From 44dd2a117c1be64bec72f570d152e3c525e043f0 Mon Sep 17 00:00:00 2001 From: Dinis Louseiro Date: Wed, 5 Jun 2024 20:20:54 +0200 Subject: [PATCH 1/3] Add support for browser SSO authentication #6 Add support for browser authentication. --- README.md | 81 +++++++++++++++++++------------------ setup.py | 2 +- target_snowflake/db_sync.py | 13 ++++-- tests/unit/test_db_sync.py | 10 +++++ 4 files changed, 61 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index 0a7d3674..46c27f54 100644 --- a/README.md +++ b/README.md @@ -138,46 +138,47 @@ Running the the target connector requires a `config.json` file. Example with the Full list of options in `config.json`: -| Property | Type | Required? | Description | -|-------------------------------------|---------|------------|---------------------------------------------------------------| -| account | String | Yes | Snowflake account name (i.e. rtXXXXX.eu-central-1) | -| dbname | String | Yes | Snowflake Database name | -| user | String | Yes | Snowflake User | -| password | String | Yes | Snowflake Password | -| warehouse | String | Yes | Snowflake virtual warehouse name | -| role | String | No | Snowflake role to use. If not defined then the user's default role will be used | -| aws_access_key_id | String | No | S3 Access Key Id. If not provided, `AWS_ACCESS_KEY_ID` environment variable or IAM role will be used | -| aws_secret_access_key | String | No | S3 Secret Access Key. If not provided, `AWS_SECRET_ACCESS_KEY` environment variable or IAM role will be used | -| aws_session_token | String | No | AWS Session token. If not provided, `AWS_SESSION_TOKEN` environment variable will be used | -| aws_profile | String | No | AWS profile name for profile based authentication. If not provided, `AWS_PROFILE` environment variable will be used. | -| s3_bucket | String | No | S3 Bucket name. Required if to use S3 External stage. When this is defined then `stage` has to be defined as well. | -| s3_key_prefix | String | No | (Default: None) A static prefix before the generated S3 key names. Using prefixes you can upload files into specific directories in the S3 bucket. | -| s3_endpoint_url | String | No | The complete URL to use for the constructed client. This is allowing to use non-native s3 account. | -| s3_region_name | String | No | Default region when creating new connections | -| s3_acl | String | No | S3 ACL name to set on the uploaded files | -| stage | String | No | Named external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name. If not specified, table internal stage are used. When this is defined then `s3_bucket` has to be defined as well. | -| file_format | String | Yes | Named file format name created at pre-requirements section. Has to be a fully qualified name including the schema name. | -| batch_size_rows | Integer | | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. | -| batch_wait_limit_seconds | Integer | | (Default: None) Maximum time to wait for batch to reach `batch_size_rows`. | -| flush_all_streams | Boolean | | (Default: False) Flush and load every stream into Snowflake when one batch is full. Warning: This may trigger the COPY command to use files with low number of records, and may cause performance problems. | -| parallelism | Integer | | (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max. | -| parallelism_max | Integer | | (Default: 16) Max number of parallel threads to use when flushing tables. | -| default_target_schema | String | | Name of the schema where the tables will be created, **without** database prefix. If `schema_mapping` is not defined then every stream sent by the tap is loaded into this schema. | -| default_target_schema_select_permission | String | | Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created tables to a specific role or a list of roles. If `schema_mapping` is not defined then every stream sent by the tap is granted accordingly. | -| schema_mapping | Object | | Useful if you want to load multiple streams from one tap to multiple Snowflake schemas.

If the tap sends the `stream_id` in `-` format then this option overwrites the `default_target_schema` value. Note, that using `schema_mapping` you can overwrite the `default_target_schema_select_permission` value to grant SELECT permissions to different groups per schemas or optionally you can create indices automatically for the replicated tables.

