From 56c302849180e565716de30716757435ffa8b6c1 Mon Sep 17 00:00:00 2001 From: Samira-El <54845154+Samira-El@users.noreply.github.com> Date: Fri, 25 Oct 2019 17:54:25 +0300 Subject: [PATCH] AP-389 fix last flush condition and incremental record count (#40) * fix last flush condition * 1.1.2: increment record count only when encountering a new PK in the current batch * unit test * update streams_to_flush variable types --- setup.py | 2 +- target_snowflake/__init__.py | 39 +-- tests/unit/resources/logical-streams.json | 59 +++++ tests/unit/test_db_sync.py | 281 ++++++++++++++++++++++ tests/unit/test_target_snowflake.py | 33 +++ tests/unit/test_unit.py | 277 --------------------- 6 files changed, 395 insertions(+), 296 deletions(-) create mode 100644 tests/unit/resources/logical-streams.json create mode 100644 tests/unit/test_db_sync.py create mode 100644 tests/unit/test_target_snowflake.py delete mode 100644 tests/unit/test_unit.py diff --git a/setup.py b/setup.py index 4a1b5dfd..3fda34f8 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name="pipelinewise-target-snowflake", - version="1.1.1", + version="1.1.2", description="Singer.io target for loading data to Snowflake - PipelineWise compatible", long_description=long_description, long_description_content_type='text/markdown', diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index 4bcd51e6..4f6eab9b 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -6,10 +6,9 @@ import os import sys import copy -import tempfile from datetime import datetime from decimal import Decimal -from tempfile import NamedTemporaryFile +from tempfile import NamedTemporaryFile, mkstemp import singer from joblib import Parallel, delayed, parallel_backend @@ -23,6 +22,7 @@ DEFAULT_PARALLELISM = 0 # 0 The number of threads used to flush tables DEFAULT_MAX_PARALLELISM = 16 # Don't use more than this number of threads by default when flushing streams in parallel + def float_to_decimal(value): """Walk the given data structure and turn all instances of float into double.""" if isinstance(value, float): @@ -141,8 +141,9 @@ def persist_lines(config, lines, information_schema_cache=None) -> None: except Exception as ex: if type(ex).__name__ == "InvalidOperation": logger.error( - "Data validation failed and cannot load to destination. RECORD: {}\n'multipleOf' validations that allows long precisions are not supported (i.e. with 15 digits or more). Try removing 'multipleOf' methods from JSON schema." - .format(o['record'])) + "Data validation failed and cannot load to destination. RECORD: {}\n'multipleOf' " + "validations that allows long precisions are not supported (i.e. with 15 digits or more). " + "Try removing 'multipleOf' methods from JSON schema.".format(o['record'])) raise ex primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record']) @@ -152,9 +153,10 @@ def persist_lines(config, lines, information_schema_cache=None) -> None: if stream not in records_to_load: records_to_load[stream] = {} - # increment row count with every record line - row_count[stream] += 1 - total_row_count[stream] += 1 + # increment row count only when a new PK is encountered in the current batch + if primary_key_string not in records_to_load[stream]: + row_count[stream] += 1 + total_row_count[stream] += 1 # append record if config.get('add_metadata_columns') or config.get('hard_delete'): @@ -264,6 +266,7 @@ def persist_lines(config, lines, information_schema_cache=None) -> None: # emit latest state emit_state(copy.deepcopy(flushed_state)) + # pylint: disable=too-many-arguments def flush_streams( streams, @@ -301,9 +304,9 @@ def flush_streams( # Select the required streams to flush if filter_streams: - streams_to_flush = {k: v for k, v in streams.items() if k in filter_streams} + streams_to_flush = filter_streams else: - streams_to_flush = streams + streams_to_flush = streams.keys() # Single-host, thread-based parallelism with parallel_backend('threading', n_jobs=parallelism): @@ -313,14 +316,14 @@ def flush_streams( row_count=row_count, db_sync=stream_to_sync[stream], delete_rows=config.get('hard_delete') - ) for (stream) in streams_to_flush.keys()) + ) for stream in streams_to_flush) # reset flushed stream records to empty to avoid flushing same records - # Update flushed streams - if filter_streams: - for stream in streams_to_flush.keys(): - streams[stream] = {} + for stream in streams_to_flush: + streams[stream] = {} + # Update flushed streams + if filter_streams: # update flushed_state position if we have state information for the stream if stream in state.get('bookmarks', {}): # Create bookmark key if not exists @@ -329,9 +332,9 @@ def flush_streams( # Copy the stream bookmark from the latest state flushed_state['bookmarks'][stream] = copy.deepcopy(state['bookmarks'][stream]) - # If we flush every bucket use the latest state - else: - flushed_state = copy.deepcopy(state) + # If we flush every bucket use the latest state + else: + flushed_state = copy.deepcopy(state) # Return with state message with flushed positions return flushed_state @@ -351,7 +354,7 @@ def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=F def flush_records(stream, records_to_load, row_count, db_sync): - csv_fd, csv_file = tempfile.mkstemp() + csv_fd, csv_file = mkstemp() with open(csv_fd, 'w+b') as f: for record in records_to_load.values(): csv_line = db_sync.record_to_csv_line(record) diff --git a/tests/unit/resources/logical-streams.json b/tests/unit/resources/logical-streams.json new file mode 100644 index 00000000..365234e6 --- /dev/null +++ b/tests/unit/resources/logical-streams.json @@ -0,0 +1,59 @@ +{"type": "SCHEMA", "stream": "logical1-logical1_table2", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 1, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 2, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 3, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108200520, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108200520, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108200520, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108200520, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 4, "cvarchar": "delete later", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 5, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 6, "cvarchar": "delete later", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108200928, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108200928, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108200928, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108200928, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 7, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 8, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108201336, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108201336, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108201336, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108201336, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 9, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 10, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108236664, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108236664, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108236664, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108236664, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 4, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 6, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108237120, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237120, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108237120, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237120, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 1, "cvarchar": "updated row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 2, "cvarchar": "updated row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 3, "cvarchar": "updated row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108237336, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237336, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108237336, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237336, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 5, "cvarchar": "updated row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 7, "cvarchar": "updated row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 8, "cvarchar": "updated row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108237600, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237600, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108237600, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237600, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 9, "cvarchar": "updated row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 10, "cvarchar": "updated row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 11, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108237864, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237864, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108237864, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237864, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 12, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 13, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108238224, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108238224, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108238224, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108238224, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 14, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 15, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 16, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 17, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108238768, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108238768, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108238768, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108238768, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 18, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 19, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 20, "cvarchar": "inserted row", "_sdc_deleted_at": null}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108239176, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239176, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108239176, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239176, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 11, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 12, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 13, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108239512, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239512, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108239512, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239512, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 14, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 15, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 16, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108239704, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239704, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108239704, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239704, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 17, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 18, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 19, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108239896, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239896, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108239896, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239896, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "RECORD", "stream": "logical1-logical1_table2", "record": {"cid": 20, "_sdc_deleted_at": "2019-10-13T14:06:31.838328+00:00"}, "version": 1570922723635, "time_extracted": "2019-10-13T14:06:31.838328Z"} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240192, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240192, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108240192, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240192, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240696, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240696, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108240696, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240696, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240796, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240796, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108240796, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240796, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py new file mode 100644 index 00000000..5276809e --- /dev/null +++ b/tests/unit/test_db_sync.py @@ -0,0 +1,281 @@ +import unittest + +from target_snowflake import db_sync + + +class TestDBSync(unittest.TestCase): + """ + Unit Tests + """ + + def setUp(self): + self.config = {} + + def test_config_validation(self): + """Test configuration validator""" + validator = db_sync.validate_config + empty_config = {} + minimal_config = { + 'account': "dummy-value", + 'dbname': "dummy-value", + 'user': "dummy-value", + 'password': "dummy-value", + 'warehouse': "dummy-value", + 'aws_access_key_id': "dummy-value", + 'aws_secret_access_key': "dummy-value", + 's3_bucket': "dummy-value", + 'default_target_schema': "dummy-value", + 'stage': "dummy-value", + 'file_format': "dummy-value" + } + + # Config validator returns a list of errors + # If the list is empty then the configuration is valid otherwise invalid + + # Empty configuration should fail - (nr_of_errors >= 0) + self.assertGreater(len(validator(empty_config)), 0) + + # Minimal configuratino should pass - (nr_of_errors == 0) + self.assertEqual(len(validator(minimal_config)), 0) + + # Configuration without schema references - (nr_of_errors >= 0) + config_with_no_schema = minimal_config.copy() + config_with_no_schema.pop('default_target_schema') + self.assertGreater(len(validator(config_with_no_schema)), 0) + + # Configuration with schema mapping - (nr_of_errors >= 0) + config_with_schema_mapping = minimal_config.copy() + config_with_schema_mapping.pop('default_target_schema') + config_with_schema_mapping['schema_mapping'] = { + "dummy_stream": { + "target_schema": "dummy_schema" + } + } + self.assertEqual(len(validator(config_with_schema_mapping)), 0) + + def test_column_type_mapping(self): + """Test JSON type to Snowflake column type mappings""" + mapper = db_sync.column_type + + # Incoming JSON schema types + json_str = {"type": ["string"]} + json_str_or_null = {"type": ["string", "null"]} + json_dt = {"type": ["string"], "format": "date-time"} + json_dt_or_null = {"type": ["string", "null"], "format": "date-time"} + json_t = {"type": ["string"], "format": "time"} + json_t_or_null = {"type": ["string", "null"], "format": "time"} + json_num = {"type": ["number"]} + json_int = {"type": ["integer"]} + json_int_or_str = {"type": ["integer", "string"]} + json_bool = {"type": ["boolean"]} + json_obj = {"type": ["object"]} + json_arr = {"type": ["array"]} + + # Mapping from JSON schema types ot Snowflake column types + self.assertEquals(mapper(json_str), 'text') + self.assertEquals(mapper(json_str_or_null), 'text') + self.assertEquals(mapper(json_dt), 'timestamp_ntz') + self.assertEquals(mapper(json_dt_or_null), 'timestamp_ntz') + self.assertEquals(mapper(json_t), 'time') + self.assertEquals(mapper(json_t_or_null), 'time') + self.assertEquals(mapper(json_num), 'float') + self.assertEquals(mapper(json_int), 'number') + self.assertEquals(mapper(json_int_or_str), 'text') + self.assertEquals(mapper(json_bool), 'boolean') + self.assertEquals(mapper(json_obj), 'variant') + self.assertEquals(mapper(json_arr), 'variant') + + def test_stream_name_to_dict(self): + """Test identifying catalog, schema and table names from fully qualified stream and table names""" + # Singer stream name format (Default '-' separator) + self.assertEquals( + db_sync.stream_name_to_dict('my_table'), + {"catalog_name": None, "schema_name": None, "table_name": "my_table"}) + + # Singer stream name format (Default '-' separator) + self.assertEquals( + db_sync.stream_name_to_dict('my_schema-my_table'), + {"catalog_name": None, "schema_name": "my_schema", "table_name": "my_table"}) + + # Singer stream name format (Default '-' separator) + self.assertEquals( + db_sync.stream_name_to_dict('my_catalog-my_schema-my_table'), + {"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"}) + + # Snowflake table format (Custom '.' separator) + self.assertEquals( + db_sync.stream_name_to_dict('my_table', separator='.'), + {"catalog_name": None, "schema_name": None, "table_name": "my_table"}) + + # Snowflake table format (Custom '.' separator) + self.assertEquals( + db_sync.stream_name_to_dict('my_schema.my_table', separator='.'), + {"catalog_name": None, "schema_name": "my_schema", "table_name": "my_table"}) + + # Snowflake table format (Custom '.' separator) + self.assertEquals( + db_sync.stream_name_to_dict('my_catalog.my_schema.my_table', separator='.'), + {"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"}) + + def test_flatten_schema(self): + """Test flattening of SCHEMA messages""" + flatten_schema = db_sync.flatten_schema + + # Schema with no object properties should be empty dict + schema_with_no_properties = {"type": "object"} + self.assertEquals(flatten_schema(schema_with_no_properties), {}) + + not_nested_schema = { + "type": "object", + "properties": { + "c_pk": {"type": ["null", "integer"]}, + "c_varchar": {"type": ["null", "string"]}, + "c_int": {"type": ["null", "integer"]}}} + + # NO FLATTENING - Schema with simple properties should be a plain dictionary + self.assertEquals(flatten_schema(not_nested_schema), not_nested_schema['properties']) + + nested_schema_with_no_properties = { + "type": "object", + "properties": { + "c_pk": {"type": ["null", "integer"]}, + "c_varchar": {"type": ["null", "string"]}, + "c_int": {"type": ["null", "integer"]}, + "c_obj": {"type": ["null", "object"]}}} + + # NO FLATTENING - Schema with object type property but without further properties should be a plain dictionary + self.assertEquals(flatten_schema(nested_schema_with_no_properties), + nested_schema_with_no_properties['properties']) + + nested_schema_with_properties = { + "type": "object", + "properties": { + "c_pk": {"type": ["null", "integer"]}, + "c_varchar": {"type": ["null", "string"]}, + "c_int": {"type": ["null", "integer"]}, + "c_obj": { + "type": ["null", "object"], + "properties": { + "nested_prop1": {"type": ["null", "string"]}, + "nested_prop2": {"type": ["null", "string"]}, + "nested_prop3": { + "type": ["null", "object"], + "properties": { + "multi_nested_prop1": {"type": ["null", "string"]}, + "multi_nested_prop2": {"type": ["null", "string"]} + } + } + } + } + } + } + + # NO FLATTENING - Schema with object type property but without further properties should be a plain dictionary + # No flattening (default) + self.assertEquals(flatten_schema(nested_schema_with_properties), nested_schema_with_properties['properties']) + + # NO FLATTENING - Schema with object type property but without further properties should be a plain dictionary + # max_level: 0 : No flattening (default) + self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=0), + nested_schema_with_properties['properties']) + + # FLATTENING - Schema with object type property but without further properties should be a dict with + # flattened properties + self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=1), + { + 'c_pk': {'type': ['null', 'integer']}, + 'c_varchar': {'type': ['null', 'string']}, + 'c_int': {'type': ['null', 'integer']}, + 'c_obj__nested_prop1': {'type': ['null', 'string']}, + 'c_obj__nested_prop2': {'type': ['null', 'string']}, + 'c_obj__nested_prop3': { + 'type': ['null', 'object'], + "properties": { + "multi_nested_prop1": {"type": ["null", "string"]}, + "multi_nested_prop2": {"type": ["null", "string"]} + } + } + }) + + # FLATTENING - Schema with object type property but without further properties should be a dict with + # flattened properties + self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=10), + { + 'c_pk': {'type': ['null', 'integer']}, + 'c_varchar': {'type': ['null', 'string']}, + 'c_int': {'type': ['null', 'integer']}, + 'c_obj__nested_prop1': {'type': ['null', 'string']}, + 'c_obj__nested_prop2': {'type': ['null', 'string']}, + 'c_obj__nested_prop3__multi_nested_prop1': {'type': ['null', 'string']}, + 'c_obj__nested_prop3__multi_nested_prop2': {'type': ['null', 'string']} + }) + + def test_flatten_record(self): + """Test flattening of RECORD messages""" + flatten_record = db_sync.flatten_record + + empty_record = {} + # Empty record should be empty dict + self.assertEquals(flatten_record(empty_record), {}) + + not_nested_record = {"c_pk": 1, "c_varchar": "1", "c_int": 1} + # NO FLATTENING - Record with simple properties should be a plain dictionary + self.assertEquals(flatten_record(not_nested_record), not_nested_record) + + nested_record = { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj": { + "nested_prop1": "value_1", + "nested_prop2": "value_2", + "nested_prop3": { + "multi_nested_prop1": "multi_value_1", + "multi_nested_prop2": "multi_value_2", + }}} + + # NO FLATTENING - No flattening (default) + self.assertEquals(flatten_record(nested_record), + { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj": '{"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": {' + '"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}}' + }) + + # NO FLATTENING + # max_level: 0 : No flattening (default) + self.assertEquals(flatten_record(nested_record, max_level=0), + { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj": '{"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": {' + '"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}}' + }) + + # SEMI FLATTENING + # max_level: 1 : Semi-flattening (default) + self.assertEquals(flatten_record(nested_record, max_level=1), + { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj__nested_prop1": "value_1", + "c_obj__nested_prop2": "value_2", + "c_obj__nested_prop3": '{"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": ' + '"multi_value_2"}' + }) + + # FLATTENING + self.assertEquals(flatten_record(nested_record, max_level=10), + { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj__nested_prop1": "value_1", + "c_obj__nested_prop2": "value_2", + "c_obj__nested_prop3__multi_nested_prop1": "multi_value_1", + "c_obj__nested_prop3__multi_nested_prop2": "multi_value_2" + }) diff --git a/tests/unit/test_target_snowflake.py b/tests/unit/test_target_snowflake.py new file mode 100644 index 00000000..bab65c20 --- /dev/null +++ b/tests/unit/test_target_snowflake.py @@ -0,0 +1,33 @@ +import unittest +import os +import logging + +from unittest.mock import patch + +import target_snowflake + + +class TestTargetSnowflake(unittest.TestCase): + + def setUp(self): + self.config = {} + + @patch('target_snowflake.NamedTemporaryFile') + @patch('target_snowflake.flush_streams') + @patch('target_snowflake.DbSync') + def test_persist_lines_with_40_records_and_batch_size_of_20_expect_flushing_once(self, dbSync_mock, flush_streams_mock, temp_file_mock): + self.config['batch_size_rows'] = 20 + self.config['flush_all_streams'] = True + + with open(f'{os.path.dirname(__file__)}/resources/logical-streams.json', 'r') as f: + lines = f.readlines() + + instance = dbSync_mock.return_value + instance.create_schema_if_not_exists.return_value = None + instance.sync_table.return_value = None + + flush_streams_mock.return_value = '{"currently_syncing": null}' + + target_snowflake.persist_lines(self.config, lines) + + flush_streams_mock.assert_called_once() diff --git a/tests/unit/test_unit.py b/tests/unit/test_unit.py deleted file mode 100644 index b2c248db..00000000 --- a/tests/unit/test_unit.py +++ /dev/null @@ -1,277 +0,0 @@ -import unittest -from nose.tools import assert_raises - -import target_snowflake - - -class TestUnit(unittest.TestCase): - """ - Unit Tests - """ - @classmethod - def setUp(self): - self.config = {} - - - def test_config_validation(self): - """Test configuration validator""" - validator = target_snowflake.db_sync.validate_config - empty_config = {} - minimal_config = { - 'account': "dummy-value", - 'dbname': "dummy-value", - 'user': "dummy-value", - 'password': "dummy-value", - 'warehouse': "dummy-value", - 'aws_access_key_id': "dummy-value", - 'aws_secret_access_key': "dummy-value", - 's3_bucket': "dummy-value", - 'default_target_schema': "dummy-value", - 'stage': "dummy-value", - 'file_format': "dummy-value" - } - - # Config validator returns a list of errors - # If the list is empty then the configuration is valid otherwise invalid - - # Empty configuration should fail - (nr_of_errors >= 0) - self.assertGreater(len(validator(empty_config)), 0) - - # Minimal configuratino should pass - (nr_of_errors == 0) - self.assertEqual(len(validator(minimal_config)), 0) - - # Configuration without schema references - (nr_of_errors >= 0) - config_with_no_schema = minimal_config.copy() - config_with_no_schema.pop('default_target_schema') - self.assertGreater(len(validator(config_with_no_schema)), 0) - - # Configuration with schema mapping - (nr_of_errors >= 0) - config_with_schema_mapping = minimal_config.copy() - config_with_schema_mapping.pop('default_target_schema') - config_with_schema_mapping['schema_mapping'] = { - "dummy_stream": { - "target_schema": "dummy_schema" - } - } - self.assertEqual(len(validator(config_with_schema_mapping)), 0) - - - def test_column_type_mapping(self): - """Test JSON type to Snowflake column type mappings""" - mapper = target_snowflake.db_sync.column_type - - # Incoming JSON schema types - json_str = {"type": ["string"] } - json_str_or_null = {"type": ["string", "null"] } - json_dt = {"type": ["string"] , "format": "date-time"} - json_dt_or_null = {"type": ["string", "null"] , "format": "date-time"} - json_t = {"type": ["string"] , "format": "time"} - json_t_or_null = {"type": ["string", "null"] , "format": "time"} - json_num = {"type": ["number"] } - json_int = {"type": ["integer"] } - json_int_or_str = {"type": ["integer", "string"] } - json_bool = {"type": ["boolean"] } - json_obj = {"type": ["object"] } - json_arr = {"type": ["array"] } - - # Mapping from JSON schema types ot Snowflake column types - self.assertEquals(mapper(json_str) , 'text') - self.assertEquals(mapper(json_str_or_null) , 'text') - self.assertEquals(mapper(json_dt) , 'timestamp_ntz') - self.assertEquals(mapper(json_dt_or_null) , 'timestamp_ntz') - self.assertEquals(mapper(json_t) , 'time') - self.assertEquals(mapper(json_t_or_null) , 'time') - self.assertEquals(mapper(json_num) , 'float') - self.assertEquals(mapper(json_int) , 'number') - self.assertEquals(mapper(json_int_or_str) , 'text') - self.assertEquals(mapper(json_bool) , 'boolean') - self.assertEquals(mapper(json_obj) , 'variant') - self.assertEquals(mapper(json_arr) , 'variant') - - - def test_stream_name_to_dict(self): - """Test identifying catalog, schema and table names from fully qualified stream and table names""" - # Singer stream name format (Default '-' separator) - self.assertEquals( - target_snowflake.db_sync.stream_name_to_dict('my_table'), - {"catalog_name": None, "schema_name": None, "table_name": "my_table"}) - - # Singer stream name format (Default '-' separator) - self.assertEquals( - target_snowflake.db_sync.stream_name_to_dict('my_schema-my_table'), - {"catalog_name": None, "schema_name": "my_schema", "table_name": "my_table"}) - - # Singer stream name format (Default '-' separator) - self.assertEquals( - target_snowflake.db_sync.stream_name_to_dict('my_catalog-my_schema-my_table'), - {"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"}) - - # Snowflake table format (Custom '.' separator) - self.assertEquals( - target_snowflake.db_sync.stream_name_to_dict('my_table', separator='.'), - {"catalog_name": None, "schema_name": None, "table_name": "my_table"}) - - # Snowflake table format (Custom '.' separator) - self.assertEquals( - target_snowflake.db_sync.stream_name_to_dict('my_schema.my_table', separator='.'), - {"catalog_name": None, "schema_name": "my_schema", "table_name": "my_table"}) - - # Snowflake table format (Custom '.' separator) - self.assertEquals( - target_snowflake.db_sync.stream_name_to_dict('my_catalog.my_schema.my_table', separator='.'), - {"catalog_name": "my_catalog", "schema_name": "my_schema", "table_name": "my_table"}) - - - def test_flatten_schema(self): - """Test flattening of SCHEMA messages""" - flatten_schema = target_snowflake.db_sync.flatten_schema - - # Schema with no object properties should be empty dict - schema_with_no_properties = {"type": "object"} - self.assertEquals(flatten_schema(schema_with_no_properties), {}) - - not_nested_schema = { - "type": "object", - "properties": { - "c_pk": {"type": ["null", "integer"]}, - "c_varchar": {"type": ["null", "string"]}, - "c_int": {"type": ["null", "integer"]}}} - # NO FLATTENNING - Schema with simple properties should be a plain dictionary - self.assertEquals(flatten_schema(not_nested_schema), not_nested_schema['properties']) - - nested_schema_with_no_properties = { - "type": "object", - "properties": { - "c_pk": {"type": ["null", "integer"]}, - "c_varchar": {"type": ["null", "string"]}, - "c_int": {"type": ["null", "integer"]}, - "c_obj": {"type": ["null", "object"]}}} - # NO FLATTENNING - Schema with object type property but without further properties should be a plain dictionary - self.assertEquals(flatten_schema(nested_schema_with_no_properties), nested_schema_with_no_properties['properties']) - - nested_schema_with_properties = { - "type": "object", - "properties": { - "c_pk": {"type": ["null", "integer"]}, - "c_varchar": {"type": ["null", "string"]}, - "c_int": {"type": ["null", "integer"]}, - "c_obj": { - "type": ["null", "object"], - "properties": { - "nested_prop1": {"type": ["null", "string"]}, - "nested_prop2": {"type": ["null", "string"]}, - "nested_prop3": { - "type": ["null", "object"], - "properties": { - "multi_nested_prop1": {"type": ["null", "string"]}, - "multi_nested_prop2": {"type": ["null", "string"]} - } - } - } - } - } - } - # NO FLATTENNING - Schema with object type property but without further properties should be a plain dictionary - # No flattening (default) - self.assertEquals(flatten_schema(nested_schema_with_properties), nested_schema_with_properties['properties']) - - # NO FLATTENNING - Schema with object type property but without further properties should be a plain dictionary - # max_level: 0 : No flattening (default) - self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=0), nested_schema_with_properties['properties']) - - # FLATTENNING - Schema with object type property but without further properties should be a dict with flattened properties - self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=1), - { - 'c_pk': {'type': ['null', 'integer']}, - 'c_varchar': {'type': ['null', 'string']}, - 'c_int': {'type': ['null', 'integer']}, - 'c_obj__nested_prop1': {'type': ['null', 'string']}, - 'c_obj__nested_prop2': {'type': ['null', 'string']}, - 'c_obj__nested_prop3': { - 'type': ['null', 'object'], - "properties": { - "multi_nested_prop1": {"type": ["null", "string"]}, - "multi_nested_prop2": {"type": ["null", "string"]} - } - } - }) - - # FLATTENNING - Schema with object type property but without further properties should be a dict with flattened properties - self.assertEquals(flatten_schema(nested_schema_with_properties, max_level=10), - { - 'c_pk': {'type': ['null', 'integer']}, - 'c_varchar': {'type': ['null', 'string']}, - 'c_int': {'type': ['null', 'integer']}, - 'c_obj__nested_prop1': {'type': ['null', 'string']}, - 'c_obj__nested_prop2': {'type': ['null', 'string']}, - 'c_obj__nested_prop3__multi_nested_prop1': {'type': ['null', 'string']}, - 'c_obj__nested_prop3__multi_nested_prop2': {'type': ['null', 'string']} - }) - - - def test_flatten_record(self): - """Test flattening of RECORD messages""" - flatten_record = target_snowflake.db_sync.flatten_record - - empty_record = {} - # Empty record should be empty dict - self.assertEquals(flatten_record(empty_record), {}) - - not_nested_record = {"c_pk": 1, "c_varchar": "1", "c_int": 1} - # NO FLATTENNING - Record with simple properties should be a plain dictionary - self.assertEquals(flatten_record(not_nested_record), not_nested_record) - - nested_record = { - "c_pk": 1, - "c_varchar": "1", - "c_int": 1, - "c_obj": { - "nested_prop1": "value_1", - "nested_prop2": "value_2", - "nested_prop3": { - "multi_nested_prop1": "multi_value_1", - "multi_nested_prop2": "multi_value_2", - }}} - - # NO FLATTENNING - No flattening (default) - self.assertEquals(flatten_record(nested_record), - { - "c_pk": 1, - "c_varchar": "1", - "c_int": 1, - "c_obj": '{"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": {"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}}' - }) - - # NO FLATTENNING - # max_level: 0 : No flattening (default) - self.assertEquals(flatten_record(nested_record, max_level=0), - { - "c_pk": 1, - "c_varchar": "1", - "c_int": 1, - "c_obj": '{"nested_prop1": "value_1", "nested_prop2": "value_2", "nested_prop3": {"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}}' - }) - - # SEMI FLATTENNING - # max_level: 1 : Semi-flattening (default) - self.assertEquals(flatten_record(nested_record, max_level=1), - { - "c_pk": 1, - "c_varchar": "1", - "c_int": 1, - "c_obj__nested_prop1": "value_1", - "c_obj__nested_prop2": "value_2", - "c_obj__nested_prop3": '{"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"}' - }) - - # FLATTENNING - self.assertEquals(flatten_record(nested_record, max_level=10), - { - "c_pk": 1, - "c_varchar": "1", - "c_int": 1, - "c_obj__nested_prop1": "value_1", - "c_obj__nested_prop2": "value_2", - "c_obj__nested_prop3__multi_nested_prop1": "multi_value_1", - "c_obj__nested_prop3__multi_nested_prop2": "multi_value_2" - }) \ No newline at end of file