From 4acc313cba2c236023c4df0f2bfce9f7ab797ef6 Mon Sep 17 00:00:00 2001 From: Jonas Kalderstam Date: Mon, 28 Feb 2022 15:13:05 +0100 Subject: [PATCH] Fix nested keys (#76) * Fixed RHS of keys not being transformed like LHS * Fixed deprecation warning * Fixed nested keys not being transformed like Schema * Created own test for nested keys --- target_bigquery/db_sync.py | 12 +++++-- tests/unit/test_db_sync.py | 73 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/target_bigquery/db_sync.py b/target_bigquery/db_sync.py index 892f208..c2a47aa 100644 --- a/target_bigquery/db_sync.py +++ b/target_bigquery/db_sync.py @@ -1,6 +1,7 @@ import json import sys import singer +from collections.abc import MutableMapping import re import itertools import time @@ -229,7 +230,11 @@ def flatten_record(d, parent_key=[], sep='__', level=0, max_level=0): if isinstance(v, MutableMapping) and level < max_level: items.extend(flatten_record(v, parent_key + [k], sep=sep, level=level+1, max_level=max_level).items()) else: - items.append((new_key, v if type(v) is list or type(v) is dict else v)) + if type(v) is dict: + # Need to fix the keys of nested dicts, lowercase etc + items.append((new_key, flatten_record(v, level = 0, max_level=0))) + else: + items.append((new_key, v)) return dict(items) @@ -395,10 +400,11 @@ def record_primary_key_string(self, record): if len(self.stream_schema_message['key_properties']) == 0: return None flatten = flatten_record(record, max_level=self.data_flattening_max_level) + primary_keys = [safe_column_name(p, quotes=False) for p in self.stream_schema_message['key_properties']] try: - key_props = [str(flatten[p.lower()]) for p in self.stream_schema_message['key_properties']] + key_props = [str(flatten[p]) for p in primary_keys] except Exception as exc: - logger.info("Cannot find {} primary key(s) in record: {}".format(self.stream_schema_message['key_properties'], flatten)) + logger.info("Cannot find {} primary key(s) in record: {}".format(primary_keys, flatten)) raise exc return ','.join(key_props) diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index d554291..8012d8a 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -305,3 +305,76 @@ def test_flatten_record(self): "c_obj__nested_prop3__multi_nested_prop1": "multi_value_1", "c_obj__nested_prop3__multi_nested_prop2": "multi_value_2" }) + + def test_nested_keys(self): + """Test recursive renaming of keys in RECORD messages""" + flatten_record = db_sync.flatten_record + + empty_record = {} + # Empty record should be empty dict + self.assertEqual(flatten_record(empty_record), {}) + + not_nested_record = {"c_pk": 1, "c_varchar": "1", "c_int": 1} + # NO FLATTENING - Record with simple properties should be a plain dictionary + self.assertEqual(flatten_record(not_nested_record), not_nested_record) + + # Include some uppercase and hyphens in nested keys to test that flatten_record + # fixes the key names recursively in all dicts to match schema + nested_record = { + "c_pk": 1, + "c_varchar": "1", + "C-Int": 1, + "c_obj": { + "Nested-Prop1": "value_1", + "Nested-Prop2": "value_2", + "Nested-Prop3": { + "multi_Nested-Prop1": "multi_value_1", + "multi_Nested-Prop2": "multi_value_2", + }}} + + # NO FLATTENING - No flattening (default) + self.maxDiff = None + self.assertEqual(flatten_record(nested_record), + { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj": {"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": { + "multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}} + }) + + # NO FLATTENING + # max_level: 0 : No flattening (default) + self.assertEqual(flatten_record(nested_record, max_level=0), + { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj": {"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": { + "multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}} + }) + + # SEMI FLATTENING + # max_level: 1 : Semi-flattening (default) + self.assertEqual(flatten_record(nested_record, max_level=1), + { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj__nested_prop1": "value_1", + "c_obj__nested_prop2": "value_2", + "c_obj__nested_prop3": {"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": + "multi_value_2"} + }) + + # FLATTENING + self.assertEqual(flatten_record(nested_record, max_level=10), + { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj__nested_prop1": "value_1", + "c_obj__nested_prop2": "value_2", + "c_obj__nested_prop3__multi_nested_prop1": "multi_value_1", + "c_obj__nested_prop3__multi_nested_prop2": "multi_value_2" + })