diff --git a/setup.py b/setup.py index 82e81429..41b4ed45 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name="pipelinewise-target-snowflake", - version="1.2.0", + version="1.2.1", description="Singer.io target for loading data to Snowflake - PipelineWise compatible", long_description=long_description, long_description_content_type='text/markdown', diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 143d0ae1..c0c6da38 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -144,15 +144,25 @@ def flatten_schema(d, parent_key=[], sep='__', level=0, max_level=0): return dict(sorted_items) +def _should_json_dump_value(key, value, flatten_schema=None): + if isinstance(value, (dict, list)): + return True -def flatten_record(d, parent_key=[], sep='__', level=0, max_level=0): + if flatten_schema and key in flatten_schema and 'type' in flatten_schema[key] and set(flatten_schema[key]['type']) == {'null', 'object', 'array'}: + return True + + return False + +#pylint: disable-msg=too-many-arguments +def flatten_record(d, flatten_schema=None, parent_key=[], sep='__', level=0, max_level=0): items = [] for k, v in d.items(): new_key = flatten_key(k, parent_key, sep) if isinstance(v, collections.MutableMapping) and level < max_level: - items.extend(flatten_record(v, parent_key + [k], sep=sep, level=level+1, max_level=max_level).items()) + items.extend(flatten_record(v, flatten_schema, parent_key + [k], sep=sep, level=level+1, max_level=max_level).items()) else: - items.append((new_key, json.dumps(v) if type(v) is list or type(v) is dict else v)) + items.append((new_key, json.dumps(v) if _should_json_dump_value(k, v, flatten_schema) else v)) + return dict(items) @@ -332,7 +342,7 @@ def table_name(self, stream_name, is_temporary, without_schema = False): def record_primary_key_string(self, record): if len(self.stream_schema_message['key_properties']) == 0: return None - flatten = flatten_record(record, max_level=self.data_flattening_max_level) + flatten = flatten_record(record, self.flatten_schema, max_level=self.data_flattening_max_level) try: key_props = [str(flatten[p]) for p in self.stream_schema_message['key_properties']] except Exception as exc: @@ -341,7 +351,8 @@ def record_primary_key_string(self, record): return ','.join(key_props) def record_to_csv_line(self, record): - flatten = flatten_record(record, max_level=self.data_flattening_max_level) + flatten = flatten_record(record, self.flatten_schema, max_level=self.data_flattening_max_level) + return ','.join( [ json.dumps(flatten[name], ensure_ascii=False) if name in flatten and (flatten[name] == 0 or flatten[name]) else '' @@ -691,4 +702,3 @@ def sync_table(self): else: logger.info("Table '{}' exists".format(table_name_with_schema)) self.update_columns() - diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index aa5905b6..ca241c4a 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -313,3 +313,45 @@ def test_flatten_record(self): "c_obj__nested_prop3__multi_nested_prop1": "multi_value_1", "c_obj__nested_prop3__multi_nested_prop2": "multi_value_2" }) + + def test_flatten_record_with_flatten_schema(self): + flatten_record = db_sync.flatten_record + + flatten_schema = { + "id": { + "type": [ + "object", + "array", + "null" + ] + } + } + + test_cases = [ + ( + True, + { + "id": 1, + "data": "xyz" + }, + { + "id": "1", + "data": "xyz" + } + ), + ( + False, + { + "id": 1, + "data": "xyz" + }, + { + "id": 1, + "data": "xyz" + } + ) + ] + + for idx, (should_use_flatten_schema, record, expected_output) in enumerate(test_cases): + output = flatten_record(record, flatten_schema if should_use_flatten_schema else None) + self.assertEqual(output, expected_output, f"Test {idx} failed. Testcase: {test_cases[idx]}")