From 935b857702d31d3fe23538d32b7a41378d96b05c Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Thu, 5 May 2022 11:55:13 +0200 Subject: [PATCH] [AP-1052] Switch to ujson (#204) * Switch to ujson * Catch the correct exception on json parsing failure * Remove not required requirement * Fix unit test * Update db_sync.py * Update setup.py Co-authored-by: Samira El Aabidi <54845154+Samira-El@users.noreply.github.com> Co-authored-by: Jeet Parekh <94441288+jeet-parekh-wise@users.noreply.github.com> --- setup.py | 1 + target_snowflake/__init__.py | 10 +++++----- target_snowflake/db_sync.py | 10 +++++----- target_snowflake/file_formats/csv.py | 4 ++-- target_snowflake/flattening.py | 4 ++-- tests/integration/test_target_snowflake.py | 4 ++-- tests/unit/test_db_sync.py | 8 ++++---- tests/unit/test_flattening.py | 10 +++++----- tests/unit/test_target_snowflake.py | 4 ++-- 9 files changed, 28 insertions(+), 27 deletions(-) diff --git a/setup.py b/setup.py index b004d528..15af10ae 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,7 @@ 'snowflake-connector-python[pandas]==2.7.*', 'inflection==0.5.1', 'joblib==1.1.0', + 'ujson==5.2.0', 'boto3==1.21', ], extras_require={ diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index 1abbaebe..7754421a 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -2,7 +2,7 @@ import argparse import io -import json +import ujson import logging import os import sys @@ -55,7 +55,7 @@ def add_metadata_columns_to_schema(schema_message): def emit_state(state: Optional[Dict]): """Print state to stdout""" if state is not None: - line = json.dumps(state) + line = ujson.dumps(state) LOGGER.info('Emitting state %s', line) sys.stdout.write(f"{line}\n") sys.stdout.flush() @@ -120,8 +120,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT # Loop over lines from stdin for line in lines: try: - o = json.loads(line) - except json.decoder.JSONDecodeError: + o = ujson.loads(line) + except ValueError: LOGGER.error('Unable to parse:\n%s', line) raise @@ -514,7 +514,7 @@ def main(): if args.config: with open(args.config, encoding="utf8") as config_input: - config = json.load(config_input) + config = ujson.load(config_input) else: config = {} diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 64efcac9..d9959d93 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -1,4 +1,4 @@ -import json +import ujson import sys import snowflake.connector import re @@ -156,9 +156,9 @@ def create_query_tag(query_tag_pattern: str, database: str = None, schema: str = # replace tokens, taking care of json formatted value compatibility for k, v in { - '{{database}}': json.dumps(database.strip('"')).strip('"') if database else None, - '{{schema}}': json.dumps(schema.strip('"')).strip('"') if schema else None, - '{{table}}': json.dumps(table.strip('"')).strip('"') if table else None + '{{database}}': ujson.dumps(database.strip('"')).strip('"') if database else None, + '{{schema}}': ujson.dumps(schema.strip('"')).strip('"') if schema else None, + '{{table}}': ujson.dumps(table.strip('"')).strip('"') if table else None }.items(): if k in query_tag: query_tag = query_tag.replace(k, v or '') @@ -490,7 +490,7 @@ def load_file(self, s3_key, count, size_bytes): self.logger.info( 'Loading into %s: %s', self.table_name(stream, False), - json.dumps({'inserts': inserts, 'updates': updates, 'size_bytes': size_bytes}) + ujson.dumps({'inserts': inserts, 'updates': updates, 'size_bytes': size_bytes}) ) def _load_file_merge(self, s3_key, stream, columns_with_trans) -> Tuple[int, int]: diff --git a/target_snowflake/file_formats/csv.py b/target_snowflake/file_formats/csv.py index 42d76ee4..c7379356 100644 --- a/target_snowflake/file_formats/csv.py +++ b/target_snowflake/file_formats/csv.py @@ -1,6 +1,6 @@ """CSV file format functions""" import gzip -import json +import ujson import os from typing import Callable, Dict, List @@ -63,7 +63,7 @@ def record_to_csv_line(record: dict, return ','.join( [ - json.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and ( + ujson.dumps(flatten_record[column], ensure_ascii=False) if column in flatten_record and ( flatten_record[column] == 0 or flatten_record[column]) else '' for column in schema ] diff --git a/target_snowflake/flattening.py b/target_snowflake/flattening.py index a6536ecc..b424efb0 100644 --- a/target_snowflake/flattening.py +++ b/target_snowflake/flattening.py @@ -1,7 +1,7 @@ import collections import inflection import itertools -import json +import ujson import re @@ -108,6 +108,6 @@ def flatten_record(d, schema=None, parent_key=None, sep='__', level=0, max_level items.extend(flatten_record(v, schema, parent_key + [k], sep=sep, level=level + 1, max_level=max_level).items()) else: - items.append((new_key, json.dumps(v) if _should_json_dump_value(k, v, schema) else v)) + items.append((new_key, ujson.dumps(v) if _should_json_dump_value(k, v, schema) else v)) return dict(items) diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 791acda2..fca9a7f3 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1,6 +1,6 @@ import datetime import gzip -import json +import ujson import tempfile import unittest import os @@ -296,7 +296,7 @@ def assert_binary_data_are_in_snowflake(self, table_name, should_metadata_column def test_invalid_json(self): """Receiving invalid JSONs should raise an exception""" tap_lines = test_utils.get_test_tap_lines('invalid-json.json') - with self.assertRaises(json.decoder.JSONDecodeError): + with self.assertRaises(ValueError): self.persist_lines_with_cache(tap_lines) def test_message_order(self): diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index e63c86a2..a17368b7 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -1,4 +1,4 @@ -import json +import ujson import unittest from unittest.mock import patch, call @@ -166,7 +166,7 @@ def test_create_query_tag(self): schema='test_schema', table='test_table') # Load the generated JSON formatted query tag to make sure it's a valid JSON - self.assertEqual(json.loads(json_query_tag), { + self.assertEqual(ujson.loads(json_query_tag), { 'database': 'test_database', 'schema': 'test_schema', 'table': 'test_table' @@ -180,7 +180,7 @@ def test_create_query_tag(self): table='test"table') # Load the generated JSON formatted query tag to make sure it's a valid JSON - self.assertEqual(json.loads(json_query_tag), { + self.assertEqual(ujson.loads(json_query_tag), { 'database': 'test"database', 'schema': 'test"schema', 'table': 'test"table' @@ -193,7 +193,7 @@ def test_create_query_tag(self): schema='"test_schema"', table='"test_table"') # Load the generated JSON formatted query tag to make sure it's a valid JSON - self.assertEqual(json.loads(json_query_tag), { + self.assertEqual(ujson.loads(json_query_tag), { 'database': 'test_database', 'schema': 'test_schema', 'table': 'test_table' diff --git a/tests/unit/test_flattening.py b/tests/unit/test_flattening.py index 99f47d4c..2c24f79c 100644 --- a/tests/unit/test_flattening.py +++ b/tests/unit/test_flattening.py @@ -131,8 +131,8 @@ def test_flatten_record(self): "c_pk": 1, "c_varchar": "1", "c_int": 1, - "c_obj": '{"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": {' - '"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}}' + "c_obj": '{"nested_prop1":"value_1","nested_prop2":"value_2","nested_prop3":{' + '"multi_nested_prop1":"multi_value_1","multi_nested_prop2":"multi_value_2"}}' }) # NO FLATTENING @@ -142,8 +142,8 @@ def test_flatten_record(self): "c_pk": 1, "c_varchar": "1", "c_int": 1, - "c_obj": '{"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": {' - '"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}}' + "c_obj": '{"nested_prop1":"value_1","nested_prop2":"value_2","nested_prop3":{' + '"multi_nested_prop1":"multi_value_1","multi_nested_prop2":"multi_value_2"}}' }) # SEMI FLATTENING @@ -155,7 +155,7 @@ def test_flatten_record(self): "c_int": 1, "c_obj__nested_prop1": "value_1", "c_obj__nested_prop2": "value_2", - "c_obj__nested_prop3": '{"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": ' + "c_obj__nested_prop3": '{"multi_nested_prop1":"multi_value_1","multi_nested_prop2":' '"multi_value_2"}' }) diff --git a/tests/unit/test_target_snowflake.py b/tests/unit/test_target_snowflake.py index d3eb7637..63e34b27 100644 --- a/tests/unit/test_target_snowflake.py +++ b/tests/unit/test_target_snowflake.py @@ -172,5 +172,5 @@ def test_persist_lines_with_only_state_messages(self, dbSync_mock, flush_streams self.assertEqual( buf.getvalue().strip(), - '{"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", ' - '"replication_key_value": 100, "version": 1}}}') + '{"bookmarks":{"tap_mysql_test-test_simple_table":{"replication_key":"id",' + '"replication_key_value":100,"version":1}}}')