From 34110fb541a3350a412ddf9c3eff378f177e5359 Mon Sep 17 00:00:00 2001 From: Louis Pieterse <45560107+louispietw@users.noreply.github.com> Date: Wed, 10 Jul 2019 15:47:16 +0100 Subject: [PATCH] Attempt ALTER of COLUMN instead of DROP/ADD (#11) * Attempt ALTER of COLUMN instead of DROP/ADD * Instead of DROPPING the exiting column, RENAME it * improve naming * formatting * Use _ instead of - * Tests added * Add missing test json --- target_snowflake/db_sync.py | 18 ++++--- ...es-with-three-streams-modified-column.json | 24 +++++++++ tests/integration/test_target_snowflake.py | 53 +++++++++++++++++++ 3 files changed, 89 insertions(+), 6 deletions(-) create mode 100644 tests/integration/resources/messages-with-three-streams-modified-column.json diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index c834a6b6..735f708d 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -566,19 +566,25 @@ def update_columns(self, table_columns_cache=None): ] for (column_name, column) in columns_to_replace: - self.drop_column(column_name, stream) + # self.drop_column(column_name, stream) + self.version_column(column_name, stream) self.add_column(column, stream) - def add_column(self, column, stream): - add_column = "ALTER TABLE {} ADD COLUMN {}".format(self.table_name(stream, False), column) - logger.info('Adding column: {}'.format(add_column)) - self.query(add_column) - def drop_column(self, column_name, stream): drop_column = "ALTER TABLE {} DROP COLUMN {}".format(self.table_name(stream, False), column_name) logger.info('Dropping column: {}'.format(drop_column)) self.query(drop_column) + def version_column(self, column_name, stream): + version_column = "ALTER TABLE {} RENAME COLUMN {} TO \"{}_{}\"".format(self.table_name(stream, False), column_name, column_name.replace("\"",""), time.strftime("%Y%m%d_%H%M")) + logger.info('Dropping column: {}'.format(version_column)) + self.query(version_column) + + def add_column(self, column, stream): + add_column = "ALTER TABLE {} ADD COLUMN {}".format(self.table_name(stream, False), column) + logger.info('Adding column: {}'.format(add_column)) + self.query(add_column) + def sync_table(self, table_columns_cache=None): stream_schema_message = self.stream_schema_message stream = stream_schema_message['stream'] diff --git a/tests/integration/resources/messages-with-three-streams-modified-column.json b/tests/integration/resources/messages-with-three-streams-modified-column.json new file mode 100644 index 00000000..1ebed08b --- /dev/null +++ b/tests/integration/resources/messages-with-three-streams-modified-column.json @@ -0,0 +1,24 @@ +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_one"}} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_one", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_one", "version": 1} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_one", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_one"}} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_one", "version": 1} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_one", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}}}} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_two", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_two", "version": 3} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-12 02:00:00"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_date": "2019-02-15 02:00:00"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_wo": {"initial_full_table_complete": true}}}} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 3} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_three", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_three", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_time_renamed": {"format": "time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 2} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_three", "record": {"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_time_renamed": "08:15:00"}, "version": 2, "time_extracted": "2019-01-31T15:51:50.215998Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_three", "record": {"c_pk": 4, "c_varchar": "4", "c_int": 4, "c_time_renamed": "23:00:03", "_sdc_deleted_at": "2019-02-10T15:51:50.215998Z"}, "version": 2, "time_extracted": "2019-01-31T15:51:50.215998Z"} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_three", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 2} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_three", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}} diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 444777ea..8f84e4d7 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -342,3 +342,56 @@ def test_nested_schema_flattening(self): 'C_NESTED_OBJECT__NESTED_PROP_3__MULTI_NESTED_PROP_2': 'multi_value_2', }]) + + def test_column_name_change(self): + """Tests correct renaming of snowflake columns after source change""" + tap_lines_before_column_name_change = test_utils.get_test_tap_lines('messages-with-three-streams.json') + tap_lines_after_column_name_change = test_utils.get_test_tap_lines('messages-with-three-streams-modified-column.json') + + # Load with default settings + target_snowflake.persist_lines(self.config, tap_lines_before_column_name_change) + target_snowflake.persist_lines(self.config, tap_lines_after_column_name_change) + + # Get loaded rows from tables + snowflake = DbSync(self.config) + target_schema = self.config.get('default_target_schema', '') + table_one = snowflake.query("SELECT * FROM {}.test_table_one ORDER BY c_pk".format(target_schema)) + table_two = snowflake.query("SELECT * FROM {}.test_table_two ORDER BY c_pk".format(target_schema)) + table_three = snowflake.query("SELECT * FROM {}.test_table_three ORDER BY c_pk".format(target_schema)) + + # Get the previous column name from information schema in test_table_two + previous_column_name = snowflake.query(""" + SELECT column_name + FROM information_schema.columns + WHERE table_catalog = '{}' + AND table_schema = '{}' + AND table_name = 'TEST_TABLE_TWO' + AND ordinal_position = 1 + """.format( + self.config.get('dbname', '').upper(), + target_schema.upper()))[0]["COLUMN_NAME"] + + # Table one should have no changes + self.assertEqual( + table_one, + [{'C_INT': 1, 'C_PK': 1, 'C_VARCHAR': '1'}]) + + # Table two should have versioned column + self.assertEquals( + table_two, + [ + {previous_column_name: datetime.datetime(2019, 2, 1, 15, 12, 45), 'C_INT': 1, 'C_PK': 1, 'C_VARCHAR': '1', 'C_DATE': None}, + {previous_column_name: datetime.datetime(2019, 2, 10, 2), 'C_INT': 2, 'C_PK': 2, 'C_VARCHAR': '2', 'C_DATE': '2019-02-12 02:00:00'}, + {previous_column_name: None, 'C_INT': 3, 'C_PK': 3, 'C_VARCHAR': '2', 'C_DATE': '2019-02-15 02:00:00'} + ] + ) + + # Table three should have renamed columns + self.assertEqual( + table_three, + [ + {'C_INT': 1, 'C_PK': 1, 'C_TIME': datetime.time(4, 0), 'C_VARCHAR': '1', 'C_TIME_RENAMED': None}, + {'C_INT': 2, 'C_PK': 2, 'C_TIME': datetime.time(7, 15), 'C_VARCHAR': '2', 'C_TIME_RENAMED': None}, + {'C_INT': 3, 'C_PK': 3, 'C_TIME': datetime.time(23, 0, 3), 'C_VARCHAR': '3', 'C_TIME_RENAMED': datetime.time(8, 15)}, + {'C_INT': 4, 'C_PK': 4, 'C_TIME': None, 'C_VARCHAR': '4', 'C_TIME_RENAMED': datetime.time(23, 0, 3)} + ])