Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Support blob / binary data types #119

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tap_mysql/discover_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

DATETIME_TYPES = {'datetime', 'timestamp', 'time', 'date'}

BINARY_TYPES = {'binary', 'varbinary'}
BINARY_TYPES = {'binary', 'varbinary', 'tinyblob', 'blob', 'mediumblob', 'longblob'}

SPATIAL_TYPES = {'geometry', 'point', 'linestring',
'polygon', 'multipoint', 'multilinestring',
Expand Down
52 changes: 47 additions & 5 deletions tests/integration/test_tap_mysql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import codecs
import os
import unittest
from unittest.mock import patch
Expand Down Expand Up @@ -335,6 +336,16 @@ def test_geometrycollection(self):
'sql-datatype': 'geometrycollection',
'datatype': 'geometrycollection'})

def test_blob(self):
actual = self.schema.properties['c_blob']
self.assertEqual(actual,
Schema(['null', 'string'],
format='binary',
inclusion='available'))
self.assertEqual(self.get_metadata_for_column('c_blob'),
{'selected-by-default': True,
'sql-datatype': 'blob',
'datatype': 'blob'})

class TestSelectsAppropriateColumns(unittest.TestCase):

Expand Down Expand Up @@ -715,10 +726,10 @@ def setUp(self):
cursor.execute('CREATE TABLE binlog_1 (id int, updated datetime, '
'created_date Date)')
cursor.execute("""
CREATE TABLE binlog_2 (id int,
updated datetime,
is_good bool default False,
ctime time,
CREATE TABLE binlog_2 (id int,
updated datetime,
is_good bool default False,
ctime time,
cjson json)
""")
cursor.execute(
Expand Down Expand Up @@ -1012,7 +1023,7 @@ def test_binlog_stream_with_gtid(self):
global SINGER_MESSAGES

engine = os.getenv('TAP_MYSQL_ENGINE', MYSQL_ENGINE)
gtid = binlog.fetch_current_gtid_pos(self.conn, os.environ['TAP_MYSQL_ENGINE'])
gtid = binlog.fetch_current_gtid_pos(self.conn, engine)

config = test_utils.get_db_config()
config['use_gtid'] = True
Expand Down Expand Up @@ -1473,3 +1484,34 @@ def tearDown(self) -> None:
with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
cursor.execute('DROP TABLE bit_booleans_table;')

class TestBinaryMapping(unittest.TestCase):

def setUp(self):
self.conn = test_utils.get_test_connection()
self.blob_data = b'hello world'
with connect_with_backoff(self.conn) as conn:
with conn.cursor() as cursor:
cursor.execute("CREATE TABLE binary_table(id int, c_blob blob)")
cursor.execute("INSERT INTO binary_table VALUES (%s, %s)", (1, self.blob_data))
self.catalog = test_utils.discover_catalog(self.conn, {})

def tearDown(self):
with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
cursor.execute('DROP TABLE binary_table')

def test_sync_messages_are_correct(self):
self.catalog.streams[0] = test_utils.set_replication_method_and_key(self.catalog.streams[0], 'FULL_TABLE', None)
self.catalog.streams[0] = test_utils.set_selected(self.catalog.streams[0], True)

global SINGER_MESSAGES
SINGER_MESSAGES.clear()

tap_mysql.do_sync(self.conn, {}, self.catalog, {})

record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES))

self.assertEqual(len(record_messages), 1)
self.assertEqual([rec.record for rec in record_messages],
[{'id': 1, 'c_blob': codecs.encode(self.blob_data, 'hex').decode('utf-8').upper()}])