diff --git a/setup.py b/setup.py index a1c2bdfa..566e1bef 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ ], extras_require={ "test": [ - "pylint==2.8.3", + "pylint==2.11.1", 'pytest==6.2.5', 'pytest-cov==3.0.0', "python-dotenv==0.19.1" diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index 1ecf7e21..4bc55c04 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -14,9 +14,9 @@ from singer import get_logger from datetime import datetime, timedelta -import target_snowflake.file_formats.csv as csv -import target_snowflake.file_formats.parquet as parquet -import target_snowflake.stream_utils as stream_utils +from target_snowflake.file_formats import csv +from target_snowflake.file_formats import parquet +from target_snowflake import stream_utils from target_snowflake.db_sync import DbSync from target_snowflake.file_format import FileFormatTypes @@ -57,7 +57,7 @@ def emit_state(state): if state is not None: line = json.dumps(state) LOGGER.info('Emitting state %s', line) - sys.stdout.write("{}\n".format(line)) + sys.stdout.write(f"{line}\n") sys.stdout.flush() @@ -157,7 +157,7 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record']) if not primary_key_string: - primary_key_string = 'RID-{}'.format(total_row_count[stream]) + primary_key_string = f'RID-{total_row_count[stream]}' if stream not in records_to_load: records_to_load[stream] = {} @@ -470,7 +470,7 @@ def flush_records(stream: str, if archive_load_files: stream_name_parts = stream_utils.stream_name_to_dict(stream) if 'schema_name' not in stream_name_parts or 'table_name' not in stream_name_parts: - raise Exception("Failed to extract schema and table names from stream '{}'".format(stream)) + raise Exception(f"Failed to extract schema and table names from stream '{stream}'") archive_schema = stream_name_parts['schema_name'] archive_table = stream_name_parts['table_name'] @@ -492,7 +492,7 @@ def flush_records(stream: str, # Use same file name as in import archive_file = os.path.basename(s3_key) - archive_key = "{}/{}/{}".format(archive_tap, archive_table, archive_file) + archive_key = f"{archive_tap}/{archive_table}/{archive_file}" db_sync.copy_to_archive(s3_key, archive_key, archive_metadata) @@ -507,7 +507,7 @@ def main(): args = arg_parser.parse_args() if args.config: - with open(args.config) as config_input: + with open(args.config, encoding="utf8") as config_input: config = json.load(config_input) else: config = {} diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index f7853ed8..82ce696a 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -7,8 +7,8 @@ import time from singer import get_logger -import target_snowflake.flattening as flattening -import target_snowflake.stream_utils as stream_utils +from target_snowflake import flattening +from target_snowflake import stream_utils from target_snowflake.file_format import FileFormat, FileFormatTypes from target_snowflake.exceptions import TooManyRecordsException, PrimaryKeyNotFoundException @@ -55,7 +55,7 @@ def validate_config(config): # Check if mandatory keys exist for k in required_config_keys: if not config.get(k, None): - errors.append("Required key is missing from config: [{}]".format(k)) + errors.append(f"Required key is missing from config: [{k}]") # Check target schema config config_default_target_schema = config.get('default_target_schema', None) @@ -114,15 +114,15 @@ def column_trans(schema_property): def safe_column_name(name): """Generate SQL friendly column name""" - return '"{}"'.format(name).upper() + return f'"{name}"'.upper() def json_element_name(name): """Generate SQL friendly semi structured element reference name""" - return '"{}"'.format(name) + return f'"{name}"' def column_clause(name, schema_property): """Generate DDL column name with column type string""" - return '{} {}'.format(safe_column_name(name), column_type(schema_property)) + return f'{safe_column_name(name)} {column_type(schema_property)}' def primary_column_names(stream_schema_message): @@ -250,7 +250,7 @@ def __init__(self, connection_config, stream_schema_message=None, table_cache=No raise Exception( "Target schema name not defined in config. " "Neither 'default_target_schema' (string) nor 'schema_mapping' (object) defines " - "target schema for {} stream.".format(stream_name)) + f"target schema for {stream_name} stream.") # Define grantees # --------------- @@ -373,7 +373,7 @@ def table_name(self, stream_name, is_temporary, without_schema=False): sf_table_name = table_name.replace('.', '_').replace('-', '_').lower() if is_temporary: - sf_table_name = '{}_temp'.format(sf_table_name) + sf_table_name = f'{sf_table_name}_temp' if without_schema: return f'"{sf_table_name.upper()}"' @@ -388,9 +388,10 @@ def record_primary_key_string(self, record): try: key_props = [str(flatten[p]) for p in self.stream_schema_message['key_properties']] except Exception as exc: - raise PrimaryKeyNotFoundException('Cannot find {} primary key(s) in record. Available fields: {}'.format( - self.stream_schema_message['key_properties'], - list(flatten.keys()))) from exc + pks = self.stream_schema_message['key_properties'] + fields = list(flatten.keys()) + raise PrimaryKeyNotFoundException(f"Cannot find {pks} primary key(s) in record. " + f"Available fields: {fields}") from exc return ','.join(key_props) def put_to_stage(self, file, stream, count, temp_dir=None): @@ -426,9 +427,9 @@ def copy_to_archive(self, s3_source_key, s3_archive_key, s3_archive_metadata): # Determine prefix to use in archive s3 bucket default_archive_prefix = 'archive' archive_prefix = self.connection_config.get('archive_load_files_s3_prefix', default_archive_prefix) - prefixed_archive_key = '{}/{}'.format(archive_prefix, s3_archive_key) + prefixed_archive_key = f'{archive_prefix}/{s3_archive_key}' - copy_source = '{}/{}'.format(source_bucket, s3_source_key) + copy_source = f'{source_bucket}/{s3_source_key}' self.logger.info('Copying %s to archive location %s', copy_source, prefixed_archive_key) self.upload_client.copy_object(copy_source, archive_bucket, prefixed_archive_key, s3_archive_metadata) @@ -506,7 +507,7 @@ def primary_key_merge_condition(self): """Generate SQL join condition on primary keys for merge SQL statements""" stream_schema_message = self.stream_schema_message names = primary_column_names(stream_schema_message) - return ' AND '.join(['s.{0} = t.{0}'.format(c) for c in names]) + return ' AND '.join([f's.{c} = t.{c}' for c in names]) def column_names(self): """Get list of columns in the schema""" @@ -523,26 +524,27 @@ def create_table_query(self, is_temporary=False): for (name, schema) in self.flatten_schema.items() ] - primary_key = ["PRIMARY KEY ({})".format(', '.join(primary_column_names(stream_schema_message)))] \ - if len(stream_schema_message.get('key_properties', [])) > 0 else [] + primary_key = [] + if len(stream_schema_message.get('key_properties', [])) > 0: + pk_list = ', '.join(primary_column_names(stream_schema_message)) + primary_key = [f"PRIMARY KEY({pk_list})"] - return 'CREATE {}TABLE IF NOT EXISTS {} ({}) {}'.format( - 'TEMP ' if is_temporary else '', - self.table_name(stream_schema_message['stream'], is_temporary), - ', '.join(columns + primary_key), - 'data_retention_time_in_days = 0 ' if is_temporary else 'data_retention_time_in_days = 1 ' - ) + p_temp = 'TEMP ' if is_temporary else '' + p_table_name = self.table_name(stream_schema_message['stream'], is_temporary) + p_columns = ', '.join(columns + primary_key) + p_extra = 'data_retention_time_in_days = 0 ' if is_temporary else 'data_retention_time_in_days = 1 ' + return f'CREATE {p_temp}TABLE IF NOT EXISTS {p_table_name} ({p_columns}) {p_extra}' def grant_usage_on_schema(self, schema_name, grantee): """Grant usage on schema""" - query = "GRANT USAGE ON SCHEMA {} TO ROLE {}".format(schema_name, grantee) + query = f"GRANT USAGE ON SCHEMA {schema_name} TO ROLE {grantee}" self.logger.info("Granting USAGE privilege on '%s' schema to '%s'... %s", schema_name, grantee, query) self.query(query) # pylint: disable=invalid-name def grant_select_on_all_tables_in_schema(self, schema_name, grantee): """Grant select on all tables in schema""" - query = "GRANT SELECT ON ALL TABLES IN SCHEMA {} TO ROLE {}".format(schema_name, grantee) + query = f"GRANT SELECT ON ALL TABLES IN SCHEMA {schema_name} TO ROLE {grantee}" self.logger.info( "Granting SELECT ON ALL TABLES privilege on '%s' schema to '%s'... %s", schema_name, grantee, query) self.query(query) @@ -559,7 +561,7 @@ def grant_privilege(cls, schema, grantees, grant_method): def delete_rows(self, stream): """Hard delete rows from target table""" table = self.table_name(stream, False) - query = "DELETE FROM {} WHERE _sdc_deleted_at IS NOT NULL".format(table) + query = f"DELETE FROM {table} WHERE _sdc_deleted_at IS NOT NULL" self.logger.info("Deleting rows from '%s' table... %s", table, query) self.logger.info('DELETE %d', len(self.query(query))) @@ -576,7 +578,7 @@ def create_schema_if_not_exists(self): schema_rows = self.query(f"SHOW SCHEMAS LIKE '{schema_name.upper()}'") if len(schema_rows) == 0: - query = "CREATE SCHEMA IF NOT EXISTS {}".format(schema_name) + query = f"CREATE SCHEMA IF NOT EXISTS {schema_name}" self.logger.info("Schema '%s' does not exist. Creating... %s", schema_name, query) self.query(query) @@ -738,22 +740,23 @@ def update_columns(self): def drop_column(self, column_name, stream): """Drops column from an existing table""" - drop_column = "ALTER TABLE {} DROP COLUMN {}".format(self.table_name(stream, False), column_name) + drop_column = f"ALTER TABLE {self.table_name(stream, False)} DROP COLUMN {column_name}" self.logger.info('Dropping column: %s', drop_column) self.query(drop_column) def version_column(self, column_name, stream): """Versions a column in an existing table""" - version_column = "ALTER TABLE {} RENAME COLUMN {} TO \"{}_{}\"".format(self.table_name(stream, False), - column_name, - column_name.replace("\"", ""), - time.strftime("%Y%m%d_%H%M")) + p_table_name = self.table_name(stream, False) + p_column_name = column_name.replace("\"", "") + p_ver_time = time.strftime("%Y%m%d_%H%M") + + version_column = f"ALTER TABLE {p_table_name} RENAME COLUMN {column_name} TO \"{p_column_name}_{p_ver_time}\"" self.logger.info('Versioning column: %s', version_column) self.query(version_column) def add_column(self, column, stream): """Adds a new column to an existing table""" - add_column = "ALTER TABLE {} ADD COLUMN {}".format(self.table_name(stream, False), column) + add_column = f"ALTER TABLE {self.table_name(stream, False)} ADD COLUMN {column}" self.logger.info('Adding column: %s', add_column) self.query(add_column) diff --git a/target_snowflake/file_formats/csv.py b/target_snowflake/file_formats/csv.py index 189905e1..42d76ee4 100644 --- a/target_snowflake/file_formats/csv.py +++ b/target_snowflake/file_formats/csv.py @@ -6,7 +6,7 @@ from typing import Callable, Dict, List from tempfile import mkstemp -import target_snowflake.flattening as flattening +from target_snowflake import flattening def create_copy_sql(table_name: str, @@ -15,14 +15,11 @@ def create_copy_sql(table_name: str, file_format_name: str, columns: List): """Generate a CSV compatible snowflake COPY INTO command""" - return "COPY INTO {} ({}) " \ - "FROM '@{}/{}' " \ - "FILE_FORMAT = (format_name='{}')".format( - table_name, - ', '.join([c['name'] for c in columns]), - stage_name, - s3_key, - file_format_name) + p_columns = ', '.join([c['name'] for c in columns]) + + return f"COPY INTO {table_name} ({p_columns}) " \ + f"FROM '@{stage_name}/{s3_key}' " \ + f"FILE_FORMAT = (format_name='{file_format_name}')" def create_merge_sql(table_name: str, @@ -32,24 +29,20 @@ def create_merge_sql(table_name: str, columns: List, pk_merge_condition: str) -> str: """Generate a CSV compatible snowflake MERGE INTO command""" - return "MERGE INTO {} t USING (" \ - "SELECT {} " \ - "FROM '@{}/{}' " \ - "(FILE_FORMAT => '{}')) s " \ - "ON {} " \ - "WHEN MATCHED THEN UPDATE SET {} " \ + p_source_columns = ', '.join([f"{c['trans']}(${i + 1}) {c['name']}" for i, c in enumerate(columns)]) + p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns]) + p_insert_cols = ', '.join([c['name'] for c in columns]) + p_insert_values = ', '.join([f"s.{c['name']}" for c in columns]) + + return f"MERGE INTO {table_name} t USING (" \ + f"SELECT {p_source_columns} " \ + f"FROM '@{stage_name}/{s3_key}' " \ + f"(FILE_FORMAT => '{file_format_name}')) s " \ + f"ON {pk_merge_condition} " \ + f"WHEN MATCHED THEN UPDATE SET {p_update} " \ "WHEN NOT MATCHED THEN " \ - "INSERT ({}) " \ - "VALUES ({})".format( - table_name, - ', '.join(["{}(${}) {}".format(c['trans'], i + 1, c['name']) for i, c in enumerate(columns)]), - stage_name, - s3_key, - file_format_name, - pk_merge_condition, - ', '.join(['{0}=s.{0}'.format(c['name']) for c in columns]), - ', '.join([c['name'] for c in columns]), - ', '.join(['s.{}'.format(c['name']) for c in columns])) + f"INSERT ({p_insert_cols}) " \ + f"VALUES ({p_insert_values})" def record_to_csv_line(record: dict, diff --git a/target_snowflake/file_formats/parquet.py b/target_snowflake/file_formats/parquet.py index b16fa462..ad02e6a5 100644 --- a/target_snowflake/file_formats/parquet.py +++ b/target_snowflake/file_formats/parquet.py @@ -5,7 +5,7 @@ from typing import Dict, List from tempfile import mkstemp -import target_snowflake.flattening as flattening +from target_snowflake import flattening def create_copy_sql(table_name: str, @@ -14,18 +14,13 @@ def create_copy_sql(table_name: str, file_format_name: str, columns: List): """Generate a Parquet compatible snowflake COPY INTO command""" - return "COPY INTO {} ({}) " \ - "FROM (SELECT {} FROM '@{}/{}') " \ - "FILE_FORMAT = (format_name='{}')".format( - table_name, - ', '.join([c['name'] for c in columns]), - ', '.join(["{}($1:{}) {}".format(c['trans'], - c['json_element_name'], - c['name']) - for i, c in enumerate(columns)]), - stage_name, - s3_key, - file_format_name) + p_target_columns = ', '.join([c['name'] for c in columns]) + p_source_columns = ', '.join([f"{c['trans']}($1:{c['json_element_name']}) {c['name']}" + for i, c in enumerate(columns)]) + + return f"COPY INTO {table_name} ({p_target_columns}) " \ + f"FROM (SELECT {p_source_columns} FROM '@{stage_name}/{s3_key}') " \ + f"FILE_FORMAT = (format_name='{file_format_name}')" def create_merge_sql(table_name: str, @@ -35,27 +30,21 @@ def create_merge_sql(table_name: str, columns: List, pk_merge_condition: str) -> str: """Generate a Parquet compatible snowflake MERGE INTO command""" - return "MERGE INTO {} t USING (" \ - "SELECT {} " \ - "FROM '@{}/{}' " \ - "(FILE_FORMAT => '{}')) s " \ - "ON {} " \ - "WHEN MATCHED THEN UPDATE SET {} " \ + p_source_columns = ', '.join([f"{c['trans']}($1:{c['json_element_name']}) {c['name']}" + for i, c in enumerate(columns)]) + p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns]) + p_insert_cols = ', '.join([c['name'] for c in columns]) + p_insert_values = ', '.join([f"s.{c['name']}" for c in columns]) + + return f"MERGE INTO {table_name} t USING (" \ + f"SELECT {p_source_columns} " \ + f"FROM '@{stage_name}/{s3_key}' " \ + f"(FILE_FORMAT => '{file_format_name}')) s " \ + f"ON {pk_merge_condition} " \ + f"WHEN MATCHED THEN UPDATE SET {p_update} " \ "WHEN NOT MATCHED THEN " \ - "INSERT ({}) " \ - "VALUES ({})".format( - table_name, - ', '.join(["{}($1:{}) {}".format(c['trans'], - c['json_element_name'], - c['name']) - for i, c in enumerate(columns)]), - stage_name, - s3_key, - file_format_name, - pk_merge_condition, - ', '.join(['{0}=s.{0}'.format(c['name']) for c in columns]), - ', '.join([c['name'] for c in columns]), - ', '.join(['s.{}'.format(c['name']) for c in columns])) + f"INSERT ({p_insert_cols}) " \ + f"VALUES ({p_insert_values})" def records_to_dataframe(records: Dict, diff --git a/target_snowflake/flattening.py b/target_snowflake/flattening.py index 33cc1cde..40ab48ec 100644 --- a/target_snowflake/flattening.py +++ b/target_snowflake/flattening.py @@ -68,7 +68,7 @@ def flatten_schema(d, parent_key=None, sep='__', level=0, max_level=0): sorted_items = sorted(items, key=key_func) for k, g in itertools.groupby(sorted_items, key=key_func): if len(list(g)) > 1: - raise ValueError('Duplicate column name produced in schema: {}'.format(k)) + raise ValueError(f'Duplicate column name produced in schema: {k}') return dict(sorted_items) diff --git a/target_snowflake/stream_utils.py b/target_snowflake/stream_utils.py index c100cfdd..be7f6297 100644 --- a/target_snowflake/stream_utils.py +++ b/target_snowflake/stream_utils.py @@ -120,7 +120,7 @@ def stream_name_to_dict(stream_name, separator='-'): def get_incremental_key(singer_msg: Dict): """Derive incremental key from a Singer message dictionary""" if singer_msg['type'] != "SCHEMA": - raise UnexpectedMessageTypeException("Expecting type SCHEMA, got {}".format(singer_msg['type'])) + raise UnexpectedMessageTypeException(f"Expecting type SCHEMA, got {singer_msg['type']}") if 'bookmark_properties' in singer_msg and len(singer_msg['bookmark_properties']) > 0: col = singer_msg['bookmark_properties'][0] diff --git a/target_snowflake/upload_clients/s3_upload_client.py b/target_snowflake/upload_clients/s3_upload_client.py index 10801e59..83479cec 100644 --- a/target_snowflake/upload_clients/s3_upload_client.py +++ b/target_snowflake/upload_clients/s3_upload_client.py @@ -50,11 +50,9 @@ def upload_file(self, file, stream, temp_dir=None): bucket = self.connection_config['s3_bucket'] s3_acl = self.connection_config.get('s3_acl') s3_key_prefix = self.connection_config.get('s3_key_prefix', '') - s3_key = "{}pipelinewise_{}_{}_{}".format(s3_key_prefix, - stream, - datetime.datetime.now().strftime("%Y%m%d-%H%M%S-%f"), - os.path.basename(file)) + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S-%f") + s3_key = f"{s3_key_prefix}pipelinewise_{stream}_{timestamp}_{os.path.basename(file)}" self.logger.info('Target S3 bucket: %s, local file: %s, S3 key: %s', bucket, file, s3_key) # Encrypt csv if client side encryption enabled @@ -73,7 +71,7 @@ def upload_file(self, file, stream, temp_dir=None): ) # Upload to s3 - extra_args = {'ACL': s3_acl} if s3_acl else dict() + extra_args = {'ACL': s3_acl} if s3_acl else {} # Send key and iv in the metadata, that will be required to decrypt and upload the encrypted file extra_args['Metadata'] = {