From 52a8315f50e5164de62061496db21fc5689e9e60 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 15 Oct 2021 11:01:52 -0700 Subject: [PATCH 1/3] make 'file_format' optional --- target_snowflake/db_sync.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 82ce696a..b9af3d2b 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -27,7 +27,6 @@ def validate_config(config): 'warehouse', 's3_bucket', 'stage', - 'file_format' ] snowflake_required_config_keys = [ @@ -36,7 +35,6 @@ def validate_config(config): 'user', 'password', 'warehouse', - 'file_format' ] required_config_keys = [] From 91d0b043d7748a65ea34d9bfcc34ba7585413372 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 15 Oct 2021 12:13:48 -0700 Subject: [PATCH 2/3] add support for inline CSV declaration --- README.md | 7 +- target_snowflake/db_sync.py | 13 ++-- target_snowflake/file_format.py | 96 ++++++++++++++++++++---- target_snowflake/file_formats/csv.py | 10 +-- target_snowflake/file_formats/parquet.py | 9 ++- 5 files changed, 105 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index b72d40fe..d648e175 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,12 @@ You need to create a few objects in snowflake in one schema before start using t 1. Create a named file format. This will be used by the MERGE/COPY commands to parse the files correctly from S3. You can use CSV or Parquet file formats. -To use CSV files: +To use the default (CSV) file format option: + +Leave `file_format` blank in settings to use the default CSV format options. + +To use a named CSV file format: + ``` CREATE FILE FORMAT {database}.{schema}.{file_format_name} TYPE = 'CSV' ESCAPE='\\' FIELD_OPTIONALLY_ENCLOSED_BY='"'; diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index b9af3d2b..47935e5d 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -9,7 +9,7 @@ from singer import get_logger from target_snowflake import flattening from target_snowflake import stream_utils -from target_snowflake.file_format import FileFormat, FileFormatTypes +from target_snowflake.file_format import FileFormat, FileFormatTypes, InlineFileFormat from target_snowflake.exceptions import TooManyRecordsException, PrimaryKeyNotFoundException from target_snowflake.upload_clients.s3_upload_client import S3UploadClient @@ -210,7 +210,10 @@ def __init__(self, connection_config, stream_schema_message=None, table_cache=No self.schema_name = None self.grantees = None - self.file_format = FileFormat(self.connection_config['file_format'], self.query, file_format_type) + if 'file_format' in self.connection_config: + self.file_format = FileFormat(self.connection_config['file_format'], self.query, file_format_type) + else: + self.file_format = InlineFileFormat(file_format_type or FileFormatTypes.CSV) if not self.connection_config.get('stage') and self.file_format.file_format_type == FileFormatTypes.PARQUET: self.logger.error("Table stages with Parquet file format is not suppported. " @@ -467,8 +470,7 @@ def load_file(self, s3_key, count, size_bytes): merge_sql = self.file_format.formatter.create_merge_sql(table_name=self.table_name(stream, False), stage_name=self.get_stage_name(stream), s3_key=s3_key, - file_format_name= - self.connection_config['file_format'], + file_format=self.file_format, columns=columns_with_trans, pk_merge_condition= self.primary_key_merge_condition()) @@ -486,8 +488,7 @@ def load_file(self, s3_key, count, size_bytes): copy_sql = self.file_format.formatter.create_copy_sql(table_name=self.table_name(stream, False), stage_name=self.get_stage_name(stream), s3_key=s3_key, - file_format_name= - self.connection_config['file_format'], + file_format=self.file_format, columns=columns_with_trans) self.logger.debug('Running query: %s', copy_sql) cur.execute(copy_sql) diff --git a/target_snowflake/file_format.py b/target_snowflake/file_format.py index 3231212a..badc5d28 100644 --- a/target_snowflake/file_format.py +++ b/target_snowflake/file_format.py @@ -1,4 +1,5 @@ """Enums used by pipelinewise-target-snowflake""" +import abc from enum import Enum, unique from types import ModuleType from typing import Callable @@ -20,20 +21,13 @@ def list(): return list(map(lambda c: c.value, FileFormatTypes)) -# pylint: disable=too-few-public-methods -class FileFormat: - """File Format class""" +class FileFormat(abc.ABCMeta): + """File Format class (abstract)""" - def __init__(self, file_format: str, query_fn: Callable, file_format_type: FileFormatTypes=None): - """Find the file format in Snowflake, detect its type and - initialise file format specific functions""" - if file_format_type: - self.file_format_type = file_format_type - else: - # Detect file format type by querying it from Snowflake - self.file_format_type = self._detect_file_format_type(file_format, query_fn) - - self.formatter = self._get_formatter(self.file_format_type) + def __init__(self, file_format_type: FileFormatTypes): + """Initialize type and file format specific functions.""" + self.file_format_type = file_format_type + self.formatter = self._get_formatter(file_format_type) @classmethod def _get_formatter(cls, file_format_type: FileFormatTypes) -> ModuleType: @@ -44,7 +38,7 @@ def _get_formatter(cls, file_format_type: FileFormatTypes) -> ModuleType: file_format_type: FileFormatTypes enum item Returns: - ModuleType implementation of the file ormatter + ModuleType implementation of the file formatter """ formatter = None @@ -57,6 +51,36 @@ def _get_formatter(cls, file_format_type: FileFormatTypes) -> ModuleType: return formatter + @abc.abstractproperty + def declaration_for_copy(self) -> str: + """Return the format declaration text for a COPY INTO statement.""" + pass + + @abc.abstractproperty + def declaration_for_merge(self) -> str: + """Return the format declaration text for a MERGE statement.""" + pass + + +# pylint: disable=too-few-public-methods +class NamedFileFormat(FileFormat): + """Named File Format class""" + + def __init__( + self, + file_format: str, + query_fn: Callable, + file_format_type: FileFormatTypes = None, + ): + """Find the file format in Snowflake, detect its type and + initialise file format specific functions""" + self.qualified_format_name = file_format + if not file_format_type: + # Detect file format type by querying it from Snowflake + file_format_type = self._detect_file_format_type(file_format, query_fn) + + super().__init__(file_format_type) + @classmethod def _detect_file_format_type(cls, file_format: str, query_fn: Callable) -> FileFormatTypes: """Detect the type of an existing snowflake file format object @@ -84,3 +108,47 @@ def _detect_file_format_type(cls, file_format: str, query_fn: Callable) -> FileF f"Named file format not found: {file_format}") return file_format_type + + def declaration_for_copy(self) -> str: + """Return the format declaration text for a COPY INTO statement.""" + return f"FILE_FORMAT = (format_name='{self.qualified_format_name}')" + + def declaration_for_merge(self) -> str: + return f"FILE_FORMAT => '{self.qualified_format_name}'" + + +class InlineFileFormat(FileFormat): + def __init__( + self, + file_format_type: FileFormatTypes = None, + ): + """Find the file format in Snowflake, detect its type and + initialise file format specific functions""" + if file_format_type != FileFormatTypes.CSV: + raise NotImplementedError("Only CSV is supported as an inline format type.") + + self.file_format_type = file_format_type + self.formatter = self._get_formatter(self.file_format_type) + + @abc.abstractproperty + def declaration_for_copy(self) -> str: + """Return the format declaration text for a COPY INTO statement.""" + if self.file_format_type == FileFormatTypes.CSV: + return ( + "FILE_FORMAT = (\n" + " TYPE = CSV\n" + " EMPTY_FIELD_AS_NULL = FALSE\n" + " FIELD_OPTIONALLY_ENCLOSED_BY = '\"'\n" + ")\n" + ) + + raise NotImplementedError("Only CSV is supported as an inline format type.") + + def declaration_for_merge(self) -> str: + return ( + "FILE_FORMAT => (\n" + " TYPE = CSV\n" + " EMPTY_FIELD_AS_NULL = FALSE\n" + " FIELD_OPTIONALLY_ENCLOSED_BY = '\"'\n" + ")" + ) diff --git a/target_snowflake/file_formats/csv.py b/target_snowflake/file_formats/csv.py index 42d76ee4..0dbb6f04 100644 --- a/target_snowflake/file_formats/csv.py +++ b/target_snowflake/file_formats/csv.py @@ -7,25 +7,25 @@ from tempfile import mkstemp from target_snowflake import flattening - +from target_snowflake.file_formats import FileFormat def create_copy_sql(table_name: str, stage_name: str, s3_key: str, - file_format_name: str, + file_format: FileFormat, columns: List): """Generate a CSV compatible snowflake COPY INTO command""" p_columns = ', '.join([c['name'] for c in columns]) return f"COPY INTO {table_name} ({p_columns}) " \ f"FROM '@{stage_name}/{s3_key}' " \ - f"FILE_FORMAT = (format_name='{file_format_name}')" + f"{file_format.declaration_for_copy}" def create_merge_sql(table_name: str, stage_name: str, s3_key: str, - file_format_name: str, + file_format: FileFormat, columns: List, pk_merge_condition: str) -> str: """Generate a CSV compatible snowflake MERGE INTO command""" @@ -37,7 +37,7 @@ def create_merge_sql(table_name: str, return f"MERGE INTO {table_name} t USING (" \ f"SELECT {p_source_columns} " \ f"FROM '@{stage_name}/{s3_key}' " \ - f"(FILE_FORMAT => '{file_format_name}')) s " \ + f"({file_format.declaration_for_merge})) s " \ f"ON {pk_merge_condition} " \ f"WHEN MATCHED THEN UPDATE SET {p_update} " \ "WHEN NOT MATCHED THEN " \ diff --git a/target_snowflake/file_formats/parquet.py b/target_snowflake/file_formats/parquet.py index ad02e6a5..2b5bf0f1 100644 --- a/target_snowflake/file_formats/parquet.py +++ b/target_snowflake/file_formats/parquet.py @@ -6,12 +6,13 @@ from tempfile import mkstemp from target_snowflake import flattening +from target_snowflake.file_formats import FileFormat def create_copy_sql(table_name: str, stage_name: str, s3_key: str, - file_format_name: str, + file_format: FileFormat, columns: List): """Generate a Parquet compatible snowflake COPY INTO command""" p_target_columns = ', '.join([c['name'] for c in columns]) @@ -20,13 +21,13 @@ def create_copy_sql(table_name: str, return f"COPY INTO {table_name} ({p_target_columns}) " \ f"FROM (SELECT {p_source_columns} FROM '@{stage_name}/{s3_key}') " \ - f"FILE_FORMAT = (format_name='{file_format_name}')" + f"{file_format.declaration_for_copy}" def create_merge_sql(table_name: str, stage_name: str, s3_key: str, - file_format_name: str, + file_format: FileFormat, columns: List, pk_merge_condition: str) -> str: """Generate a Parquet compatible snowflake MERGE INTO command""" @@ -39,7 +40,7 @@ def create_merge_sql(table_name: str, return f"MERGE INTO {table_name} t USING (" \ f"SELECT {p_source_columns} " \ f"FROM '@{stage_name}/{s3_key}' " \ - f"(FILE_FORMAT => '{file_format_name}')) s " \ + f"({file_format.declaration_for_merge})) s " \ f"ON {pk_merge_condition} " \ f"WHEN MATCHED THEN UPDATE SET {p_update} " \ "WHEN NOT MATCHED THEN " \ From 52b8531d6b0b9765cffd4190876636027b7a5ed0 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 15 Oct 2021 12:14:01 -0700 Subject: [PATCH 3/3] ignore mypy cache in .gitignor --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index a005a21d..76319ea7 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ __pycache__/ *~ dist/ .coverage +.mypy_cache # Singer JSON files properties.json