Skip to content

Commit

Permalink
Do not overwrite data with null when updating (transferwise#275)
Browse files Browse the repository at this point in the history
Record messages for some updates from certain taps, e.g.
Postgres with log-based replication when doing deletes,
will only contain metadata and pkey columns. The current
MERGE logic would then set all non-included columns in the
target to null.
  • Loading branch information
Tolsto committed May 11, 2022
1 parent b5536e2 commit 2240462
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 49 deletions.
11 changes: 8 additions & 3 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def _load_file_merge(self, s3_key, stream, columns_with_trans) -> Tuple[int, int
s3_key=s3_key,
file_format_name=self.connection_config['file_format'],
columns=columns_with_trans,
pk_merge_condition=self.primary_key_merge_condition()
pk_merge_condition=self.primary_key_merge_condition(columns_with_trans)
)
self.logger.debug('Running query: %s', merge_sql)
cur.execute(merge_sql)
Expand Down Expand Up @@ -536,11 +536,16 @@ def _load_file_copy(self, s3_key, stream, columns_with_trans) -> int:
inserts = results[0].get('rows_loaded', 0)
return inserts

def primary_key_merge_condition(self):
def primary_key_merge_condition(self, columns_with_trans):
"""Generate SQL join condition on primary keys for merge SQL statements"""
stream_schema_message = self.stream_schema_message
names = primary_column_names(stream_schema_message)
return ' AND '.join([f's.{c} = t.{c}' for c in names])

trans = {}
for column in names:
trans[column] = next(item['trans'] for item in columns_with_trans if item["name"] == column)

return ' AND '.join([f'{trans[c]}(s.{c}) = t.{c}' for c in names])

def column_names(self):
"""Get list of columns in the schema"""
Expand Down
22 changes: 18 additions & 4 deletions target_snowflake/file_formats/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from target_snowflake import flattening

IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb'


def create_copy_sql(table_name: str,
stage_name: str,
Expand All @@ -29,10 +31,21 @@ def create_merge_sql(table_name: str,
columns: List,
pk_merge_condition: str) -> str:
"""Generate a CSV compatible snowflake MERGE INTO command"""
p_source_columns = ', '.join([f"{c['trans']}(${i + 1}) {c['name']}" for i, c in enumerate(columns)])
p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns])
p_source_columns = ', '.join([f"(${i + 1}) {c['name']}" for i, c in enumerate(columns)])
p_update = ', '.join(
[
f"""
{c['name']} = CASE
WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']}
ELSE {c['trans']}(s.{c['name']})
END"""
for c in columns
]
)
p_insert_cols = ', '.join([c['name'] for c in columns])
p_insert_values = ', '.join([f"s.{c['name']}" for c in columns])
p_insert_values = ', '.join(
[f"CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN NULL ELSE {c['trans']}(s.{c['name']}) END" for c in columns]
)

return f"MERGE INTO {table_name} t USING (" \
f"SELECT {p_source_columns} " \
Expand Down Expand Up @@ -64,7 +77,8 @@ def record_to_csv_line(record: dict,
return ','.join(
[
ujson.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and (
flatten_record[column] == 0 or flatten_record[column]) else ''
flatten_record[column] == 0 or flatten_record[column])
else '' if column in flatten_record else IGNORE_VALUE
for column in schema
]
)
Expand Down
20 changes: 16 additions & 4 deletions target_snowflake/file_formats/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from target_snowflake import flattening

IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb'


