Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Add optional validate_records option
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti authored Jan 7, 2020
1 parent 0a200a3 commit 5f6ba6a
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 12 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ Full list of options in `config.json`:
| hard_delete | Boolean | | (Default: False) When `hard_delete` option is true then DELETE SQL commands will be performed in Snowflake to delete rows in tables. It's achieved by continuously checking the `_SDC_DELETED_AT` metadata column sent by the singer tap. Due to deleting rows requires metadata columns, `hard_delete` option automatically enables the `add_metadata_columns` option as well. |
| data_flattening_max_level | Integer | | (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.<br><br>When value is 0 (default) then flattening functionality is turned off. |
| primary_key_required | Boolean | | (Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined. |
| validate_records | Boolean | | (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation. |

### To run tests:

Expand Down
39 changes: 27 additions & 12 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@
MAX_TIME = '23:59:59.999999'


class RecordValidationException(Exception):
"""Exception to raise when record validation failed"""
pass


class InvalidValidationOperationException(Exception):
"""Exception to raise when internal JSON schema validation process failed"""
pass


class InvalidTableStructureException(Exception):
"""Exception to raise when target table structure not compatible with singer messages"""
pass


def float_to_decimal(value):
"""Walk the given data structure and turn all instances of float into double."""
if isinstance(value, float):
Expand Down Expand Up @@ -174,15 +189,16 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
adjust_timestamps_in_record(o['record'], schemas[stream])

# Validate record
try:
validators[stream].validate(float_to_decimal(o['record']))
except Exception as ex:
if type(ex).__name__ == "InvalidOperation":
logger.error(
"Data validation failed and cannot load to destination. RECORD: {}\n'multipleOf' "
"validations that allows long precisions are not supported (i.e. with 15 digits or more). "
"Try removing 'multipleOf' methods from JSON schema.".format(o['record']))
raise ex
if config.get('validate_records'):
try:
validators[stream].validate(float_to_decimal(o['record']))
except Exception as ex:
if type(ex).__name__ == "InvalidOperation":
raise InvalidValidationOperationException(
f"Data validation failed and cannot load to destination. RECORD: {o['record']}\n"
"multipleOf validations that allows long precisions are not supported (i.e. with 15 digits"
"or more) Try removing 'multipleOf' methods from JSON schema.")
raise RecordValidationException(f"Record does not pass schema validation. RECORD: {o['record']}")

primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record'])
if not primary_key_string:
Expand Down Expand Up @@ -266,14 +282,13 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
try:
stream_to_sync[stream].create_schema_if_not_exists()
stream_to_sync[stream].sync_table()
except Exception as e:
logger.error("""
except Exception:
raise InvalidTableStructureException("""
Cannot sync table structure in Snowflake schema: {} .
Try to delete {}.COLUMNS table to reset information_schema cache. Maybe it's outdated.
""".format(
stream_to_sync[stream].schema_name,
stream_to_sync[stream].pipelinewise_schema.upper()))
raise e

row_count[stream] = 0
total_row_count[stream] = 0
Expand Down
14 changes: 14 additions & 0 deletions tests/integration/resources/messages-with-invalid-records.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_invalid_record"}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_invalid_record", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_invalid_record", "version": 1}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": "Non Numeric PK", "c_varchar": "Hello World", "c_int": 1}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 2, "c_varchar": "Hello Asia", "c_int": 2}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 3, "c_varchar": "Hello Europe", "c_int": 3}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 4, "c_varchar": "Hello Americas", "c_int": 4}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 5, "c_varchar": "Hello Africa", "c_int": 5}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 6, "c_varchar": "Hello Antarctica", "c_int": 6}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 7, "c_varchar": "Hello Oceania", "c_int": 7}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 8, "c_varchar": "Hello Australia", "c_int": 8}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_invalid_record", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_invalid_record", "version": 1}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_table_invalid_record": {"initial_full_table_complete": true}}}}
15 changes: 15 additions & 0 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from nose.tools import assert_raises

import target_snowflake
from target_snowflake import RecordValidationException, InvalidTableStructureException
from target_snowflake.db_sync import DbSync

from snowflake.connector.errors import ProgrammingError
Expand Down Expand Up @@ -852,3 +853,17 @@ def test_flush_streams_with_intermediate_flushes_on_all_streams(self, mock_emit_

# Every table should be loaded correctly
self.assert_logical_streams_are_in_snowflake(True)

def test_record_validation(self):
"""Test validating records"""
tap_lines = test_utils.get_test_tap_lines('messages-with-invalid-records.json')

# Loading invalid records when record validation enabled should fail at ...
self.config['validate_records'] = True
with assert_raises(RecordValidationException):
self.persist_lines_with_cache(tap_lines)

# Loading invalid records when record validation disabled should fail at load time
self.config['validate_records'] = False
with assert_raises(ProgrammingError):
self.persist_lines_with_cache(tap_lines)
1 change: 1 addition & 0 deletions tests/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def get_db_config():
config['add_metadata_columns'] = None
config['hard_delete'] = None
config['flush_all_streams'] = None
config['validate_records'] = None

return config

Expand Down

0 comments on commit 5f6ba6a

Please sign in to comment.