Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
json dump value in case of json column (#58)
Browse files Browse the repository at this point in the history
* json dump value in case of json column
  • Loading branch information
ivan-transferwise authored Jan 13, 2020
1 parent 8d31e29 commit 7f28237
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name="pipelinewise-target-snowflake",
version="1.2.0",
version="1.2.1",
description="Singer.io target for loading data to Snowflake - PipelineWise compatible",
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
22 changes: 16 additions & 6 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,25 @@ def flatten_schema(d, parent_key=[], sep='__', level=0, max_level=0):

return dict(sorted_items)

def _should_json_dump_value(key, value, flatten_schema=None):
if isinstance(value, (dict, list)):
return True

def flatten_record(d, parent_key=[], sep='__', level=0, max_level=0):
if flatten_schema and key in flatten_schema and 'type' in flatten_schema[key] and set(flatten_schema[key]['type']) == {'null', 'object', 'array'}:
return True

return False

#pylint: disable-msg=too-many-arguments
def flatten_record(d, flatten_schema=None, parent_key=[], sep='__', level=0, max_level=0):
items = []
for k, v in d.items():
new_key = flatten_key(k, parent_key, sep)
if isinstance(v, collections.MutableMapping) and level < max_level:
items.extend(flatten_record(v, parent_key + [k], sep=sep, level=level+1, max_level=max_level).items())
items.extend(flatten_record(v, flatten_schema, parent_key + [k], sep=sep, level=level+1, max_level=max_level).items())
else:
items.append((new_key, json.dumps(v) if type(v) is list or type(v) is dict else v))
items.append((new_key, json.dumps(v) if _should_json_dump_value(k, v, flatten_schema) else v))

return dict(items)


Expand Down Expand Up @@ -332,7 +342,7 @@ def table_name(self, stream_name, is_temporary, without_schema = False):
def record_primary_key_string(self, record):
if len(self.stream_schema_message['key_properties']) == 0:
return None
flatten = flatten_record(record, max_level=self.data_flattening_max_level)
flatten = flatten_record(record, self.flatten_schema, max_level=self.data_flattening_max_level)
try:
key_props = [str(flatten[p]) for p in self.stream_schema_message['key_properties']]
except Exception as exc:
Expand All @@ -341,7 +351,8 @@ def record_primary_key_string(self, record):
return ','.join(key_props)

def record_to_csv_line(self, record):
flatten = flatten_record(record, max_level=self.data_flattening_max_level)
flatten = flatten_record(record, self.flatten_schema, max_level=self.data_flattening_max_level)

return ','.join(
[
json.dumps(flatten[name], ensure_ascii=False) if name in flatten and (flatten[name] == 0 or flatten[name]) else ''
Expand Down Expand Up @@ -691,4 +702,3 @@ def sync_table(self):
else:
logger.info("Table '{}' exists".format(table_name_with_schema))
self.update_columns()

42 changes: 42 additions & 0 deletions tests/unit/test_db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,45 @@ def test_flatten_record(self):
"c_obj__nested_prop3__multi_nested_prop1": "multi_value_1",
"c_obj__nested_prop3__multi_nested_prop2": "multi_value_2"
})

def test_flatten_record_with_flatten_schema(self):
flatten_record = db_sync.flatten_record

flatten_schema = {
"id": {
"type": [
"object",
"array",
"null"
]
}
}

test_cases = [
(
True,
{
"id": 1,
"data": "xyz"
},
{
"id": "1",
"data": "xyz"
}
),
(
False,
{
"id": 1,
"data": "xyz"
},
{
"id": 1,
"data": "xyz"
}
)
]

for idx, (should_use_flatten_schema, record, expected_output) in enumerate(test_cases):
output = flatten_record(record, flatten_schema if should_use_flatten_schema else None)
self.assertEqual(output, expected_output, f"Test {idx} failed. Testcase: {test_cases[idx]}")

0 comments on commit 7f28237

Please sign in to comment.