def create_copy_sql(table_name: str,
stage_name: str,
Expand All @@ -30,11 +32,21 @@ def create_merge_sql(table_name: str,
columns: List,
pk_merge_condition: str) -> str:
"""Generate a Parquet compatible snowflake MERGE INTO command"""
p_source_columns = ', '.join([f"{c['trans']}($1:{c['json_element_name']}) {c['name']}"
for i, c in enumerate(columns)])
p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns])
p_source_columns = ', '.join([f"($1:{c['json_element_name']}) {c['name']}" for i, c in enumerate(columns)])
p_update = ', '.join(
[
f"""
{c['name']} = CASE
WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']}
ELSE {c['trans']}(s.{c['name']})
END"""
for c in columns
]
)
p_insert_cols = ', '.join([c['name'] for c in columns])
p_insert_values = ', '.join([f"s.{c['name']}" for c in columns])
p_insert_values = ', '.join(
[f"CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN NULL ELSE {c['trans']}(s.{c['name']}) END" for c in columns]
)

return f"MERGE INTO {table_name} t USING (" \
f"SELECT {p_source_columns} " \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"type": "SCHEMA", "stream": "logical1-logical1_edgydata", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cjson": {"type": ["null", "object"]}, "cjsonb": {"type": ["null", "object"]}, "ctimentz": {"format": "time", "type": ["null", "string"]}, "ctimetz": {"format": "time", "type": ["null", "string"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]}
{"type": "RECORD", "stream": "logical1-logical1_edgydata", "record": {"cid": 17, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723618, "time_extracted": "2019-10-13T14:06:31.838328Z"}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240873, "version": 1570922723618, "xmin": null}}}}
20 changes: 20 additions & 0 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -1374,3 +1374,23 @@ def test_stream_with_falsy_pks_should_succeed(self):
f' {self.config["default_target_schema"]}.test_simple_table;')

self.assertEqual(8, rows_count[0]['_COUNT'])

def test_deletion_does_not_set_column_data_to_null(self):
"""Test if table columns will still have original data when doing a partial update during a delete"""
tap_lines_initial = test_utils.get_test_tap_lines('messages-pg-logical-streams.json')
self.persist_lines_with_cache(tap_lines_initial)

subject = self.snowflake.query(f'SELECT cid, cjson, cjsonb FROM'
f' {self.config["default_target_schema"]}.logical1_edgydata WHERE cid = \'17\';')

for _column, value in subject[0].items():
self.assertIsNotNone(value)

tap_lines_update = test_utils.get_test_tap_lines('messages-pg-logical-streams-update.json')
self.persist_lines_with_cache(tap_lines_update)

subject = self.snowflake.query(f'SELECT cid, cjsonb, cjson, _sdc_deleted_at FROM'
f' {self.config["default_target_schema"]}.logical1_edgydata WHERE cid = \'17\';')

for _column, value in subject[0].items():
self.assertIsNotNone(value)
46 changes: 28 additions & 18 deletions tests/unit/file_formats/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import gzip
import tempfile
import re

import target_snowflake.file_formats.csv as csv

Expand Down Expand Up @@ -109,21 +110,30 @@ def test_create_copy_sql(self):
"FILE_FORMAT = (format_name='foo_file_format')")

def test_create_merge_sql(self):
self.assertEqual(csv.create_merge_sql(table_name='foo_table',
stage_name='foo_stage',
s3_key='foo_s3_key.csv',
file_format_name='foo_file_format',
columns=[{'name': 'COL_1', 'trans': ''},
{'name': 'COL_2', 'trans': ''},
{'name': 'COL_3', 'trans': 'parse_json'}],
pk_merge_condition='s.COL_1 = t.COL_1'),

"MERGE INTO foo_table t USING ("
"SELECT ($1) COL_1, ($2) COL_2, parse_json($3) COL_3 "
"FROM '@foo_stage/foo_s3_key.csv' "
"(FILE_FORMAT => 'foo_file_format')) s "
"ON s.COL_1 = t.COL_1 "
"WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 "
"WHEN NOT MATCHED THEN "
"INSERT (COL_1, COL_2, COL_3) "
"VALUES (s.COL_1, s.COL_2, s.COL_3)")
subject = csv.create_merge_sql(table_name='foo_table',
stage_name='foo_stage',
s3_key='foo_s3_key.csv',
file_format_name='foo_file_format',
columns=[{'name': 'COL_1', 'trans': ''},
{'name': 'COL_2', 'trans': ''},
{'name': 'COL_3', 'trans': 'parse_json'}],
pk_merge_condition='s.COL_1 = t.COL_1').replace("\n", "")
sanitized_subject = re.sub(' +', ' ', subject)

