diff --git a/.circleci/config.yml b/.circleci/config.yml index ab7ee938..8693c410 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -13,8 +13,14 @@ jobs: . venv/bin/activate pip install --upgrade pip pip install .[test] + + - run: + name: 'Pylinting' + command: | + . venv/bin/activate + pip install .[test] pylint target_snowflake -d C,W,unexpected-keyword-arg,duplicate-code - + - run: name: 'Unit Tests' command: | diff --git a/setup.py b/setup.py index 01d46ab2..4a1b5dfd 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name="pipelinewise-target-snowflake", - version="1.1.0", + version="1.1.1", description="Singer.io target for loading data to Snowflake - PipelineWise compatible", long_description=long_description, long_description_content_type='text/markdown', diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index 4eb9dcbe..0c2da1c3 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -7,6 +7,7 @@ import sys import copy import tempfile +import logging from datetime import datetime from decimal import Decimal from tempfile import NamedTemporaryFile @@ -18,6 +19,7 @@ from target_snowflake.db_sync import DbSync logger = singer.get_logger() +logger.setLevel(logging.ERROR) DEFAULT_BATCH_SIZE_ROWS = 100000 DEFAULT_PARALLELISM = 0 # 0 The number of threads used to flush tables @@ -257,14 +259,13 @@ def persist_lines(config, lines, information_schema_cache=None) -> None: # if some bucket has records that need to be flushed but haven't reached batch size # then flush all buckets. - if len(row_count.values()) > 0: + if sum(row_count.values()) > 0: # flush all streams one last time, delete records if needed, reset counts and then emit current state flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state) # emit latest state emit_state(copy.deepcopy(flushed_state)) - # pylint: disable=too-many-arguments def flush_streams( streams, diff --git a/tests/integration/resources/messages-pg-logical-streams-no-records.json b/tests/integration/resources/messages-pg-logical-streams-no-records.json new file mode 100644 index 00000000..44b635fb --- /dev/null +++ b/tests/integration/resources/messages-pg-logical-streams-no-records.json @@ -0,0 +1,8 @@ +{"type": "SCHEMA", "stream": "logical1-logical1_edgydata", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cjson": {"type": ["null", "object"]}, "cjsonb": {"type": ["null", "object"]}, "ctimentz": {"format": "time", "type": ["null", "string"]}, "ctimetz": {"format": "time", "type": ["null", "string"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]} +{"type": "SCHEMA", "stream": "logical1-logical1_table1", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cvarchar": {"type": ["null", "string"]}, "cvarchar2": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]} +{"type": "SCHEMA", "stream": "logical1-logical1_table2", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]} +{"type": "SCHEMA", "stream": "logical2-logical2_table1", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108196760, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196760, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108196760, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196760, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197672, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197672, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108197672, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197672, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}} \ No newline at end of file diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 8c488d40..57786964 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -232,6 +232,20 @@ def assert_logical_streams_are_in_snowflake(self, should_metadata_columns_exist= self.assertEqual(table_three, expected_table_three) self.assertEqual(table_four, expected_table_four) + def assert_logical_streams_are_in_snowflake_and_are_empty(self): + # Get loaded rows from tables + snowflake = DbSync(self.config) + target_schema = self.config.get('default_target_schema', '') + table_one = snowflake.query("SELECT * FROM {}.logical1_table1 ORDER BY CID".format(target_schema)) + table_two = snowflake.query("SELECT * FROM {}.logical1_table2 ORDER BY CID".format(target_schema)) + table_three = snowflake.query("SELECT * FROM {}.logical2_table1 ORDER BY CID".format(target_schema)) + table_four = snowflake.query("SELECT CID, CTIMENTZ, CTIMETZ FROM {}.logical1_edgydata WHERE CID IN(1,2,3,4,5,6,8,9) ORDER BY CID".format(target_schema)) + + self.assertEqual(table_one, []) + self.assertEqual(table_two, []) + self.assertEqual(table_three, []) + self.assertEqual(table_four, []) + ################################# # TESTS # ################################# @@ -491,7 +505,7 @@ def test_column_name_change(self): {'C_INT': 4, 'C_PK': 4, 'C_TIME': None, 'C_VARCHAR': '4', 'C_TIME_RENAMED': datetime.time(23, 0, 3)} ]) - def test_logical_streams_from_pg_with_hard_delete_and_default_batch_size(self): + def test_logical_streams_from_pg_with_hard_delete_and_default_batch_size_should_pass(self): """Tests logical streams from pg with inserts, updates and deletes""" tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams.json') @@ -501,7 +515,7 @@ def test_logical_streams_from_pg_with_hard_delete_and_default_batch_size(self): self.assert_logical_streams_are_in_snowflake(True) - def test_logical_streams_from_pg_with_hard_delete_and_batch_size_of_5(self): + def test_logical_streams_from_pg_with_hard_delete_and_batch_size_of_5_should_pass(self): """Tests logical streams from pg with inserts, updates and deletes""" tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams.json') @@ -512,6 +526,17 @@ def test_logical_streams_from_pg_with_hard_delete_and_batch_size_of_5(self): self.assert_logical_streams_are_in_snowflake(True) + def test_logical_streams_from_pg_with_hard_delete_and_batch_size_of_5_and_no_records_should_pass(self): + """Tests logical streams from pg with inserts, updates and deletes""" + tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams-no-records.json') + + # Turning on hard delete mode + self.config['hard_delete'] = True + self.config['batch_size_rows'] = 5 + self.persist_lines_with_cache(tap_lines) + + self.assert_logical_streams_are_in_snowflake_and_are_empty() + def test_information_schema_cache_create_and_update(self): """Newly created and altered tables must be cached automatically for later use.