Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
[AP-549] Support reserved words as table names (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti authored Mar 5, 2020
1 parent 9ae36cb commit 0231128
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 11 deletions.
3 changes: 2 additions & 1 deletion target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
try:
stream_to_sync[stream].create_schema_if_not_exists()
stream_to_sync[stream].sync_table()
except Exception:
except Exception as ex:
LOGGER.exception(ex)
raise InvalidTableStructureException("""
Cannot sync table structure in Snowflake schema: {} .
Try to delete {}.COLUMNS table to reset information_schema cache. Maybe it's outdated.
Expand Down
19 changes: 12 additions & 7 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,9 @@ def table_name(self, stream_name, is_temporary, without_schema = False):
sf_table_name = '{}_temp'.format(sf_table_name)

if without_schema:
return '{}'.format(sf_table_name)
return f'"{sf_table_name.upper()}"'

return '{}.{}'.format(self.schema_name, sf_table_name)
return f'{self.schema_name}."{sf_table_name.upper()}"'

def record_primary_key_string(self, record):
if len(self.stream_schema_message['key_properties']) == 0:
Expand Down Expand Up @@ -611,11 +611,14 @@ def update_columns(self):
stream_schema_message = self.stream_schema_message
stream = stream_schema_message['stream']
table_name = self.table_name(stream, False, True)
columns = []

if self.information_schema_columns:
columns = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and x['TABLE_NAME'].lower() == table_name, self.information_schema_columns))
columns = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and
f'"{x["TABLE_NAME"].upper()}"' == table_name,
self.information_schema_columns))
else:
columns = self.get_table_columns(table_schemas=[self.schema_name], table_name=table_name)

columns_dict = {column['COLUMN_NAME'].lower(): column for column in columns}

columns_to_add = [
Expand Down Expand Up @@ -681,12 +684,14 @@ def sync_table(self):
stream = stream_schema_message['stream']
table_name = self.table_name(stream, False, True)
table_name_with_schema = self.table_name(stream, False)
found_tables = []

if self.information_schema_columns:
found_tables = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and x['TABLE_NAME'].lower() == table_name, self.information_schema_columns))
found_tables = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and
f'"{x["TABLE_NAME"].upper()}"' == table_name,
self.information_schema_columns))
else:
found_tables = [table for table in (self.get_tables(self.schema_name.lower())) if table['TABLE_NAME'].lower() == table_name]
found_tables = [table for table in (self.get_tables(self.schema_name.lower()))
if f'"{table["TABLE_NAME"].upper()}"' == table_name]

if len(found_tables) == 0:
query = self.create_table_query()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{"type": "STATE", "value": {"currently_syncing": "my_db-order"}}
{"type": "SCHEMA", "stream": "my_db-order", "schema": {"properties": {"data": {"inclusion": "available", "format": "binary", "type": ["null", "string"]}, "id": {"inclusion": "automatic", "format": "binary", "type": ["null", "string"]}, "created_at": {"inclusion": "available", "format": "date-time", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["id"]}
{"type": "ACTIVATE_VERSION", "stream": "my_db-order", "version": 1576670613163}
{"type": "RECORD", "stream": "my_db-order", "record": {"data": "6461746132", "id": "706b32", "created_at": "2019-12-17T16:02:55+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"}
{"type": "RECORD", "stream": "my_db-order", "record": {"data": "64617461313030", "id": "706b33", "created_at": "2019-12-18T11:46:38+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"}
{"type": "RECORD", "stream": "my_db-order", "record": {"data": "6461746134", "id": "706b34", "created_at": "2019-12-17T16:32:22+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T12:03:33.174343Z"}
{"type": "STATE", "value": {"currently_syncing": "my_db-order", "bookmarks": {"my_db-order": {"version": 1576670613163}}}}
{"type": "ACTIVATE_VERSION", "stream": "my_db-order", "version": 1576670613163}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"my_db-order": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 945}}}}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"my_db-order": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 945}}}}
{"type": "SCHEMA", "stream": "my_db-order", "schema": {"properties": {"data": {"inclusion": "available", "format": "binary", "type": ["null", "string"]}, "created_at": {"inclusion": "available", "format": "date-time", "type": ["null", "string"]}, "id": {"inclusion": "automatic", "format": "binary", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["id"]}
{"type": "RECORD", "stream": "my_db-order", "record": {"id": "706b35", "data": "6461746135", "created_at": "2019-12-18T13:19:20+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"}
{"type": "RECORD", "stream": "my_db-order", "record": {"id": "706b35", "data": "64617461313030", "created_at": "2019-12-18T13:19:35+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"}
{"type": "RECORD", "stream": "my_db-order", "record": {"id": "706b33", "data": "64617461313030", "created_at": "2019-12-18T11:46:38+00:00", "_sdc_deleted_at": "2019-12-18T13:19:44+00:00+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"}
{"type": "RECORD", "stream": "my_db-order", "record": {"id": "706b35", "data": "64617461313030", "created_at": "2019-12-18T13:19:35+00:00", "_sdc_deleted_at": "2019-12-18T13:19:44+00:00+00:00"}, "version": 1576670613163, "time_extracted": "2019-12-18T13:24:31.441849Z"}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"my_db-order": {"version": 1576670613163, "log_file": "mysql-bin.000004", "log_pos": 1867}}}}
21 changes: 18 additions & 3 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@ def assert_logical_streams_are_in_snowflake_and_are_empty(self):
self.assertEqual(table_three, [])
self.assertEqual(table_four, [])

def assert_binary_data_are_in_snowflake(self, should_metadata_columns_exist=False):
def assert_binary_data_are_in_snowflake(self, table_name, should_metadata_columns_exist=False):
# Get loaded rows from tables
snowflake = DbSync(self.config)
target_schema = self.config.get('default_target_schema', '')
table_one = snowflake.query("SELECT * FROM {}.test_binary ORDER BY ID".format(target_schema))
table_one = snowflake.query("SELECT * FROM {}.{} ORDER BY ID".format(target_schema, table_name))

# ----------------------------------------------------------------------
# Check rows in table_one
Expand Down Expand Up @@ -370,6 +370,21 @@ def test_loading_tables_with_binary_columns_and_hard_delete(self):

# Check if data loaded correctly and metadata columns exist
self.assert_binary_data_are_in_snowflake(
table_name='test_binary',
should_metadata_columns_exist=True
)

def test_loading_table_with_reserved_word_as_name_and_hard_delete(self):
"""Loading a table where the name is a reserved word with deleted rows"""
tap_lines = test_utils.get_test_tap_lines('messages-with-reserved-name-as-table-name.json')

# Turning on hard delete mode
self.config['hard_delete'] = True
self.persist_lines_with_cache(tap_lines)

# Check if data loaded correctly and metadata columns exist
self.assert_binary_data_are_in_snowflake(
table_name='"ORDER"',
should_metadata_columns_exist=True
)

Expand Down Expand Up @@ -664,7 +679,7 @@ def test_information_schema_cache_outdated(self):
""".format(snowflake.pipelinewise_schema, target_schema))

# Loading into an outdated information_schema cache should fail with table not exists
with self.assertRaises(Exception):
with self.assertRaises(target_snowflake.InvalidTableStructureException):
self.persist_lines_with_cache(tap_lines_with_multi_streams)

# 2) Simulate an out of data cache:
Expand Down

0 comments on commit 0231128

Please sign in to comment.