diff --git a/README.md b/README.md
index 7748cb36..8f48679a 100644
--- a/README.md
+++ b/README.md
@@ -121,6 +121,7 @@ Full list of options in `config.json`:
| hard_delete | Boolean | | (Default: False) When `hard_delete` option is true then DELETE SQL commands will be performed in Snowflake to delete rows in tables. It's achieved by continuously checking the `_SDC_DELETED_AT` metadata column sent by the singer tap. Due to deleting rows requires metadata columns, `hard_delete` option automatically enables the `add_metadata_columns` option as well. |
| data_flattening_max_level | Integer | | (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.
When value is 0 (default) then flattening functionality is turned off. |
| primary_key_required | Boolean | | (Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined. |
+| validate_records | Boolean | | (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation. |
### To run tests:
diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py
index 18ec9f9f..51bba10a 100644
--- a/target_snowflake/__init__.py
+++ b/target_snowflake/__init__.py
@@ -32,6 +32,21 @@
MAX_TIME = '23:59:59.999999'
+class RecordValidationException(Exception):
+ """Exception to raise when record validation failed"""
+ pass
+
+
+class InvalidValidationOperationException(Exception):
+ """Exception to raise when internal JSON schema validation process failed"""
+ pass
+
+
+class InvalidTableStructureException(Exception):
+ """Exception to raise when target table structure not compatible with singer messages"""
+ pass
+
+
def float_to_decimal(value):
"""Walk the given data structure and turn all instances of float into double."""
if isinstance(value, float):
@@ -174,15 +189,16 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
adjust_timestamps_in_record(o['record'], schemas[stream])
# Validate record
- try:
- validators[stream].validate(float_to_decimal(o['record']))
- except Exception as ex:
- if type(ex).__name__ == "InvalidOperation":
- logger.error(
- "Data validation failed and cannot load to destination. RECORD: {}\n'multipleOf' "
- "validations that allows long precisions are not supported (i.e. with 15 digits or more). "
- "Try removing 'multipleOf' methods from JSON schema.".format(o['record']))
- raise ex
+ if config.get('validate_records'):
+ try:
+ validators[stream].validate(float_to_decimal(o['record']))
+ except Exception as ex:
+ if type(ex).__name__ == "InvalidOperation":
+ raise InvalidValidationOperationException(
+ f"Data validation failed and cannot load to destination. RECORD: {o['record']}\n"
+ "multipleOf validations that allows long precisions are not supported (i.e. with 15 digits"
+ "or more) Try removing 'multipleOf' methods from JSON schema.")
+ raise RecordValidationException(f"Record does not pass schema validation. RECORD: {o['record']}")
primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record'])
if not primary_key_string:
@@ -266,14 +282,13 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
try:
stream_to_sync[stream].create_schema_if_not_exists()
stream_to_sync[stream].sync_table()
- except Exception as e:
- logger.error("""
+ except Exception:
+ raise InvalidTableStructureException("""
Cannot sync table structure in Snowflake schema: {} .
Try to delete {}.COLUMNS table to reset information_schema cache. Maybe it's outdated.
""".format(
stream_to_sync[stream].schema_name,
stream_to_sync[stream].pipelinewise_schema.upper()))
- raise e
row_count[stream] = 0
total_row_count[stream] = 0
diff --git a/tests/integration/resources/messages-with-invalid-records.json b/tests/integration/resources/messages-with-invalid-records.json
new file mode 100644
index 00000000..de5707fc
--- /dev/null
+++ b/tests/integration/resources/messages-with-invalid-records.json
@@ -0,0 +1,14 @@
+{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_invalid_record"}}
+{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_invalid_record", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]}
+{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_invalid_record", "version": 1}
+{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": "Non Numeric PK", "c_varchar": "Hello World", "c_int": 1}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
+{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 2, "c_varchar": "Hello Asia", "c_int": 2}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
+{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 3, "c_varchar": "Hello Europe", "c_int": 3}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
+{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 4, "c_varchar": "Hello Americas", "c_int": 4}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
+{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 5, "c_varchar": "Hello Africa", "c_int": 5}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
+{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 6, "c_varchar": "Hello Antarctica", "c_int": 6}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
+{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 7, "c_varchar": "Hello Oceania", "c_int": 7}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
+{"type": "RECORD", "stream": "tap_mysql_test-test_table_invalid_record", "record": {"c_pk": 8, "c_varchar": "Hello Australia", "c_int": 8}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
+{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_invalid_record", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}}
+{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_invalid_record", "version": 1}
+{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_table_invalid_record": {"initial_full_table_complete": true}}}}
diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py
index 4fc0c2bf..87708e6b 100644
--- a/tests/integration/test_target_snowflake.py
+++ b/tests/integration/test_target_snowflake.py
@@ -7,6 +7,7 @@
from nose.tools import assert_raises
import target_snowflake
+from target_snowflake import RecordValidationException, InvalidTableStructureException
from target_snowflake.db_sync import DbSync
from snowflake.connector.errors import ProgrammingError
@@ -852,3 +853,17 @@ def test_flush_streams_with_intermediate_flushes_on_all_streams(self, mock_emit_
# Every table should be loaded correctly
self.assert_logical_streams_are_in_snowflake(True)
+
+ def test_record_validation(self):
+ """Test validating records"""
+ tap_lines = test_utils.get_test_tap_lines('messages-with-invalid-records.json')
+
+ # Loading invalid records when record validation enabled should fail at ...
+ self.config['validate_records'] = True
+ with assert_raises(RecordValidationException):
+ self.persist_lines_with_cache(tap_lines)
+
+ # Loading invalid records when record validation disabled should fail at load time
+ self.config['validate_records'] = False
+ with assert_raises(ProgrammingError):
+ self.persist_lines_with_cache(tap_lines)
diff --git a/tests/integration/utils.py b/tests/integration/utils.py
index 41b72922..b9131404 100644
--- a/tests/integration/utils.py
+++ b/tests/integration/utils.py
@@ -39,6 +39,7 @@ def get_db_config():
config['add_metadata_columns'] = None
config['hard_delete'] = None
config['flush_all_streams'] = None
+ config['validate_records'] = None
return config