From 2c8f41fe297189c5eca9f197d9de225461f5b8b3 Mon Sep 17 00:00:00 2001 From: Nils Mueller Date: Tue, 10 May 2022 18:09:12 +0200 Subject: [PATCH] Do not overwrite data with null when updating (#275) Record messages for some updates from certain taps, e.g. Postgres with log-based replication when doing deletes, will only contain metadata and pkey columns. The current MERGE logic would then set all non-included columns in the target to null. --- target_snowflake/file_formats/csv.py | 12 +++++++++-- .../messages-pg-logical-streams-update.json | 3 +++ tests/integration/test_target_snowflake.py | 20 +++++++++++++++++++ tests/unit/file_formats/test_csv.py | 2 +- 4 files changed, 34 insertions(+), 3 deletions(-) create mode 100644 tests/integration/resources/messages-pg-logical-streams-update.json diff --git a/target_snowflake/file_formats/csv.py b/target_snowflake/file_formats/csv.py index c7379356..2134eb5f 100644 --- a/target_snowflake/file_formats/csv.py +++ b/target_snowflake/file_formats/csv.py @@ -8,6 +8,8 @@ from target_snowflake import flattening +IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' + def create_copy_sql(table_name: str, stage_name: str, @@ -30,7 +32,12 @@ def create_merge_sql(table_name: str, pk_merge_condition: str) -> str: """Generate a CSV compatible snowflake MERGE INTO command""" p_source_columns = ', '.join([f"{c['trans']}(${i + 1}) {c['name']}" for i, c in enumerate(columns)]) - p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns]) + p_update = ', '.join( + [ + f"{c['name']} = CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']} ELSE s.{c['name']} END" + for c in columns + ] + ) p_insert_cols = ', '.join([c['name'] for c in columns]) p_insert_values = ', '.join([f"s.{c['name']}" for c in columns]) @@ -64,7 +71,8 @@ def record_to_csv_line(record: dict, return ','.join( [ ujson.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and ( - flatten_record[column] == 0 or flatten_record[column]) else '' + flatten_record[column] == 0 or flatten_record[column]) + else '' if column in flatten_record else IGNORE_VALUE for column in schema ] ) diff --git a/tests/integration/resources/messages-pg-logical-streams-update.json b/tests/integration/resources/messages-pg-logical-streams-update.json new file mode 100644 index 00000000..9efee3d3 --- /dev/null +++ b/tests/integration/resources/messages-pg-logical-streams-update.json @@ -0,0 +1,3 @@ +{"type": "SCHEMA", "stream": "logical1-logical1_table1", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cvarchar": {"type": ["null", "string"]}, "cvarchar2": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]} +{"type": "RECORD", "stream": "logical1-logical1_table1", "record": {"cid": 2, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723618, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240873, "version": 1570922723618, "xmin": null}}}} diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index fca9a7f3..17e00915 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1374,3 +1374,23 @@ def test_stream_with_falsy_pks_should_succeed(self): f' {self.config["default_target_schema"]}.test_simple_table;') self.assertEqual(8, rows_count[0]['_COUNT']) + + def test_deletion_does_not_set_column_data_to_null(self): + """Test if table columns will still have original data when doing a partial update during a delete""" + tap_lines_initial = test_utils.get_test_tap_lines('messages-pg-logical-streams.json') + self.persist_lines_with_cache(tap_lines_initial) + + subject = self.snowflake.query(f'SELECT cid, cvarchar, cvarchar2 FROM' + f' {self.config["default_target_schema"]}.logical1_table1 WHERE cid = \'2\';') + + for _column, value in subject[0].items(): + self.assertIsNotNone(value) + + tap_lines_update = test_utils.get_test_tap_lines('messages-pg-logical-streams-update.json') + self.persist_lines_with_cache(tap_lines_update) + + subject = self.snowflake.query(f'SELECT cid, cvarchar, cvarchar2, _sdc_deleted_at FROM' + f' {self.config["default_target_schema"]}.logical1_table1 WHERE cid = \'2\';') + + for _column, value in subject[0].items(): + self.assertIsNotNone(value) diff --git a/tests/unit/file_formats/test_csv.py b/tests/unit/file_formats/test_csv.py index 900ba9f1..d475e195 100644 --- a/tests/unit/file_formats/test_csv.py +++ b/tests/unit/file_formats/test_csv.py @@ -123,7 +123,7 @@ def test_create_merge_sql(self): "FROM '@foo_stage/foo_s3_key.csv' " "(FILE_FORMAT => 'foo_file_format')) s " "ON s.COL_1 = t.COL_1 " - "WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 " + "WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_1 ELSE s.COL_1 END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_2 ELSE s.COL_2 END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_3 ELSE s.COL_3 END " "WHEN NOT MATCHED THEN " "INSERT (COL_1, COL_2, COL_3) " "VALUES (s.COL_1, s.COL_2, s.COL_3)")