Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Attempt ALTER of COLUMN instead of DROP/ADD (#11)
Browse files Browse the repository at this point in the history
* Attempt ALTER of COLUMN instead of DROP/ADD

* Instead of DROPPING the exiting column, RENAME it

* improve naming

* formatting

* Use _ instead of -

* Tests added

* Add missing test json
  • Loading branch information
louis-pie authored and koszti committed Jul 10, 2019
1 parent 2de6919 commit 34110fb
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 6 deletions.
18 changes: 12 additions & 6 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,19 +566,25 @@ def update_columns(self, table_columns_cache=None):
]

for (column_name, column) in columns_to_replace:
self.drop_column(column_name, stream)
# self.drop_column(column_name, stream)
self.version_column(column_name, stream)
self.add_column(column, stream)

def add_column(self, column, stream):
add_column = "ALTER TABLE {} ADD COLUMN {}".format(self.table_name(stream, False), column)
logger.info('Adding column: {}'.format(add_column))
self.query(add_column)

def drop_column(self, column_name, stream):
drop_column = "ALTER TABLE {} DROP COLUMN {}".format(self.table_name(stream, False), column_name)
logger.info('Dropping column: {}'.format(drop_column))
self.query(drop_column)

def version_column(self, column_name, stream):
version_column = "ALTER TABLE {} RENAME COLUMN {} TO \"{}_{}\"".format(self.table_name(stream, False), column_name, column_name.replace("\"",""), time.strftime("%Y%m%d_%H%M"))
logger.info('Dropping column: {}'.format(version_column))
self.query(version_column)

def add_column(self, column, stream):
add_column = "ALTER TABLE {} ADD COLUMN {}".format(self.table_name(stream, False), column)
logger.info('Adding column: {}'.format(add_column))
self.query(add_column)

def sync_table(self, table_columns_cache=None):
stream_schema_message = self.stream_schema_message
stream = stream_schema_message['stream']
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_one"}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_one", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_one", "version": 1}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_one", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_one"}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_one", "version": 1}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_one", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}}}}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_two", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_two", "version": 3}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-12 02:00:00"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_date": "2019-02-15 02:00:00"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_wo": {"initial_full_table_complete": true}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 3}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_three", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_three", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_time_renamed": {"format": "time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 2}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_three", "record": {"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_time_renamed": "08:15:00"}, "version": 2, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_three", "record": {"c_pk": 4, "c_varchar": "4", "c_int": 4, "c_time_renamed": "23:00:03", "_sdc_deleted_at": "2019-02-10T15:51:50.215998Z"}, "version": 2, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_three", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 2}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_three", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}}
53 changes: 53 additions & 0 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,56 @@ def test_nested_schema_flattening(self):
'C_NESTED_OBJECT__NESTED_PROP_3__MULTI_NESTED_PROP_2': 'multi_value_2',
}])


def test_column_name_change(self):
"""Tests correct renaming of snowflake columns after source change"""
tap_lines_before_column_name_change = test_utils.get_test_tap_lines('messages-with-three-streams.json')
tap_lines_after_column_name_change = test_utils.get_test_tap_lines('messages-with-three-streams-modified-column.json')

# Load with default settings
target_snowflake.persist_lines(self.config, tap_lines_before_column_name_change)
target_snowflake.persist_lines(self.config, tap_lines_after_column_name_change)

# Get loaded rows from tables
snowflake = DbSync(self.config)
target_schema = self.config.get('default_target_schema', '')
table_one = snowflake.query("SELECT * FROM {}.test_table_one ORDER BY c_pk".format(target_schema))
table_two = snowflake.query("SELECT * FROM {}.test_table_two ORDER BY c_pk".format(target_schema))
table_three = snowflake.query("SELECT * FROM {}.test_table_three ORDER BY c_pk".format(target_schema))

# Get the previous column name from information schema in test_table_two
previous_column_name = snowflake.query("""
SELECT column_name
FROM information_schema.columns
WHERE table_catalog = '{}'
AND table_schema = '{}'
AND table_name = 'TEST_TABLE_TWO'
AND ordinal_position = 1
""".format(
self.config.get('dbname', '').upper(),
target_schema.upper()))[0]["COLUMN_NAME"]

# Table one should have no changes
self.assertEqual(
table_one,
[{'C_INT': 1, 'C_PK': 1, 'C_VARCHAR': '1'}])

# Table two should have versioned column
self.assertEquals(
table_two,
[
{previous_column_name: datetime.datetime(2019, 2, 1, 15, 12, 45), 'C_INT': 1, 'C_PK': 1, 'C_VARCHAR': '1', 'C_DATE': None},
{previous_column_name: datetime.datetime(2019, 2, 10, 2), 'C_INT': 2, 'C_PK': 2, 'C_VARCHAR': '2', 'C_DATE': '2019-02-12 02:00:00'},
{previous_column_name: None, 'C_INT': 3, 'C_PK': 3, 'C_VARCHAR': '2', 'C_DATE': '2019-02-15 02:00:00'}
]
)

# Table three should have renamed columns
self.assertEqual(
table_three,
[
{'C_INT': 1, 'C_PK': 1, 'C_TIME': datetime.time(4, 0), 'C_VARCHAR': '1', 'C_TIME_RENAMED': None},
{'C_INT': 2, 'C_PK': 2, 'C_TIME': datetime.time(7, 15), 'C_VARCHAR': '2', 'C_TIME_RENAMED': None},
{'C_INT': 3, 'C_PK': 3, 'C_TIME': datetime.time(23, 0, 3), 'C_VARCHAR': '3', 'C_TIME_RENAMED': datetime.time(8, 15)},
{'C_INT': 4, 'C_PK': 4, 'C_TIME': None, 'C_VARCHAR': '4', 'C_TIME_RENAMED': datetime.time(23, 0, 3)}
])

0 comments on commit 34110fb

Please sign in to comment.