Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
1.1.8: create binary column for the binary data (#53)
Browse files Browse the repository at this point in the history
* 1.1.8: create binary column for the binary data

* bump minor

* update test to get rid of repeated input
  • Loading branch information
Samira-El authored Jan 7, 2020
1 parent 2d1d795 commit 0a200a3
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 47 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name="pipelinewise-target-snowflake",
version="1.1.7",
version="1.2.0",
description="Singer.io target for loading data to Snowflake - PipelineWise compatible",
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
4 changes: 4 additions & 0 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def column_type(schema_property):
column_type = 'timestamp_ntz'
elif property_format == 'time':
column_type = 'time'
elif property_format == 'binary':
column_type = 'binary'
elif 'number' in property_type:
column_type = 'float'
elif 'integer' in property_type and 'string' in property_type:
Expand All @@ -82,6 +84,8 @@ def column_trans(schema_property):
column_trans = ''
if 'object' in property_type or 'array' in property_type:
column_trans = 'parse_json'
elif schema_property.get('format') == 'binary':
column_trans = 'to_binary'

return column_trans

Expand Down
16 changes: 16 additions & 0 deletions tests/integration/resources/messages-with-binary-columns.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_binary"}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_binary", "schema": {"properties": {"data": {"inclusion": "available", "format": "binary", "type": ["null", "string"]}, "id": {"inclusion": "automatic", "format": "binary", "type": ["null", "string"]}, "created_at": {"inclusion": "available", "format": "date-time", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["id"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_binary", "version": 1576670613163}
{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"data": "6461746132", "id": "706b32", "created_at": "2019-12-17T16:02:55+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"data": "64617461313030", "id": "706b33", "created_at": "2019-12-18T11:46:38+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"data": "6461746134", "id": "706b34", "created_at": "2019-12-17T16:32:22+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_binary", "bookmarks": {"tap_mysql_test-test_binary": {"version": 1576670613163}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_binary", "version": 1576670613163}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_binary": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 945}}}}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_binary": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 945}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_binary", "schema": {"properties": {"data": {"inclusion": "available", "format": "binary", "type": ["null", "string"]}, "created_at": {"inclusion": "available", "format": "date-time", "type": ["null", "string"]}, "id": {"inclusion": "automatic", "format": "binary", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["id"]}
{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"id": "706b35", "data": "6461746135", "created_at": "2019-12-18T13:19:20+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"id": "706b35", "data": "64617461313030", "created_at": "2019-12-18T13:19:35+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"id": "706b33", "data": "64617461313030", "created_at": "2019-12-18T11:46:38+00:00", "_sdc_deleted_at": "2019-12-18T13:19:44+00:00+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"id": "706b35", "data": "64617461313030", "created_at": "2019-12-18T13:19:35+00:00", "_sdc_deleted_at": "2019-12-18T13:19:44+00:00+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_binary": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 1867}}}}
32 changes: 32 additions & 0 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,25 @@ def assert_logical_streams_are_in_snowflake_and_are_empty(self):
self.assertEqual(table_three, [])
self.assertEqual(table_four, [])

def assert_binary_data_are_in_snowflake(self, should_metadata_columns_exist=False):
# Get loaded rows from tables
snowflake = DbSync(self.config)
target_schema = self.config.get('default_target_schema', '')
table_one = snowflake.query("SELECT * FROM {}.test_binary ORDER BY ID".format(target_schema))

# ----------------------------------------------------------------------
# Check rows in table_one
# ----------------------------------------------------------------------
expected_table_one = [
{'ID': b'pk2', 'DATA': b'data2', 'CREATED_AT': datetime.datetime(2019, 12, 17, 16, 2, 55)},
{'ID': b'pk4', 'DATA': b'data4', "CREATED_AT": datetime.datetime(2019, 12, 17, 16, 32, 22)},
]

if should_metadata_columns_exist:
self.assertEqual(self.remove_metadata_columns_from_rows(table_one), expected_table_one)
else:
self.assertEqual(table_one, expected_table_one)

#################################
# TESTS #
#################################
Expand Down Expand Up @@ -340,6 +359,19 @@ def test_loading_with_multiple_schema(self):
should_hard_deleted_rows=False
)

def test_loading_tables_with_binary_columns_and_hard_delete(self):
"""Loading multiple tables from the same input tap with deleted rows"""
tap_lines = test_utils.get_test_tap_lines('messages-with-binary-columns.json')

# Turning on hard delete mode
self.config['hard_delete'] = True
self.persist_lines_with_cache(tap_lines)

# Check if data loaded correctly and metadata columns exist
self.assert_binary_data_are_in_snowflake(
should_metadata_columns_exist=True
)

def test_loading_unicode_characters(self):
"""Loading unicode encoded characters"""
tap_lines = test_utils.get_test_tap_lines('messages-with-unicode-characters.json')
Expand Down
126 changes: 80 additions & 46 deletions tests/unit/test_db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ class TestDBSync(unittest.TestCase):
def setUp(self):
self.config = {}

self.json_types = {
'str': {"type": ["string"]},
'str_or_null': {"type": ["string", "null"]},
'dt': {"type": ["string"], "format": "date-time"},
'dt_or_null': {"type": ["string", "null"], "format": "date-time"},
'time': {"type": ["string"], "format": "time"},
'time_or_null': {"type": ["string", "null"], "format": "time"},
'binary': {"type": ["string", "null"], "format": "binary"},
'num': {"type": ["number"]},
'int': {"type": ["integer"]},
'int_or_str': {"type": ["integer", "string"]},
'bool': {"type": ["boolean"]},
'obj': {"type": ["object"]},
'arr': {"type": ["array"]},
}

def test_config_validation(self):
"""Test configuration validator"""
validator = db_sync.validate_config
Expand Down Expand Up @@ -57,63 +73,81 @@ def test_column_type_mapping(self):
"""Test JSON type to Snowflake column type mappings"""
mapper = db_sync.column_type

# Incoming JSON schema types
json_str = {"type": ["string"]}
json_str_or_null = {"type": ["string", "null"]}
json_dt = {"type": ["string"], "format": "date-time"}
json_dt_or_null = {"type": ["string", "null"], "format": "date-time"}
json_t = {"type": ["string"], "format": "time"}
json_t_or_null = {"type": ["string", "null"], "format": "time"}
json_num = {"type": ["number"]}
json_int = {"type": ["integer"]}
json_int_or_str = {"type": ["integer", "string"]}
json_bool = {"type": ["boolean"]}
json_obj = {"type": ["object"]}
json_arr = {"type": ["array"]}

# Mapping from JSON schema types ot Snowflake column types
self.assertEquals(mapper(json_str), 'text')
self.assertEquals(mapper(json_str_or_null), 'text')
self.assertEquals(mapper(json_dt), 'timestamp_ntz')
self.assertEquals(mapper(json_dt_or_null), 'timestamp_ntz')
self.assertEquals(mapper(json_t), 'time')
self.assertEquals(mapper(json_t_or_null), 'time')
self.assertEquals(mapper(json_num), 'float')
self.assertEquals(mapper(json_int), 'number')
self.assertEquals(mapper(json_int_or_str), 'text')
self.assertEquals(mapper(json_bool), 'boolean')
self.assertEquals(mapper(json_obj), 'variant')
self.assertEquals(mapper(json_arr), 'variant')
# Snowflake column types
sf_types = {
'str': 'text',
'str_or_null': 'text',
'dt': 'timestamp_ntz',
'dt_or_null': 'timestamp_ntz',
'time': 'time',
'time_or_null': 'time',
'binary': 'binary',
'num': 'float',
'int': 'number',
'int_or_str': 'text',
'bool': 'boolean',
'obj': 'variant',
'arr': 'variant',
}

# Mapping from JSON schema types to Snowflake column types
for key, val in self.json_types.items():
self.assertEqual(mapper(val), sf_types[key])

def test_column_trans(self):
"""Test column transformation"""
trans = db_sync.column_trans

# Snowflake column transformations
sf_trans = {
'str': '',
'str_or_null': '',
'dt': '',
'dt_or_null': '',
'time': '',
'time_or_null': '',
'binary': 'to_binary',
'num': '',
'int': '',
'int_or_str': '',
'bool': '',
'obj': 'parse_json',
'arr': 'parse_json',
}

# Getting transformations for every JSON type
for key, val in self.json_types.items():
self.assertEqual(trans(val), sf_trans[key])

def test_stream_name_to_dict(self):
"""Test identifying catalog, schema and table names from fully qualified stream and table names"""
# Singer stream name format (Default '-' separator)
self.assertEquals(
self.assertEqual(
db_sync.stream_name_to_dict('my_table'),
{"catalog_name": None, "schema_name": None, "table_name": "my_table"})

# Singer stream name format (Default '-' separator)
self.assertEquals(
self.assertEqual(
db_sync.stream_name_to_dict('my_schema-my_table'),
{"catalog_name": None, "schema_name": "my_schema", "table_name": "my_table"})

# Singer stream name format (Default '-' separator)
self.assertEquals(
self.assertEqual(
db_sync.stream_name_to_dict('my_catalog-my_schema-my_table'),
{"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"})

# Snowflake table format (Custom '.' separator)
self.assertEquals(
self.assertEqual(
db_sync.stream_name_to_dict('my_table', separator='.'),
{"catalog_name": None, "schema_name": None, "table_name": "my_table"})

# Snowflake table format (Custom '.' separator)
self.assertEquals(
self.assertEqual(
db_sync.stream_name_to_dict('my_schema.my_table', separator='.'),
{"catalog_name": None, "schema_name": "my_schema", "table_name": "my_table"})

# Snowflake table format (Custom '.' separator)
self.assertEquals(
self.assertEqual(
db_sync.stream_name_to_dict('my_catalog.my_schema.my_table', separator='.'),
{"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"})

Expand All @@ -123,7 +157,7 @@ def test_flatten_schema(self):

# Schema with no object properties should be empty dict
schema_with_no_properties = {"type": "object"}
self.assertEquals(flatten_schema(schema_with_no_properties), {})
self.assertEqual(flatten_schema(schema_with_no_properties), {})

not_nested_schema = {
"type": "object",
Expand All @@ -133,7 +167,7 @@ def test_flatten_schema(self):
"c_int": {"type": ["null", "integer"]}}}

# NO FLATTENING - Schema with simple properties should be a plain dictionary
self.assertEquals(flatten_schema(not_nested_schema), not_nested_schema['properties'])
self.assertEqual(flatten_schema(not_nested_schema), not_nested_schema['properties'])

nested_schema_with_no_properties = {
"type": "object",
Expand All @@ -144,7 +178,7 @@ def test_flatten_schema(self):
"c_obj": {"type": ["null", "object"]}}}

# NO FLATTENING - Schema with object type property but without further properties should be a plain dictionary
self.assertEquals(flatten_schema(nested_schema_with_no_properties),
self.assertEqual(flatten_schema(nested_schema_with_no_properties),
nested_schema_with_no_properties['properties'])

nested_schema_with_properties = {
Expand Down Expand Up @@ -172,16 +206,16 @@ def test_flatten_schema(self):

# NO FLATTENING - Schema with object type property but without further properties should be a plain dictionary
# No flattening (default)
self.assertEquals(flatten_schema(nested_schema_with_properties), nested_schema_with_properties['properties'])
self.assertEqual(flatten_schema(nested_schema_with_properties), nested_schema_with_properties['properties'])

# NO FLATTENING - Schema with object type property but without further properties should be a plain dictionary
# max_level: 0 : No flattening (default)
self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=0),
self.assertEqual(flatten_schema(nested_schema_with_properties, max_level=0),
nested_schema_with_properties['properties'])

# FLATTENING - Schema with object type property but without further properties should be a dict with
# flattened properties
self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=1),
self.assertEqual(flatten_schema(nested_schema_with_properties, max_level=1),
{
'c_pk': {'type': ['null', 'integer']},
'c_varchar': {'type': ['null', 'string']},
Expand All @@ -199,7 +233,7 @@ def test_flatten_schema(self):

# FLATTENING - Schema with object type property but without further properties should be a dict with
# flattened properties
self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=10),
self.assertEqual(flatten_schema(nested_schema_with_properties, max_level=10),
{
'c_pk': {'type': ['null', 'integer']},
'c_varchar': {'type': ['null', 'string']},
Expand All @@ -216,11 +250,11 @@ def test_flatten_record(self):

empty_record = {}
# Empty record should be empty dict
self.assertEquals(flatten_record(empty_record), {})
self.assertEqual(flatten_record(empty_record), {})

not_nested_record = {"c_pk": 1, "c_varchar": "1", "c_int": 1}
# NO FLATTENING - Record with simple properties should be a plain dictionary
self.assertEquals(flatten_record(not_nested_record), not_nested_record)
self.assertEqual(flatten_record(not_nested_record), not_nested_record)

nested_record = {
"c_pk": 1,
Expand All @@ -235,7 +269,7 @@ def test_flatten_record(self):
}}}

# NO FLATTENING - No flattening (default)
self.assertEquals(flatten_record(nested_record),
self.assertEqual(flatten_record(nested_record),
{
"c_pk": 1,
"c_varchar": "1",
Expand All @@ -246,7 +280,7 @@ def test_flatten_record(self):

# NO FLATTENING
# max_level: 0 : No flattening (default)
self.assertEquals(flatten_record(nested_record, max_level=0),
self.assertEqual(flatten_record(nested_record, max_level=0),
{
"c_pk": 1,
"c_varchar": "1",
Expand All @@ -257,7 +291,7 @@ def test_flatten_record(self):

# SEMI FLATTENING
# max_level: 1 : Semi-flattening (default)
self.assertEquals(flatten_record(nested_record, max_level=1),
self.assertEqual(flatten_record(nested_record, max_level=1),
{
"c_pk": 1,
"c_varchar": "1",
Expand All @@ -269,7 +303,7 @@ def test_flatten_record(self):
})

# FLATTENING
self.assertEquals(flatten_record(nested_record, max_level=10),
self.assertEqual(flatten_record(nested_record, max_level=10),
{
"c_pk": 1,
"c_varchar": "1",
Expand Down

0 comments on commit 0a200a3

Please sign in to comment.