Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
[AP-1052] Switch to ujson (#204)
Browse files Browse the repository at this point in the history
* Switch to ujson

* Catch the correct exception on json parsing failure

* Remove not required requirement

* Fix unit test

* Update db_sync.py

* Update setup.py

Co-authored-by: Samira El Aabidi <[email protected]>
Co-authored-by: Jeet Parekh <[email protected]>
  • Loading branch information
3 people authored May 5, 2022
1 parent 36fa521 commit 935b857
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 27 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
'snowflake-connector-python[pandas]==2.7.*',
'inflection==0.5.1',
'joblib==1.1.0',
'ujson==5.2.0',
'boto3==1.21',
],
extras_require={
Expand Down
10 changes: 5 additions & 5 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import argparse
import io
import json
import ujson
import logging
import os
import sys
Expand Down Expand Up @@ -55,7 +55,7 @@ def add_metadata_columns_to_schema(schema_message):
def emit_state(state: Optional[Dict]):
"""Print state to stdout"""
if state is not None:
line = json.dumps(state)
line = ujson.dumps(state)
LOGGER.info('Emitting state %s', line)
sys.stdout.write(f"{line}\n")
sys.stdout.flush()
Expand Down Expand Up @@ -120,8 +120,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
# Loop over lines from stdin
for line in lines:
try:
o = json.loads(line)
except json.decoder.JSONDecodeError:
o = ujson.loads(line)
except ValueError:
LOGGER.error('Unable to parse:\n%s', line)
raise

Expand Down Expand Up @@ -514,7 +514,7 @@ def main():

if args.config:
with open(args.config, encoding="utf8") as config_input:
config = json.load(config_input)
config = ujson.load(config_input)
else:
config = {}

Expand Down
10 changes: 5 additions & 5 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import json
import ujson
import sys
import snowflake.connector
import re
Expand Down Expand Up @@ -156,9 +156,9 @@ def create_query_tag(query_tag_pattern: str, database: str = None, schema: str =

# replace tokens, taking care of json formatted value compatibility
for k, v in {
'{{database}}': json.dumps(database.strip('"')).strip('"') if database else None,
'{{schema}}': json.dumps(schema.strip('"')).strip('"') if schema else None,
'{{table}}': json.dumps(table.strip('"')).strip('"') if table else None
'{{database}}': ujson.dumps(database.strip('"')).strip('"') if database else None,
'{{schema}}': ujson.dumps(schema.strip('"')).strip('"') if schema else None,
'{{table}}': ujson.dumps(table.strip('"')).strip('"') if table else None
}.items():
if k in query_tag:
query_tag = query_tag.replace(k, v or '')
Expand Down Expand Up @@ -490,7 +490,7 @@ def load_file(self, s3_key, count, size_bytes):
self.logger.info(
'Loading into %s: %s',
self.table_name(stream, False),
json.dumps({'inserts': inserts, 'updates': updates, 'size_bytes': size_bytes})
ujson.dumps({'inserts': inserts, 'updates': updates, 'size_bytes': size_bytes})
)

def _load_file_merge(self, s3_key, stream, columns_with_trans) -> Tuple[int, int]:
Expand Down
4 changes: 2 additions & 2 deletions target_snowflake/file_formats/csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""CSV file format functions"""
import gzip
import json
import ujson
import os

from typing import Callable, Dict, List
Expand Down Expand Up @@ -63,7 +63,7 @@ def record_to_csv_line(record: dict,

return ','.join(
[
json.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and (
ujson.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and (
flatten_record[column] == 0 or flatten_record[column]) else ''
for column in schema
]
Expand Down
4 changes: 2 additions & 2 deletions target_snowflake/flattening.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import collections
import inflection
import itertools
import json
import ujson
import re


Expand Down Expand Up @@ -108,6 +108,6 @@ def flatten_record(d, schema=None, parent_key=None, sep='__', level=0, max_level
items.extend(flatten_record(v, schema, parent_key + [k], sep=sep, level=level + 1,
max_level=max_level).items())
else:
items.append((new_key, json.dumps(v) if _should_json_dump_value(k, v, schema) else v))
items.append((new_key, ujson.dumps(v) if _should_json_dump_value(k, v, schema) else v))

return dict(items)
4 changes: 2 additions & 2 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
import gzip
import json
import ujson
import tempfile
import unittest
import os
Expand Down Expand Up @@ -296,7 +296,7 @@ def assert_binary_data_are_in_snowflake(self, table_name, should_metadata_column
def test_invalid_json(self):
"""Receiving invalid JSONs should raise an exception"""
tap_lines = test_utils.get_test_tap_lines('invalid-json.json')
with self.assertRaises(json.decoder.JSONDecodeError):
with self.assertRaises(ValueError):
self.persist_lines_with_cache(tap_lines)

def test_message_order(self):
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_db_sync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import json
import ujson
import unittest

from unittest.mock import patch, call
Expand Down Expand Up @@ -166,7 +166,7 @@ def test_create_query_tag(self):
schema='test_schema',
table='test_table')
# Load the generated JSON formatted query tag to make sure it's a valid JSON
self.assertEqual(json.loads(json_query_tag), {
self.assertEqual(ujson.loads(json_query_tag), {
'database': 'test_database',
'schema': 'test_schema',
'table': 'test_table'
Expand All @@ -180,7 +180,7 @@ def test_create_query_tag(self):
table='test"table')

# Load the generated JSON formatted query tag to make sure it's a valid JSON
self.assertEqual(json.loads(json_query_tag), {
self.assertEqual(ujson.loads(json_query_tag), {
'database': 'test"database',
'schema': 'test"schema',
'table': 'test"table'
Expand All @@ -193,7 +193,7 @@ def test_create_query_tag(self):
schema='"test_schema"',
table='"test_table"')
# Load the generated JSON formatted query tag to make sure it's a valid JSON
self.assertEqual(json.loads(json_query_tag), {
self.assertEqual(ujson.loads(json_query_tag), {
'database': 'test_database',
'schema': 'test_schema',
'table': 'test_table'
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/test_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ def test_flatten_record(self):
"c_pk": 1,
"c_varchar": "1",
"c_int": 1,
"c_obj": '{"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": {'
'"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}}'
"c_obj": '{"nested_prop1":"value_1","nested_prop2":"value_2","nested_prop3":{'
'"multi_nested_prop1":"multi_value_1","multi_nested_prop2":"multi_value_2"}}'
})

# NO FLATTENING
Expand All @@ -142,8 +142,8 @@ def test_flatten_record(self):
"c_pk": 1,
"c_varchar": "1",
"c_int": 1,
"c_obj": '{"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": {'
'"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}}'
"c_obj": '{"nested_prop1":"value_1","nested_prop2":"value_2","nested_prop3":{'
'"multi_nested_prop1":"multi_value_1","multi_nested_prop2":"multi_value_2"}}'
})

# SEMI FLATTENING
Expand All @@ -155,7 +155,7 @@ def test_flatten_record(self):
"c_int": 1,
"c_obj__nested_prop1": "value_1",
"c_obj__nested_prop2": "value_2",
"c_obj__nested_prop3": '{"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": '
"c_obj__nested_prop3": '{"multi_nested_prop1":"multi_value_1","multi_nested_prop2":'
'"multi_value_2"}'
})

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,5 @@ def test_persist_lines_with_only_state_messages(self, dbSync_mock, flush_streams

self.assertEqual(
buf.getvalue().strip(),
'{"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", '
'"replication_key_value": 100, "version": 1}}}')
'{"bookmarks":{"tap_mysql_test-test_simple_table":{"replication_key":"id",'
'"replication_key_value":100,"version":1}}}')

0 comments on commit 935b857

Please sign in to comment.