diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index a7a396a1..a0ae6bdb 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -572,7 +572,7 @@ def get_tables(self, table_schemas=[]): # Get column data types by SHOW COLUMNS show_tables = f"SHOW TERSE TABLES IN SCHEMA {self.connection_config['dbname']}.{schema}" - # Convert output of SHOW COLUMNS to table and insert restuls into the cache COLUMNS table + # Convert output of SHOW TABLES to table select = f""" SELECT "schema_name" AS schema_name ,"name" AS table_name @@ -581,15 +581,25 @@ def get_tables(self, table_schemas=[]): queries.extend([show_tables, select]) # Run everything in one transaction - self.query(show_tables, max_records=9999) + try: + tables = self.query(queries, max_records=9999) + + # Catch exception when schema not exists and SHOW TABLES throws a ProgrammingError + # Regexp to extract snowflake error code and message from the exception message + # Do nothing if schema not exists + except snowflake.connector.errors.ProgrammingError as exc: + if re.match('002043 \(02000\):.*\n.*does not exist.*', str(sys.exc_info()[1])): + pass + else: + raise exc else: raise Exception("Cannot get table columns. List of table schemas empty") return tables - def get_table_columns(self, table_schemas=[], table_name=None): + def get_table_columns(self, table_schemas=[]): table_columns = [] - if table_schemas or table_name: + if table_schemas: for schema in table_schemas: queries = [] @@ -643,13 +653,17 @@ def update_columns(self): stream_schema_message = self.stream_schema_message stream = stream_schema_message['stream'] table_name = self.table_name(stream, False, True) + all_table_columns = [] if self.table_cache: - columns = list(filter(lambda x: x['SCHEMA_NAME'] == self.schema_name.upper() and - f'"{x["TABLE_NAME"].upper()}"' == table_name, - self.table_cache)) + all_table_columns = self.table_cache else: - columns = self.get_table_columns(table_schemas=[self.schema_name], table_name=table_name) + all_table_columns = self.get_table_columns(table_schemas=[self.schema_name]) + + # Find the specific table + columns = list(filter(lambda x: x['SCHEMA_NAME'] == self.schema_name.upper() and + f'"{x["TABLE_NAME"].upper()}"' == table_name, + all_table_columns)) columns_dict = {column['COLUMN_NAME'].upper(): column for column in columns} diff --git a/tests/integration/resources/messages-with-three-streams-modified-column.json b/tests/integration/resources/messages-with-three-streams-modified-column.json index 1ebed08b..38bf8b25 100644 --- a/tests/integration/resources/messages-with-three-streams-modified-column.json +++ b/tests/integration/resources/messages-with-three-streams-modified-column.json @@ -6,18 +6,18 @@ {"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_one", "version": 1} {"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_one", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}}}} {"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}} -{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_two", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_two", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"inclusion": "available", "type": ["null", "string"]}, "c_new_column": {"inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]} {"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_two", "version": 3} -{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-12 02:00:00"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} -{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_date": "2019-02-15 02:00:00"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-12 02:00:00", "c_new_column": "data 1"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_date": "2019-02-15 02:00:00", "c_new_column": "data 2"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} {"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_wo": {"initial_full_table_complete": true}}}} {"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 3} {"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}} {"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_three", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}} -{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_three", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_time_renamed": {"format": "time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_three", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_new_column": {"inclusion": "available", "type": ["null", "string"]}, "c_time_renamed": {"format": "time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]} {"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 2} -{"type": "RECORD", "stream": "tap_mysql_test-test_table_three", "record": {"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_time_renamed": "08:15:00"}, "version": 2, "time_extracted": "2019-01-31T15:51:50.215998Z"} -{"type": "RECORD", "stream": "tap_mysql_test-test_table_three", "record": {"c_pk": 4, "c_varchar": "4", "c_int": 4, "c_time_renamed": "23:00:03", "_sdc_deleted_at": "2019-02-10T15:51:50.215998Z"}, "version": 2, "time_extracted": "2019-01-31T15:51:50.215998Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_three", "record": {"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_new_column": "data 1", "c_time_renamed": "08:15:00"}, "version": 2, "time_extracted": "2019-01-31T15:51:50.215998Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_three", "record": {"c_pk": 4, "c_varchar": "4", "c_int": 4, "c_new_column": "data 2", "c_time_renamed": "23:00:03", "_sdc_deleted_at": "2019-02-10T15:51:50.215998Z"}, "version": 2, "time_extracted": "2019-01-31T15:51:50.215998Z"} {"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_three", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}} {"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 2} {"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_three", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}} diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 6f1c8188..4f6026e9 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -38,6 +38,10 @@ def setUp(self): if self.config['default_target_schema']: snowflake.query("DROP SCHEMA IF EXISTS {}".format(self.config['default_target_schema'])) + def persist_lines(self, lines): + """Loads singer messages into snowflake without table caching option""" + target_snowflake.persist_lines(self.config, lines) + def persist_lines_with_cache(self, lines): """Enables table caching option and loads singer messages into snowflake. @@ -571,27 +575,92 @@ def test_column_name_change(self): table_one, [{'C_INT': 1, 'C_PK': 1, 'C_VARCHAR': '1'}]) - # Table two should have versioned column + # Table two should have a versioned column and a new column + self.assertEquals( + table_two, + [ + {previous_column_name: datetime.datetime(2019, 2, 1, 15, 12, 45), 'C_INT': 1, 'C_PK': 1, + 'C_VARCHAR': '1', 'C_DATE': None, 'C_NEW_COLUMN': None}, + {previous_column_name: datetime.datetime(2019, 2, 10, 2), 'C_INT': 2, 'C_PK': 2, 'C_VARCHAR': '2', + 'C_DATE': '2019-02-12 02:00:00', 'C_NEW_COLUMN': 'data 1'}, + {previous_column_name: None, 'C_INT': 3, 'C_PK': 3, 'C_VARCHAR': '2', 'C_DATE': '2019-02-15 02:00:00', + 'C_NEW_COLUMN': 'data 2'} + ] + ) + + # Table three should have a renamed columns and a new column + self.assertEqual( + table_three, + [ + {'C_INT': 1, 'C_PK': 1, 'C_TIME': datetime.time(4, 0), 'C_VARCHAR': '1', 'C_TIME_RENAMED': None, + 'C_NEW_COLUMN': None}, + {'C_INT': 2, 'C_PK': 2, 'C_TIME': datetime.time(7, 15), 'C_VARCHAR': '2', 'C_TIME_RENAMED': None, + 'C_NEW_COLUMN': None}, + {'C_INT': 3, 'C_PK': 3, 'C_TIME': datetime.time(23, 0, 3), 'C_VARCHAR': '3', + 'C_TIME_RENAMED': datetime.time(8, 15), 'C_NEW_COLUMN': 'data 1'}, + {'C_INT': 4, 'C_PK': 4, 'C_TIME': None, 'C_VARCHAR': '4', 'C_TIME_RENAMED': datetime.time(23, 0, 3), + 'C_NEW_COLUMN': 'data 2'} + ]) + + def test_column_name_change_without_table_cache(self): + """Tests correct renaming of snowflake columns after source change with not using table caching""" + tap_lines_before_column_name_change = test_utils.get_test_tap_lines('messages-with-three-streams.json') + tap_lines_after_column_name_change = test_utils.get_test_tap_lines( + 'messages-with-three-streams-modified-column.json') + + # Load with default settings + self.persist_lines(tap_lines_before_column_name_change) + self.persist_lines(tap_lines_after_column_name_change) + + # Get loaded rows from tables + snowflake = DbSync(self.config) + target_schema = self.config.get('default_target_schema', '') + table_one = snowflake.query("SELECT * FROM {}.test_table_one ORDER BY c_pk".format(target_schema)) + table_two = snowflake.query("SELECT * FROM {}.test_table_two ORDER BY c_pk".format(target_schema)) + table_three = snowflake.query("SELECT * FROM {}.test_table_three ORDER BY c_pk".format(target_schema)) + + # Get the previous column name from information schema in test_table_two + previous_column_name = snowflake.query(""" + SELECT column_name + FROM information_schema.columns + WHERE table_catalog = '{}' + AND table_schema = '{}' + AND table_name = 'TEST_TABLE_TWO' + AND ordinal_position = 1 + """.format( + self.config.get('dbname', '').upper(), + target_schema.upper()))[0]["COLUMN_NAME"] + + # Table one should have no changes + self.assertEqual( + table_one, + [{'C_INT': 1, 'C_PK': 1, 'C_VARCHAR': '1'}]) + + # Table two should have a versioned column and a new column self.assertEquals( table_two, [ {previous_column_name: datetime.datetime(2019, 2, 1, 15, 12, 45), 'C_INT': 1, 'C_PK': 1, - 'C_VARCHAR': '1', 'C_DATE': None}, + 'C_VARCHAR': '1', 'C_DATE': None, 'C_NEW_COLUMN': None}, {previous_column_name: datetime.datetime(2019, 2, 10, 2), 'C_INT': 2, 'C_PK': 2, 'C_VARCHAR': '2', - 'C_DATE': '2019-02-12 02:00:00'}, - {previous_column_name: None, 'C_INT': 3, 'C_PK': 3, 'C_VARCHAR': '2', 'C_DATE': '2019-02-15 02:00:00'} + 'C_DATE': '2019-02-12 02:00:00', 'C_NEW_COLUMN': 'data 1'}, + {previous_column_name: None, 'C_INT': 3, 'C_PK': 3, 'C_VARCHAR': '2', 'C_DATE': '2019-02-15 02:00:00', + 'C_NEW_COLUMN': 'data 2'} ] ) - # Table three should have renamed columns + # Table three should have a renamed columns and a new column self.assertEqual( table_three, [ - {'C_INT': 1, 'C_PK': 1, 'C_TIME': datetime.time(4, 0), 'C_VARCHAR': '1', 'C_TIME_RENAMED': None}, - {'C_INT': 2, 'C_PK': 2, 'C_TIME': datetime.time(7, 15), 'C_VARCHAR': '2', 'C_TIME_RENAMED': None}, + {'C_INT': 1, 'C_PK': 1, 'C_TIME': datetime.time(4, 0), 'C_VARCHAR': '1', 'C_TIME_RENAMED': None, + 'C_NEW_COLUMN': None}, + {'C_INT': 2, 'C_PK': 2, 'C_TIME': datetime.time(7, 15), 'C_VARCHAR': '2', 'C_TIME_RENAMED': None, + 'C_NEW_COLUMN': None}, {'C_INT': 3, 'C_PK': 3, 'C_TIME': datetime.time(23, 0, 3), 'C_VARCHAR': '3', - 'C_TIME_RENAMED': datetime.time(8, 15)}, - {'C_INT': 4, 'C_PK': 4, 'C_TIME': None, 'C_VARCHAR': '4', 'C_TIME_RENAMED': datetime.time(23, 0, 3)} + 'C_TIME_RENAMED': datetime.time(8, 15), 'C_NEW_COLUMN': 'data 1'}, + {'C_INT': 4, 'C_PK': 4, 'C_TIME': None, 'C_VARCHAR': '4', 'C_TIME_RENAMED': datetime.time(23, 0, 3), + 'C_NEW_COLUMN': 'data 2'} ]) def test_logical_streams_from_pg_with_hard_delete_and_default_batch_size_should_pass(self):