Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Do not overwrite data with null when updating (#275) #276

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ __pycache__/
*~
dist/
.coverage
/build

# Singer JSON files
properties.json
Expand Down
15 changes: 12 additions & 3 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,19 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
if primary_key_string not in records_to_load[stream]:
row_count[stream] += 1
total_row_count[stream] += 1
records_to_load[stream][primary_key_string] = {}

# append record
# merge record into batch
if config.get('add_metadata_columns') or config.get('hard_delete'):
records_to_load[stream][primary_key_string] = stream_utils.add_metadata_values_to_record(o)
records_to_load[stream][primary_key_string] = merge_records(
records_to_load[stream][primary_key_string],
stream_utils.add_metadata_values_to_record(o)
)
else:
records_to_load[stream][primary_key_string] = o['record']
records_to_load[stream][primary_key_string] = merge_records(
records_to_load[stream][primary_key_string],
o['record']
)

if archive_load_files and stream in archive_load_files_data:
# Keep track of min and max of the designated column
Expand Down Expand Up @@ -334,6 +341,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
# emit latest state
emit_state(copy.deepcopy(flushed_state))

def merge_records(existing: dict, update: dict):
return {**existing, **update}

# pylint: disable=too-many-arguments
def flush_streams(
Expand Down
11 changes: 8 additions & 3 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def _load_file_merge(self, s3_key, stream, columns_with_trans) -> Tuple[int, int
s3_key=s3_key,
file_format_name=self.connection_config['file_format'],
columns=columns_with_trans,
pk_merge_condition=self.primary_key_merge_condition()
pk_merge_condition=self.primary_key_merge_condition(columns_with_trans)
)
self.logger.debug('Running query: %s', merge_sql)
cur.execute(merge_sql)
Expand Down Expand Up @@ -536,11 +536,16 @@ def _load_file_copy(self, s3_key, stream, columns_with_trans) -> int:
inserts = results[0].get('rows_loaded', 0)
return inserts

def primary_key_merge_condition(self):
def primary_key_merge_condition(self, columns_with_trans):
"""Generate SQL join condition on primary keys for merge SQL statements"""
stream_schema_message = self.stream_schema_message
names = primary_column_names(stream_schema_message)
return ' AND '.join([f's.{c} = t.{c}' for c in names])

trans = {}
for column in names:
trans[column] = next(item['trans'] for item in columns_with_trans if item["name"] == column)

return ' AND '.join([f'{trans[c]}(s.{c}) = t.{c}' for c in names])

def column_names(self):
"""Get list of columns in the schema"""
Expand Down
22 changes: 18 additions & 4 deletions target_snowflake/file_formats/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from target_snowflake import flattening

IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb'


def create_copy_sql(table_name: str,
stage_name: str,
Expand All @@ -29,10 +31,21 @@ def create_merge_sql(table_name: str,
columns: List,
pk_merge_condition: str) -> str:
"""Generate a CSV compatible snowflake MERGE INTO command"""
p_source_columns = ', '.join([f"{c['trans']}(${i + 1}) {c['name']}" for i, c in enumerate(columns)])
p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns])
p_source_columns = ', '.join([f"(${i + 1}) {c['name']}" for i, c in enumerate(columns)])
p_update = ', '.join(
[
f"""
{c['name']} = CASE
WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']}
ELSE {c['trans']}(s.{c['name']})
END"""
for c in columns
]
)
p_insert_cols = ', '.join([c['name'] for c in columns])
p_insert_values = ', '.join([f"s.{c['name']}" for c in columns])
p_insert_values = ', '.join(
[f"CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN NULL ELSE {c['trans']}(s.{c['name']}) END" for c in columns]
)

return f"MERGE INTO {table_name} t USING (" \
f"SELECT {p_source_columns} " \
Expand Down Expand Up @@ -64,7 +77,8 @@ def record_to_csv_line(record: dict,
return ','.join(
[
ujson.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and (
flatten_record[column] == 0 or flatten_record[column]) else ''
flatten_record[column] == 0 or flatten_record[column])
else '' if column in flatten_record else IGNORE_VALUE
for column in schema
]
)
Expand Down
20 changes: 16 additions & 4 deletions target_snowflake/file_formats/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from target_snowflake import flattening

IGNORE_VALUE = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb'


def create_copy_sql(table_name: str,
stage_name: str,
Expand All @@ -30,11 +32,21 @@ def create_merge_sql(table_name: str,
columns: List,
pk_merge_condition: str) -> str:
"""Generate a Parquet compatible snowflake MERGE INTO command"""
p_source_columns = ', '.join([f"{c['trans']}($1:{c['json_element_name']}) {c['name']}"
for i, c in enumerate(columns)])
p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns])
p_source_columns = ', '.join([f"($1:{c['json_element_name']}) {c['name']}" for i, c in enumerate(columns)])
p_update = ', '.join(
[
f"""
{c['name']} = CASE
WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN t.{c['name']}
ELSE {c['trans']}(s.{c['name']})
END"""
for c in columns
]
)
p_insert_cols = ', '.join([c['name'] for c in columns])
p_insert_values = ', '.join([f"s.{c['name']}" for c in columns])
p_insert_values = ', '.join(
[f"CASE WHEN s.{c['name']} = '{IGNORE_VALUE}' THEN NULL ELSE {c['trans']}(s.{c['name']}) END" for c in columns]
)

