diff --git a/.circleci/config.yml b/.circleci/config.yml
index 6c4c9e8d..ab7ee938 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -12,22 +12,21 @@ jobs:
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
- pip install .
- pip install pylint
+ pip install .[test]
pylint target_snowflake -d C,W,unexpected-keyword-arg,duplicate-code
- run:
name: 'Unit Tests'
command: |
. venv/bin/activate
- pip install nose
+ pip install .[test]
nosetests --where=tests/unit
- run:
name: 'Integration Tests'
command: |
. venv/bin/activate
- pip install nose
+ pip install .[test]
nosetests --where=tests/integration/
workflows:
diff --git a/README.md b/README.md
index 9637a337..e95417c9 100644
--- a/README.md
+++ b/README.md
@@ -107,7 +107,10 @@ Full list of options in `config.json`:
| s3_key_prefix | String | | (Default: None) A static prefix before the generated S3 key names. Using prefixes you can upload files into specific directories in the S3 bucket. |
| stage | String | Yes | Named external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name |
| file_format | String | Yes | Named file format name created at pre-requirements section. Has to be a fully qualified name including the schema name. |
-| batch_size | Integer | | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. |
+| batch_size_rows | Integer | | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. |
+| flush_all_streams | Boolean | | (Default: False) Flush and load every stream into Snowflake when one batch is full. Warning: This may trigger the COPY command to use files with low number of records, and may cause performance problems. |
+| parallelism | Integer | | (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max. |
+| parallelism_max | Integer | | (Default: 16) Max number of parallel threads to use when flushing tables. |
| default_target_schema | String | | Name of the schema where the tables will be created. If `schema_mapping` is not defined then every stream sent by the tap is loaded into this schema. |
| default_target_schema_select_permission | String | | Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created tables to a specific role or a list of roles. If `schema_mapping` is not defined then every stream sent by the tap is granted accordingly. |
| schema_mapping | Object | | Useful if you want to load multiple streams from one tap to multiple Snowflake schemas.
If the tap sends the `stream_id` in `-` format then this option overwrites the `default_target_schema` value. Note, that using `schema_mapping` you can overwrite the `default_target_schema_select_permission` value to grant SELECT permissions to different groups per schemas or optionally you can create indices automatically for the replicated tables.
**Note**: This is an experimental feature and recommended to use via PipelineWise YAML files that will generate the object mapping in the right JSON format. For further info check a [PipelineWise YAML Example]
@@ -139,13 +142,12 @@ Full list of options in `config.json`:
export CLIENT_SIDE_ENCRYPTION_STAGE_OBJECT=
```
-2. Install python dependencies in a virtual env and run nose unit and integration tests
+2. Install python test dependencies in a virtual env and run nose unit and integration tests
```
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
- pip install .
- pip install nose
+ pip install .[test]
```
3. To run unit tests:
diff --git a/setup.py b/setup.py
index 10d1de6d..8173f6ad 100644
--- a/setup.py
+++ b/setup.py
@@ -25,6 +25,13 @@
'inflection==0.3.1',
'joblib==0.13.2'
],
+ extras_require={
+ "test": [
+ "nose==1.3.7",
+ "mock==3.0.5",
+ "pylint==2.4.2"
+ ]
+ },
entry_points="""
[console_scripts]
target-snowflake=target_snowflake:main
diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py
index 0b4a9896..4eb9dcbe 100644
--- a/target_snowflake/__init__.py
+++ b/target_snowflake/__init__.py
@@ -5,6 +5,7 @@
import json
import os
import sys
+import copy
import tempfile
from datetime import datetime
from decimal import Decimal
@@ -18,6 +19,9 @@
logger = singer.get_logger()
+DEFAULT_BATCH_SIZE_ROWS = 100000
+DEFAULT_PARALLELISM = 0 # 0 The number of threads used to flush tables
+DEFAULT_MAX_PARALLELISM = 16 # Don't use more than this number of threads by default when flushing streams in parallel
def float_to_decimal(value):
"""Walk the given data structure and turn all instances of float into double."""
@@ -61,7 +65,7 @@ def add_metadata_values_to_record(record_message, stream_to_sync):
def emit_state(state):
if state is not None:
line = json.dumps(state)
- logger.debug('Emitting state {}'.format(line))
+ logger.info('Emitting state {}'.format(line))
sys.stdout.write("{}\n".format(line))
sys.stdout.flush()
@@ -97,6 +101,7 @@ def load_information_schema_cache(config):
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def persist_lines(config, lines, information_schema_cache=None) -> None:
state = None
+ flushed_state = None
schemas = {}
key_properties = {}
validators = {}
@@ -105,7 +110,7 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
row_count = {}
stream_to_sync = {}
total_row_count = {}
- batch_size_rows = config.get('batch_size_rows', 100000)
+ batch_size_rows = config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS)
# Loop over lines from stdin
for line in lines:
@@ -159,9 +164,23 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
if row_count[stream] >= batch_size_rows:
# flush all streams, delete records if needed, reset counts and then emit current state
- flush_all_streams(records_to_load, row_count, stream_to_sync, config)
+ if config.get('flush_all_streams'):
+ filter_streams = None
+ else:
+ filter_streams = [stream]
+
+ # Flush and return a new state dict with new positions only for the flushed streams
+ flushed_state = flush_streams(
+ records_to_load,
+ row_count,
+ stream_to_sync,
+ config,
+ state,
+ flushed_state,
+ filter_streams=filter_streams)
+
# emit last encountered state
- emit_state(state)
+ emit_state(copy.deepcopy(flushed_state))
elif t == 'SCHEMA':
if 'stream' not in o:
@@ -177,9 +196,10 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
# if same stream has been encountered again, it means the schema might have been altered
# so previous records need to be flushed
if row_count.get(stream, 0) > 0:
- flush_all_streams(records_to_load, row_count, stream_to_sync, config)
+ flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state)
+
# emit latest encountered state
- emit_state(state)
+ emit_state(flushed_state)
# key_properties key must be available in the SCHEMA message.
if 'key_properties' not in o:
@@ -227,6 +247,10 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
logger.debug('Setting state to {}'.format(o['value']))
state = o['value']
+ # Initially set flushed state
+ if not flushed_state:
+ flushed_state = copy.deepcopy(state)
+
else:
raise Exception("Unknown message type {} in message {}"
.format(o['type'], o))
@@ -235,34 +259,83 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
# then flush all buckets.
if len(row_count.values()) > 0:
# flush all streams one last time, delete records if needed, reset counts and then emit current state
- flush_all_streams(records_to_load, row_count, stream_to_sync, config)
+ flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state)
# emit latest state
- emit_state(state)
-
-
-def flush_all_streams(records_to_load, row_count, stream_to_sync, config) -> None:
+ emit_state(copy.deepcopy(flushed_state))
+
+
+# pylint: disable=too-many-arguments
+def flush_streams(
+ streams,
+ row_count,
+ stream_to_sync,
+ config,
+ state,
+ flushed_state,
+ filter_streams=None):
"""
Flushes all buckets and resets records count to 0 as well as empties records to load list
- :param records_to_load: dictionary with records to load per stream
+ :param streams: dictionary with records to load per stream
:param row_count: dictionary with row count per stream
:param stream_to_sync: Snowflake db sync instance per stream
:param config: dictionary containing the configuration
- :return:
+ :param state: dictionary containing the original state from tap
+ :param flushed_state: dictionary containing updated states only when streams got flushed
+ :param filter_streams: Keys of streams to flush from the streams dict. Default is every stream
+ :return: State dict with flushed positions
"""
+ parallelism = config.get("parallelism", DEFAULT_PARALLELISM)
+ max_parallelism = config.get("max_parallelism", DEFAULT_MAX_PARALLELISM)
+
+ # Parallelism 0 means auto parallelism:
+ #
+ # Auto parallelism trying to flush streams efficiently with auto defined number
+ # of threads where the number of threads is the number of streams that need to
+ # be loaded but it's not greater than the value of max_parallelism
+ if parallelism == 0:
+ n_streams_to_flush = len(streams.keys())
+ if n_streams_to_flush > max_parallelism:
+ parallelism = max_parallelism
+ else:
+ parallelism = n_streams_to_flush
+
+ # Select the required streams to flush
+ if filter_streams:
+ streams_to_flush = {k: v for k, v in streams.items() if k in filter_streams}
+ else:
+ streams_to_flush = streams
+
# Single-host, thread-based parallelism
- with parallel_backend('threading', n_jobs=-1):
+ with parallel_backend('threading', n_jobs=parallelism):
Parallel()(delayed(load_stream_batch)(
stream=stream,
- records_to_load=records_to_load[stream],
+ records_to_load=streams[stream],
row_count=row_count,
db_sync=stream_to_sync[stream],
delete_rows=config.get('hard_delete')
- ) for (stream) in records_to_load.keys())
+ ) for (stream) in streams_to_flush.keys())
+
+ # reset flushed stream records to empty to avoid flushing same records
+ # Update flushed streams
+ if filter_streams:
+ for stream in streams_to_flush.keys():
+ streams[stream] = {}
+
+ # update flushed_state position if we have state information for the stream
+ if stream in state.get('bookmarks', {}):
+ # Create bookmark key if not exists
+ if 'bookmarks' not in flushed_state:
+ flushed_state['bookmarks'] = {}
+ # Copy the stream bookmark from the latest state
+ flushed_state['bookmarks'][stream] = copy.deepcopy(state['bookmarks'][stream])
+
+ # If we flush every bucket use the latest state
+ else:
+ flushed_state = copy.deepcopy(state)
- # reset all stream records to empty to avoid flushing same records
- for stream in records_to_load.keys():
- records_to_load[stream] = {}
+ # Return with state message with flushed positions
+ return flushed_state
def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=False):
diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py
index da6cb519..8c488d40 100644
--- a/tests/integration/test_target_snowflake.py
+++ b/tests/integration/test_target_snowflake.py
@@ -1,6 +1,7 @@
import datetime
import json
import unittest
+import mock
import os
from nose.tools import assert_raises
@@ -26,6 +27,7 @@ class TestIntegration(unittest.TestCase):
"""
Integration Tests
"""
+ maxDiff = None
def setUp(self):
self.config = test_utils.get_test_config()
@@ -286,6 +288,17 @@ def test_loading_tables_with_metadata_columns(self):
# Check if data loaded correctly and metadata columns exist
self.assert_three_streams_are_into_snowflake(should_metadata_columns_exist=True)
+ def test_loading_tables_with_defined_parallelism(self):
+ """Loading multiple tables from the same input tap with various columns types"""
+ tap_lines = test_utils.get_test_tap_lines('messages-with-three-streams.json')
+
+ # Using fixed 1 thread parallelism
+ self.config['parallelism'] = 1
+ self.persist_lines_with_cache(tap_lines)
+
+ # Check if data loaded correctly and metadata columns exist
+ self.assert_three_streams_are_into_snowflake()
+
def test_loading_tables_with_hard_delete(self):
"""Loading multiple tables from the same input tap with deleted rows"""
tap_lines = test_utils.get_test_tap_lines('messages-with-three-streams.json')
@@ -605,3 +618,180 @@ def test_information_schema_cache_outdated(self):
# It should try adding the new column based on the values in cache but the column already exists
with self.assertRaises(Exception):
self.persist_lines_with_cache(tap_lines_with_multi_streams)
+
+ @mock.patch('target_snowflake.emit_state')
+ def test_flush_streams_with_no_intermediate_flushes(self, mock_emit_state):
+ """Test emitting states when no intermediate flush required"""
+ mock_emit_state.get.return_value = None
+ tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams.json')
+
+ # Set batch size big enough to never has to flush in the middle
+ self.config['hard_delete'] = True
+ self.config['batch_size_rows'] = 1000
+ self.persist_lines_with_cache(tap_lines)
+
+ # State should be emitted only once with the latest received STATE message
+ self.assertEquals(
+ mock_emit_state.mock_calls,
+ [
+ mock.call({"currently_syncing": None, "bookmarks": {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}})
+ ])
+
+ # Every table should be loaded correctly
+ self.assert_logical_streams_are_in_snowflake(True)
+
+ @mock.patch('target_snowflake.emit_state')
+ def test_flush_streams_with_intermediate_flushes(self, mock_emit_state):
+ """Test emitting states when intermediate flushes required"""
+ mock_emit_state.get.return_value = None
+ tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams.json')
+
+ # Set batch size small enough to trigger multiple stream flushes
+ self.config['hard_delete'] = True
+ self.config['batch_size_rows'] = 10
+ self.persist_lines_with_cache(tap_lines)
+
+ # State should be emitted multiple times, updating the positions only in the stream which got flushed
+ self.assertEquals(
+ mock_emit_state.call_args_list,
+ [
+ # Flush #1 - Flushed edgydata until lsn: 108197216
+ mock.call({"currently_syncing": None, "bookmarks": {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #2 - Flushed logical1-logical1_table2 until lsn: 108201336
+ mock.call({"currently_syncing": None, "bookmarks": {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108201336, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #3 - Flushed logical1-logical1_table2 until lsn: 108237600
+ mock.call({"currently_syncing": None, "bookmarks": {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108237600, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #4 - Flushed logical1-logical1_table2 until lsn: 108238768
+ mock.call({"currently_syncing": None, "bookmarks": {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108238768, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #5 - Flushed logical1-logical1_table2 until lsn: 108239704,
+ mock.call({"currently_syncing": None, "bookmarks": {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108239896, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108196176, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #6 - Last flush, update every stream lsn: 108240872,
+ mock.call({"currently_syncing": None, "bookmarks": {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ ])
+
+ # Every table should be loaded correctly
+ self.assert_logical_streams_are_in_snowflake(True)
+
+ @mock.patch('target_snowflake.emit_state')
+ def test_flush_streams_with_intermediate_flushes_on_all_streams(self, mock_emit_state):
+ """Test emitting states when intermediate flushes required and flush_all_streams is enabled"""
+ mock_emit_state.get.return_value = None
+ tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams.json')
+
+ # Set batch size small enough to trigger multiple stream flushes
+ self.config['hard_delete'] = True
+ self.config['batch_size_rows'] = 10
+ self.config['flush_all_streams'] = True
+ self.persist_lines_with_cache(tap_lines)
+
+ # State should be emitted 6 times, flushing every stream and updating every stream position
+ self.assertEquals(
+ mock_emit_state.call_args_list,
+ [
+ # Flush #1 - Flush every stream until lsn: 108197216
+ mock.call({"currently_syncing": None, "bookmarks": {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108197216, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #2 - Flush every stream until lsn 108201336
+ mock.call({'currently_syncing': None, 'bookmarks': {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108201336, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108201336, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108201336, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108201336, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #3 - Flush every stream until lsn: 108237600
+ mock.call({'currently_syncing': None, 'bookmarks': {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108237600, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237600, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108237600, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108237600, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #4 - Flush every stream until lsn: 108238768
+ mock.call({'currently_syncing': None, 'bookmarks': {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108238768, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108238768, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108238768, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108238768, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #5 - Flush every stream until lsn: 108239704,
+ mock.call({'currently_syncing': None, 'bookmarks': {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108239896, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239896, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108239896, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108239896, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ # Flush #6 - Last flush, update every stream until lsn: 108240872,
+ mock.call({'currently_syncing': None, 'bookmarks': {
+ "logical1-logical1_edgydata": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723596, "xmin": None},
+ "logical1-logical1_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723618, "xmin": None},
+ "logical1-logical1_table2": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723635, "xmin": None},
+ "logical2-logical2_table1": {"last_replication_method": "LOG_BASED", "lsn": 108240872, "version": 1570922723651, "xmin": None},
+ "public-city": {"last_replication_method": "INCREMENTAL", "replication_key": "id", "version": 1570922723667, "replication_key_value": 4079},
+ "public-country": {"last_replication_method": "FULL_TABLE", "version": 1570922730456, "xmin": None},
+ "public2-wearehere": {}}}),
+ ])
+
+ # Every table should be loaded correctly
+ self.assert_logical_streams_are_in_snowflake(True)
diff --git a/tests/integration/utils.py b/tests/integration/utils.py
index f892bda9..41b72922 100644
--- a/tests/integration/utils.py
+++ b/tests/integration/utils.py
@@ -30,7 +30,6 @@ def get_db_config():
# External stage in snowflake with client side encryption details
config['client_side_encryption_master_key'] = os.environ.get('CLIENT_SIDE_ENCRYPTION_MASTER_KEY')
-
# --------------------------------------------------------------------------
# The following variables needs to be empty.
# The tests cases will set them automatically whenever it's needed
@@ -39,7 +38,7 @@ def get_db_config():
config['schema_mapping'] = None
config['add_metadata_columns'] = None
config['hard_delete'] = None
-
+ config['flush_all_streams'] = None
return config
@@ -57,4 +56,3 @@ def get_test_tap_lines(filename):
lines.append(line)
return lines
-