From 61afc3a7ab34d625b866ec7d98ab3166f444a09a Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Thu, 3 Dec 2020 13:41:44 +0100 Subject: [PATCH] [AP-866] Add database token to query_tag (#117) --- README.md | 2 +- target_snowflake/db_sync.py | 17 +++++--- tests/integration/test_target_snowflake.py | 18 ++++---- tests/unit/test_db_sync.py | 51 ++++++++++++++++++++-- 4 files changed, 68 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index b1c1081c..c3f03326 100644 --- a/README.md +++ b/README.md @@ -165,7 +165,7 @@ Full list of options in `config.json`: | validate_records | Boolean | | (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation. | | temp_dir | String | | (Default: platform-dependent) Directory of temporary CSV files with RECORD messages. | | no_compression | Boolean | | (Default: False) Generate uncompressed CSV files when loading to Snowflake. Normally, by default GZIP compressed files are generated. | -| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `schema` and `table` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. | +| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. | ### To run tests: diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 28b8ba88..ad6fcc41 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -216,7 +216,7 @@ def stream_name_to_dict(stream_name, separator='-'): } -def create_query_tag(query_tag_pattern: str, schema: str = None, table: str = None) -> str: +def create_query_tag(query_tag_pattern: str, database: str = None, schema: str = None, table: str = None) -> str: """ Generate a string to tag executed queries in Snowflake. Replaces tokens `schema` and `table` with the appropriate values. @@ -226,8 +226,9 @@ def create_query_tag(query_tag_pattern: str, schema: str = None, table: str = No Args: query_tag_pattern: - schema: optional value to replace {schema} token in query_tag_pattern - table: optional value to replace {table} token in query_tag_pattern + database: optional value to replace {{database}} token in query_tag_pattern + schema: optional value to replace {{schema}} token in query_tag_pattern + table: optional value to replace {{table}} token in query_tag_pattern Returns: String if query_tag_patter defined otherwise None @@ -237,13 +238,14 @@ def create_query_tag(query_tag_pattern: str, schema: str = None, table: str = No query_tag = query_tag_pattern - # replace tokens + # replace tokens, taking care of json formatted value compatibility for k, v in { - '{schema}': schema or 'unknown-schema', - '{table}': table or 'unknown-table' + '{{database}}': json.dumps(database.strip('"')).strip('"') if database else None, + '{{schema}}': json.dumps(schema.strip('"')).strip('"') if schema else None, + '{{table}}': json.dumps(table.strip('"')).strip('"') if table else None }.items(): if k in query_tag: - query_tag = query_tag.replace(k, v) + query_tag = query_tag.replace(k, v or '') return query_tag @@ -376,6 +378,7 @@ def open_connection(self): # Quoted identifiers should be case sensitive 'QUOTED_IDENTIFIERS_IGNORE_CASE': 'FALSE', 'QUERY_TAG': create_query_tag(self.connection_config.get('query_tag'), + database=self.connection_config['dbname'], schema=self.schema_name, table=self.table_name(stream, False, True)) } diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 9b12a673..93fb0751 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -1050,7 +1050,8 @@ def test_query_tagging(self): current_time = datetime.datetime.now().strftime('%H:%M:%s') # Tag queries with dynamic schema and table tokens - self.config['query_tag'] = f'PPW test tap run at {current_time}. Loading into {{schema}}.{{table}}' + self.config['query_tag'] = f'PPW test tap run at {current_time}. ' \ + f'Loading into {{{{database}}}}.{{{{schema}}}}.{{{{table}}}}' self.persist_lines_with_cache(tap_lines) # Get query tags from QUERY_HISTORY @@ -1059,22 +1060,23 @@ def test_query_tagging(self): f"WHERE query_tag like '%PPW test tap run at {current_time}%'" "GROUP BY query_tag " "ORDER BY 1") + target_db = self.config['dbname'] target_schema = self.config['default_target_schema'] self.assertEqual(result, [{ - 'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_schema}."TEST_TABLE_ONE"', - 'QUERIES': 12 + 'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_db}..', + 'QUERIES': 4 }, { - 'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_schema}."TEST_TABLE_THREE"', - 'QUERIES': 10 + 'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_db}.{target_schema}.TEST_TABLE_ONE', + 'QUERIES': 12 }, { - 'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_schema}."TEST_TABLE_TWO"', + 'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_db}.{target_schema}.TEST_TABLE_THREE', 'QUERIES': 10 }, { - 'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into unknown-schema.unknown-table', - 'QUERIES': 4 + 'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_db}.{target_schema}.TEST_TABLE_TWO', + 'QUERIES': 10 } ]) diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index ed11d96a..56ea626f 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -1,4 +1,5 @@ import unittest +import json from target_snowflake import db_sync @@ -371,9 +372,51 @@ def test_flatten_record_with_flatten_schema(self): def test_create_query_tag(self): assert db_sync.create_query_tag(None) is None assert db_sync.create_query_tag('This is a test query tag') == 'This is a test query tag' - assert db_sync.create_query_tag('Loading into {schema}.{table}', + assert db_sync.create_query_tag('Loading into {{database}}.{{schema}}.{{table}}', + database='test_database', schema='test_schema', - table='test_table') == 'Loading into test_schema.test_table' - assert db_sync.create_query_tag('Loading into {schema}.{table}', + table='test_table') == 'Loading into test_database.test_schema.test_table' + assert db_sync.create_query_tag('Loading into {{database}}.{{schema}}.{{table}}', + database=None, schema=None, - table=None) == 'Loading into unknown-schema.unknown-table' + table=None) == 'Loading into ..' + + # JSON formatted query tags with variables + json_query_tag = db_sync.create_query_tag( + '{"database": "{{database}}", "schema": "{{schema}}", "table": "{{table}}"}', + database='test_database', + schema='test_schema', + table='test_table') + # Load the generated JSON formatted query tag to make sure it's a valid JSON + assert json.loads(json_query_tag) == { + 'database': 'test_database', + 'schema': 'test_schema', + 'table': 'test_table' + } + + # JSON formatted query tags with variables quotes in the middle + json_query_tag = db_sync.create_query_tag( + '{"database": "{{database}}", "schema": "{{schema}}", "table": "{{table}}"}', + database='test"database', + schema='test"schema', + table='test"table') + + # Load the generated JSON formatted query tag to make sure it's a valid JSON + assert json.loads(json_query_tag) == { + 'database': 'test"database', + 'schema': 'test"schema', + 'table': 'test"table' + } + + # JSON formatted query tags with quoted variables + json_query_tag = db_sync.create_query_tag( + '{"database": "{{database}}", "schema": "{{schema}}", "table": "{{table}}"}', + database='"test_database"', + schema='"test_schema"', + table='"test_table"') + # Load the generated JSON formatted query tag to make sure it's a valid JSON + assert json.loads(json_query_tag) == { + 'database': 'test_database', + 'schema': 'test_schema', + 'table': 'test_table' + }