From 5d917caf0564ade283f068a5e30b9358ad8c8a4c Mon Sep 17 00:00:00 2001 From: Samira-El <54845154+Samira-El@users.noreply.github.com> Date: Mon, 9 Dec 2019 09:30:45 +0200 Subject: [PATCH] 1.1.7: review all dates/times in records and adjust them of they're not valid before insert/update (#48) --- setup.py | 2 +- target_snowflake/__init__.py | 47 +++++++++++++++++++++++--- tests/unit/test_target_snowflake.py | 51 ++++++++++++++++++++++++++++- 3 files changed, 93 insertions(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 0f4d0993..cac91e0d 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name="pipelinewise-target-snowflake", - version="1.1.6", + version="1.1.7", description="Singer.io target for loading data to Snowflake - PipelineWise compatible", long_description=long_description, long_description_content_type='text/markdown', diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index e3f43bb4..18ec9f9f 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -6,11 +6,14 @@ import os import sys import copy +import singer + from datetime import datetime from decimal import Decimal from tempfile import NamedTemporaryFile, mkstemp - -import singer +from typing import Dict +from dateutil import parser +from dateutil.parser import ParserError from joblib import Parallel, delayed, parallel_backend from jsonschema import Draft4Validator, FormatChecker @@ -22,6 +25,12 @@ DEFAULT_PARALLELISM = 0 # 0 The number of threads used to flush tables DEFAULT_MAX_PARALLELISM = 16 # Don't use more than this number of threads by default when flushing streams in parallel +# max timestamp/datetime supported in SF, used to reset all invalid dates that are beyond this value +MAX_TIMESTAMP = '9999-12-31 23:59:59.999999' + +# max time supported in SF, used to reset all invalid times that are beyond this value +MAX_TIME = '23:59:59.999999' + def float_to_decimal(value): """Walk the given data structure and turn all instances of float into double.""" @@ -98,6 +107,33 @@ def load_information_schema_cache(config): return information_schema_cache +def adjust_timestamps_in_record(record: Dict, schema: Dict) -> None: + """ + Goes through every field that is of type date/datetime/time and if its value is out of range, + resets it to MAX value accordingly + Args: + record: record containing properties and values + schema: json schema that has types of each property + """ + # creating this internal function to avoid duplicating code and too many nested blocks. + def reset_new_value(record: Dict, key: str, format: str): + try: + parser.parse(record[key]) + except ParserError: + record[key] = MAX_TIMESTAMP if format != 'time' else MAX_TIME + + for key, value in record.items(): + if value is not None and key in schema['properties']: + if 'anyOf' in schema['properties'][key]: + for type_dict in schema['properties'][key]['anyOf']: + if 'string' in type_dict['type'] and type_dict.get('format', None) in {'date-time', 'time', 'date'}: + reset_new_value(record, key, type_dict['format']) + break + else: + if 'string' in schema['properties'][key]['type'] and \ + schema['properties'][key].get('format', None) in {'date-time', 'time', 'date'}: + reset_new_value(record, key, schema['properties'][key]['format']) + # pylint: disable=too-many-locals,too-many-branches,too-many-statements def persist_lines(config, lines, information_schema_cache=None) -> None: state = None @@ -135,6 +171,8 @@ def persist_lines(config, lines, information_schema_cache=None) -> None: # Get schema for this record's stream stream = o['stream'] + adjust_timestamps_in_record(o['record'], schemas[stream]) + # Validate record try: validators[stream].validate(float_to_decimal(o['record'])) @@ -190,9 +228,8 @@ def persist_lines(config, lines, information_schema_cache=None) -> None: stream = o['stream'] - schemas[stream] = o - schema = float_to_decimal(o['schema']) - validators[stream] = Draft4Validator(schema, format_checker=FormatChecker()) + schemas[stream] = float_to_decimal(o['schema']) + validators[stream] = Draft4Validator(schemas[stream], format_checker=FormatChecker()) # flush records from previous stream SCHEMA # if same stream has been encountered again, it means the schema might have been altered diff --git a/tests/unit/test_target_snowflake.py b/tests/unit/test_target_snowflake.py index bab65c20..668e82c0 100644 --- a/tests/unit/test_target_snowflake.py +++ b/tests/unit/test_target_snowflake.py @@ -1,6 +1,5 @@ import unittest import os -import logging from unittest.mock import patch @@ -31,3 +30,53 @@ def test_persist_lines_with_40_records_and_batch_size_of_20_expect_flushing_once target_snowflake.persist_lines(self.config, lines) flush_streams_mock.assert_called_once() + + def test_adjust_timestamps_in_record(self): + record = { + 'key1': '1', + 'key2': '2030-01-22', + 'key3': '10000-01-22 12:04:22', + 'key4': '25:01:01', + 'key5': 'I\'m good', + 'key6': None + } + + schema = { + 'properties': { + 'key1': { + 'type': ['null', 'string', 'integer'], + }, + 'key2': { + 'anyOf': [ + {'type': ['null', 'string'], 'format': 'date'}, + {'type': ['null', 'string']} + ] + }, + 'key3': { + 'type': ['null', 'string'], 'format': 'date-time', + }, + 'key4': { + 'anyOf': [ + {'type': ['null', 'string'], 'format': 'time'}, + {'type': ['null', 'string']} + ] + }, + 'key5': { + 'type': ['null', 'string'], + }, + 'key6': { + 'type': ['null', 'string'], 'format': 'time', + }, + } + } + + target_snowflake.adjust_timestamps_in_record(record, schema) + + self.assertDictEqual({ + 'key1': '1', + 'key2': '2030-01-22', + 'key3': '9999-12-31 23:59:59.999999', + 'key4': '23:59:59.999999', + 'key5': 'I\'m good', + 'key6': None + }, record)