return f"MERGE INTO {table_name} t USING (" \
f"SELECT {p_source_columns} " \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"type": "SCHEMA", "stream": "logical1-logical1_edgydata", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cjson": {"type": ["null", "object"]}, "cjsonb": {"type": ["null", "object"]}, "ctimentz": {"format": "time", "type": ["null", "string"]}, "ctimetz": {"format": "time", "type": ["null", "string"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]}
{"type": "RECORD", "stream": "logical1-logical1_edgydata", "record": {"cid": 17, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723618, "time_extracted": "2019-10-13T14:06:31.838328Z"}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240873, "version": 1570922723618, "xmin": null}}}}
27 changes: 27 additions & 0 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -1374,3 +1374,30 @@ def test_stream_with_falsy_pks_should_succeed(self):
f' {self.config["default_target_schema"]}.test_simple_table;')

self.assertEqual(8, rows_count[0]['_COUNT'])

def test_deletion_does_not_set_column_data_to_null(self):
"""Test if table columns will still have original data when doing a partial update during a delete"""
tap_lines_initial = test_utils.get_test_tap_lines('messages-pg-logical-streams.json')
self.persist_lines_with_cache(tap_lines_initial)

subject = self.snowflake.query(f'SELECT cid, cjson, cjsonb FROM'
f' {self.config["default_target_schema"]}.logical1_edgydata WHERE cid = \'17\';')

for _column, value in subject[0].items():
self.assertIsNotNone(value)

# Insert and Delete for cid 4 in table logical1_table2 happens in a single batch. Validate that record message
# of the deletion does not overwrite all data from the insert within the batch.
subject = self.snowflake.query(f'SELECT cid, cvarchar, _sdc_deleted_at FROM'
f' {self.config["default_target_schema"]}.logical1_table2 WHERE cid = \'4\';')
for _column, value in subject[0].items():
self.assertIsNotNone(value)

tap_lines_update = test_utils.get_test_tap_lines('messages-pg-logical-streams-update.json')
self.persist_lines_with_cache(tap_lines_update)

subject = self.snowflake.query(f'SELECT cid, cjsonb, cjson, _sdc_deleted_at FROM'
f' {self.config["default_target_schema"]}.logical1_edgydata WHERE cid = \'17\';')

for _column, value in subject[0].items():
self.assertIsNotNone(value)
46 changes: 28 additions & 18 deletions tests/unit/file_formats/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import gzip
import tempfile
import re

import target_snowflake.file_formats.csv as csv

Expand Down Expand Up @@ -109,21 +110,30 @@ def test_create_copy_sql(self):
"FILE_FORMAT = (format_name='foo_file_format')")

def test_create_merge_sql(self):
self.assertEqual(csv.create_merge_sql(table_name='foo_table',
stage_name='foo_stage',
s3_key='foo_s3_key.csv',
file_format_name='foo_file_format',
columns=[{'name': 'COL_1', 'trans': ''},
{'name': 'COL_2', 'trans': ''},
{'name': 'COL_3', 'trans': 'parse_json'}],
pk_merge_condition='s.COL_1 = t.COL_1'),

"MERGE INTO foo_table t USING ("
"SELECT ($1) COL_1, ($2) COL_2, parse_json($3) COL_3 "
"FROM '@foo_stage/foo_s3_key.csv' "
"(FILE_FORMAT => 'foo_file_format')) s "
"ON s.COL_1 = t.COL_1 "
"WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 "
"WHEN NOT MATCHED THEN "
"INSERT (COL_1, COL_2, COL_3) "
"VALUES (s.COL_1, s.COL_2, s.COL_3)")
subject = csv.create_merge_sql(table_name='foo_table',
stage_name='foo_stage',
s3_key='foo_s3_key.csv',
file_format_name='foo_file_format',
columns=[{'name': 'COL_1', 'trans': ''},
{'name': 'COL_2', 'trans': ''},
{'name': 'COL_3', 'trans': 'parse_json'}],
pk_merge_condition='s.COL_1 = t.COL_1').replace("\n", "")
sanitized_subject = re.sub(' +', ' ', subject)

