Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
AP-1205 AP-1206 Fix issues with primary keys (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samira-El authored Apr 8, 2022
1 parent 2d4d891 commit f688bbd
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 17 deletions.
14 changes: 8 additions & 6 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,11 @@ def record_primary_key_string(self, record):

key_props = []
for key_prop in self.stream_schema_message['key_properties']:
if not flatten.get(key_prop):
if key_prop not in flatten or flatten[key_prop] is None:
raise PrimaryKeyNotFoundException(
f"Primary key '{key_prop}' does not exist in record or is null. "
f"Available fields: {list(flatten.keys())}")
f"Available fields: {list(flatten.keys())}"
)

key_props.append(str(flatten[key_prop]))

Expand Down Expand Up @@ -847,10 +848,11 @@ def _refresh_table_pks(self):
elif new_pks != current_pks:
self.logger.info('Changes detected in pk columns of table "%s", need to refresh PK.', table_name)
pk_list = ', '.join([safe_column_name(col) for col in new_pks])
queries.extend([
f'alter table {table_name} drop primary key;',
f'alter table {table_name} add primary key({pk_list});'
])

if current_pks:
queries.append(f'alter table {table_name} drop primary key;')

queries.append(f'alter table {table_name} add primary key({pk_list});')

# For now, we don't wish to enforce non-nullability on the pk columns
for pk in current_pks.union(new_pks):
Expand Down
13 changes: 13 additions & 0 deletions tests/integration/resources/messages-with-falsy-pk-values.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": ["id"]}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 1, "version": 1}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_simple_table", "version": 1}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 0, "results": "xyz1", "time_created": "2019-12-01T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 1, "results": "xyz1", "time_created": "2019-12-01T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 2, "results": "xyz2", "time_created": "2019-12-03T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "c_bool": {"type": ["boolean"]}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": ["id", "c_bool"]}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 2, "version": 1}}}}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 3, "c_bool": false, "results": "xyz3", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 4, "c_bool": true, "results": "xyz4", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 0, "c_bool": true, "results": "xyz4", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 5, "c_bool": false, "results": "xyz4", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 0, "c_bool": false, "results": "xyz4", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
13 changes: 13 additions & 0 deletions tests/integration/resources/messages-with-new-pk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": []}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 1, "version": 1}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_simple_table", "version": 1}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 1, "results": "xyz1", "time_created": "2019-12-01T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 2, "results": "xyz2", "time_created": "2019-12-03T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 2, "version": 1}}}}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 3, "results": "xyz3", "time_created": "2019-12-09T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 4, "results": "xyz4", "time_created": "2019-12-11T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 4, "version": 1}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "name": {"type": "string"}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": ["id", "name"]}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 5, "name": "A", "results": "xyz5", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 6, "name": "B", "results": "xyz6", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 6, "version": 1}}}}
43 changes: 43 additions & 0 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -1331,3 +1331,46 @@ def test_stream_with_null_values_in_pks_should_fail(self):

with self.assertRaises(PrimaryKeyNotFoundException):
self.persist_lines_with_cache(tap_lines)

def test_stream_with_new_pks_should_succeed(self):
"""Test if table will have new PKs after not having any"""
tap_lines = test_utils.get_test_tap_lines('messages-with-new-pk.json')

self.config['primary_key_required'] = False

self.persist_lines_with_cache(tap_lines)

table_desc = self.snowflake.query(f'desc table {self.config["default_target_schema"]}.test_simple_table;')
rows_count = self.snowflake.query(f'select count(1) as _count from'
f' {self.config["default_target_schema"]}.test_simple_table;')

self.assertEqual(6, rows_count[0]['_COUNT'])

self.assertEqual(4, len(table_desc))

self.assertEqual('ID', table_desc[0]['name'])
self.assertEqual('Y', table_desc[0]['null?'])
self.assertEqual('Y', table_desc[0]['primary key'])

self.assertEqual('RESULTS', table_desc[1]['name'])
self.assertEqual('Y', table_desc[1]['null?'])
self.assertEqual('N', table_desc[1]['primary key'])

self.assertEqual('TIME_CREATED', table_desc[2]['name'])
self.assertEqual('Y', table_desc[2]['null?'])
self.assertEqual('N', table_desc[2]['primary key'])

self.assertEqual('NAME', table_desc[3]['name'])
self.assertEqual('Y', table_desc[3]['null?'])
self.assertEqual('Y', table_desc[3]['primary key'])

def test_stream_with_falsy_pks_should_succeed(self):
"""Test if data will be loaded if records have falsy values"""
tap_lines = test_utils.get_test_tap_lines('messages-with-falsy-pk-values.json')

self.persist_lines_with_cache(tap_lines)

rows_count = self.snowflake.query(f'select count(1) as _count from'
f' {self.config["default_target_schema"]}.test_simple_table;')

