From 023112840502d679c0a9e3f3c326c93b770f487c Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Thu, 5 Mar 2020 12:32:28 +0000 Subject: [PATCH] [AP-549] Support reserved words as table names (#65) --- target_snowflake/__init__.py | 3 ++- target_snowflake/db_sync.py | 19 ++++++++++------- ...ages-with-reserved-name-as-table-name.json | 16 ++++++++++++++ tests/integration/test_target_snowflake.py | 21 ++++++++++++++++--- 4 files changed, 48 insertions(+), 11 deletions(-) create mode 100644 tests/integration/resources/messages-with-reserved-name-as-table-name.json diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index 1a22963f..d3095559 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -286,7 +286,8 @@ def persist_lines(config, lines, information_schema_cache=None) -> None: try: stream_to_sync[stream].create_schema_if_not_exists() stream_to_sync[stream].sync_table() - except Exception: + except Exception as ex: + LOGGER.exception(ex) raise InvalidTableStructureException(""" Cannot sync table structure in Snowflake schema: {} . Try to delete {}.COLUMNS table to reset information_schema cache. Maybe it's outdated. diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 25eee2f4..38e2a1f9 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -334,9 +334,9 @@ def table_name(self, stream_name, is_temporary, without_schema = False): sf_table_name = '{}_temp'.format(sf_table_name) if without_schema: - return '{}'.format(sf_table_name) + return f'"{sf_table_name.upper()}"' - return '{}.{}'.format(self.schema_name, sf_table_name) + return f'{self.schema_name}."{sf_table_name.upper()}"' def record_primary_key_string(self, record): if len(self.stream_schema_message['key_properties']) == 0: @@ -611,11 +611,14 @@ def update_columns(self): stream_schema_message = self.stream_schema_message stream = stream_schema_message['stream'] table_name = self.table_name(stream, False, True) - columns = [] + if self.information_schema_columns: - columns = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and x['TABLE_NAME'].lower() == table_name, self.information_schema_columns)) + columns = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and + f'"{x["TABLE_NAME"].upper()}"' == table_name, + self.information_schema_columns)) else: columns = self.get_table_columns(table_schemas=[self.schema_name], table_name=table_name) + columns_dict = {column['COLUMN_NAME'].lower(): column for column in columns} columns_to_add = [ @@ -681,12 +684,14 @@ def sync_table(self): stream = stream_schema_message['stream'] table_name = self.table_name(stream, False, True) table_name_with_schema = self.table_name(stream, False) - found_tables = [] if self.information_schema_columns: - found_tables = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and x['TABLE_NAME'].lower() == table_name, self.information_schema_columns)) + found_tables = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and + f'"{x["TABLE_NAME"].upper()}"' == table_name, + self.information_schema_columns)) else: - found_tables = [table for table in (self.get_tables(self.schema_name.lower())) if table['TABLE_NAME'].lower() == table_name] + found_tables = [table for table in (self.get_tables(self.schema_name.lower())) + if f'"{table["TABLE_NAME"].upper()}"' == table_name] if len(found_tables) == 0: query = self.create_table_query() diff --git a/tests/integration/resources/messages-with-reserved-name-as-table-name.json b/tests/integration/resources/messages-with-reserved-name-as-table-name.json new file mode 100644 index 00000000..280d002f --- /dev/null +++ b/tests/integration/resources/messages-with-reserved-name-as-table-name.json @@ -0,0 +1,16 @@ +{"type": "STATE", "value": {"currently_syncing": "my_db-order"}} +{"type": "SCHEMA", "stream": "my_db-order", "schema": {"properties": {"data": {"inclusion": "available", "format": "binary", "type": ["null", "string"]}, "id": {"inclusion": "automatic", "format": "binary", "type": ["null", "string"]}, "created_at": {"inclusion": "available", "format": "date-time", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["id"]} +{"type": "ACTIVATE_VERSION", "stream": "my_db-order", "version": 1576670613163} +{"type": "RECORD", "stream": "my_db-order", "record": {"data": "6461746132", "id": "706b32", "created_at": "2019-12-17T16:02:55+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"} +{"type": "RECORD", "stream": "my_db-order", "record": {"data": "64617461313030", "id": "706b33", "created_at": "2019-12-18T11:46:38+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"} +{"type": "RECORD", "stream": "my_db-order", "record": {"data": "6461746134", "id": "706b34", "created_at": "2019-12-17T16:32:22+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"} +{"type": "STATE", "value": {"currently_syncing": "my_db-order", "bookmarks": {"my_db-order": {"version": 1576670613163}}}} +{"type": "ACTIVATE_VERSION", "stream": "my_db-order", "version": 1576670613163} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"my_db-order": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 945}}}} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"my_db-order": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 945}}}} +{"type": "SCHEMA", "stream": "my_db-order", "schema": {"properties": {"data": {"inclusion": "available", "format": "binary", "type": ["null", "string"]}, "created_at": {"inclusion": "available", "format": "date-time", "type": ["null", "string"]}, "id": {"inclusion": "automatic", "format": "binary", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["id"]} +{"type": "RECORD", "stream": "my_db-order", "record": {"id": "706b35", "data": "6461746135", "created_at": "2019-12-18T13:19:20+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"} +{"type": "RECORD", "stream": "my_db-order", "record": {"id": "706b35", "data": "64617461313030", "created_at": "2019-12-18T13:19:35+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"} +{"type": "RECORD", "stream": "my_db-order", "record": {"id": "706b33", "data": "64617461313030", "created_at": "2019-12-18T11:46:38+00:00", "_sdc_deleted_at": "2019-12-18T13:19:44+00:00+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"} +{"type": "RECORD", "stream": "my_db-order", "record": {"id": "706b35", "data": "64617461313030", "created_at": "2019-12-18T13:19:35+00:00", "_sdc_deleted_at": "2019-12-18T13:19:44+00:00+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"my_db-order": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 1867}}}} diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 86db2ee1..9ade2b01 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -247,11 +247,11 @@ def assert_logical_streams_are_in_snowflake_and_are_empty(self): self.assertEqual(table_three, []) self.assertEqual(table_four, []) - def assert_binary_data_are_in_snowflake(self, should_metadata_columns_exist=False): + def assert_binary_data_are_in_snowflake(self, table_name, should_metadata_columns_exist=False): # Get loaded rows from tables snowflake = DbSync(self.config) target_schema = self.config.get('default_target_schema', '') - table_one = snowflake.query("SELECT * FROM {}.test_binary ORDER BY ID".format(target_schema)) + table_one = snowflake.query("SELECT * FROM {}.{} ORDER BY ID".format(target_schema, table_name)) # ---------------------------------------------------------------------- # Check rows in table_one @@ -370,6 +370,21 @@ def test_loading_tables_with_binary_columns_and_hard_delete(self): # Check if data loaded correctly and metadata columns exist self.assert_binary_data_are_in_snowflake( + table_name='test_binary', + should_metadata_columns_exist=True + ) + + def test_loading_table_with_reserved_word_as_name_and_hard_delete(self): + """Loading a table where the name is a reserved word with deleted rows""" + tap_lines = test_utils.get_test_tap_lines('messages-with-reserved-name-as-table-name.json') + + # Turning on hard delete mode + self.config['hard_delete'] = True + self.persist_lines_with_cache(tap_lines) + + # Check if data loaded correctly and metadata columns exist + self.assert_binary_data_are_in_snowflake( + table_name='"ORDER"', should_metadata_columns_exist=True ) @@ -664,7 +679,7 @@ def test_information_schema_cache_outdated(self): """.format(snowflake.pipelinewise_schema, target_schema)) # Loading into an outdated information_schema cache should fail with table not exists - with self.assertRaises(Exception): + with self.assertRaises(target_snowflake.InvalidTableStructureException): self.persist_lines_with_cache(tap_lines_with_multi_streams) # 2) Simulate an out of data cache: