Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
AP1011 - Custom archive bucket/folder (#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lauri Lehtinen authored Jun 23, 2021
1 parent 4de4250 commit 9b90754
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 29 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ Full list of options in `config.json`:
| temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. |
| no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. |
| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. |
| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `{{s3_bucket}}` under the key `/archive/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`.
| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket` under the key `/{archive_load_files_s3_prefix}/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`.
| archive_load_files_s3_prefix | String | | (Default: "archive") When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix.
| archive_load_files_s3_bucket | String | | (Default: Value of `s3_bucket`) When `archive_load_files` is enabled, the archived files will be placed in this bucket.

### To run tests:

Expand Down
5 changes: 2 additions & 3 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,8 @@ def flush_records(stream: str,
})

# Use same file name as in import
archive_file = s3_key.split('/')[-1]
archive_folder = "archive/{}/{}".format(archive_tap, archive_table)
archive_key = "{}/{}".format(archive_folder, archive_file)
archive_file = os.path.basename(s3_key)
archive_key = "{}/{}/{}".format(archive_tap, archive_table, archive_file)

db_sync.copy_to_archive(s3_key, archive_key, archive_metadata)

Expand Down
32 changes: 29 additions & 3 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,35 @@ def delete_from_stage(self, stream, s3_key):
self.upload_client.delete_object(stream, s3_key)

def copy_to_archive(self, s3_source_key, s3_archive_key, s3_archive_metadata):
"""Copy file from snowflake stage to archive"""
self.logger.info('Copying %s to archive location %s', s3_source_key, s3_archive_key)
self.upload_client.copy_object(s3_source_key, s3_archive_key, s3_archive_metadata)
"""
Copy file from snowflake stage to archive.
s3_source_key: The s3 key to copy, assumed to exist in the bucket configured as 's3_bucket'
s3_archive_key: The key to use in archive destination. This will be prefixed with the config value
'archive_load_files_s3_prefix'. If none is specified, 'archive' will be used as the prefix.
As destination bucket, the config value 'archive_load_files_s3_bucket' will be used. If none is
specified, the bucket configured as 's3_bucket' will be used.
s3_archive_metadata: This dict will be used as the S3 metadata in the file in archive destination. Metadata in
the source file will be replaced.
"""
source_bucket = self.connection_config.get('s3_bucket')

# Get archive s3_bucket from config, or use same bucket if not specified
archive_bucket = self.connection_config.get('archive_load_files_s3_bucket', source_bucket)

# Determine prefix to use in archive s3 bucket
default_archive_prefix = 'archive'
archive_prefix = self.connection_config.get('archive_load_files_s3_prefix', default_archive_prefix)
prefixed_archive_key = '{}/{}'.format(archive_prefix, s3_archive_key)

copy_source = '{}/{}'.format(source_bucket, s3_source_key)

self.logger.info('Copying %s to archive location %s', copy_source, prefixed_archive_key)
self.upload_client.copy_object(copy_source, archive_bucket, prefixed_archive_key, s3_archive_metadata)

def get_stage_name(self, stream):
"""Generate snowflake stage name"""
Expand Down
2 changes: 1 addition & 1 deletion target_snowflake/upload_clients/base_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def delete_object(self, stream: str, key: str) -> None:
"""

@abstractmethod
def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
def copy_object(self, copy_source: str, target_bucket: str, target_key: str, target_metadata: dict) -> None:
"""
Copy object
"""
12 changes: 4 additions & 8 deletions target_snowflake/upload_clients/s3_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,9 @@ def delete_object(self, stream: str, key: str) -> None:
bucket = self.connection_config['s3_bucket']
self.s3_client.delete_object(Bucket=bucket, Key=key)

def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
def copy_object(self, copy_source: str, target_bucket: str, target_key: str, target_metadata: dict) -> None:
"""Copy object to another location on S3"""
self.logger.info('Copying %s to %s', source_key, target_key)
bucket = self.connection_config['s3_bucket']

copy_source = "{}/{}".format(bucket, source_key)

self.logger.info('Copying %s to %s/%s', copy_source, target_bucket, target_key)
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy_object
self.s3_client.copy_object(CopySource=copy_source, Bucket=bucket, Key=target_key, Metadata=target_metadata,
MetadataDirective="REPLACE")
self.s3_client.copy_object(CopySource=copy_source, Bucket=target_bucket, Key=target_key,
Metadata=target_metadata, MetadataDirective="REPLACE")
2 changes: 1 addition & 1 deletion target_snowflake/upload_clients/snowflake_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ def delete_object(self, stream: str, key: str) -> None:
with self.dblink.open_connection() as connection:
connection.cursor().execute(f"REMOVE '@{stage}/{key}'")

def copy_object(self, source_key: str, target_key: str, target_metadata: dict) -> None:
def copy_object(self, copy_source: str, target_bucket: str, target_key: str, target_metadata: dict) -> None:
raise NotImplementedError(
"Copying objects is not supported with a Snowflake upload client.")
8 changes: 3 additions & 5 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,14 +1180,15 @@ def test_parquet(self):
def test_archive_load_files(self):
"""Test if load file is copied to archive folder"""
self.config['archive_load_files'] = True
self.config['archive_load_files_s3_prefix'] = 'archive_folder'
self.config['tap_id'] = 'test_tap_id'
self.config['client_side_encryption_master_key'] = ''

s3_bucket = self.config['s3_bucket']

# Delete any dangling files from archive
files_in_s3_archive = self.s3_client.list_objects(
Bucket=s3_bucket, Prefix="archive/test_tap_id/").get('Contents', [])
Bucket=s3_bucket, Prefix="archive_folder/test_tap_id/").get('Contents', [])
for file_in_archive in files_in_s3_archive:
key = file_in_archive["Key"]
self.s3_client.delete_object(Bucket=s3_bucket, Key=key)
Expand All @@ -1196,7 +1197,7 @@ def test_archive_load_files(self):
self.persist_lines_with_cache(tap_lines)

# Verify expected file metadata in S3
files_in_s3_archive = self.s3_client.list_objects(Bucket=s3_bucket, Prefix="archive/test_tap_id/").get(
files_in_s3_archive = self.s3_client.list_objects(Bucket=s3_bucket, Prefix="archive_folder/test_tap_id/").get(
'Contents')
self.assertIsNotNone(files_in_s3_archive)
self.assertEqual(1, len(files_in_s3_archive))
Expand Down Expand Up @@ -1229,6 +1230,3 @@ def test_archive_load_files(self):
4,"xyz4","not-formatted-time-4"
5,"xyz5","not-formatted-time-5"
''')

# Clean up
self.s3_client.delete_object(Bucket=s3_bucket, Key=archived_file_key)
37 changes: 37 additions & 0 deletions tests/unit/test_db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,43 @@ def test_parallelism(self, query_patch):
self.assertEqual(db_sync.DbSync({**minimal_config,
**table_stage_with_parallel}).connection_config['parallelism'], 1)

@patch('target_snowflake.upload_clients.s3_upload_client.S3UploadClient.copy_object')
@patch('target_snowflake.db_sync.DbSync.query')
def test_copy_to_archive(self, query_patch, copy_object_patch):
query_patch.return_value = [{'type': 'CSV'}]
minimal_config = {
'account': "dummy-value",
'dbname': "dummy-value",
'user': "dummy-value",
'password': "dummy-value",
'warehouse': "dummy-value",
'default_target_schema': "dummy-value",
'file_format': "dummy-value",
's3_bucket': 'dummy-bucket',
'stage': 'dummy_schema.dummy_stage'
}

# Assert default values (same bucket, 'archive' as the archive prefix)
s3_config = {}
dbsync = db_sync.DbSync({**minimal_config, **s3_config})
dbsync.copy_to_archive('source/file', 'tap/schema/file', {'meta': "data"})

self.assertEqual(copy_object_patch.call_args[0][0], 'dummy-bucket/source/file')
self.assertEqual(copy_object_patch.call_args[0][1], 'dummy-bucket')
self.assertEqual(copy_object_patch.call_args[0][2], 'archive/tap/schema/file')

# Assert custom archive bucket and prefix
s3_config = {
'archive_load_files_s3_bucket': "custom-bucket",
'archive_load_files_s3_prefix': "custom-prefix"
}
dbsync = db_sync.DbSync({**minimal_config, **s3_config})
dbsync.copy_to_archive('source/file', 'tap/schema/file', {'meta': "data"})

self.assertEqual(copy_object_patch.call_args[0][0], 'dummy-bucket/source/file')
self.assertEqual(copy_object_patch.call_args[0][1], 'custom-bucket')
self.assertEqual(copy_object_patch.call_args[0][2], 'custom-prefix/tap/schema/file')

def test_safe_column_name(self):
self.assertEqual(db_sync.safe_column_name("columnname"), '"COLUMNNAME"')
self.assertEqual(db_sync.safe_column_name("columnName"), '"COLUMNNAME"')
Expand Down
13 changes: 6 additions & 7 deletions tests/unit/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ def test_archive_load_files_incremental_replication(self, os_remove_mock, dbSync
instance = dbSync_mock.return_value
instance.create_schema_if_not_exists.return_value = None
instance.sync_table.return_value = None
instance.put_to_stage.return_value = 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz'
instance.put_to_stage.return_value = 'some-s3-folder/some-name_date_batch_hash.csg.gz'

target_snowflake.persist_lines(self.config, lines)

copy_to_archive_args = instance.copy_to_archive.call_args[0]
assert copy_to_archive_args[0] == 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[1] == 'archive/test_tap_id/test_simple_table/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[0] == 'some-s3-folder/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[1] == 'test_tap_id/test_simple_table/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[2] == {
'tap': 'test_tap_id',
'schema': 'tap_mysql_test',
Expand All @@ -122,21 +122,20 @@ def test_archive_load_files_incremental_replication(self, os_remove_mock, dbSync
def test_archive_load_files_log_based_replication(self, os_remove_mock, dbSync_mock):
self.config['tap_id'] = 'test_tap_id'
self.config['archive_load_files'] = True
self.config['s3_bucket'] = 'dummy_bucket'

with open(f'{os.path.dirname(__file__)}/resources/logical-streams.json', 'r') as f:
lines = f.readlines()

instance = dbSync_mock.return_value
instance.create_schema_if_not_exists.return_value = None
instance.sync_table.return_value = None
instance.put_to_stage.return_value = 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz'
instance.put_to_stage.return_value = 'some-s3-folder/some-name_date_batch_hash.csg.gz'

target_snowflake.persist_lines(self.config, lines)

copy_to_archive_args = instance.copy_to_archive.call_args[0]
assert copy_to_archive_args[0] == 'some-s3-bucket/some-s3-folder/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[1] == 'archive/test_tap_id/logical1_table2/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[0] == 'some-s3-folder/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[1] == 'test_tap_id/logical1_table2/some-name_date_batch_hash.csg.gz'
assert copy_to_archive_args[2] == {
'tap': 'test_tap_id',
'schema': 'logical1',
Expand Down

0 comments on commit 9b90754

Please sign in to comment.