Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Json support (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhindery authored May 13, 2020
1 parent 135e28b commit 34cbd9b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
3 changes: 3 additions & 0 deletions tap_mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def schema_for_column(column):
elif data_type in FLOAT_TYPES:
result.type = ['null', 'number']

elif data_type == 'json':
result.type = ['null', 'object']

elif data_type == 'decimal':
result.type = ['null', 'number']
result.multipleOf = 10 ** (0 - column.numeric_scale)
Expand Down
12 changes: 12 additions & 0 deletions tap_mysql/sync_strategies/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import codecs
import copy
import datetime
import json
import pymysql.connections
import pymysql.err
import pytz
Expand Down Expand Up @@ -114,6 +115,14 @@ def fetch_server_id(mysql_conn):
return server_id


def json_bytes_to_string(data):
if isinstance(data, bytes): return data.decode()
if isinstance(data, dict): return dict(map(json_bytes_to_string, data.items()))
if isinstance(data, tuple): return tuple(map(json_bytes_to_string, data))
if isinstance(data, list): return list(map(json_bytes_to_string, data))
return data


def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extracted):
row_to_persist = {}

Expand Down Expand Up @@ -141,6 +150,9 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac
timedelta_from_epoch = datetime.datetime.utcfromtimestamp(0) + val
row_to_persist[column_name] = timedelta_from_epoch.isoformat() + '+00:00'

elif db_column_type == FIELD_TYPE.JSON:
row_to_persist[column_name] = json.dumps(json_bytes_to_string(val))

elif isinstance(val, bytes):
# encode bytes as hex bytes then to utf8 string
row_to_persist[column_name] = codecs.encode(val, 'hex').decode('utf-8')
Expand Down
32 changes: 32 additions & 0 deletions tests/test_tap_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,38 @@ def runTest(self):
self.assertEqual(record_message.record, {'b c': 1})


class TestJsonTables(unittest.TestCase):

def setUp(self):
self.conn = test_utils.get_test_connection()

with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
cursor.execute('CREATE TABLE json_table (val json)')
cursor.execute('INSERT INTO json_table (val) VALUES ( \'{"a": 10, "b": "c"}\')')

self.catalog = test_utils.discover_catalog(self.conn, {})
for stream in self.catalog.streams:
stream.key_properties = []

stream.metadata = [
{'breadcrumb': (), 'metadata': {'selected': True, 'database-name': 'tap_mysql_test'}},
{'breadcrumb': ('properties', 'val'), 'metadata': {'selected': True}}
]

stream.stream = stream.table
test_utils.set_replication_method_and_key(stream, 'FULL_TABLE', None)

def runTest(self):
global SINGER_MESSAGES
SINGER_MESSAGES.clear()
tap_mysql.do_sync(self.conn, {}, self.catalog, {})

record_message = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES))[0]
self.assertTrue(isinstance(record_message, singer.RecordMessage))
self.assertEqual(record_message.record, {'val': '{"a": 10, "b": "c"}'})


class TestSupportedPK(unittest.TestCase):

def setUp(self):
Expand Down

0 comments on commit 34cbd9b

Please sign in to comment.