From e6e4ef691cee96447be832b7ac6935adb2212b0d Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Wed, 15 Jan 2020 22:51:44 +0200 Subject: [PATCH] [AP-415] Add temp_dir optional parameter to config (#59) --- README.md | 1 + setup.py | 2 +- target_snowflake/__init__.py | 14 +++++++++----- tests/integration/test_target_snowflake.py | 10 ++++++++++ 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 8f48679a..f6be2ae1 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,7 @@ Full list of options in `config.json`: | data_flattening_max_level | Integer | | (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.

When value is 0 (default) then flattening functionality is turned off. | | primary_key_required | Boolean | | (Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined. | | validate_records | Boolean | | (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation. | +| temp_dir | String | | (Default: platform-dependent) Directory of temporary CSV files with RECORD messages. | ### To run tests: diff --git a/setup.py b/setup.py index 41b4ed45..889bc1e3 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name="pipelinewise-target-snowflake", - version="1.2.1", + version="1.3.0", description="Singer.io target for loading data to Snowflake - PipelineWise compatible", long_description=long_description, long_description_content_type='text/markdown', diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index 51bba10a..2321fb43 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -367,7 +367,8 @@ def flush_streams( records_to_load=streams[stream], row_count=row_count, db_sync=stream_to_sync[stream], - delete_rows=config.get('hard_delete') + delete_rows=config.get('hard_delete'), + temp_dir=config.get('temp_dir') ) for stream in streams_to_flush) # reset flushed stream records to empty to avoid flushing same records @@ -392,10 +393,10 @@ def flush_streams( return flushed_state -def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=False): +def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=False, temp_dir=None): # Load into snowflake if row_count[stream] > 0: - flush_records(stream, records_to_load, row_count[stream], db_sync) + flush_records(stream, records_to_load, row_count[stream], db_sync, temp_dir) # Delete soft-deleted, flagged rows - where _sdc_deleted at is not null if delete_rows: @@ -405,8 +406,11 @@ def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=F row_count[stream] = 0 -def flush_records(stream, records_to_load, row_count, db_sync): - csv_fd, csv_file = mkstemp() +def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None): + if temp_dir: + os.makedirs(temp_dir, exist_ok=True) + csv_fd, csv_file = mkstemp(suffix='.csv', prefix='records_', dir=temp_dir) + with open(csv_fd, 'w+b') as f: for record in records_to_load.values(): csv_line = db_sync.record_to_csv_line(record) diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index b503bbe2..4b5a2595 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -886,3 +886,13 @@ def test_pg_records_validation(self): self.config['validate_records'] = True self.persist_lines_with_cache(tap_lines_valid_records) + + def test_loading_tables_with_custom_temp_dir(self): + """Loading multiple tables from the same input tap using custom temp directory""" + tap_lines = test_utils.get_test_tap_lines('messages-with-three-streams.json') + + # Turning on client-side encryption and load + self.config['temp_dir'] = ('~/.pipelinewise/tmp') + self.persist_lines_with_cache(tap_lines) + + self.assert_three_streams_are_into_snowflake() \ No newline at end of file