Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Add test TestBinaryMapping
Browse files Browse the repository at this point in the history
  • Loading branch information
haleemur committed Jul 20, 2022
1 parent 8a02d58 commit 20f7e7c
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions tests/integration/test_tap_mysql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import codecs
import os
import unittest
from unittest.mock import patch
Expand Down Expand Up @@ -1483,3 +1484,34 @@ def tearDown(self) -> None:
with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
cursor.execute('DROP TABLE bit_booleans_table;')

class TestBinaryMapping(unittest.TestCase):

def setUp(self):
self.conn = test_utils.get_test_connection()
self.blob_data = b'hello world'
with connect_with_backoff(self.conn) as conn:
with conn.cursor() as cursor:
cursor.execute("CREATE TABLE binary_table(id int, c_blob blob)")
cursor.execute("INSERT INTO binary_table VALUES (%s, %s)", (1, self.blob_data))
self.catalog = test_utils.discover_catalog(self.conn, {})

def tearDown(self):
with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
cursor.execute('DROP TABLE binary_table')

def test_sync_messages_are_correct(self):
self.catalog.streams[0] = test_utils.set_replication_method_and_key(self.catalog.streams[0], 'FULL_TABLE', None)
self.catalog.streams[0] = test_utils.set_selected(self.catalog.streams[0], True)

global SINGER_MESSAGES
SINGER_MESSAGES.clear()

tap_mysql.do_sync(self.conn, {}, self.catalog, {})

record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES))

self.assertEqual(len(record_messages), 1)
self.assertEqual([rec.record for rec in record_messages],
[{'id': 1, 'c_blob': codecs.encode(self.blob_data, 'hex').decode('utf-8').upper()}])

0 comments on commit 20f7e7c

Please sign in to comment.