**Note**: This is an experimental feature and recommended to use via PipelineWise YAML files that will generate the object mapping in the right JSON format. For further info check a [PipelineWise YAML Example] -| disable_table_cache | Boolean | | (Default: False) By default the connector caches the available table structures in Snowflake at startup. In this way it doesn't need to run additional queries when ingesting data to check if altering the target tables is required. With `disable_table_cache` option you can turn off this caching. You will always see the most recent table structures but will cause an extra query runtime. | -| client_side_encryption_master_key | String | | (Default: None) When this is defined, Client-Side Encryption is enabled. The data in S3 will be encrypted, No third parties, including Amazon AWS and any ISPs, can see data in the clear. Snowflake COPY command will decrypt the data once it's in Snowflake. The master key must be 256-bit length and must be encoded as base64 string. | -| add_metadata_columns | Boolean | | (Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in snowflake etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix `_SDC_`. The column names are following the stitch naming conventions documented at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns. Enabling metadata columns will flag the deleted rows by setting the `_SDC_DELETED_AT` metadata column. Without the `add_metadata_columns` option the deleted rows from singer taps will not be recongisable in Snowflake. | -| hard_delete | Boolean | | (Default: False) When `hard_delete` option is true then DELETE SQL commands will be performed in Snowflake to delete rows in tables. It's achieved by continuously checking the `_SDC_DELETED_AT` metadata column sent by the singer tap. Due to deleting rows requires metadata columns, `hard_delete` option automatically enables the `add_metadata_columns` option as well. | -| data_flattening_max_level | Integer | | (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.

When value is 0 (default) then flattening functionality is turned off. | -| primary_key_required | Boolean | | (Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined. | -| validate_records | Boolean | | (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation. | -| temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. | -| no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. | -| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. | -| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket` under the key `/{archive_load_files_s3_prefix}/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`. -| archive_load_files_s3_prefix | String | | (Default: "archive") When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix. -| archive_load_files_s3_bucket | String | | (Default: Value of `s3_bucket`) When `archive_load_files` is enabled, the archived files will be placed in this bucket. +| Property | Type | Required? | Description | +|-----------------------------------------|---------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| account | String | Yes | Snowflake account name (i.e. rtXXXXX.eu-central-1) | +| dbname | String | Yes | Snowflake Database name | +| user | String | Yes | Snowflake User | +| password | String | No | Snowflake Password. Not required if `use_browser_authentication` is set to `True`. Value is ignored if provided together with `use_browser_authentication` set to `True`. | +| warehouse | String | Yes | Snowflake virtual warehouse name | +| role | String | No | Snowflake role to use. If not defined then the user's default role will be used | +| aws_access_key_id | String | No | S3 Access Key Id. If not provided, `AWS_ACCESS_KEY_ID` environment variable or IAM role will be used | +| aws_secret_access_key | String | No | S3 Secret Access Key. If not provided, `AWS_SECRET_ACCESS_KEY` environment variable or IAM role will be used | +| aws_session_token | String | No | AWS Session token. If not provided, `AWS_SESSION_TOKEN` environment variable will be used | +| aws_profile | String | No | AWS profile name for profile based authentication. If not provided, `AWS_PROFILE` environment variable will be used. | +| s3_bucket | String | No | S3 Bucket name. Required if to use S3 External stage. When this is defined then `stage` has to be defined as well. | +| s3_key_prefix | String | No | (Default: None) A static prefix before the generated S3 key names. Using prefixes you can upload files into specific directories in the S3 bucket. | +| s3_endpoint_url | String | No | The complete URL to use for the constructed client. This is allowing to use non-native s3 account. | +| s3_region_name | String | No | Default region when creating new connections | +| s3_acl | String | No | S3 ACL name to set on the uploaded files | +| stage | String | No | Named external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name. If not specified, table internal stage are used. When this is defined then `s3_bucket` has to be defined as well. | +| file_format | String | Yes | Named file format name created at pre-requirements section. Has to be a fully qualified name including the schema name. | +| batch_size_rows | Integer | | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. | +| batch_wait_limit_seconds | Integer | | (Default: None) Maximum time to wait for batch to reach `batch_size_rows`. | +| flush_all_streams | Boolean | | (Default: False) Flush and load every stream into Snowflake when one batch is full. Warning: This may trigger the COPY command to use files with low number of records, and may cause performance problems. | +| parallelism | Integer | | (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max. | +| parallelism_max | Integer | | (Default: 16) Max number of parallel threads to use when flushing tables. | +| default_target_schema | String | | Name of the schema where the tables will be created, **without** database prefix. If `schema_mapping` is not defined then every stream sent by the tap is loaded into this schema. | +| default_target_schema_select_permission | String | | Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created tables to a specific role or a list of roles. If `schema_mapping` is not defined then every stream sent by the tap is granted accordingly. | +| schema_mapping | Object | | Useful if you want to load multiple streams from one tap to multiple Snowflake schemas.

If the tap sends the `stream_id` in `-` format then this option overwrites the `default_target_schema` value. Note, that using `schema_mapping` you can overwrite the `default_target_schema_select_permission` value to grant SELECT permissions to different groups per schemas or optionally you can create indices automatically for the replicated tables.

**Note**: This is an experimental feature and recommended to use via PipelineWise YAML files that will generate the object mapping in the right JSON format. For further info check a [PipelineWise YAML Example] +| disable_table_cache | Boolean | | (Default: False) By default the connector caches the available table structures in Snowflake at startup. In this way it doesn't need to run additional queries when ingesting data to check if altering the target tables is required. With `disable_table_cache` option you can turn off this caching. You will always see the most recent table structures but will cause an extra query runtime. | +| client_side_encryption_master_key | String | | (Default: None) When this is defined, Client-Side Encryption is enabled. The data in S3 will be encrypted, No third parties, including Amazon AWS and any ISPs, can see data in the clear. Snowflake COPY command will decrypt the data once it's in Snowflake. The master key must be 256-bit length and must be encoded as base64 string. | +| add_metadata_columns | Boolean | | (Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in snowflake etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix `_SDC_`. The column names are following the stitch naming conventions documented at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns. Enabling metadata columns will flag the deleted rows by setting the `_SDC_DELETED_AT` metadata column. Without the `add_metadata_columns` option the deleted rows from singer taps will not be recongisable in Snowflake. | +| hard_delete | Boolean | | (Default: False) When `hard_delete` option is true then DELETE SQL commands will be performed in Snowflake to delete rows in tables. It's achieved by continuously checking the `_SDC_DELETED_AT` metadata column sent by the singer tap. Due to deleting rows requires metadata columns, `hard_delete` option automatically enables the `add_metadata_columns` option as well. | +| data_flattening_max_level | Integer | | (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.

When value is 0 (default) then flattening functionality is turned off. | +| primary_key_required | Boolean | | (Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined. | +| validate_records | Boolean | | (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation. | +| temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. | +| no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. | +| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. | +| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket` under the key `/{archive_load_files_s3_prefix}/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`. +| archive_load_files_s3_prefix | String | | (Default: "archive") When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix. +| archive_load_files_s3_bucket | String | | (Default: Value of `s3_bucket`) When `archive_load_files` is enabled, the archived files will be placed in this bucket. +| use_browser_authentication | Boolean | No | (Default: False) Use SSO authentication via external browser (authenticator=externalbrowser). Documented [here](https://docs.snowflake.com/en/developer-guide/node-js/nodejs-driver-authenticate#using-single-sign-on-sso-through-a-web-browser). ### To run tests: diff --git a/setup.py b/setup.py index 001bdf30..88ee9ad7 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ python_requires='>=3.7', install_requires=[ 'pipelinewise-singer-python==1.*', - 'snowflake-connector-python[pandas]==3.0.4', + 'snowflake-connector-python[pandas,secure-local-storage]==3.10.1', 'inflection==0.5.1', 'joblib==1.2.0', 'boto3==1.28.20', diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index ffac46c5..9e0559f3 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -22,7 +22,6 @@ def validate_config(config): 'account', 'dbname', 'user', - 'password', 'warehouse', 's3_bucket', 'stage', @@ -33,7 +32,6 @@ def validate_config(config): 'account', 'dbname', 'user', - 'password', 'warehouse', 'file_format' ] @@ -46,6 +44,12 @@ def validate_config(config): # Use table stage if none s3_bucket and stage defined elif not config.get('s3_bucket', None) and not config.get('stage', None): required_config_keys = snowflake_required_config_keys + elif not(config.get("use_browser_authentication")) and not(config.get("password")): + errors.append("'password' configuration was not provided and it is mandatory when " + "SSO browser authentication is not intended (" + "'use_browser_authentication' is 'False'). Please provide a value " + "for 'password' for basic authentication or " + "set 'use_browser_authentication' to True for SSO browser authentication.") else: errors.append("Only one of 's3_bucket' or 'stage' keys defined in config. " "Use both of them if you want to use an external stage when loading data into snowflake " @@ -293,7 +297,7 @@ def open_connection(self): return snowflake.connector.connect( user=self.connection_config['user'], - password=self.connection_config['password'], + password=self.connection_config.get('password') if not self.connection_config.get("use_browser_authentication") else None, account=self.connection_config['account'], database=self.connection_config['dbname'], warehouse=self.connection_config['warehouse'], @@ -306,7 +310,8 @@ def open_connection(self): database=self.connection_config['dbname'], schema=self.schema_name, table=self.table_name(stream, False, True)) - } + }, + authenticator="externalbrowser" if self.connection_config.get("use_browser_authentication") else "password" ) def query(self, query: Union[str, List[str]], params: Dict = None, max_records=0) -> List[Dict]: diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index e63c86a2..1c53e1fb 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -92,6 +92,16 @@ def test_config_validation(self): config_with_archive_load_files['archive_load_files'] = True self.assertGreater(len(validator(config_with_external_stage)), 0) + # Configuration without password nor use_browser_authentication=True + config_without_password_nor_browser_auth = minimal_config.copy() + config_without_password_nor_browser_auth.pop("password") + self.assertGreater(len(validator(minimal_config)), 0) + + # Configuration without password nor use_browser_authentication=True + config_without_password_with_browser_auth = minimal_config.copy() + config_without_password_with_browser_auth["use_browser_authentication"] = True + self.assertEqual(len(validator(minimal_config)), 0) + def test_column_type_mapping(self): """Test JSON type to Snowflake column type mappings""" mapper = db_sync.column_type From 958cc74cab25c99caa662918828084f8836b05af Mon Sep 17 00:00:00 2001 From: Dinis Louseiro Date: Wed, 5 Jun 2024 20:31:58 +0200 Subject: [PATCH 2/3] Update changelog and bump version (#7) --- CHANGELOG.md | 9 +++++++++ setup.py | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 152ab5e6..d1122c5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +2.4.0 (2024-06-05) +------------------- + +*Changes* +- Update dependencies: + - snowflake-connector-python[pandas,secure-local-storage] +- Add support for browser SSO authentication + + 2.3.0 (2023-08-08) ------------------- diff --git a/setup.py b/setup.py index 88ee9ad7..27d03c78 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name="pipelinewise-target-snowflake", - version="2.3.0", + version="2.4.0", description="Singer.io target for loading data to Snowflake - PipelineWise compatible", long_description=long_description, long_description_content_type='text/markdown', From 396f17b7be63b48a01498c44ada6d6ffa9d63ec5 Mon Sep 17 00:00:00 2001 From: Dinis Louseiro Date: Tue, 11 Jun 2024 11:36:38 +0200 Subject: [PATCH 3/3] Stop using invalid `password` authenticator (#8) Replace usage of invalid `password` authenticator by proper `snowflake`. --- target_snowflake/db_sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 9e0559f3..c4e6ddcc 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -297,7 +297,7 @@ def open_connection(self): return snowflake.connector.connect( user=self.connection_config['user'], - password=self.connection_config.get('password') if not self.connection_config.get("use_browser_authentication") else None, + password=self.connection_config.get('password') if not self.connection_config.get('use_browser_authentication') else None, account=self.connection_config['account'], database=self.connection_config['dbname'], warehouse=self.connection_config['warehouse'], @@ -311,7 +311,7 @@ def open_connection(self): schema=self.schema_name, table=self.table_name(stream, False, True)) }, - authenticator="externalbrowser" if self.connection_config.get("use_browser_authentication") else "password" + authenticator='externalbrowser' if self.connection_config.get('use_browser_authentication') else 'snowflake' ) def query(self, query: Union[str, List[str]], params: Dict = None, max_records=0) -> List[Dict]: