Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
[AP-415] Add temp_dir optional parameter to config (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti authored Jan 15, 2020
1 parent 7f28237 commit e6e4ef6
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ Full list of options in `config.json`:
| data_flattening_max_level | Integer | | (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.<br><br>When value is 0 (default) then flattening functionality is turned off. |
| primary_key_required | Boolean | | (Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined. |
| validate_records | Boolean | | (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation. |
| temp_dir | String | | (Default: platform-dependent) Directory of temporary CSV files with RECORD messages. |

### To run tests:

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name="pipelinewise-target-snowflake",
version="1.2.1",
version="1.3.0",
description="Singer.io target for loading data to Snowflake - PipelineWise compatible",
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
14 changes: 9 additions & 5 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ def flush_streams(
records_to_load=streams[stream],
row_count=row_count,
db_sync=stream_to_sync[stream],
delete_rows=config.get('hard_delete')
delete_rows=config.get('hard_delete'),
temp_dir=config.get('temp_dir')
) for stream in streams_to_flush)

# reset flushed stream records to empty to avoid flushing same records
Expand All @@ -392,10 +393,10 @@ def flush_streams(
return flushed_state


def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=False):
def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=False, temp_dir=None):
# Load into snowflake
if row_count[stream] > 0:
flush_records(stream, records_to_load, row_count[stream], db_sync)
flush_records(stream, records_to_load, row_count[stream], db_sync, temp_dir)

# Delete soft-deleted, flagged rows - where _sdc_deleted at is not null
if delete_rows:
Expand All @@ -405,8 +406,11 @@ def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=F
row_count[stream] = 0


def flush_records(stream, records_to_load, row_count, db_sync):
csv_fd, csv_file = mkstemp()
def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None):
if temp_dir:
os.makedirs(temp_dir, exist_ok=True)
csv_fd, csv_file = mkstemp(suffix='.csv', prefix='records_', dir=temp_dir)

with open(csv_fd, 'w+b') as f:
for record in records_to_load.values():
csv_line = db_sync.record_to_csv_line(record)
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,3 +886,13 @@ def test_pg_records_validation(self):

self.config['validate_records'] = True
self.persist_lines_with_cache(tap_lines_valid_records)

def test_loading_tables_with_custom_temp_dir(self):
"""Loading multiple tables from the same input tap using custom temp directory"""
tap_lines = test_utils.get_test_tap_lines('messages-with-three-streams.json')

# Turning on client-side encryption and load
self.config['temp_dir'] = ('~/.pipelinewise/tmp')
self.persist_lines_with_cache(tap_lines)

self.assert_three_streams_are_into_snowflake()

0 comments on commit e6e4ef6

Please sign in to comment.