From 224046226499e4ad5ef50689ecd93ce34687cfc2 Mon Sep 17 00:00:00 2001 From: Nils Mueller Date: Tue, 10 May 2022 18:09:12 +0200 Subject: [PATCH 1/3] Do not overwrite data with null when updating (#275) Record messages for some updates from certain taps, e.g. Postgres with log-based replication when doing deletes, will only contain metadata and pkey columns. The current MERGE logic would then set all non-included columns in the target to null. --- target_snowflake/db_sync.py | 11 ++-- target_snowflake/file_formats/csv.py | 22 ++++++-- target_snowflake/file_formats/parquet.py | 20 ++++++-- .../messages-pg-logical-streams-update.json | 3 ++ tests/integration/test_target_snowflake.py | 20 ++++++++ tests/unit/file_formats/test_csv.py | 46 ++++++++++------- tests/unit/file_formats/test_parquet.py | 51 +++++++++++-------- 7 files changed, 124 insertions(+), 49 deletions(-) create mode 100644 tests/integration/resources/messages-pg-logical-streams-update.json diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index d9959d93..6a5375fd 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -505,7 +505,7 @@ def _load_file_merge(self, s3_key, stream, columns_with_trans) -> Tuple[int, int s3_key=s3_key, file_format_name=self.connection_config['file_format'], columns=columns_with_trans, - pk_merge_condition=self.primary_key_merge_condition() + pk_merge_condition=self.primary_key_merge_condition(columns_with_trans) ) self.logger.debug('Running query: %s', merge_sql) cur.execute(merge_sql) @@ -536,11 +536,16 @@ def _load_file_copy(self, s3_key, stream, columns_with_trans) -> int: inserts = results[0].get('rows_loaded', 0) return inserts - def primary_key_merge_condition(self): + def primary_key_merge_condition(self, columns_with_trans): """Generate SQL join condition on primary keys for merge SQL statements""" stream_schema_message = self.stream_schema_message names = primary_column_names(stream_schema_message) - return ' AND '.join([f's.{c} = t.{c}' for c in names]) + + trans = {} + for column in names: + trans[column] = next(item['trans'] for item in columns_with_trans if item["name"] == column) + + return ' AND '.join([f'{trans[c]}(s.{c}) = t.{c}' for c in names]) def column_names(self): """Get list of columns in the schema""" diff --git a/target_snowflake/file_formats/csv.py b/target_snowflake/file_formats/csv.py index c7379356..1f003f51 100644 --- a/target_snowflake/file_formats/csv.py +++ b/target_snowflake/file_formats/csv.py @@ -8,6 +8,8 @@ from target_snowflake import flattening +IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' + def create_copy_sql(table_name: str, stage_name: str, @@ -29,10 +31,21 @@ def create_merge_sql(table_name: str, columns: List, pk_merge_condition: str) -> str: """Generate a CSV compatible snowflake MERGE INTO command""" - p_source_columns = ', '.join([f"{c['trans']}(${i + 1}) {c['name']}" for i, c in enumerate(columns)]) - p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns]) + p_source_columns = ', '.join([f"(${i + 1}) {c['name']}" for i, c in enumerate(columns)]) + p_update = ', '.join( + [ + f""" + {c['name']} = CASE + WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']} + ELSE {c['trans']}(s.{c['name']}) + END""" + for c in columns + ] + ) p_insert_cols = ', '.join([c['name'] for c in columns]) - p_insert_values = ', '.join([f"s.{c['name']}" for c in columns]) + p_insert_values = ', '.join( + [f"CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN NULL ELSE {c['trans']}(s.{c['name']}) END" for c in columns] + ) return f"MERGE INTO {table_name} t USING (" \ f"SELECT {p_source_columns} " \ @@ -64,7 +77,8 @@ def record_to_csv_line(record: dict, return ','.join( [ ujson.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and ( - flatten_record[column] == 0 or flatten_record[column]) else '' + flatten_record[column] == 0 or flatten_record[column]) + else '' if column in flatten_record else IGNORE_VALUE for column in schema ] ) diff --git a/target_snowflake/file_formats/parquet.py b/target_snowflake/file_formats/parquet.py index ad02e6a5..6c068fdf 100644 --- a/target_snowflake/file_formats/parquet.py +++ b/target_snowflake/file_formats/parquet.py @@ -7,6 +7,8 @@ from target_snowflake import flattening +IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' + def create_copy_sql(table_name: str, stage_name: str, @@ -30,11 +32,21 @@ def create_merge_sql(table_name: str, columns: List, pk_merge_condition: str) -> str: """Generate a Parquet compatible snowflake MERGE INTO command""" - p_source_columns = ', '.join([f"{c['trans']}($1:{c['json_element_name']}) {c['name']}" - for i, c in enumerate(columns)]) - p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns]) + p_source_columns = ', '.join([f"($1:{c['json_element_name']}) {c['name']}" for i, c in enumerate(columns)]) + p_update = ', '.join( + [ + f""" + {c['name']} = CASE + WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']} + ELSE {c['trans']}(s.{c['name']}) + END""" + for c in columns + ] + ) p_insert_cols = ', '.join([c['name'] for c in columns]) - p_insert_values = ', '.join([f"s.{c['name']}" for c in columns]) + p_insert_values = ', '.join( + [f"CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN NULL ELSE {c['trans']}(s.{c['name']}) END" for c in columns] + ) return f"MERGE INTO {table_name} t USING (" \ f"SELECT {p_source_columns} " \ diff --git a/tests/integration/resources/messages-pg-logical-streams-update.json b/tests/integration/resources/messages-pg-logical-streams-update.json new file mode 100644 index 00000000..55fb5698 --- /dev/null +++ b/tests/integration/resources/messages-pg-logical-streams-update.json @@ -0,0 +1,3 @@ +{"type": "SCHEMA", "stream": "logical1-logical1_edgydata", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cjson": {"type": ["null", "object"]}, "cjsonb": {"type": ["null", "object"]}, "ctimentz": {"format": "time", "type": ["null", "string"]}, "ctimetz": {"format": "time", "type": ["null", "string"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]} +{"type": "RECORD", "stream": "logical1-logical1_edgydata", "record": {"cid": 17, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723618, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240873, "version": 1570922723618, "xmin": null}}}} diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index fca9a7f3..38d7757a 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1374,3 +1374,23 @@ def test_stream_with_falsy_pks_should_succeed(self): f' {self.config["default_target_schema"]}.test_simple_table;') self.assertEqual(8, rows_count[0]['_COUNT']) + + def test_deletion_does_not_set_column_data_to_null(self): + """Test if table columns will still have original data when doing a partial update during a delete""" + tap_lines_initial = test_utils.get_test_tap_lines('messages-pg-logical-streams.json') + self.persist_lines_with_cache(tap_lines_initial) + + subject = self.snowflake.query(f'SELECT cid, cjson, cjsonb FROM' + f' {self.config["default_target_schema"]}.logical1_edgydata WHERE cid = \'17\';') + + for _column, value in subject[0].items(): + self.assertIsNotNone(value) + + tap_lines_update = test_utils.get_test_tap_lines('messages-pg-logical-streams-update.json') + self.persist_lines_with_cache(tap_lines_update) + + subject = self.snowflake.query(f'SELECT cid, cjsonb, cjson, _sdc_deleted_at FROM' + f' {self.config["default_target_schema"]}.logical1_edgydata WHERE cid = \'17\';') + + for _column, value in subject[0].items(): + self.assertIsNotNone(value) diff --git a/tests/unit/file_formats/test_csv.py b/tests/unit/file_formats/test_csv.py index 900ba9f1..2d2aef85 100644 --- a/tests/unit/file_formats/test_csv.py +++ b/tests/unit/file_formats/test_csv.py @@ -2,6 +2,7 @@ import os import gzip import tempfile +import re import target_snowflake.file_formats.csv as csv @@ -109,21 +110,30 @@ def test_create_copy_sql(self): "FILE_FORMAT = (format_name='foo_file_format')") def test_create_merge_sql(self): - self.assertEqual(csv.create_merge_sql(table_name='foo_table', - stage_name='foo_stage', - s3_key='foo_s3_key.csv', - file_format_name='foo_file_format', - columns=[{'name': 'COL_1', 'trans': ''}, - {'name': 'COL_2', 'trans': ''}, - {'name': 'COL_3', 'trans': 'parse_json'}], - pk_merge_condition='s.COL_1 = t.COL_1'), - - "MERGE INTO foo_table t USING (" - "SELECT ($1) COL_1, ($2) COL_2, parse_json($3) COL_3 " - "FROM '@foo_stage/foo_s3_key.csv' " - "(FILE_FORMAT => 'foo_file_format')) s " - "ON s.COL_1 = t.COL_1 " - "WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 " - "WHEN NOT MATCHED THEN " - "INSERT (COL_1, COL_2, COL_3) " - "VALUES (s.COL_1, s.COL_2, s.COL_3)") + subject = csv.create_merge_sql(table_name='foo_table', + stage_name='foo_stage', + s3_key='foo_s3_key.csv', + file_format_name='foo_file_format', + columns=[{'name': 'COL_1', 'trans': ''}, + {'name': 'COL_2', 'trans': ''}, + {'name': 'COL_3', 'trans': 'parse_json'}], + pk_merge_condition='s.COL_1 = t.COL_1').replace("\n", "") + sanitized_subject = re.sub(' +', ' ', subject) + + self.assertEqual( + sanitized_subject, + "MERGE INTO foo_table t USING (" + "SELECT ($1) COL_1, ($2) COL_2, ($3) COL_3 " + "FROM '@foo_stage/foo_s3_key.csv' " + "(FILE_FORMAT => 'foo_file_format')) s " + "ON s.COL_1 = t.COL_1 " + "WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' " + "THEN t.COL_1 ELSE (s.COL_1) END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' " + "THEN t.COL_2 ELSE (s.COL_2) END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' " + "THEN t.COL_3 ELSE parse_json(s.COL_3) END " + "WHEN NOT MATCHED THEN " + "INSERT (COL_1, COL_2, COL_3) " + "VALUES (CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_1) END, CASE WHEN " + "s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_2) END, " + "CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE parse_json(s.COL_3) END)" + ) diff --git a/tests/unit/file_formats/test_parquet.py b/tests/unit/file_formats/test_parquet.py index c20aa44e..e9dfe092 100644 --- a/tests/unit/file_formats/test_parquet.py +++ b/tests/unit/file_formats/test_parquet.py @@ -1,4 +1,5 @@ import unittest +import re from pandas._testing import assert_frame_equal from pandas import DataFrame @@ -67,24 +68,34 @@ def test_create_copy_sql(self): "FILE_FORMAT = (format_name='foo_file_format')") def test_create_merge_sql(self): - self.assertEqual(parquet.create_merge_sql(table_name='foo_table', - stage_name='foo_stage', - s3_key='foo_s3_key.parquet', - file_format_name='foo_file_format', - columns=[ - {'name': 'COL_1', 'json_element_name': 'col_1', 'trans': ''}, - {'name': 'COL_2', 'json_element_name': 'colTwo', 'trans': ''}, - {'name': 'COL_3', 'json_element_name': 'col_3', - 'trans': 'parse_json'} - ], - pk_merge_condition='s.COL_1 = t.COL_1'), - "MERGE INTO foo_table t USING (" - "SELECT ($1:col_1) COL_1, ($1:colTwo) COL_2, parse_json($1:col_3) COL_3 " - "FROM '@foo_stage/foo_s3_key.parquet' " - "(FILE_FORMAT => 'foo_file_format')) s " - "ON s.COL_1 = t.COL_1 " - "WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 " - "WHEN NOT MATCHED THEN " - "INSERT (COL_1, COL_2, COL_3) " - "VALUES (s.COL_1, s.COL_2, s.COL_3)") + subject = parquet.create_merge_sql(table_name='foo_table', + stage_name='foo_stage', + s3_key='foo_s3_key.parquet', + file_format_name='foo_file_format', + columns=[ + {'name': 'COL_1', 'json_element_name': 'col_1', 'trans': ''}, + {'name': 'COL_2', 'json_element_name': 'colTwo', 'trans': ''}, + {'name': 'COL_3', 'json_element_name': 'col_3', + 'trans': 'parse_json'} + ], + pk_merge_condition='s.COL_1 = t.COL_1').replace("\n", "") + sanitized_subject = re.sub(' +', ' ', subject) + + self.assertEqual( + sanitized_subject, + "MERGE INTO foo_table t USING (" + "SELECT ($1:col_1) COL_1, ($1:colTwo) COL_2, ($1:col_3) COL_3 " + "FROM '@foo_stage/foo_s3_key.parquet' " + "(FILE_FORMAT => 'foo_file_format')) s " + "ON s.COL_1 = t.COL_1 " + "WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_1 " + "ELSE (s.COL_1) END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_2 " + "ELSE (s.COL_2) END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_3 " + "ELSE parse_json(s.COL_3) END " + "WHEN NOT MATCHED THEN " + "INSERT (COL_1, COL_2, COL_3) " + "VALUES (CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_1) END, " + "CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_2) END, " + "CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE parse_json(s.COL_3) END)" + ) From 962f6872883b717f6903f555e242c0dc0ba47a12 Mon Sep 17 00:00:00 2001 From: Nils Mueller Date: Wed, 11 May 2022 17:42:31 +0200 Subject: [PATCH 2/3] Add build directory to .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index a005a21d..bdb44f6d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ __pycache__/ *~ dist/ .coverage +/build # Singer JSON files properties.json From b4307f180b496b195ebaf0689b951344ef3e8f70 Mon Sep 17 00:00:00 2001 From: Nils Mueller Date: Wed, 11 May 2022 23:53:44 +0200 Subject: [PATCH 3/3] Merge records into batch We cannot assume here that the latest version of a record will always contain all columns. For instance, in the case of delete records coming from Postgres via log-based replication the record message will only contain the primary key columns. --- target_snowflake/__init__.py | 15 ++++++++++++--- tests/integration/test_target_snowflake.py | 7 +++++++ tests/unit/test_target_snowflake.py | 10 ++++++++++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index 7754421a..f13fe92d 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -166,12 +166,19 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT if primary_key_string not in records_to_load[stream]: row_count[stream] += 1 total_row_count[stream] += 1 + records_to_load[stream][primary_key_string] = {} - # append record + # merge record into batch if config.get('add_metadata_columns') or config.get('hard_delete'): - records_to_load[stream][primary_key_string] = stream_utils.add_metadata_values_to_record(o) + records_to_load[stream][primary_key_string] = merge_records( + records_to_load[stream][primary_key_string], + stream_utils.add_metadata_values_to_record(o) + ) else: - records_to_load[stream][primary_key_string] = o['record'] + records_to_load[stream][primary_key_string] = merge_records( + records_to_load[stream][primary_key_string], + o['record'] + ) if archive_load_files and stream in archive_load_files_data: # Keep track of min and max of the designated column @@ -334,6 +341,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT # emit latest state emit_state(copy.deepcopy(flushed_state)) +def merge_records(existing: dict, update: dict): + return {**existing, **update} # pylint: disable=too-many-arguments def flush_streams( diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 38d7757a..dcb2129d 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1386,6 +1386,13 @@ def test_deletion_does_not_set_column_data_to_null(self): for _column, value in subject[0].items(): self.assertIsNotNone(value) + # Insert and Delete for cid 4 in table logical1_table2 happens in a single batch. Validate that record message + # of the deletion does not overwrite all data from the insert within the batch. + subject = self.snowflake.query(f'SELECT cid, cvarchar, _sdc_deleted_at FROM' + f' {self.config["default_target_schema"]}.logical1_table2 WHERE cid = \'4\';') + for _column, value in subject[0].items(): + self.assertIsNotNone(value) + tap_lines_update = test_utils.get_test_tap_lines('messages-pg-logical-streams-update.json') self.persist_lines_with_cache(tap_lines_update) diff --git a/tests/unit/test_target_snowflake.py b/tests/unit/test_target_snowflake.py index 63e34b27..f96725fa 100644 --- a/tests/unit/test_target_snowflake.py +++ b/tests/unit/test_target_snowflake.py @@ -174,3 +174,13 @@ def test_persist_lines_with_only_state_messages(self, dbSync_mock, flush_streams buf.getvalue().strip(), '{"bookmarks":{"tap_mysql_test-test_simple_table":{"replication_key":"id",' '"replication_key_value":100,"version":1}}}') + + def test_merge_records(self): + existing_record = {'a': 1, 'b': None, 'c': 'foo', 'd': 1} + new_record = {'a': 2, 'c': None, 'e': '2'} + + merged_records = target_snowflake.merge_records(existing_record, new_record) + + expected = {'a': 2, 'b': None, 'c': None, 'd': 1, 'e': '2'} + + self.assertEqual(merged_records, expected)