Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Add batch_wait_limit_seconds (#164)
Browse files Browse the repository at this point in the history
* Add batch_wait_limit_seconds
  • Loading branch information
Lauri Lehtinen authored Apr 12, 2021
1 parent 5bdc5e4 commit bb483e9
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# IDE
.vscode
.idea/*
*.iml

# Python
__pycache__/
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ Full list of options in `config.json`:
| stage | String | No | Named external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name. If not specified, table internal stage are used. When this is defined then `s3_bucket` has to be defined as well. |
| file_format | String | Yes | Named file format name created at pre-requirements section. Has to be a fully qualified name including the schema name. |
| batch_size_rows | Integer | | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. |
| batch_wait_limit_seconds | Integer | | (Default: None) Maximum time to wait for batch to reach `batch_size_rows`. |
| flush_all_streams | Boolean | | (Default: False) Flush and load every stream into Snowflake when one batch is full. Warning: This may trigger the COPY command to use files with low number of records, and may cause performance problems. |
| parallelism | Integer | | (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max. **Parallelism works only with external stages. If no s3_bucket defined with an external stage then flusing tables is enforced to use a single thread.**|
| parallelism_max | Integer | | (Default: 16) Max number of parallel threads to use when flushing tables. |
Expand Down
16 changes: 16 additions & 0 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from joblib import Parallel, delayed, parallel_backend
from jsonschema import Draft7Validator, FormatChecker
from singer import get_logger
from datetime import datetime, timedelta

import target_snowflake.file_formats.csv as csv
import target_snowflake.file_formats.parquet as parquet
Expand Down Expand Up @@ -110,6 +111,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
stream_to_sync = {}
total_row_count = {}
batch_size_rows = config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS)
batch_wait_limit_seconds = config.get('batch_wait_limit_seconds', None)
flush_timestamp = datetime.utcnow()

# Loop over lines from stdin
for line in lines:
Expand Down Expand Up @@ -167,7 +170,18 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
else:
records_to_load[stream][primary_key_string] = o['record']

flush = False
if row_count[stream] >= batch_size_rows:
flush = True
LOGGER.info("Flush triggered by batch_size_rows (%s) reached in %s",
batch_size_rows, stream)
elif (batch_wait_limit_seconds and
datetime.utcnow() >= (flush_timestamp + timedelta(seconds=batch_wait_limit_seconds))):
flush = True
LOGGER.info("Flush triggered by batch_wait_limit_seconds (%s)",
batch_wait_limit_seconds)

if flush:
# flush all streams, delete records if needed, reset counts and then emit current state
if config.get('flush_all_streams'):
filter_streams = None
Expand All @@ -184,6 +198,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
flushed_state,
filter_streams=filter_streams)

flush_timestamp = datetime.utcnow()

# emit last encountered state
emit_state(copy.deepcopy(flushed_state))

Expand Down
16 changes: 16 additions & 0 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import mock
import os
import botocore
import itertools

import target_snowflake
from target_snowflake import RecordValidationException
Expand Down Expand Up @@ -874,6 +875,21 @@ def test_flush_streams_with_intermediate_flushes_on_all_streams(self, mock_emit_
# Every table should be loaded correctly
self.assert_logical_streams_are_in_snowflake(True)

@mock.patch('target_snowflake.emit_state')
def test_flush_streams_based_on_batch_wait_limit(self, mock_emit_state):
"""Tests logical streams from pg with inserts, updates and deletes"""
tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams.json')

mock_emit_state.get.return_value = None

self.config['hard_delete'] = True
self.config['batch_size_rows'] = 1000
self.config['batch_wait_limit_seconds'] = 0.1
self.persist_lines_with_cache(tap_lines)

self.assert_logical_streams_are_in_snowflake(True)
self.assertGreater(mock_emit_state.call_count, 1, 'Expecting multiple flushes')

def test_record_validation(self):
"""Test validating records"""
tap_lines = test_utils.get_test_tap_lines('messages-with-invalid-records.json')
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/test_target_snowflake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import unittest
import os
import itertools

from datetime import datetime, timedelta
from unittest.mock import patch

import target_snowflake
Expand Down Expand Up @@ -53,3 +55,34 @@ def test_persist_lines_with_same_schema_expect_flushing_once(self, dbSync_mock,
target_snowflake.persist_lines(self.config, lines)

flush_streams_mock.assert_called_once()

@patch('target_snowflake.datetime')
@patch('target_snowflake.flush_streams')
@patch('target_snowflake.DbSync')
def test_persist_40_records_with_batch_wait_limit(self, dbSync_mock, flush_streams_mock, dateTime_mock):

start_time = datetime(2021, 4, 6, 0, 0, 0)
increment = 11
counter = itertools.count()

# Move time forward by {{increment}} seconds every time utcnow() is called
dateTime_mock.utcnow.side_effect = lambda: start_time + timedelta(seconds=increment * next(counter))

self.config['batch_size_rows'] = 100
self.config['batch_wait_limit_seconds'] = 10
self.config['flush_all_streams'] = True

# Expecting 40 records
with open(f'{os.path.dirname(__file__)}/resources/logical-streams.json', 'r') as f:
lines = f.readlines()

instance = dbSync_mock.return_value
instance.create_schema_if_not_exists.return_value = None
instance.sync_table.return_value = None

flush_streams_mock.return_value = '{"currently_syncing": null}'

target_snowflake.persist_lines(self.config, lines)

# Expecting flush after every records + 1 at the end
assert flush_streams_mock.call_count == 41

0 comments on commit bb483e9

Please sign in to comment.