Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Bump pylint from 2.8.3 to 2.11.1 (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti authored Oct 14, 2021
1 parent 4c035a6 commit b9439f0
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 107 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
],
extras_require={
"test": [
"pylint==2.8.3",
"pylint==2.11.1",
'pytest==6.2.5',
'pytest-cov==3.0.0',
"python-dotenv==0.19.1"
Expand Down
16 changes: 8 additions & 8 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from singer import get_logger
from datetime import datetime, timedelta

import target_snowflake.file_formats.csv as csv
import target_snowflake.file_formats.parquet as parquet
import target_snowflake.stream_utils as stream_utils
from target_snowflake.file_formats import csv
from target_snowflake.file_formats import parquet
from target_snowflake import stream_utils

from target_snowflake.db_sync import DbSync
from target_snowflake.file_format import FileFormatTypes
Expand Down Expand Up @@ -57,7 +57,7 @@ def emit_state(state):
if state is not None:
line = json.dumps(state)
LOGGER.info('Emitting state %s', line)
sys.stdout.write("{}\n".format(line))
sys.stdout.write(f"{line}\n")
sys.stdout.flush()


Expand Down Expand Up @@ -157,7 +157,7 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT

primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record'])
if not primary_key_string:
primary_key_string = 'RID-{}'.format(total_row_count[stream])
primary_key_string = f'RID-{total_row_count[stream]}'

if stream not in records_to_load:
records_to_load[stream] = {}
Expand Down Expand Up @@ -470,7 +470,7 @@ def flush_records(stream: str,
if archive_load_files:
stream_name_parts = stream_utils.stream_name_to_dict(stream)
if 'schema_name' not in stream_name_parts or 'table_name' not in stream_name_parts:
raise Exception("Failed to extract schema and table names from stream '{}'".format(stream))
raise Exception(f"Failed to extract schema and table names from stream '{stream}'")

archive_schema = stream_name_parts['schema_name']
archive_table = stream_name_parts['table_name']
Expand All @@ -492,7 +492,7 @@ def flush_records(stream: str,

# Use same file name as in import
archive_file = os.path.basename(s3_key)
archive_key = "{}/{}/{}".format(archive_tap, archive_table, archive_file)
archive_key = f"{archive_tap}/{archive_table}/{archive_file}"

db_sync.copy_to_archive(s3_key, archive_key, archive_metadata)

Expand All @@ -507,7 +507,7 @@ def main():
args = arg_parser.parse_args()

if args.config:
with open(args.config) as config_input:
with open(args.config, encoding="utf8") as config_input:
config = json.load(config_input)
else:
config = {}
Expand Down
67 changes: 35 additions & 32 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import time

from singer import get_logger
import target_snowflake.flattening as flattening
import target_snowflake.stream_utils as stream_utils
from target_snowflake import flattening
from target_snowflake import stream_utils
from target_snowflake.file_format import FileFormat, FileFormatTypes

from target_snowflake.exceptions import TooManyRecordsException, PrimaryKeyNotFoundException
Expand Down Expand Up @@ -55,7 +55,7 @@ def validate_config(config):
# Check if mandatory keys exist
for k in required_config_keys:
if not config.get(k, None):
errors.append("Required key is missing from config: [{}]".format(k))
errors.append(f"Required key is missing from config: [{k}]")

# Check target schema config
config_default_target_schema = config.get('default_target_schema', None)
Expand Down Expand Up @@ -114,15 +114,15 @@ def column_trans(schema_property):

def safe_column_name(name):
"""Generate SQL friendly column name"""
return '"{}"'.format(name).upper()
return f'"{name}"'.upper()

def json_element_name(name):
"""Generate SQL friendly semi structured element reference name"""
return '"{}"'.format(name)
return f'"{name}"'

def column_clause(name, schema_property):
"""Generate DDL column name with column type string"""
return '{} {}'.format(safe_column_name(name), column_type(schema_property))
return f'{safe_column_name(name)} {column_type(schema_property)}'


def primary_column_names(stream_schema_message):
Expand Down Expand Up @@ -250,7 +250,7 @@ def __init__(self, connection_config, stream_schema_message=None, table_cache=No
raise Exception(
"Target schema name not defined in config. "
"Neither 'default_target_schema' (string) nor 'schema_mapping' (object) defines "
"target schema for {} stream.".format(stream_name))
f"target schema for {stream_name} stream.")

# Define grantees
# ---------------
Expand Down Expand Up @@ -373,7 +373,7 @@ def table_name(self, stream_name, is_temporary, without_schema=False):
sf_table_name = table_name.replace('.', '_').replace('-', '_').lower()

if is_temporary:
sf_table_name = '{}_temp'.format(sf_table_name)
sf_table_name = f'{sf_table_name}_temp'

if without_schema:
return f'"{sf_table_name.upper()}"'
Expand All @@ -388,9 +388,10 @@ def record_primary_key_string(self, record):
try:
key_props = [str(flatten[p]) for p in self.stream_schema_message['key_properties']]
except Exception as exc:
raise PrimaryKeyNotFoundException('Cannot find {} primary key(s) in record. Available fields: {}'.format(
self.stream_schema_message['key_properties'],
list(flatten.keys()))) from exc
pks = self.stream_schema_message['key_properties']
fields = list(flatten.keys())
raise PrimaryKeyNotFoundException(f"Cannot find {pks} primary key(s) in record. "
f"Available fields: {fields}") from exc
return ','.join(key_props)

def put_to_stage(self, file, stream, count, temp_dir=None):
Expand Down Expand Up @@ -426,9 +427,9 @@ def copy_to_archive(self, s3_source_key, s3_archive_key, s3_archive_metadata):
# Determine prefix to use in archive s3 bucket
default_archive_prefix = 'archive'
archive_prefix = self.connection_config.get('archive_load_files_s3_prefix', default_archive_prefix)
prefixed_archive_key = '{}/{}'.format(archive_prefix, s3_archive_key)
prefixed_archive_key = f'{archive_prefix}/{s3_archive_key}'

copy_source = '{}/{}'.format(source_bucket, s3_source_key)
copy_source = f'{source_bucket}/{s3_source_key}'

self.logger.info('Copying %s to archive location %s', copy_source, prefixed_archive_key)
self.upload_client.copy_object(copy_source, archive_bucket, prefixed_archive_key, s3_archive_metadata)
Expand Down Expand Up @@ -506,7 +507,7 @@ def primary_key_merge_condition(self):
"""Generate SQL join condition on primary keys for merge SQL statements"""
stream_schema_message = self.stream_schema_message
names = primary_column_names(stream_schema_message)
return ' AND '.join(['s.{0} = t.{0}'.format(c) for c in names])
return ' AND '.join([f's.{c} = t.{c}' for c in names])

def column_names(self):
"""Get list of columns in the schema"""
Expand All @@ -523,26 +524,27 @@ def create_table_query(self, is_temporary=False):
for (name, schema) in self.flatten_schema.items()
]

primary_key = ["PRIMARY KEY ({})".format(', '.join(primary_column_names(stream_schema_message)))] \
if len(stream_schema_message.get('key_properties', [])) > 0 else []
primary_key = []
if len(stream_schema_message.get('key_properties', [])) > 0:
pk_list = ', '.join(primary_column_names(stream_schema_message))
primary_key = [f"PRIMARY KEY({pk_list})"]

return 'CREATE {}TABLE IF NOT EXISTS {} ({}) {}'.format(
'TEMP ' if is_temporary else '',
self.table_name(stream_schema_message['stream'], is_temporary),
', '.join(columns + primary_key),
'data_retention_time_in_days = 0 ' if is_temporary else 'data_retention_time_in_days = 1 '
)
p_temp = 'TEMP ' if is_temporary else ''
p_table_name = self.table_name(stream_schema_message['stream'], is_temporary)
p_columns = ', '.join(columns + primary_key)
p_extra = 'data_retention_time_in_days = 0 ' if is_temporary else 'data_retention_time_in_days = 1 '
return f'CREATE {p_temp}TABLE IF NOT EXISTS {p_table_name} ({p_columns}) {p_extra}'

def grant_usage_on_schema(self, schema_name, grantee):
"""Grant usage on schema"""
query = "GRANT USAGE ON SCHEMA {} TO ROLE {}".format(schema_name, grantee)
query = f"GRANT USAGE ON SCHEMA {schema_name} TO ROLE {grantee}"
self.logger.info("Granting USAGE privilege on '%s' schema to '%s'... %s", schema_name, grantee, query)
self.query(query)

# pylint: disable=invalid-name
def grant_select_on_all_tables_in_schema(self, schema_name, grantee):
"""Grant select on all tables in schema"""
query = "GRANT SELECT ON ALL TABLES IN SCHEMA {} TO ROLE {}".format(schema_name, grantee)
query = f"GRANT SELECT ON ALL TABLES IN SCHEMA {schema_name} TO ROLE {grantee}"
self.logger.info(
"Granting SELECT ON ALL TABLES privilege on '%s' schema to '%s'... %s", schema_name, grantee, query)
self.query(query)
Expand All @@ -559,7 +561,7 @@ def grant_privilege(cls, schema, grantees, grant_method):
def delete_rows(self, stream):
"""Hard delete rows from target table"""
table = self.table_name(stream, False)
query = "DELETE FROM {} WHERE _sdc_deleted_at IS NOT NULL".format(table)
query = f"DELETE FROM {table} WHERE _sdc_deleted_at IS NOT NULL"
self.logger.info("Deleting rows from '%s' table... %s", table, query)
self.logger.info('DELETE %d', len(self.query(query)))

Expand All @@ -576,7 +578,7 @@ def create_schema_if_not_exists(self):
schema_rows = self.query(f"SHOW SCHEMAS LIKE '{schema_name.upper()}'")

if len(schema_rows) == 0:
query = "CREATE SCHEMA IF NOT EXISTS {}".format(schema_name)
query = f"CREATE SCHEMA IF NOT EXISTS {schema_name}"
self.logger.info("Schema '%s' does not exist. Creating... %s", schema_name, query)
self.query(query)

Expand Down Expand Up @@ -738,22 +740,23 @@ def update_columns(self):

def drop_column(self, column_name, stream):
"""Drops column from an existing table"""
drop_column = "ALTER TABLE {} DROP COLUMN {}".format(self.table_name(stream, False), column_name)
drop_column = f"ALTER TABLE {self.table_name(stream, False)} DROP COLUMN {column_name}"
self.logger.info('Dropping column: %s', drop_column)
self.query(drop_column)

def version_column(self, column_name, stream):
"""Versions a column in an existing table"""
version_column = "ALTER TABLE {} RENAME COLUMN {} TO \"{}_{}\"".format(self.table_name(stream, False),
column_name,
column_name.replace("\"", ""),
time.strftime("%Y%m%d_%H%M"))
p_table_name = self.table_name(stream, False)
p_column_name = column_name.replace("\"", "")
p_ver_time = time.strftime("%Y%m%d_%H%M")

version_column = f"ALTER TABLE {p_table_name} RENAME COLUMN {column_name} TO \"{p_column_name}_{p_ver_time}\""
self.logger.info('Versioning column: %s', version_column)
self.query(version_column)

def add_column(self, column, stream):
"""Adds a new column to an existing table"""
add_column = "ALTER TABLE {} ADD COLUMN {}".format(self.table_name(stream, False), column)
add_column = f"ALTER TABLE {self.table_name(stream, False)} ADD COLUMN {column}"
self.logger.info('Adding column: %s', add_column)
self.query(add_column)

Expand Down
45 changes: 19 additions & 26 deletions target_snowflake/file_formats/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable, Dict, List
from tempfile import mkstemp

import target_snowflake.flattening as flattening
from target_snowflake import flattening


def create_copy_sql(table_name: str,
Expand All @@ -15,14 +15,11 @@ def create_copy_sql(table_name: str,
file_format_name: str,
columns: List):
"""Generate a CSV compatible snowflake COPY INTO command"""
return "COPY INTO {} ({}) " \
"FROM '@{}/{}' " \
"FILE_FORMAT = (format_name='{}')".format(
table_name,
', '.join([c['name'] for c in columns]),
stage_name,
s3_key,
file_format_name)
p_columns = ', '.join([c['name'] for c in columns])

return f"COPY INTO {table_name} ({p_columns}) " \
f"FROM '@{stage_name}/{s3_key}' " \
f"FILE_FORMAT = (format_name='{file_format_name}')"


def create_merge_sql(table_name: str,
Expand All @@ -32,24 +29,20 @@ def create_merge_sql(table_name: str,
columns: List,
pk_merge_condition: str) -> str:
"""Generate a CSV compatible snowflake MERGE INTO command"""
return "MERGE INTO {} t USING (" \
"SELECT {} " \
"FROM '@{}/{}' " \
"(FILE_FORMAT => '{}')) s " \
"ON {} " \
"WHEN MATCHED THEN UPDATE SET {} " \
p_source_columns = ', '.join([f"{c['trans']}(${i + 1}) {c['name']}" for i, c in enumerate(columns)])
p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns])
p_insert_cols = ', '.join([c['name'] for c in columns])
p_insert_values = ', '.join([f"s.{c['name']}" for c in columns])

return f"MERGE INTO {table_name} t USING (" \
f"SELECT {p_source_columns} " \
f"FROM '@{stage_name}/{s3_key}' " \
f"(FILE_FORMAT => '{file_format_name}')) s " \
f"ON {pk_merge_condition} " \
f"WHEN MATCHED THEN UPDATE SET {p_update} " \
"WHEN NOT MATCHED THEN " \
"INSERT ({}) " \
"VALUES ({})".format(
table_name,
', '.join(["{}(${}) {}".format(c['trans'], i + 1, c['name']) for i, c in enumerate(columns)]),
stage_name,
s3_key,
file_format_name,
pk_merge_condition,
', '.join(['{0}=s.{0}'.format(c['name']) for c in columns]),
', '.join([c['name'] for c in columns]),
', '.join(['s.{}'.format(c['name']) for c in columns]))
f"INSERT ({p_insert_cols}) " \
f"VALUES ({p_insert_values})"


def record_to_csv_line(record: dict,
Expand Down
55 changes: 22 additions & 33 deletions target_snowflake/file_formats/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Dict, List
from tempfile import mkstemp

import target_snowflake.flattening as flattening
from target_snowflake import flattening


def create_copy_sql(table_name: str,
Expand All @@ -14,18 +14,13 @@ def create_copy_sql(table_name: str,
file_format_name: str,
columns: List):
"""Generate a Parquet compatible snowflake COPY INTO command"""
return "COPY INTO {} ({}) " \
"FROM (SELECT {} FROM '@{}/{}') " \
"FILE_FORMAT = (format_name='{}')".format(
table_name,
', '.join([c['name'] for c in columns]),
', '.join(["{}($1:{}) {}".format(c['trans'],
c['json_element_name'],
c['name'])
for i, c in enumerate(columns)]),
stage_name,
s3_key,
file_format_name)
p_target_columns = ', '.join([c['name'] for c in columns])
p_source_columns = ', '.join([f"{c['trans']}($1:{c['json_element_name']}) {c['name']}"
for i, c in enumerate(columns)])

return f"COPY INTO {table_name} ({p_target_columns}) " \
f"FROM (SELECT {p_source_columns} FROM '@{stage_name}/{s3_key}') " \
f"FILE_FORMAT = (format_name='{file_format_name}')"


def create_merge_sql(table_name: str,
Expand All @@ -35,27 +30,21 @@ def create_merge_sql(table_name: str,
columns: List,
pk_merge_condition: str) -> str:
"""Generate a Parquet compatible snowflake MERGE INTO command"""
return "MERGE INTO {} t USING (" \
"SELECT {} " \
"FROM '@{}/{}' " \
"(FILE_FORMAT => '{}')) s " \
"ON {} " \
"WHEN MATCHED THEN UPDATE SET {} " \
p_source_columns = ', '.join([f"{c['trans']}($1:{c['json_element_name']}) {c['name']}"
for i, c in enumerate(columns)])
p_update = ', '.join([f"{c['name']}=s.{c['name']}" for c in columns])
p_insert_cols = ', '.join([c['name'] for c in columns])
p_insert_values = ', '.join([f"s.{c['name']}" for c in columns])

return f"MERGE INTO {table_name} t USING (" \
f"SELECT {p_source_columns} " \
f"FROM '@{stage_name}/{s3_key}' " \
f"(FILE_FORMAT => '{file_format_name}')) s " \
f"ON {pk_merge_condition} " \
f"WHEN MATCHED THEN UPDATE SET {p_update} " \
"WHEN NOT MATCHED THEN " \
"INSERT ({}) " \
"VALUES ({})".format(
table_name,
', '.join(["{}($1:{}) {}".format(c['trans'],
c['json_element_name'],
c['name'])
for i, c in enumerate(columns)]),
stage_name,
s3_key,
file_format_name,
pk_merge_condition,
', '.join(['{0}=s.{0}'.format(c['name']) for c in columns]),
', '.join([c['name'] for c in columns]),
', '.join(['s.{}'.format(c['name']) for c in columns]))
f"INSERT ({p_insert_cols}) " \
f"VALUES ({p_insert_values})"


def records_to_dataframe(records: Dict,
Expand Down
2 changes: 1 addition & 1 deletion target_snowflake/flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def flatten_schema(d, parent_key=None, sep='__', level=0, max_level=0):
sorted_items = sorted(items, key=key_func)
for k, g in itertools.groupby(sorted_items, key=key_func):
if len(list(g)) > 1:
raise ValueError('Duplicate column name produced in schema: {}'.format(k))
raise ValueError(f'Duplicate column name produced in schema: {k}')

return dict(sorted_items)

Expand Down
Loading

0 comments on commit b9439f0

Please sign in to comment.