self.assertEqual(8, rows_count[0]['_COUNT'])
84 changes: 73 additions & 11 deletions tests/unit/test_db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,14 @@ def test_create_query_tag(self):
self.assertIsNone(db_sync.create_query_tag(None))
self.assertEqual(db_sync.create_query_tag('This is a test query tag'), 'This is a test query tag')
self.assertEqual(db_sync.create_query_tag('Loading into {{database}}.{{schema}}.{{table}}',
database='test_database',
schema='test_schema',
table='test_table'), 'Loading into test_database.test_schema.test_table')
database='test_database',
schema='test_schema',
table='test_table'),
'Loading into test_database.test_schema.test_table')
self.assertEqual(db_sync.create_query_tag('Loading into {{database}}.{{schema}}.{{table}}',
database=None,
schema=None,
table=None), 'Loading into ..')
database=None,
schema=None,
table=None), 'Loading into ..')

# JSON formatted query tags with variables
json_query_tag = db_sync.create_query_tag(
Expand Down Expand Up @@ -200,7 +201,7 @@ def test_create_query_tag(self):

@patch('target_snowflake.db_sync.DbSync.query')
def test_parallelism(self, query_patch):
query_patch.return_value = [{ 'type': 'CSV' }]
query_patch.return_value = [{'type': 'CSV'}]

minimal_config = {
'account': "dummy-value",
Expand Down Expand Up @@ -288,8 +289,10 @@ def test_record_primary_key_string(self, query_patch):
stream_schema_message = {"stream": "public-table1",
"schema": {
"properties": {
"id": { "type": ["integer"]},
"c_str": {"type": ["null", "string"]}}},
"id": {"type": ["integer"]},
"c_str": {"type": ["null", "string"]},
"c_bool": {"type": ["boolean"]}
}},
"key_properties": ["id"]}

# Single primary key string
Expand Down Expand Up @@ -317,6 +320,16 @@ def test_record_primary_key_string(self, query_patch):
r"fields: \['id', 'c_str'\]"):
dbsync.record_primary_key_string({'id': None, 'c_str': 'xyz'})

# falsy PK field accepted
stream_schema_message['key_properties'] = ['id']
dbsync = db_sync.DbSync(minimal_config, stream_schema_message)
self.assertEqual(dbsync.record_primary_key_string({'id': 0, 'c_str': 'xyz'}), '0')

# falsy PK field accepted
stream_schema_message['key_properties'] = ['id', 'c_bool']
dbsync = db_sync.DbSync(minimal_config, stream_schema_message)
self.assertEqual(dbsync.record_primary_key_string({'id': 1, 'c_bool': False, 'c_str': 'xyz'}), '1,False')

@patch('target_snowflake.db_sync.DbSync.query')
@patch('target_snowflake.db_sync.DbSync._load_file_merge')
def test_merge_failure_message(self, load_file_merge_patch, query_patch):
Expand Down Expand Up @@ -503,7 +516,7 @@ def test_sync_table_with_new_pk_in_stream(self, query_patch):
self.assertEqual('alter table dummy-schema."TABLE1" drop primary key;', calls[2][0][0][0])

self.assertIn(calls[2][0][0][1], {'alter table dummy-schema."TABLE1" add primary key("ID", "NAME");',
'alter table dummy-schema."TABLE1" add primary key("NAME", "ID");'})
'alter table dummy-schema."TABLE1" add primary key("NAME", "ID");'})

self.assertListEqual(sorted(calls[2][0][0][2:]),
[
Expand Down Expand Up @@ -559,4 +572,53 @@ def test_sync_table_with_stream_that_changes_to_have_no_pk(self, query_patch):
call('show primary keys in table dummy-db.dummy-schema."TABLE1";'),
call(['alter table dummy-schema."TABLE1" drop primary key;',
'alter table dummy-schema."TABLE1" alter column "ID" drop not null;'])
])
])

@patch('target_snowflake.db_sync.DbSync.query')
def test_sync_table_with_stream_that_has_no_pk_but_get_a_new_one(self, query_patch):
minimal_config = {
'account': "dummy-account",
'dbname': "dummy-db",
'user': "dummy-user",
'password': "dummy-passwd",
'warehouse': "dummy-wh",
'default_target_schema': "dummy-schema",
'file_format': "dummy-file-format"
}

stream_schema_message = {"stream": "public-table1",
"schema": {
"properties": {
"id": {"type": ["integer"]},
"c_str": {"type": ["null", "string"]}}},
"key_properties": ['id']}

table_cache = [
{
'SCHEMA_NAME': 'DUMMY-SCHEMA',
'TABLE_NAME': 'TABLE1',
'COLUMN_NAME': 'ID',
'DATA_TYPE': 'NUMBER'
},
{
'SCHEMA_NAME': 'DUMMY-SCHEMA',
'TABLE_NAME': 'TABLE1',
'COLUMN_NAME': 'C_STR',
'DATA_TYPE': 'TEXT'
}
]
query_patch.side_effect = [
[{'type': 'CSV'}],
[],
None
]

dbsync = db_sync.DbSync(minimal_config, stream_schema_message, table_cache)
dbsync.sync_table()

query_patch.assert_has_calls([
call('SHOW FILE FORMATS LIKE \'dummy-file-format\''),
call('show primary keys in table dummy-db.dummy-schema."TABLE1";'),
call(['alter table dummy-schema."TABLE1" add primary key("ID");',
'alter table dummy-schema."TABLE1" alter column "ID" drop not null;'])
])

0 comments on commit f688bbd

Please sign in to comment.