self.assertEqual(
sanitized_subject,
"MERGE INTO foo_table t USING ("
"SELECT ($1) COL_1, ($2) COL_2, ($3) COL_3 "
"FROM '@foo_stage/foo_s3_key.csv' "
"(FILE_FORMAT => 'foo_file_format')) s "
"ON s.COL_1 = t.COL_1 "
"WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' "
"THEN t.COL_1 ELSE (s.COL_1) END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' "
"THEN t.COL_2 ELSE (s.COL_2) END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' "
"THEN t.COL_3 ELSE parse_json(s.COL_3) END "
"WHEN NOT MATCHED THEN "
"INSERT (COL_1, COL_2, COL_3) "
"VALUES (CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_1) END, CASE WHEN "
"s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_2) END, "
"CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE parse_json(s.COL_3) END)"
)
51 changes: 31 additions & 20 deletions tests/unit/file_formats/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
import re

from pandas._testing import assert_frame_equal
from pandas import DataFrame
Expand Down Expand Up @@ -67,24 +68,34 @@ def test_create_copy_sql(self):
"FILE_FORMAT = (format_name='foo_file_format')")

def test_create_merge_sql(self):
self.assertEqual(parquet.create_merge_sql(table_name='foo_table',
stage_name='foo_stage',
s3_key='foo_s3_key.parquet',
file_format_name='foo_file_format',
columns=[
{'name': 'COL_1', 'json_element_name': 'col_1', 'trans': ''},
{'name': 'COL_2', 'json_element_name': 'colTwo', 'trans': ''},
{'name': 'COL_3', 'json_element_name': 'col_3',
'trans': 'parse_json'}
],
pk_merge_condition='s.COL_1 = t.COL_1'),

"MERGE INTO foo_table t USING ("
"SELECT ($1:col_1) COL_1, ($1:colTwo) COL_2, parse_json($1:col_3) COL_3 "
"FROM '@foo_stage/foo_s3_key.parquet' "
"(FILE_FORMAT => 'foo_file_format')) s "
"ON s.COL_1 = t.COL_1 "
"WHEN MATCHED THEN UPDATE SET COL_1=s.COL_1, COL_2=s.COL_2, COL_3=s.COL_3 "
"WHEN NOT MATCHED THEN "
"INSERT (COL_1, COL_2, COL_3) "
"VALUES (s.COL_1, s.COL_2, s.COL_3)")
subject = parquet.create_merge_sql(table_name='foo_table',
stage_name='foo_stage',
s3_key='foo_s3_key.parquet',
file_format_name='foo_file_format',
columns=[
{'name': 'COL_1', 'json_element_name': 'col_1', 'trans': ''},
{'name': 'COL_2', 'json_element_name': 'colTwo', 'trans': ''},
{'name': 'COL_3', 'json_element_name': 'col_3',
'trans': 'parse_json'}
],
pk_merge_condition='s.COL_1 = t.COL_1').replace("\n", "")
sanitized_subject = re.sub(' +', ' ', subject)

self.assertEqual(
sanitized_subject,
"MERGE INTO foo_table t USING ("
"SELECT ($1:col_1) COL_1, ($1:colTwo) COL_2, ($1:col_3) COL_3 "
"FROM '@foo_stage/foo_s3_key.parquet' "
"(FILE_FORMAT => 'foo_file_format')) s "
"ON s.COL_1 = t.COL_1 "
"WHEN MATCHED THEN UPDATE SET COL_1 = CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_1 "
"ELSE (s.COL_1) END, COL_2 = CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_2 "
"ELSE (s.COL_2) END, COL_3 = CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN t.COL_3 "
"ELSE parse_json(s.COL_3) END "
"WHEN NOT MATCHED THEN "
"INSERT (COL_1, COL_2, COL_3) "
"VALUES (CASE WHEN s.COL_1 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_1) END, "
"CASE WHEN s.COL_2 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE (s.COL_2) END, "
"CASE WHEN s.COL_3 = 'ppw_ignore-4BVdmNiaHxpsFC3wDkwb' THEN NULL ELSE parse_json(s.COL_3) END)"
)
10 changes: 10 additions & 0 deletions tests/unit/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,13 @@ def test_persist_lines_with_only_state_messages(self, dbSync_mock, flush_streams
buf.getvalue().strip(),
'{"bookmarks":{"tap_mysql_test-test_simple_table":{"replication_key":"id",'
'"replication_key_value":100,"version":1}}}')

def test_merge_records(self):
existing_record = {'a': 1, 'b': None, 'c': 'foo', 'd': 1}
new_record = {'a': 2, 'c': None, 'e': '2'}

merged_records = target_snowflake.merge_records(existing_record, new_record)

expected = {'a': 2, 'b': None, 'c': None, 'd': 1, 'e': '2'}

self.assertEqual(merged_records, expected)