From f688bbd989441d2515fb76d848640709d2028672 Mon Sep 17 00:00:00 2001 From: Samira El Aabidi <54845154+Samira-El@users.noreply.github.com> Date: Fri, 8 Apr 2022 12:48:50 +0300 Subject: [PATCH] AP-1205 AP-1206 Fix issues with primary keys (#262) --- target_snowflake/db_sync.py | 14 ++-- .../messages-with-falsy-pk-values.json | 13 +++ .../resources/messages-with-new-pk.json | 13 +++ tests/integration/test_target_snowflake.py | 43 ++++++++++ tests/unit/test_db_sync.py | 84 ++++++++++++++++--- 5 files changed, 150 insertions(+), 17 deletions(-) create mode 100644 tests/integration/resources/messages-with-falsy-pk-values.json create mode 100644 tests/integration/resources/messages-with-new-pk.json diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 2e48451f..64efcac9 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -378,10 +378,11 @@ def record_primary_key_string(self, record): key_props = [] for key_prop in self.stream_schema_message['key_properties']: - if not flatten.get(key_prop): + if key_prop not in flatten or flatten[key_prop] is None: raise PrimaryKeyNotFoundException( f"Primary key '{key_prop}' does not exist in record or is null. " - f"Available fields: {list(flatten.keys())}") + f"Available fields: {list(flatten.keys())}" + ) key_props.append(str(flatten[key_prop])) @@ -847,10 +848,11 @@ def _refresh_table_pks(self): elif new_pks != current_pks: self.logger.info('Changes detected in pk columns of table "%s", need to refresh PK.', table_name) pk_list = ', '.join([safe_column_name(col) for col in new_pks]) - queries.extend([ - f'alter table {table_name} drop primary key;', - f'alter table {table_name} add primary key({pk_list});' - ]) + + if current_pks: + queries.append(f'alter table {table_name} drop primary key;') + + queries.append(f'alter table {table_name} add primary key({pk_list});') # For now, we don't wish to enforce non-nullability on the pk columns for pk in current_pks.union(new_pks): diff --git a/tests/integration/resources/messages-with-falsy-pk-values.json b/tests/integration/resources/messages-with-falsy-pk-values.json new file mode 100644 index 00000000..edd665b8 --- /dev/null +++ b/tests/integration/resources/messages-with-falsy-pk-values.json @@ -0,0 +1,13 @@ +{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": ["id"]} +{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 1, "version": 1}}}} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_simple_table", "version": 1} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 0, "results": "xyz1", "time_created": "2019-12-01T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 1, "results": "xyz1", "time_created": "2019-12-01T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 2, "results": "xyz2", "time_created": "2019-12-03T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "c_bool": {"type": ["boolean"]}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": ["id", "c_bool"]} +{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 2, "version": 1}}}} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 3, "c_bool": false, "results": "xyz3", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 4, "c_bool": true, "results": "xyz4", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 0, "c_bool": true, "results": "xyz4", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 5, "c_bool": false, "results": "xyz4", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 0, "c_bool": false, "results": "xyz4", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} \ No newline at end of file diff --git a/tests/integration/resources/messages-with-new-pk.json b/tests/integration/resources/messages-with-new-pk.json new file mode 100644 index 00000000..9fb431d3 --- /dev/null +++ b/tests/integration/resources/messages-with-new-pk.json @@ -0,0 +1,13 @@ +{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": []} +{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 1, "version": 1}}}} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_simple_table", "version": 1} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 1, "results": "xyz1", "time_created": "2019-12-01T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 2, "results": "xyz2", "time_created": "2019-12-03T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 2, "version": 1}}}} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 3, "results": "xyz3", "time_created": "2019-12-09T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 4, "results": "xyz4", "time_created": "2019-12-11T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 4, "version": 1}}}} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "name": {"type": "string"}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": ["id", "name"]} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 5, "name": "A", "results": "xyz5", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 6, "name": "B", "results": "xyz6", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"} +{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 6, "version": 1}}}} \ No newline at end of file diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index d1ed9776..791acda2 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1331,3 +1331,46 @@ def test_stream_with_null_values_in_pks_should_fail(self): with self.assertRaises(PrimaryKeyNotFoundException): self.persist_lines_with_cache(tap_lines) + + def test_stream_with_new_pks_should_succeed(self): + """Test if table will have new PKs after not having any""" + tap_lines = test_utils.get_test_tap_lines('messages-with-new-pk.json') + + self.config['primary_key_required'] = False + + self.persist_lines_with_cache(tap_lines) + + table_desc = self.snowflake.query(f'desc table {self.config["default_target_schema"]}.test_simple_table;') + rows_count = self.snowflake.query(f'select count(1) as _count from' + f' {self.config["default_target_schema"]}.test_simple_table;') + + self.assertEqual(6, rows_count[0]['_COUNT']) + + self.assertEqual(4, len(table_desc)) + + self.assertEqual('ID', table_desc[0]['name']) + self.assertEqual('Y', table_desc[0]['null?']) + self.assertEqual('Y', table_desc[0]['primary key']) + + self.assertEqual('RESULTS', table_desc[1]['name']) + self.assertEqual('Y', table_desc[1]['null?']) + self.assertEqual('N', table_desc[1]['primary key']) + + self.assertEqual('TIME_CREATED', table_desc[2]['name']) + self.assertEqual('Y', table_desc[2]['null?']) + self.assertEqual('N', table_desc[2]['primary key']) + + self.assertEqual('NAME', table_desc[3]['name']) + self.assertEqual('Y', table_desc[3]['null?']) + self.assertEqual('Y', table_desc[3]['primary key']) + + def test_stream_with_falsy_pks_should_succeed(self): + """Test if data will be loaded if records have falsy values""" + tap_lines = test_utils.get_test_tap_lines('messages-with-falsy-pk-values.json') + + self.persist_lines_with_cache(tap_lines) + + rows_count = self.snowflake.query(f'select count(1) as _count from' + f' {self.config["default_target_schema"]}.test_simple_table;') + + self.assertEqual(8, rows_count[0]['_COUNT']) diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index 5301d404..e63c86a2 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -150,13 +150,14 @@ def test_create_query_tag(self): self.assertIsNone(db_sync.create_query_tag(None)) self.assertEqual(db_sync.create_query_tag('This is a test query tag'), 'This is a test query tag') self.assertEqual(db_sync.create_query_tag('Loading into {{database}}.{{schema}}.{{table}}', - database='test_database', - schema='test_schema', - table='test_table'), 'Loading into test_database.test_schema.test_table') + database='test_database', + schema='test_schema', + table='test_table'), + 'Loading into test_database.test_schema.test_table') self.assertEqual(db_sync.create_query_tag('Loading into {{database}}.{{schema}}.{{table}}', - database=None, - schema=None, - table=None), 'Loading into ..') + database=None, + schema=None, + table=None), 'Loading into ..') # JSON formatted query tags with variables json_query_tag = db_sync.create_query_tag( @@ -200,7 +201,7 @@ def test_create_query_tag(self): @patch('target_snowflake.db_sync.DbSync.query') def test_parallelism(self, query_patch): - query_patch.return_value = [{ 'type': 'CSV' }] + query_patch.return_value = [{'type': 'CSV'}] minimal_config = { 'account': "dummy-value", @@ -288,8 +289,10 @@ def test_record_primary_key_string(self, query_patch): stream_schema_message = {"stream": "public-table1", "schema": { "properties": { - "id": { "type": ["integer"]}, - "c_str": {"type": ["null", "string"]}}}, + "id": {"type": ["integer"]}, + "c_str": {"type": ["null", "string"]}, + "c_bool": {"type": ["boolean"]} + }}, "key_properties": ["id"]} # Single primary key string @@ -317,6 +320,16 @@ def test_record_primary_key_string(self, query_patch): r"fields: \['id', 'c_str'\]"): dbsync.record_primary_key_string({'id': None, 'c_str': 'xyz'}) + # falsy PK field accepted + stream_schema_message['key_properties'] = ['id'] + dbsync = db_sync.DbSync(minimal_config, stream_schema_message) + self.assertEqual(dbsync.record_primary_key_string({'id': 0, 'c_str': 'xyz'}), '0') + + # falsy PK field accepted + stream_schema_message['key_properties'] = ['id', 'c_bool'] + dbsync = db_sync.DbSync(minimal_config, stream_schema_message) + self.assertEqual(dbsync.record_primary_key_string({'id': 1, 'c_bool': False, 'c_str': 'xyz'}), '1,False') + @patch('target_snowflake.db_sync.DbSync.query') @patch('target_snowflake.db_sync.DbSync._load_file_merge') def test_merge_failure_message(self, load_file_merge_patch, query_patch): @@ -503,7 +516,7 @@ def test_sync_table_with_new_pk_in_stream(self, query_patch): self.assertEqual('alter table dummy-schema."TABLE1" drop primary key;', calls[2][0][0][0]) self.assertIn(calls[2][0][0][1], {'alter table dummy-schema."TABLE1" add primary key("ID", "NAME");', - 'alter table dummy-schema."TABLE1" add primary key("NAME", "ID");'}) + 'alter table dummy-schema."TABLE1" add primary key("NAME", "ID");'}) self.assertListEqual(sorted(calls[2][0][0][2:]), [ @@ -559,4 +572,53 @@ def test_sync_table_with_stream_that_changes_to_have_no_pk(self, query_patch): call('show primary keys in table dummy-db.dummy-schema."TABLE1";'), call(['alter table dummy-schema."TABLE1" drop primary key;', 'alter table dummy-schema."TABLE1" alter column "ID" drop not null;']) - ]) \ No newline at end of file + ]) + + @patch('target_snowflake.db_sync.DbSync.query') + def test_sync_table_with_stream_that_has_no_pk_but_get_a_new_one(self, query_patch): + minimal_config = { + 'account': "dummy-account", + 'dbname': "dummy-db", + 'user': "dummy-user", + 'password': "dummy-passwd", + 'warehouse': "dummy-wh", + 'default_target_schema': "dummy-schema", + 'file_format': "dummy-file-format" + } + + stream_schema_message = {"stream": "public-table1", + "schema": { + "properties": { + "id": {"type": ["integer"]}, + "c_str": {"type": ["null", "string"]}}}, + "key_properties": ['id']} + + table_cache = [ + { + 'SCHEMA_NAME': 'DUMMY-SCHEMA', + 'TABLE_NAME': 'TABLE1', + 'COLUMN_NAME': 'ID', + 'DATA_TYPE': 'NUMBER' + }, + { + 'SCHEMA_NAME': 'DUMMY-SCHEMA', + 'TABLE_NAME': 'TABLE1', + 'COLUMN_NAME': 'C_STR', + 'DATA_TYPE': 'TEXT' + } + ] + query_patch.side_effect = [ + [{'type': 'CSV'}], + [], + None + ] + + dbsync = db_sync.DbSync(minimal_config, stream_schema_message, table_cache) + dbsync.sync_table() + + query_patch.assert_has_calls([ + call('SHOW FILE FORMATS LIKE \'dummy-file-format\''), + call('show primary keys in table dummy-db.dummy-schema."TABLE1";'), + call(['alter table dummy-schema."TABLE1" add primary key("ID");', + 'alter table dummy-schema."TABLE1" alter column "ID" drop not null;']) + ])