self.assertEqual(
sanitized_subject,
"MERGE INTO foo_table t USING ("
"SELECT ($1) COL_1, ($2) COL_2, ($3) COL_3 "
"FROM '@foo_stage/foo_s3_key.csv' "
"(FILE_FORMAT => 'foo_file_format')) s "
"ON s.COL_1 = t.COL_1 "
"WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' "
"THEN t.COL_1 ELSE (s.COL_1) END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' "
"THEN t.COL_2 ELSE (s.COL_2) END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' "
"THEN t.COL_3 ELSE parse_json(s.COL_3) END "
"WHEN NOT MATCHED THEN "
"INSERT (COL_1, COL_2, COL_3) "
"VALUES (CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_1) END, CASE WHEN "
"s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_2) END, "
"CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE parse_json(s.COL_3) END)"
)
51 changes: 31 additions & 20 deletions tests/unit/file_formats/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
import re

from pandas._testing import assert_frame_equal
from pandas import DataFrame
Expand Down Expand Up @@ -67,24 +68,34 @@ def test_create_copy_sql(self):
"FILE_FORMAT = (format_name='foo_file_format')")

def test_create_merge_sql(self):
self.assertEqual(parquet.create_merge_sql(table_name='foo_table',
stage_name='foo_stage',
s3_key='foo_s3_key.parquet',
file_format_name='foo_file_format',
columns=[
{'name': 'COL_1', 'json_element_name': 'col_1', 'trans': ''},
{'name': 'COL_2', 'json_element_name': 'colTwo', 'trans': ''},
{'name': 'COL_3', 'json_element_name': 'col_3',
'trans': 'parse_json'}
],
pk_merge_condition='s.COL_1 = t.COL_1'),

"MERGE INTO foo_table t USING ("
"SELECT ($1:col_1) COL_1, ($1:colTwo) COL_2, parse_json($1:col_3) COL_3 "
"FROM '@foo_stage/foo_s3_key.parquet' "
"(FILE_FORMAT => 'foo_file_format')) s "
"ON s.COL_1 = t.COL_1 "
"WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 "
"WHEN NOT MATCHED THEN "
"INSERT (COL_1, COL_2, COL_3) "
"VALUES (s.COL_1, s.COL_2, s.COL_3)")
subject = parquet.create_merge_sql(table_name='foo_table',
stage_name='foo_stage',
s3_key='foo_s3_key.parquet',
file_format_name='foo_file_format',
columns=[
{'name': 'COL_1', 'json_element_name': 'col_1', 'trans': ''},
{'name': 'COL_2', 'json_element_name': 'colTwo', 'trans': ''},
{'name': 'COL_3', 'json_element_name': 'col_3',
'trans': 'parse_json'}
],
pk_merge_condition='s.COL_1 = t.COL_1').replace("\n", "")
sanitized_subject = re.sub(' +', ' ', subject)

self.assertEqual(
sanitized_subject,
"MERGE INTO foo_table t USING ("
"SELECT ($1:col_1) COL_1, ($1:colTwo) COL_2, ($1:col_3) COL_3 "
"FROM '@foo_stage/foo_s3_key.parquet' "
"(FILE_FORMAT => 'foo_file_format')) s "
"ON s.COL_1 = t.COL_1 "
"WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_1 "
"ELSE (s.COL_1) END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_2 "
"ELSE (s.COL_2) END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_3 "
"ELSE parse_json(s.COL_3) END "
"WHEN NOT MATCHED THEN "
"INSERT (COL_1, COL_2, COL_3) "
"VALUES (CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_1) END, "
"CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_2) END, "
"CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE parse_json(s.COL_3) END)"
)

0 comments on commit 2240462

Please sign in to comment.