Skip to content

Commit

Permalink
Do not overwrite data with null when updating (transferwise#275)
Browse files Browse the repository at this point in the history
Record messages for some updates from certain taps, e.g.
Postgres with log-based replication when doing deletes,
will only contain metadata and pkey columns. The current
MERGE logic would then set all non-included columns in the
target to null.
  • Loading branch information
Tolsto committed May 10, 2022
1 parent b5536e2 commit 2c8f41f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 3 deletions.
12 changes: 10 additions & 2 deletions target_snowflake/file_formats/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from target_snowflake import flattening

IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb'


def create_copy_sql(table_name: str,
stage_name: str,
Expand All @@ -30,7 +32,12 @@ def create_merge_sql(table_name: str,
pk_merge_condition: str) -> str:
"""Generate a CSV compatible snowflake MERGE INTO command"""
p_source_columns = ', '.join([f"{c['trans']}(${i + 1}) {c['name']}" for i, c in enumerate(columns)])
p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns])
p_update = ', '.join(
[
f"{c['name']} = CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']} ELSE s.{c['name']} END"
for c in columns
]
)
p_insert_cols = ', '.join([c['name'] for c in columns])
p_insert_values = ', '.join([f"s.{c['name']}" for c in columns])

Expand Down Expand Up @@ -64,7 +71,8 @@ def record_to_csv_line(record: dict,
return ','.join(
[
ujson.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and (
flatten_record[column] == 0 or flatten_record[column]) else ''
flatten_record[column] == 0 or flatten_record[column])
else '' if column in flatten_record else IGNORE_VALUE
for column in schema
]
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"type": "SCHEMA", "stream": "logical1-logical1_table1", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cvarchar": {"type": ["null", "string"]}, "cvarchar2": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]}
{"type": "RECORD", "stream": "logical1-logical1_table1", "record": {"cid": 2, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723618, "time_extracted": "2019-10-13T14:06:31.838328Z"}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240873, "version": 1570922723618, "xmin": null}}}}
20 changes: 20 additions & 0 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -1374,3 +1374,23 @@ def test_stream_with_falsy_pks_should_succeed(self):
f' {self.config["default_target_schema"]}.test_simple_table;')

self.assertEqual(8, rows_count[0]['_COUNT'])

def test_deletion_does_not_set_column_data_to_null(self):
"""Test if table columns will still have original data when doing a partial update during a delete"""
tap_lines_initial = test_utils.get_test_tap_lines('messages-pg-logical-streams.json')
self.persist_lines_with_cache(tap_lines_initial)

subject = self.snowflake.query(f'SELECT cid, cvarchar, cvarchar2 FROM'
f' {self.config["default_target_schema"]}.logical1_table1 WHERE cid = \'2\';')

for _column, value in subject[0].items():
self.assertIsNotNone(value)

tap_lines_update = test_utils.get_test_tap_lines('messages-pg-logical-streams-update.json')
self.persist_lines_with_cache(tap_lines_update)

subject = self.snowflake.query(f'SELECT cid, cvarchar, cvarchar2, _sdc_deleted_at FROM'
f' {self.config["default_target_schema"]}.logical1_table1 WHERE cid = \'2\';')

for _column, value in subject[0].items():
self.assertIsNotNone(value)
2 changes: 1 addition & 1 deletion tests/unit/file_formats/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def test_create_merge_sql(self):
"FROM '@foo_stage/foo_s3_key.csv' "
"(FILE_FORMAT => 'foo_file_format')) s "
"ON s.COL_1 = t.COL_1 "
"WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 "
"WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_1 ELSE s.COL_1 END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_2 ELSE s.COL_2 END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_3 ELSE s.COL_3 END "
"WHEN NOT MATCHED THEN "
"INSERT (COL_1, COL_2, COL_3) "
"VALUES (s.COL_1, s.COL_2, s.COL_3)")

0 comments on commit 2c8f41f

Please sign in to comment.