From 4a5f7eda88685a5f533578b17519173aca92a44c Mon Sep 17 00:00:00 2001 From: Hal Ali Date: Wed, 13 Jul 2022 11:58:28 -0400 Subject: [PATCH 1/3] Add support for more binary types --- tap_mysql/discover_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_mysql/discover_utils.py b/tap_mysql/discover_utils.py index 1cdd789..0c6cbbe 100644 --- a/tap_mysql/discover_utils.py +++ b/tap_mysql/discover_utils.py @@ -53,7 +53,7 @@ DATETIME_TYPES = {'datetime', 'timestamp', 'time', 'date'} -BINARY_TYPES = {'binary', 'varbinary'} +BINARY_TYPES = {'binary', 'varbinary', 'tinyblob', 'blob', 'mediumblob', 'longblob'} SPATIAL_TYPES = {'geometry', 'point', 'linestring', 'polygon', 'multipoint', 'multilinestring', From 8a02d584019b0c34d4fa831ec001098d9b7699d6 Mon Sep 17 00:00:00 2001 From: Haleemur Ali Date: Tue, 19 Jul 2022 21:44:51 -0400 Subject: [PATCH 2/3] Add test_blob & Fix KeyError in test_binlog_stream_with_gtid --- tests/integration/test_tap_mysql.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_tap_mysql.py b/tests/integration/test_tap_mysql.py index 0af047d..ce032df 100644 --- a/tests/integration/test_tap_mysql.py +++ b/tests/integration/test_tap_mysql.py @@ -335,6 +335,16 @@ def test_geometrycollection(self): 'sql-datatype': 'geometrycollection', 'datatype': 'geometrycollection'}) + def test_blob(self): + actual = self.schema.properties['c_blob'] + self.assertEqual(actual, + Schema(['null', 'string'], + format='binary', + inclusion='available')) + self.assertEqual(self.get_metadata_for_column('c_blob'), + {'selected-by-default': True, + 'sql-datatype': 'blob', + 'datatype': 'blob'}) class TestSelectsAppropriateColumns(unittest.TestCase): @@ -715,10 +725,10 @@ def setUp(self): cursor.execute('CREATE TABLE binlog_1 (id int, updated datetime, ' 'created_date Date)') cursor.execute(""" - CREATE TABLE binlog_2 (id int, - updated datetime, - is_good bool default False, - ctime time, + CREATE TABLE binlog_2 (id int, + updated datetime, + is_good bool default False, + ctime time, cjson json) """) cursor.execute( @@ -1012,7 +1022,7 @@ def test_binlog_stream_with_gtid(self): global SINGER_MESSAGES engine = os.getenv('TAP_MYSQL_ENGINE', MYSQL_ENGINE) - gtid = binlog.fetch_current_gtid_pos(self.conn, os.environ['TAP_MYSQL_ENGINE']) + gtid = binlog.fetch_current_gtid_pos(self.conn, engine) config = test_utils.get_db_config() config['use_gtid'] = True From 20f7e7cc0a14c2a9d3d2bf955a2cf43a04381d69 Mon Sep 17 00:00:00 2001 From: Haleemur Ali Date: Tue, 19 Jul 2022 22:25:39 -0400 Subject: [PATCH 3/3] Add test TestBinaryMapping --- tests/integration/test_tap_mysql.py | 32 +++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/integration/test_tap_mysql.py b/tests/integration/test_tap_mysql.py index ce032df..d7a3cb1 100644 --- a/tests/integration/test_tap_mysql.py +++ b/tests/integration/test_tap_mysql.py @@ -1,3 +1,4 @@ +import codecs import os import unittest from unittest.mock import patch @@ -1483,3 +1484,34 @@ def tearDown(self) -> None: with connect_with_backoff(self.conn) as open_conn: with open_conn.cursor() as cursor: cursor.execute('DROP TABLE bit_booleans_table;') + +class TestBinaryMapping(unittest.TestCase): + + def setUp(self): + self.conn = test_utils.get_test_connection() + self.blob_data = b'hello world' + with connect_with_backoff(self.conn) as conn: + with conn.cursor() as cursor: + cursor.execute("CREATE TABLE binary_table(id int, c_blob blob)") + cursor.execute("INSERT INTO binary_table VALUES (%s, %s)", (1, self.blob_data)) + self.catalog = test_utils.discover_catalog(self.conn, {}) + + def tearDown(self): + with connect_with_backoff(self.conn) as open_conn: + with open_conn.cursor() as cursor: + cursor.execute('DROP TABLE binary_table') + + def test_sync_messages_are_correct(self): + self.catalog.streams[0] = test_utils.set_replication_method_and_key(self.catalog.streams[0], 'FULL_TABLE', None) + self.catalog.streams[0] = test_utils.set_selected(self.catalog.streams[0], True) + + global SINGER_MESSAGES + SINGER_MESSAGES.clear() + + tap_mysql.do_sync(self.conn, {}, self.catalog, {}) + + record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES)) + + self.assertEqual(len(record_messages), 1) + self.assertEqual([rec.record for rec in record_messages], + [{'id': 1, 'c_blob': codecs.encode(self.blob_data, 'hex').decode('utf-8').upper()}])