diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index d9959d93..6a5375fd 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -505,7 +505,7 @@ def _load_file_merge(self, s3_key, stream, columns_with_trans) -> Tuple[int, int s3_key=s3_key, file_format_name=self.connection_config['file_format'], columns=columns_with_trans, - pk_merge_condition=self.primary_key_merge_condition() + pk_merge_condition=self.primary_key_merge_condition(columns_with_trans) ) self.logger.debug('Running query: %s', merge_sql) cur.execute(merge_sql) @@ -536,11 +536,16 @@ def _load_file_copy(self, s3_key, stream, columns_with_trans) -> int: inserts = results[0].get('rows_loaded', 0) return inserts - def primary_key_merge_condition(self): + def primary_key_merge_condition(self, columns_with_trans): """Generate SQL join condition on primary keys for merge SQL statements""" stream_schema_message = self.stream_schema_message names = primary_column_names(stream_schema_message) - return ' AND '.join([f's.{c} = t.{c}' for c in names]) + + trans = {} + for column in names: + trans[column] = next(item['trans'] for item in columns_with_trans if item["name"] == column) + + return ' AND '.join([f'{trans[c]}(s.{c}) = t.{c}' for c in names]) def column_names(self): """Get list of columns in the schema""" diff --git a/target_snowflake/file_formats/csv.py b/target_snowflake/file_formats/csv.py index c7379356..1f003f51 100644 --- a/target_snowflake/file_formats/csv.py +++ b/target_snowflake/file_formats/csv.py @@ -8,6 +8,8 @@ from target_snowflake import flattening +IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' + def create_copy_sql(table_name: str, stage_name: str, @@ -29,10 +31,21 @@ def create_merge_sql(table_name: str, columns: List, pk_merge_condition: str) -> str: """Generate a CSV compatible snowflake MERGE INTO command""" - p_source_columns = ', '.join([f"{c['trans']}(${i + 1}) {c['name']}" for i, c in enumerate(columns)]) - p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns]) + p_source_columns = ', '.join([f"(${i + 1}) {c['name']}" for i, c in enumerate(columns)]) + p_update = ', '.join( + [ + f""" + {c['name']} = CASE + WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']} + ELSE {c['trans']}(s.{c['name']}) + END""" + for c in columns + ] + ) p_insert_cols = ', '.join([c['name'] for c in columns]) - p_insert_values = ', '.join([f"s.{c['name']}" for c in columns]) + p_insert_values = ', '.join( + [f"CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN NULL ELSE {c['trans']}(s.{c['name']}) END" for c in columns] + ) return f"MERGE INTO {table_name} t USING (" \ f"SELECT {p_source_columns} " \ @@ -64,7 +77,8 @@ def record_to_csv_line(record: dict, return ','.join( [ ujson.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and ( - flatten_record[column] == 0 or flatten_record[column]) else '' + flatten_record[column] == 0 or flatten_record[column]) + else '' if column in flatten_record else IGNORE_VALUE for column in schema ] ) diff --git a/target_snowflake/file_formats/parquet.py b/target_snowflake/file_formats/parquet.py index ad02e6a5..6c068fdf 100644 --- a/target_snowflake/file_formats/parquet.py +++ b/target_snowflake/file_formats/parquet.py @@ -7,6 +7,8 @@ from target_snowflake import flattening +IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' + def create_copy_sql(table_name: str, stage_name: str, @@ -30,11 +32,21 @@ def create_merge_sql(table_name: str, columns: List, pk_merge_condition: str) -> str: """Generate a Parquet compatible snowflake MERGE INTO command""" - p_source_columns = ', '.join([f"{c['trans']}($1:{c['json_element_name']}) {c['name']}" - for i, c in enumerate(columns)]) - p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns]) + p_source_columns = ', '.join([f"($1:{c['json_element_name']}) {c['name']}" for i, c in enumerate(columns)]) + p_update = ', '.join( + [ + f""" + {c['name']} = CASE + WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']} + ELSE {c['trans']}(s.{c['name']}) + END""" + for c in columns + ] + ) p_insert_cols = ', '.join([c['name'] for c in columns]) - p_insert_values = ', '.join([f"s.{c['name']}" for c in columns]) + p_insert_values = ', '.join( + [f"CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN NULL ELSE {c['trans']}(s.{c['name']}) END" for c in columns] + ) return f"MERGE INTO {table_name} t USING (" \ f"SELECT {p_source_columns} " \ diff --git a/tests/integration/resources/messages-pg-logical-streams-update.json b/tests/integration/resources/messages-pg-logical-streams-update.json new file mode 100644 index 00000000..55fb5698 --- /dev/null +++ b/tests/integration/resources/messages-pg-logical-streams-update.json @@ -0,0 +1,3 @@ +{"type": "SCHEMA", "stream": "logical1-logical1_edgydata", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cjson": {"type": ["null", "object"]}, "cjsonb": {"type": ["null", "object"]}, "ctimentz": {"format": "time", "type": ["null", "string"]}, "ctimetz": {"format": "time", "type": ["null", "string"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]} +{"type": "RECORD", "stream": "logical1-logical1_edgydata", "record": {"cid": 17, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723618, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240873, "version": 1570922723618, "xmin": null}}}} diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index fca9a7f3..38d7757a 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1374,3 +1374,23 @@ def test_stream_with_falsy_pks_should_succeed(self): f' {self.config["default_target_schema"]}.test_simple_table;') self.assertEqual(8, rows_count[0]['_COUNT']) + + def test_deletion_does_not_set_column_data_to_null(self): + """Test if table columns will still have original data when doing a partial update during a delete""" + tap_lines_initial = test_utils.get_test_tap_lines('messages-pg-logical-streams.json') + self.persist_lines_with_cache(tap_lines_initial) + + subject = self.snowflake.query(f'SELECT cid, cjson, cjsonb FROM' + f' {self.config["default_target_schema"]}.logical1_edgydata WHERE cid = \'17\';') + + for _column, value in subject[0].items(): + self.assertIsNotNone(value) + + tap_lines_update = test_utils.get_test_tap_lines('messages-pg-logical-streams-update.json') + self.persist_lines_with_cache(tap_lines_update) + + subject = self.snowflake.query(f'SELECT cid, cjsonb, cjson, _sdc_deleted_at FROM' + f' {self.config["default_target_schema"]}.logical1_edgydata WHERE cid = \'17\';') + + for _column, value in subject[0].items(): + self.assertIsNotNone(value) diff --git a/tests/unit/file_formats/test_csv.py b/tests/unit/file_formats/test_csv.py index 900ba9f1..2d2aef85 100644 --- a/tests/unit/file_formats/test_csv.py +++ b/tests/unit/file_formats/test_csv.py @@ -2,6 +2,7 @@ import os import gzip import tempfile +import re import target_snowflake.file_formats.csv as csv @@ -109,21 +110,30 @@ def test_create_copy_sql(self): "FILE_FORMAT = (format_name='foo_file_format')") def test_create_merge_sql(self): - self.assertEqual(csv.create_merge_sql(table_name='foo_table', - stage_name='foo_stage', - s3_key='foo_s3_key.csv', - file_format_name='foo_file_format', - columns=[{'name': 'COL_1', 'trans': ''}, - {'name': 'COL_2', 'trans': ''}, - {'name': 'COL_3', 'trans': 'parse_json'}], - pk_merge_condition='s.COL_1 = t.COL_1'), - - "MERGE INTO foo_table t USING (" - "SELECT ($1) COL_1, ($2) COL_2, parse_json($3) COL_3 " - "FROM '@foo_stage/foo_s3_key.csv' " - "(FILE_FORMAT => 'foo_file_format')) s " - "ON s.COL_1 = t.COL_1 " - "WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 " - "WHEN NOT MATCHED THEN " - "INSERT (COL_1, COL_2, COL_3) " - "VALUES (s.COL_1, s.COL_2, s.COL_3)") + subject = csv.create_merge_sql(table_name='foo_table', + stage_name='foo_stage', + s3_key='foo_s3_key.csv', + file_format_name='foo_file_format', + columns=[{'name': 'COL_1', 'trans': ''}, + {'name': 'COL_2', 'trans': ''}, + {'name': 'COL_3', 'trans': 'parse_json'}], + pk_merge_condition='s.COL_1 = t.COL_1').replace("\n", "") + sanitized_subject = re.sub(' +', ' ', subject) + + self.assertEqual( + sanitized_subject, + "MERGE INTO foo_table t USING (" + "SELECT ($1) COL_1, ($2) COL_2, ($3) COL_3 " + "FROM '@foo_stage/foo_s3_key.csv' " + "(FILE_FORMAT => 'foo_file_format')) s " + "ON s.COL_1 = t.COL_1 " + "WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' " + "THEN t.COL_1 ELSE (s.COL_1) END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' " + "THEN t.COL_2 ELSE (s.COL_2) END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' " + "THEN t.COL_3 ELSE parse_json(s.COL_3) END " + "WHEN NOT MATCHED THEN " + "INSERT (COL_1, COL_2, COL_3) " + "VALUES (CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_1) END, CASE WHEN " + "s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_2) END, " + "CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE parse_json(s.COL_3) END)" + ) diff --git a/tests/unit/file_formats/test_parquet.py b/tests/unit/file_formats/test_parquet.py index c20aa44e..e9dfe092 100644 --- a/tests/unit/file_formats/test_parquet.py +++ b/tests/unit/file_formats/test_parquet.py @@ -1,4 +1,5 @@ import unittest +import re from pandas._testing import assert_frame_equal from pandas import DataFrame @@ -67,24 +68,34 @@ def test_create_copy_sql(self): "FILE_FORMAT = (format_name='foo_file_format')") def test_create_merge_sql(self): - self.assertEqual(parquet.create_merge_sql(table_name='foo_table', - stage_name='foo_stage', - s3_key='foo_s3_key.parquet', - file_format_name='foo_file_format', - columns=[ - {'name': 'COL_1', 'json_element_name': 'col_1', 'trans': ''}, - {'name': 'COL_2', 'json_element_name': 'colTwo', 'trans': ''}, - {'name': 'COL_3', 'json_element_name': 'col_3', - 'trans': 'parse_json'} - ], - pk_merge_condition='s.COL_1 = t.COL_1'), - "MERGE INTO foo_table t USING (" - "SELECT ($1:col_1) COL_1, ($1:colTwo) COL_2, parse_json($1:col_3) COL_3 " - "FROM '@foo_stage/foo_s3_key.parquet' " - "(FILE_FORMAT => 'foo_file_format')) s " - "ON s.COL_1 = t.COL_1 " - "WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 " - "WHEN NOT MATCHED THEN " - "INSERT (COL_1, COL_2, COL_3) " - "VALUES (s.COL_1, s.COL_2, s.COL_3)") + subject = parquet.create_merge_sql(table_name='foo_table', + stage_name='foo_stage', + s3_key='foo_s3_key.parquet', + file_format_name='foo_file_format', + columns=[ + {'name': 'COL_1', 'json_element_name': 'col_1', 'trans': ''}, + {'name': 'COL_2', 'json_element_name': 'colTwo', 'trans': ''}, + {'name': 'COL_3', 'json_element_name': 'col_3', + 'trans': 'parse_json'} + ], + pk_merge_condition='s.COL_1 = t.COL_1').replace("\n", "") + sanitized_subject = re.sub(' +', ' ', subject) + + self.assertEqual( + sanitized_subject, + "MERGE INTO foo_table t USING (" + "SELECT ($1:col_1) COL_1, ($1:colTwo) COL_2, ($1:col_3) COL_3 " + "FROM '@foo_stage/foo_s3_key.parquet' " + "(FILE_FORMAT => 'foo_file_format')) s " + "ON s.COL_1 = t.COL_1 " + "WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_1 " + "ELSE (s.COL_1) END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_2 " + "ELSE (s.COL_2) END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_3 " + "ELSE parse_json(s.COL_3) END " + "WHEN NOT MATCHED THEN " + "INSERT (COL_1, COL_2, COL_3) " + "VALUES (CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_1) END, " + "CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_2) END, " + "CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE parse_json(s.COL_3) END)" + )