diff --git a/setup.py b/setup.py index 42b39460..82e81429 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name="pipelinewise-target-snowflake", - version="1.1.7", + version="1.2.0", description="Singer.io target for loading data to Snowflake - PipelineWise compatible", long_description=long_description, long_description_content_type='text/markdown', diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index b0fa4217..143d0ae1 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -65,6 +65,8 @@ def column_type(schema_property): column_type = 'timestamp_ntz' elif property_format == 'time': column_type = 'time' + elif property_format == 'binary': + column_type = 'binary' elif 'number' in property_type: column_type = 'float' elif 'integer' in property_type and 'string' in property_type: @@ -82,6 +84,8 @@ def column_trans(schema_property): column_trans = '' if 'object' in property_type or 'array' in property_type: column_trans = 'parse_json' + elif schema_property.get('format') == 'binary': + column_trans = 'to_binary' return column_trans diff --git a/tests/integration/resources/messages-with-binary-columns.json b/tests/integration/resources/messages-with-binary-columns.json new file mode 100644 index 00000000..2a22f313 --- /dev/null +++ b/tests/integration/resources/messages-with-binary-columns.json @@ -0,0 +1,16 @@ +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_binary"}} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_binary", "schema": {"properties": {"data": {"inclusion": "available", "format": "binary", "type": ["null", "string"]}, "id": {"inclusion": "automatic", "format": "binary", "type": ["null", "string"]}, "created_at": {"inclusion": "available", "format": "date-time", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["id"]} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_binary", "version": 1576670613163} +{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"data": "6461746132", "id": "706b32", "created_at": "2019-12-17T16:02:55+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"data": "64617461313030", "id": "706b33", "created_at": "2019-12-18T11:46:38+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"data": "6461746134", "id": "706b34", "created_at": "2019-12-17T16:32:22+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_binary", "bookmarks": {"tap_mysql_test-test_binary": {"version": 1576670613163}}}} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_binary", "version": 1576670613163} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_binary": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 945}}}} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_binary": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 945}}}} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_binary", "schema": {"properties": {"data": {"inclusion": "available", "format": "binary", "type": ["null", "string"]}, "created_at": {"inclusion": "available", "format": "date-time", "type": ["null", "string"]}, "id": {"inclusion": "automatic", "format": "binary", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["id"]} +{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"id": "706b35", "data": "6461746135", "created_at": "2019-12-18T13:19:20+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"id": "706b35", "data": "64617461313030", "created_at": "2019-12-18T13:19:35+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"id": "706b33", "data": "64617461313030", "created_at": "2019-12-18T11:46:38+00:00", "_sdc_deleted_at": "2019-12-18T13:19:44+00:00+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_binary", "record": {"id": "706b35", "data": "64617461313030", "created_at": "2019-12-18T13:19:35+00:00", "_sdc_deleted_at": "2019-12-18T13:19:44+00:00+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_binary": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 1867}}}} diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 57786964..4fc0c2bf 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -246,6 +246,25 @@ def assert_logical_streams_are_in_snowflake_and_are_empty(self): self.assertEqual(table_three, []) self.assertEqual(table_four, []) + def assert_binary_data_are_in_snowflake(self, should_metadata_columns_exist=False): + # Get loaded rows from tables + snowflake = DbSync(self.config) + target_schema = self.config.get('default_target_schema', '') + table_one = snowflake.query("SELECT * FROM {}.test_binary ORDER BY ID".format(target_schema)) + + # ---------------------------------------------------------------------- + # Check rows in table_one + # ---------------------------------------------------------------------- + expected_table_one = [ + {'ID': b'pk2', 'DATA': b'data2', 'CREATED_AT': datetime.datetime(2019, 12, 17, 16, 2, 55)}, + {'ID': b'pk4', 'DATA': b'data4', "CREATED_AT": datetime.datetime(2019, 12, 17, 16, 32, 22)}, + ] + + if should_metadata_columns_exist: + self.assertEqual(self.remove_metadata_columns_from_rows(table_one), expected_table_one) + else: + self.assertEqual(table_one, expected_table_one) + ################################# # TESTS # ################################# @@ -340,6 +359,19 @@ def test_loading_with_multiple_schema(self): should_hard_deleted_rows=False ) + def test_loading_tables_with_binary_columns_and_hard_delete(self): + """Loading multiple tables from the same input tap with deleted rows""" + tap_lines = test_utils.get_test_tap_lines('messages-with-binary-columns.json') + + # Turning on hard delete mode + self.config['hard_delete'] = True + self.persist_lines_with_cache(tap_lines) + + # Check if data loaded correctly and metadata columns exist + self.assert_binary_data_are_in_snowflake( + should_metadata_columns_exist=True + ) + def test_loading_unicode_characters(self): """Loading unicode encoded characters""" tap_lines = test_utils.get_test_tap_lines('messages-with-unicode-characters.json') diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index 5276809e..aa5905b6 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -11,6 +11,22 @@ class TestDBSync(unittest.TestCase): def setUp(self): self.config = {} + self.json_types = { + 'str': {"type": ["string"]}, + 'str_or_null': {"type": ["string", "null"]}, + 'dt': {"type": ["string"], "format": "date-time"}, + 'dt_or_null': {"type": ["string", "null"], "format": "date-time"}, + 'time': {"type": ["string"], "format": "time"}, + 'time_or_null': {"type": ["string", "null"], "format": "time"}, + 'binary': {"type": ["string", "null"], "format": "binary"}, + 'num': {"type": ["number"]}, + 'int': {"type": ["integer"]}, + 'int_or_str': {"type": ["integer", "string"]}, + 'bool': {"type": ["boolean"]}, + 'obj': {"type": ["object"]}, + 'arr': {"type": ["array"]}, + } + def test_config_validation(self): """Test configuration validator""" validator = db_sync.validate_config @@ -57,63 +73,81 @@ def test_column_type_mapping(self): """Test JSON type to Snowflake column type mappings""" mapper = db_sync.column_type - # Incoming JSON schema types - json_str = {"type": ["string"]} - json_str_or_null = {"type": ["string", "null"]} - json_dt = {"type": ["string"], "format": "date-time"} - json_dt_or_null = {"type": ["string", "null"], "format": "date-time"} - json_t = {"type": ["string"], "format": "time"} - json_t_or_null = {"type": ["string", "null"], "format": "time"} - json_num = {"type": ["number"]} - json_int = {"type": ["integer"]} - json_int_or_str = {"type": ["integer", "string"]} - json_bool = {"type": ["boolean"]} - json_obj = {"type": ["object"]} - json_arr = {"type": ["array"]} - - # Mapping from JSON schema types ot Snowflake column types - self.assertEquals(mapper(json_str), 'text') - self.assertEquals(mapper(json_str_or_null), 'text') - self.assertEquals(mapper(json_dt), 'timestamp_ntz') - self.assertEquals(mapper(json_dt_or_null), 'timestamp_ntz') - self.assertEquals(mapper(json_t), 'time') - self.assertEquals(mapper(json_t_or_null), 'time') - self.assertEquals(mapper(json_num), 'float') - self.assertEquals(mapper(json_int), 'number') - self.assertEquals(mapper(json_int_or_str), 'text') - self.assertEquals(mapper(json_bool), 'boolean') - self.assertEquals(mapper(json_obj), 'variant') - self.assertEquals(mapper(json_arr), 'variant') + # Snowflake column types + sf_types = { + 'str': 'text', + 'str_or_null': 'text', + 'dt': 'timestamp_ntz', + 'dt_or_null': 'timestamp_ntz', + 'time': 'time', + 'time_or_null': 'time', + 'binary': 'binary', + 'num': 'float', + 'int': 'number', + 'int_or_str': 'text', + 'bool': 'boolean', + 'obj': 'variant', + 'arr': 'variant', + } + + # Mapping from JSON schema types to Snowflake column types + for key, val in self.json_types.items(): + self.assertEqual(mapper(val), sf_types[key]) + + def test_column_trans(self): + """Test column transformation""" + trans = db_sync.column_trans + + # Snowflake column transformations + sf_trans = { + 'str': '', + 'str_or_null': '', + 'dt': '', + 'dt_or_null': '', + 'time': '', + 'time_or_null': '', + 'binary': 'to_binary', + 'num': '', + 'int': '', + 'int_or_str': '', + 'bool': '', + 'obj': 'parse_json', + 'arr': 'parse_json', + } + + # Getting transformations for every JSON type + for key, val in self.json_types.items(): + self.assertEqual(trans(val), sf_trans[key]) def test_stream_name_to_dict(self): """Test identifying catalog, schema and table names from fully qualified stream and table names""" # Singer stream name format (Default '-' separator) - self.assertEquals( + self.assertEqual( db_sync.stream_name_to_dict('my_table'), {"catalog_name": None, "schema_name": None, "table_name": "my_table"}) # Singer stream name format (Default '-' separator) - self.assertEquals( + self.assertEqual( db_sync.stream_name_to_dict('my_schema-my_table'), {"catalog_name": None, "schema_name": "my_schema", "table_name": "my_table"}) # Singer stream name format (Default '-' separator) - self.assertEquals( + self.assertEqual( db_sync.stream_name_to_dict('my_catalog-my_schema-my_table'), {"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"}) # Snowflake table format (Custom '.' separator) - self.assertEquals( + self.assertEqual( db_sync.stream_name_to_dict('my_table', separator='.'), {"catalog_name": None, "schema_name": None, "table_name": "my_table"}) # Snowflake table format (Custom '.' separator) - self.assertEquals( + self.assertEqual( db_sync.stream_name_to_dict('my_schema.my_table', separator='.'), {"catalog_name": None, "schema_name": "my_schema", "table_name": "my_table"}) # Snowflake table format (Custom '.' separator) - self.assertEquals( + self.assertEqual( db_sync.stream_name_to_dict('my_catalog.my_schema.my_table', separator='.'), {"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"}) @@ -123,7 +157,7 @@ def test_flatten_schema(self): # Schema with no object properties should be empty dict schema_with_no_properties = {"type": "object"} - self.assertEquals(flatten_schema(schema_with_no_properties), {}) + self.assertEqual(flatten_schema(schema_with_no_properties), {}) not_nested_schema = { "type": "object", @@ -133,7 +167,7 @@ def test_flatten_schema(self): "c_int": {"type": ["null", "integer"]}}} # NO FLATTENING - Schema with simple properties should be a plain dictionary - self.assertEquals(flatten_schema(not_nested_schema), not_nested_schema['properties']) + self.assertEqual(flatten_schema(not_nested_schema), not_nested_schema['properties']) nested_schema_with_no_properties = { "type": "object", @@ -144,7 +178,7 @@ def test_flatten_schema(self): "c_obj": {"type": ["null", "object"]}}} # NO FLATTENING - Schema with object type property but without further properties should be a plain dictionary - self.assertEquals(flatten_schema(nested_schema_with_no_properties), + self.assertEqual(flatten_schema(nested_schema_with_no_properties), nested_schema_with_no_properties['properties']) nested_schema_with_properties = { @@ -172,16 +206,16 @@ def test_flatten_schema(self): # NO FLATTENING - Schema with object type property but without further properties should be a plain dictionary # No flattening (default) - self.assertEquals(flatten_schema(nested_schema_with_properties), nested_schema_with_properties['properties']) + self.assertEqual(flatten_schema(nested_schema_with_properties), nested_schema_with_properties['properties']) # NO FLATTENING - Schema with object type property but without further properties should be a plain dictionary # max_level: 0 : No flattening (default) - self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=0), + self.assertEqual(flatten_schema(nested_schema_with_properties, max_level=0), nested_schema_with_properties['properties']) # FLATTENING - Schema with object type property but without further properties should be a dict with # flattened properties - self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=1), + self.assertEqual(flatten_schema(nested_schema_with_properties, max_level=1), { 'c_pk': {'type': ['null', 'integer']}, 'c_varchar': {'type': ['null', 'string']}, @@ -199,7 +233,7 @@ def test_flatten_schema(self): # FLATTENING - Schema with object type property but without further properties should be a dict with # flattened properties - self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=10), + self.assertEqual(flatten_schema(nested_schema_with_properties, max_level=10), { 'c_pk': {'type': ['null', 'integer']}, 'c_varchar': {'type': ['null', 'string']}, @@ -216,11 +250,11 @@ def test_flatten_record(self): empty_record = {} # Empty record should be empty dict - self.assertEquals(flatten_record(empty_record), {}) + self.assertEqual(flatten_record(empty_record), {}) not_nested_record = {"c_pk": 1, "c_varchar": "1", "c_int": 1} # NO FLATTENING - Record with simple properties should be a plain dictionary - self.assertEquals(flatten_record(not_nested_record), not_nested_record) + self.assertEqual(flatten_record(not_nested_record), not_nested_record) nested_record = { "c_pk": 1, @@ -235,7 +269,7 @@ def test_flatten_record(self): }}} # NO FLATTENING - No flattening (default) - self.assertEquals(flatten_record(nested_record), + self.assertEqual(flatten_record(nested_record), { "c_pk": 1, "c_varchar": "1", @@ -246,7 +280,7 @@ def test_flatten_record(self): # NO FLATTENING # max_level: 0 : No flattening (default) - self.assertEquals(flatten_record(nested_record, max_level=0), + self.assertEqual(flatten_record(nested_record, max_level=0), { "c_pk": 1, "c_varchar": "1", @@ -257,7 +291,7 @@ def test_flatten_record(self): # SEMI FLATTENING # max_level: 1 : Semi-flattening (default) - self.assertEquals(flatten_record(nested_record, max_level=1), + self.assertEqual(flatten_record(nested_record, max_level=1), { "c_pk": 1, "c_varchar": "1", @@ -269,7 +303,7 @@ def test_flatten_record(self): }) # FLATTENING - self.assertEquals(flatten_record(nested_record, max_level=10), + self.assertEqual(flatten_record(nested_record, max_level=10), { "c_pk": 1, "c_varchar": "1",