diff --git a/tap_mysql/__init__.py b/tap_mysql/__init__.py index 6c5f3a2..4401bc9 100644 --- a/tap_mysql/__init__.py +++ b/tap_mysql/__init__.py @@ -86,6 +86,9 @@ def schema_for_column(column): elif data_type in FLOAT_TYPES: result.type = ['null', 'number'] + elif data_type == 'json': + result.type = ['null', 'object'] + elif data_type == 'decimal': result.type = ['null', 'number'] result.multipleOf = 10 ** (0 - column.numeric_scale) diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index 865d318..d157797 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -3,6 +3,7 @@ import codecs import copy import datetime +import json import pymysql.connections import pymysql.err import pytz @@ -114,6 +115,14 @@ def fetch_server_id(mysql_conn): return server_id +def json_bytes_to_string(data): + if isinstance(data, bytes): return data.decode() + if isinstance(data, dict): return dict(map(json_bytes_to_string, data.items())) + if isinstance(data, tuple): return tuple(map(json_bytes_to_string, data)) + if isinstance(data, list): return list(map(json_bytes_to_string, data)) + return data + + def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extracted): row_to_persist = {} @@ -141,6 +150,9 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac timedelta_from_epoch = datetime.datetime.utcfromtimestamp(0) + val row_to_persist[column_name] = timedelta_from_epoch.isoformat() + '+00:00' + elif db_column_type == FIELD_TYPE.JSON: + row_to_persist[column_name] = json.dumps(json_bytes_to_string(val)) + elif isinstance(val, bytes): # encode bytes as hex bytes then to utf8 string row_to_persist[column_name] = codecs.encode(val, 'hex').decode('utf-8') diff --git a/tests/test_tap_mysql.py b/tests/test_tap_mysql.py index 34bbc30..598e7b3 100644 --- a/tests/test_tap_mysql.py +++ b/tests/test_tap_mysql.py @@ -861,6 +861,38 @@ def runTest(self): self.assertEqual(record_message.record, {'b c': 1}) +class TestJsonTables(unittest.TestCase): + + def setUp(self): + self.conn = test_utils.get_test_connection() + + with connect_with_backoff(self.conn) as open_conn: + with open_conn.cursor() as cursor: + cursor.execute('CREATE TABLE json_table (val json)') + cursor.execute('INSERT INTO json_table (val) VALUES ( \'{"a": 10, "b": "c"}\')') + + self.catalog = test_utils.discover_catalog(self.conn, {}) + for stream in self.catalog.streams: + stream.key_properties = [] + + stream.metadata = [ + {'breadcrumb': (), 'metadata': {'selected': True, 'database-name': 'tap_mysql_test'}}, + {'breadcrumb': ('properties', 'val'), 'metadata': {'selected': True}} + ] + + stream.stream = stream.table + test_utils.set_replication_method_and_key(stream, 'FULL_TABLE', None) + + def runTest(self): + global SINGER_MESSAGES + SINGER_MESSAGES.clear() + tap_mysql.do_sync(self.conn, {}, self.catalog, {}) + + record_message = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES))[0] + self.assertTrue(isinstance(record_message, singer.RecordMessage)) + self.assertEqual(record_message.record, {'val': '{"a": 10, "b": "c"}'}) + + class TestSupportedPK(unittest.TestCase): def setUp(self):