From bb483e902fe7fb6a3a6ccab5873288b31df40c0d Mon Sep 17 00:00:00 2001 From: Lauri Lehtinen Date: Mon, 12 Apr 2021 10:19:08 +0300 Subject: [PATCH] Add batch_wait_limit_seconds (#164) * Add batch_wait_limit_seconds --- .gitignore | 1 + README.md | 1 + target_snowflake/__init__.py | 16 +++++++++++ tests/integration/test_target_snowflake.py | 16 +++++++++++ tests/unit/test_target_snowflake.py | 33 ++++++++++++++++++++++ 5 files changed, 67 insertions(+) diff --git a/.gitignore b/.gitignore index 6c183788..a005a21d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # IDE .vscode .idea/* +*.iml # Python __pycache__/ diff --git a/README.md b/README.md index 326b9b88..7b665a6e 100644 --- a/README.md +++ b/README.md @@ -158,6 +158,7 @@ Full list of options in `config.json`: | stage | String | No | Named external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name. If not specified, table internal stage are used. When this is defined then `s3_bucket` has to be defined as well. | | file_format | String | Yes | Named file format name created at pre-requirements section. Has to be a fully qualified name including the schema name. | | batch_size_rows | Integer | | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. | +| batch_wait_limit_seconds | Integer | | (Default: None) Maximum time to wait for batch to reach `batch_size_rows`. | | flush_all_streams | Boolean | | (Default: False) Flush and load every stream into Snowflake when one batch is full. Warning: This may trigger the COPY command to use files with low number of records, and may cause performance problems. | | parallelism | Integer | | (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max. **Parallelism works only with external stages. If no s3_bucket defined with an external stage then flusing tables is enforced to use a single thread.**| | parallelism_max | Integer | | (Default: 16) Max number of parallel threads to use when flushing tables. | diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index cbf7abae..c59681a0 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -12,6 +12,7 @@ from joblib import Parallel, delayed, parallel_backend from jsonschema import Draft7Validator, FormatChecker from singer import get_logger +from datetime import datetime, timedelta import target_snowflake.file_formats.csv as csv import target_snowflake.file_formats.parquet as parquet @@ -110,6 +111,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT stream_to_sync = {} total_row_count = {} batch_size_rows = config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS) + batch_wait_limit_seconds = config.get('batch_wait_limit_seconds', None) + flush_timestamp = datetime.utcnow() # Loop over lines from stdin for line in lines: @@ -167,7 +170,18 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT else: records_to_load[stream][primary_key_string] = o['record'] + flush = False if row_count[stream] >= batch_size_rows: + flush = True + LOGGER.info("Flush triggered by batch_size_rows (%s) reached in %s", + batch_size_rows, stream) + elif (batch_wait_limit_seconds and + datetime.utcnow() >= (flush_timestamp + timedelta(seconds=batch_wait_limit_seconds))): + flush = True + LOGGER.info("Flush triggered by batch_wait_limit_seconds (%s)", + batch_wait_limit_seconds) + + if flush: # flush all streams, delete records if needed, reset counts and then emit current state if config.get('flush_all_streams'): filter_streams = None @@ -184,6 +198,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT flushed_state, filter_streams=filter_streams) + flush_timestamp = datetime.utcnow() + # emit last encountered state emit_state(copy.deepcopy(flushed_state)) diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 06be8b90..e229a243 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -4,6 +4,7 @@ import mock import os import botocore +import itertools import target_snowflake from target_snowflake import RecordValidationException @@ -874,6 +875,21 @@ def test_flush_streams_with_intermediate_flushes_on_all_streams(self, mock_emit_ # Every table should be loaded correctly self.assert_logical_streams_are_in_snowflake(True) + @mock.patch('target_snowflake.emit_state') + def test_flush_streams_based_on_batch_wait_limit(self, mock_emit_state): + """Tests logical streams from pg with inserts, updates and deletes""" + tap_lines = test_utils.get_test_tap_lines('messages-pg-logical-streams.json') + + mock_emit_state.get.return_value = None + + self.config['hard_delete'] = True + self.config['batch_size_rows'] = 1000 + self.config['batch_wait_limit_seconds'] = 0.1 + self.persist_lines_with_cache(tap_lines) + + self.assert_logical_streams_are_in_snowflake(True) + self.assertGreater(mock_emit_state.call_count, 1, 'Expecting multiple flushes') + def test_record_validation(self): """Test validating records""" tap_lines = test_utils.get_test_tap_lines('messages-with-invalid-records.json') diff --git a/tests/unit/test_target_snowflake.py b/tests/unit/test_target_snowflake.py index d50e60eb..4417ac41 100644 --- a/tests/unit/test_target_snowflake.py +++ b/tests/unit/test_target_snowflake.py @@ -1,6 +1,8 @@ import unittest import os +import itertools +from datetime import datetime, timedelta from unittest.mock import patch import target_snowflake @@ -53,3 +55,34 @@ def test_persist_lines_with_same_schema_expect_flushing_once(self, dbSync_mock, target_snowflake.persist_lines(self.config, lines) flush_streams_mock.assert_called_once() + + @patch('target_snowflake.datetime') + @patch('target_snowflake.flush_streams') + @patch('target_snowflake.DbSync') + def test_persist_40_records_with_batch_wait_limit(self, dbSync_mock, flush_streams_mock, dateTime_mock): + + start_time = datetime(2021, 4, 6, 0, 0, 0) + increment = 11 + counter = itertools.count() + + # Move time forward by {{increment}} seconds every time utcnow() is called + dateTime_mock.utcnow.side_effect = lambda: start_time + timedelta(seconds=increment * next(counter)) + + self.config['batch_size_rows'] = 100 + self.config['batch_wait_limit_seconds'] = 10 + self.config['flush_all_streams'] = True + + # Expecting 40 records + with open(f'{os.path.dirname(__file__)}/resources/logical-streams.json', 'r') as f: + lines = f.readlines() + + instance = dbSync_mock.return_value + instance.create_schema_if_not_exists.return_value = None + instance.sync_table.return_value = None + + flush_streams_mock.return_value = '{"currently_syncing": null}' + + target_snowflake.persist_lines(self.config, lines) + + # Expecting flush after every records + 1 at the end + assert flush_streams_mock.call_count == 41 \ No newline at end of file