Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
fix last flush condition (#37)
Browse files Browse the repository at this point in the history
* fix last flush condition

* update circle ci config to have a separate step for pylinting

* install pylint in pylinting step

* bump version
  • Loading branch information
Samira-El authored Oct 18, 2019
1 parent 1279949 commit e787101
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 6 deletions.
8 changes: 7 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ jobs:
. venv/bin/activate
pip install --upgrade pip
pip install .[test]
- run:
name: 'Pylinting'
command: |
. venv/bin/activate
pip install .[test]
pylint target_snowflake -d C,W,unexpected-keyword-arg,duplicate-code
- run:
name: 'Unit Tests'
command: |
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name="pipelinewise-target-snowflake",
version="1.1.0",
version="1.1.1",
description="Singer.io target for loading data to Snowflake - PipelineWise compatible",
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
5 changes: 3 additions & 2 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import copy
import tempfile
import logging
from datetime import datetime
from decimal import Decimal
from tempfile import NamedTemporaryFile
Expand All @@ -18,6 +19,7 @@
from target_snowflake.db_sync import DbSync

logger = singer.get_logger()
logger.setLevel(logging.ERROR)

DEFAULT_BATCH_SIZE_ROWS = 100000
DEFAULT_PARALLELISM = 0 # 0 The number of threads used to flush tables
Expand Down Expand Up @@ -257,14 +259,13 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:

# if some bucket has records that need to be flushed but haven't reached batch size
# then flush all buckets.
if len(row_count.values()) > 0:
if sum(row_count.values()) > 0:
# flush all streams one last time, delete records if needed, reset counts and then emit current state
flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state)

# emit latest state
emit_state(copy.deepcopy(flushed_state))


# pylint: disable=too-many-arguments
def flush_streams(
streams,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"type": "SCHEMA", "stream": "logical1-logical1_edgydata", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cjson": {"type": ["null", "object"]}, "cjsonb": {"type": ["null", "object"]}, "ctimentz": {"format": "time", "type": ["null", "string"]}, "ctimetz": {"format": "time", "type": ["null", "string"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]}
{"type": "SCHEMA", "stream": "logical1-logical1_table1", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cvarchar": {"type": ["null", "string"]}, "cvarchar2": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]}
{"type": "SCHEMA", "stream": "logical1-logical1_table2", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]}
{"type": "SCHEMA", "stream": "logical2-logical2_table1", "schema": {"definitions": {"sdc_recursive_boolean_array": {"items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}, "type": ["null", "boolean", "array"]}, "sdc_recursive_integer_array": {"items": {"$ref": "#/definitions/sdc_recursive_integer_array"}, "type": ["null", "integer", "array"]}, "sdc_recursive_number_array": {"items": {"$ref": "#/definitions/sdc_recursive_number_array"}, "type": ["null", "number", "array"]}, "sdc_recursive_object_array": {"items": {"$ref": "#/definitions/sdc_recursive_object_array"}, "type": ["null", "object", "array"]}, "sdc_recursive_string_array": {"items": {"$ref": "#/definitions/sdc_recursive_string_array"}, "type": ["null", "string", "array"]}, "sdc_recursive_timestamp_array": {"format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}, "type": ["null", "string", "array"]}}, "properties": {"cid": {"maximum": 2147483647, "minimum": -2147483648, "type": ["integer"]}, "cvarchar": {"type": ["null", "string"]}, "_sdc_deleted_at": {"type": ["null", "string"], "format": "date-time"}}, "type": "object"}, "key_properties": ["cid"], "bookmark_properties": ["lsn"]}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108196760, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196760, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108196760, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196760, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197672, "version": 1570922723596, "xmin": null}, "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197672, "version": 1570922723618, "xmin": null}, "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108197672, "version": 1570922723635, "xmin": null}, "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197672, "version": 1570922723651, "xmin": null}, "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079}, "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": null}, "public2-wearehere": {}}}}
29 changes: 27 additions & 2 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,20 @@ def assert_logical_streams_are_in_snowflake(self, should_metadata_columns_exist=
self.assertEqual(table_three, expected_table_three)
self.assertEqual(table_four, expected_table_four)

def assert_logical_streams_are_in_snowflake_and_are_empty(self):
# Get loaded rows from tables
snowflake = DbSync(self.config)
target_schema = self.config.get('default_target_schema', '')
table_one = snowflake.query("SELECT * FROM {}.logical1_table1 ORDER BY CID".format(target_schema))
table_two = snowflake.query("SELECT * FROM {}.logical1_table2 ORDER BY CID".format(target_schema))
table_three = snowflake.query("SELECT * FROM {}.logical2_table1 ORDER BY CID".format(target_schema))
table_four = snowflake.query("SELECT CID, CTIMENTZ, CTIMETZ FROM {}.logical1_edgydata WHERE CID IN(1,2,3,4,5,6,8,9) ORDER BY CID".format(target_schema))

self.assertEqual(table_one, [])
self.assertEqual(table_two, [])
self.assertEqual(table_three, [])
self.assertEqual(table_four, [])

#################################
# TESTS #
#################################
Expand Down Expand Up @@ -491,7 +505,7 @@ def test_column_name_change(self):
{'C_INT': 4, 'C_PK': 4, 'C_TIME': None, 'C_VARCHAR': '4', 'C_TIME_RENAMED': datetime.time(23, 0, 3)}
])

def test_logical_streams_from_pg_with_hard_delete_and_default_batch_size(self):
def test_logical_streams_from_pg_with_hard_delete_and_default_batch_size_should_pass(self):
"""Tests logical streams from pg with inserts, updates and deletes"""
tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams.json')

Expand All @@ -501,7 +515,7 @@ def test_logical_streams_from_pg_with_hard_delete_and_default_batch_size(self):

self.assert_logical_streams_are_in_snowflake(True)

def test_logical_streams_from_pg_with_hard_delete_and_batch_size_of_5(self):
def test_logical_streams_from_pg_with_hard_delete_and_batch_size_of_5_should_pass(self):
"""Tests logical streams from pg with inserts, updates and deletes"""
tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams.json')

Expand All @@ -512,6 +526,17 @@ def test_logical_streams_from_pg_with_hard_delete_and_batch_size_of_5(self):

self.assert_logical_streams_are_in_snowflake(True)

def test_logical_streams_from_pg_with_hard_delete_and_batch_size_of_5_and_no_records_should_pass(self):
"""Tests logical streams from pg with inserts, updates and deletes"""
tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams-no-records.json')

# Turning on hard delete mode
self.config['hard_delete'] = True
self.config['batch_size_rows'] = 5
self.persist_lines_with_cache(tap_lines)

self.assert_logical_streams_are_in_snowflake_and_are_empty()

def test_information_schema_cache_create_and_update(self):
"""Newly created and altered tables must be cached automatically for later use.
Expand Down

0 comments on commit e787101

Please sign in to comment.