Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
AP-1011 archive_load_files feature (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lauri Lehtinen authored Jun 14, 2021
1 parent ad0d71b commit 4de4250
Show file tree
Hide file tree
Showing 14 changed files with 319 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
command: |
. venv/bin/activate
export LOGGING_CONF_FILE=$(pwd)/sample_logging.conf
pytest tests/unit -vv --cov target_snowflake --cov-fail-under=55
pytest tests/unit -vv --cov target_snowflake --cov-fail-under=60
- run:
name: 'Integration Tests'
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ Full list of options in `config.json`:
| temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. |
| no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. |
| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. |
| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `{{s3_bucket}}` under the key `/archive/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`.

### To run tests:

Expand Down
105 changes: 94 additions & 11 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def get_snowflake_statics(config):
if not ('disable_table_cache' in config and config['disable_table_cache']):
LOGGER.info('Getting catalog objects from table cache...')

db = DbSync(config) # pylint: disable=invalid-name
db = DbSync(config) # pylint: disable=invalid-name
table_cache = db.get_table_columns(
table_schemas=stream_utils.get_schema_names_from_config(config))

Expand All @@ -83,8 +83,9 @@ def get_snowflake_statics(config):

return table_cache, file_format_type


# pylint: disable=too-many-locals,too-many-branches,too-many-statements,invalid-name
def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatTypes=None) -> None:
def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatTypes = None) -> None:
"""Main loop to read and consume singer messages from stdin
Params:
Expand Down Expand Up @@ -113,6 +114,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
batch_size_rows = config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS)
batch_wait_limit_seconds = config.get('batch_wait_limit_seconds', None)
flush_timestamp = datetime.utcnow()
archive_load_files = config.get('archive_load_files', False)
archive_load_files_data = {}

# Loop over lines from stdin
for line in lines:
Expand Down Expand Up @@ -170,6 +173,21 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
else:
records_to_load[stream][primary_key_string] = o['record']

if archive_load_files and stream in archive_load_files_data:
# Keep track of min and max of the designated column
stream_archive_load_files_values = archive_load_files_data[stream]
if 'column' in stream_archive_load_files_values:
incremental_key_column_name = stream_archive_load_files_values['column']
incremental_key_value = o['record'][incremental_key_column_name]
min_value = stream_archive_load_files_values['min']
max_value = stream_archive_load_files_values['max']

if min_value is None or min_value > incremental_key_value:
stream_archive_load_files_values['min'] = incremental_key_value

if max_value is None or max_value < incremental_key_value:
stream_archive_load_files_values['max'] = incremental_key_value

flush = False
if row_count[stream] >= batch_size_rows:
flush = True
Expand All @@ -196,6 +214,7 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
config,
state,
flushed_state,
archive_load_files_data,
filter_streams=filter_streams)

flush_timestamp = datetime.utcnow()
Expand Down Expand Up @@ -226,7 +245,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
stream_to_sync,
config,
state,
flushed_state)
flushed_state,
archive_load_files_data)

# emit latest encountered state
emit_state(flushed_state)
Expand Down Expand Up @@ -257,6 +277,27 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
else:
stream_to_sync[stream] = DbSync(config, o, table_cache, file_format_type)

if archive_load_files:
archive_load_files_data[stream] = {
'tap': config.get('tap_id'),
}

# In case of incremental replication, track min/max of the replication key.
# Incremental replication is assumed if o['bookmark_properties'][0] is one of the columns.
incremental_key_column_name = stream_utils.get_incremental_key(o)
if incremental_key_column_name:
LOGGER.info("Using %s as incremental_key_column_name", incremental_key_column_name)
archive_load_files_data[stream].update(
column=incremental_key_column_name,
min=None,
max=None
)
else:
LOGGER.warning(
"archive_load_files is enabled, but no incremental_key_column_name was found. "
"Min/max values will not be added to metadata for stream %s.", stream
)

stream_to_sync[stream].create_schema_if_not_exists()
stream_to_sync[stream].sync_table()

Expand All @@ -281,7 +322,8 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
# then flush all buckets.
if sum(row_count.values()) > 0:
# flush all streams one last time, delete records if needed, reset counts and then emit current state
flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state)
flushed_state = flush_streams(records_to_load, row_count, stream_to_sync, config, state, flushed_state,
archive_load_files_data)

# emit latest state
emit_state(copy.deepcopy(flushed_state))
Expand All @@ -295,6 +337,7 @@ def flush_streams(
config,
state,
flushed_state,
archive_load_files_data,
filter_streams=None):
"""
Flushes all buckets and resets records count to 0 as well as empties records to load list
Expand All @@ -305,6 +348,7 @@ def flush_streams(
:param state: dictionary containing the original state from tap
:param flushed_state: dictionary containing updated states only when streams got flushed
:param filter_streams: Keys of streams to flush from the streams dict. Default is every stream
:param archive_load_files_data: dictionary of dictionaries containing archive load files data
:return: State dict with flushed positions
"""
parallelism = config.get("parallelism", DEFAULT_PARALLELISM)
Expand Down Expand Up @@ -337,7 +381,8 @@ def flush_streams(
db_sync=stream_to_sync[stream],
no_compression=config.get('no_compression'),
delete_rows=config.get('hard_delete'),
temp_dir=config.get('temp_dir')
temp_dir=config.get('temp_dir'),
archive_load_files=copy.copy(archive_load_files_data.get(stream, None))
) for stream in streams_to_flush)

# reset flushed stream records to empty to avoid flushing same records
Expand All @@ -358,16 +403,20 @@ def flush_streams(
else:
flushed_state = copy.deepcopy(state)

if stream in archive_load_files_data:
archive_load_files_data[stream]['min'] = None
archive_load_files_data[stream]['max'] = None

# Return with state message with flushed positions
return flushed_state


def load_stream_batch(stream, records, row_count, db_sync, no_compression=False, delete_rows=False,
temp_dir=None):
temp_dir=None, archive_load_files=None):
"""Load one batch of the stream into target table"""
# Load into snowflake
if row_count[stream] > 0:
flush_records(stream, records, db_sync, temp_dir, no_compression)
flush_records(stream, records, db_sync, temp_dir, no_compression, archive_load_files)

# Delete soft-deleted, flagged rows - where _sdc_deleted at is not null
if delete_rows:
Expand All @@ -381,7 +430,8 @@ def flush_records(stream: str,
records: List[Dict],
db_sync: DbSync,
temp_dir: str = None,
no_compression: bool = False) -> None:
no_compression: bool = False,
archive_load_files: Dict = None) -> None:
"""
Takes a list of record messages and loads it into the snowflake target table
Expand All @@ -391,8 +441,9 @@ def flush_records(stream: str,
column value
row_count:
db_sync: A DbSync object
temp_dir: Directory where intermediate temporary files will be created. (Default: OS specificy temp directory)
temp_dir: Directory where intermediate temporary files will be created. (Default: OS specific temp directory)
no_compression: Disable to use compressed files. (Default: False)
archive_load_files: Data needed for archive load files. (Default: None)
Returns:
None
Expand All @@ -403,7 +454,7 @@ def flush_records(stream: str,
compression=not no_compression,
dest_dir=temp_dir,
data_flattening_max_level=
db_sync.data_flattening_max_level)
db_sync.data_flattening_max_level)

# Get file stats
row_count = len(records)
Expand All @@ -413,8 +464,40 @@ def flush_records(stream: str,
s3_key = db_sync.put_to_stage(filepath, stream, row_count, temp_dir=temp_dir)
db_sync.load_file(s3_key, row_count, size_bytes)

# Delete file from local disk and from s3
# Delete file from local disk
os.remove(filepath)

if archive_load_files:
stream_name_parts = stream_utils.stream_name_to_dict(stream)
if 'schema_name' not in stream_name_parts or 'table_name' not in stream_name_parts:
raise Exception("Failed to extract schema and table names from stream '{}'".format(stream))

archive_schema = stream_name_parts['schema_name']
archive_table = stream_name_parts['table_name']
archive_tap = archive_load_files['tap']

archive_metadata = {
'tap': archive_tap,
'schema': archive_schema,
'table': archive_table,
'archived-by': 'pipelinewise_target_snowflake'
}

if 'column' in archive_load_files:
archive_metadata.update({
'incremental-key': archive_load_files['column'],
'incremental-key-min': str(archive_load_files['min']),
'incremental-key-max': str(archive_load_files['max'])
})

# Use same file name as in import
archive_file = s3_key.split('/')[-1]
archive_folder = "archive/{}/{}".format(archive_tap, archive_table)
archive_key = "{}/{}".format(archive_folder, archive_file)

db_sync.copy_to_archive(s3_key, archive_key, archive_metadata)

# Delete file from S3
db_sync.delete_from_stage(stream, s3_key)


Expand Down
10 changes: 10 additions & 0 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def validate_config(config):
if not config_default_target_schema and not config_schema_mapping:
errors.append("Neither 'default_target_schema' (string) nor 'schema_mapping' (object) keys set in config.")

# Check if archive load files option is using external stages
archive_load_files = config.get('archive_load_files', False)
if archive_load_files and not config.get('s3_bucket', None):
errors.append('Archive load files option can be used only with external s3 stages. Please define s3_bucket.')

return errors


Expand Down Expand Up @@ -397,6 +402,11 @@ def delete_from_stage(self, stream, s3_key):
self.logger.info('Deleting %s from stage', format(s3_key))
self.upload_client.delete_object(stream, s3_key)

def copy_to_archive(self, s3_source_key, s3_archive_key, s3_archive_metadata):
"""Copy file from snowflake stage to archive"""
self.logger.info('Copying %s to archive location %s', s3_source_key, s3_archive_key)
self.upload_client.copy_object(s3_source_key, s3_archive_key, s3_archive_metadata)

def get_stage_name(self, stream):
"""Generate snowflake stage name"""
stage = self.connection_config.get('stage', None)
Expand Down
4 changes: 4 additions & 0 deletions target_snowflake/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ class FileFormatNotFoundException(Exception):

class InvalidFileFormatException(Exception):
"""Exception to raise when name file format is not compatible"""


class UnexpectedMessageTypeException(Exception):
"""Exception to raise when provided message doesn't match the expected type"""
14 changes: 14 additions & 0 deletions target_snowflake/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from singer import get_logger

from target_snowflake.exceptions import UnexpectedValueTypeException
from target_snowflake.exceptions import UnexpectedMessageTypeException

LOGGER = get_logger('target_snowflake')

Expand Down Expand Up @@ -114,3 +115,16 @@ def stream_name_to_dict(stream_name, separator='-'):
'schema_name': schema_name,
'table_name': table_name
}


def get_incremental_key(singer_msg: Dict):
"""Derive incremental key from a Singer message dictionary"""
if singer_msg['type'] != "SCHEMA":
raise UnexpectedMessageTypeException("Expecting type SCHEMA, got {}".format(singer_msg['type']))

if 'bookmark_properties' in singer_msg and len(singer_msg['bookmark_properties']) > 0:
col = singer_msg['bookmark_properties'][0]
if col in singer_msg['schema']['properties']:
return col

return None
6 changes: 6 additions & 0 deletions target_snowflake/upload_clients/base_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,9 @@ def delete_object(self, stream: str, key: str) -> None:
"""
Delete object
"""

@abstractmethod
def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
"""
Copy object
"""
11 changes: 11 additions & 0 deletions target_snowflake/upload_clients/s3_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,14 @@ def delete_object(self, stream: str, key: str) -> None:
self.logger.info('Deleting %s from external snowflake stage on S3', key)
bucket = self.connection_config['s3_bucket']
self.s3_client.delete_object(Bucket=bucket, Key=key)

def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
"""Copy object to another location on S3"""
self.logger.info('Copying %s to %s', source_key, target_key)
bucket = self.connection_config['s3_bucket']

copy_source = "{}/{}".format(bucket, source_key)

# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy_object
self.s3_client.copy_object(CopySource=copy_source, Bucket=bucket, Key=target_key, Metadata=target_metadata,
MetadataDirective="REPLACE")
4 changes: 4 additions & 0 deletions target_snowflake/upload_clients/snowflake_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ def delete_object(self, stream: str, key: str) -> None:

with self.dblink.open_connection() as connection:
connection.cursor().execute(f"REMOVE '@{stage}/{key}'")

def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
raise NotImplementedError(
"Copying objects is not supported with a Snowflake upload client.")
Loading

0 comments on commit 4de4250

Please sign in to comment.