Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
[AP-866] Add database token to query_tag (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti authored Dec 3, 2020
1 parent 195e7d7 commit 61afc3a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 20 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Full list of options in `config.json`:
| validate_records | Boolean | | (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation. |
| temp_dir | String | | (Default: platform-dependent) Directory of temporary CSV files with RECORD messages. |
| no_compression | Boolean | | (Default: False) Generate uncompressed CSV files when loading to Snowflake. Normally, by default GZIP compressed files are generated. |
| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `schema` and `table` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. |
| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. |

### To run tests:

Expand Down
17 changes: 10 additions & 7 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def stream_name_to_dict(stream_name, separator='-'):
}


def create_query_tag(query_tag_pattern: str, schema: str = None, table: str = None) -> str:
def create_query_tag(query_tag_pattern: str, database: str = None, schema: str = None, table: str = None) -> str:
"""
Generate a string to tag executed queries in Snowflake.
Replaces tokens `schema` and `table` with the appropriate values.
Expand All @@ -226,8 +226,9 @@ def create_query_tag(query_tag_pattern: str, schema: str = None, table: str = No
Args:
query_tag_pattern:
schema: optional value to replace {schema} token in query_tag_pattern
table: optional value to replace {table} token in query_tag_pattern
database: optional value to replace {{database}} token in query_tag_pattern
schema: optional value to replace {{schema}} token in query_tag_pattern
table: optional value to replace {{table}} token in query_tag_pattern
Returns:
String if query_tag_patter defined otherwise None
Expand All @@ -237,13 +238,14 @@ def create_query_tag(query_tag_pattern: str, schema: str = None, table: str = No

query_tag = query_tag_pattern

# replace tokens
# replace tokens, taking care of json formatted value compatibility
for k, v in {
'{schema}': schema or 'unknown-schema',
'{table}': table or 'unknown-table'
'{{database}}': json.dumps(database.strip('"')).strip('"') if database else None,
'{{schema}}': json.dumps(schema.strip('"')).strip('"') if schema else None,
'{{table}}': json.dumps(table.strip('"')).strip('"') if table else None
}.items():
if k in query_tag:
query_tag = query_tag.replace(k, v)
query_tag = query_tag.replace(k, v or '')

return query_tag

Expand Down Expand Up @@ -376,6 +378,7 @@ def open_connection(self):
# Quoted identifiers should be case sensitive
'QUOTED_IDENTIFIERS_IGNORE_CASE': 'FALSE',
'QUERY_TAG': create_query_tag(self.connection_config.get('query_tag'),
database=self.connection_config['dbname'],
schema=self.schema_name,
table=self.table_name(stream, False, True))
}
Expand Down
18 changes: 10 additions & 8 deletions tests/integration/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,8 @@ def test_query_tagging(self):
current_time = datetime.datetime.now().strftime('%H:%M:%s')

# Tag queries with dynamic schema and table tokens
self.config['query_tag'] = f'PPW test tap run at {current_time}. Loading into {{schema}}.{{table}}'
self.config['query_tag'] = f'PPW test tap run at {current_time}. ' \
f'Loading into {{{{database}}}}.{{{{schema}}}}.{{{{table}}}}'
self.persist_lines_with_cache(tap_lines)

# Get query tags from QUERY_HISTORY
Expand All @@ -1059,22 +1060,23 @@ def test_query_tagging(self):
f"WHERE query_tag like '%PPW test tap run at {current_time}%'"
"GROUP BY query_tag "
"ORDER BY 1")
target_db = self.config['dbname']
target_schema = self.config['default_target_schema']
self.assertEqual(result, [{
'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_schema}."TEST_TABLE_ONE"',
'QUERIES': 12
'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_db}..',
'QUERIES': 4
},
{
'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_schema}."TEST_TABLE_THREE"',
'QUERIES': 10
'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_db}.{target_schema}.TEST_TABLE_ONE',
'QUERIES': 12
},
{
'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_schema}."TEST_TABLE_TWO"',
'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_db}.{target_schema}.TEST_TABLE_THREE',
'QUERIES': 10
},
{
'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into unknown-schema.unknown-table',
'QUERIES': 4
'QUERY_TAG': f'PPW test tap run at {current_time}. Loading into {target_db}.{target_schema}.TEST_TABLE_TWO',
'QUERIES': 10
}
])

Expand Down
51 changes: 47 additions & 4 deletions tests/unit/test_db_sync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
import json

from target_snowflake import db_sync

Expand Down Expand Up @@ -371,9 +372,51 @@ def test_flatten_record_with_flatten_schema(self):
def test_create_query_tag(self):
assert db_sync.create_query_tag(None) is None
assert db_sync.create_query_tag('This is a test query tag') == 'This is a test query tag'
assert db_sync.create_query_tag('Loading into {schema}.{table}',
assert db_sync.create_query_tag('Loading into {{database}}.{{schema}}.{{table}}',
database='test_database',
schema='test_schema',
table='test_table') == 'Loading into test_schema.test_table'
assert db_sync.create_query_tag('Loading into {schema}.{table}',
table='test_table') == 'Loading into test_database.test_schema.test_table'
assert db_sync.create_query_tag('Loading into {{database}}.{{schema}}.{{table}}',
database=None,
schema=None,
table=None) == 'Loading into unknown-schema.unknown-table'
table=None) == 'Loading into ..'

# JSON formatted query tags with variables
json_query_tag = db_sync.create_query_tag(
'{"database": "{{database}}", "schema": "{{schema}}", "table": "{{table}}"}',
database='test_database',
schema='test_schema',
table='test_table')
# Load the generated JSON formatted query tag to make sure it's a valid JSON
assert json.loads(json_query_tag) == {
'database': 'test_database',
'schema': 'test_schema',
'table': 'test_table'
}

# JSON formatted query tags with variables quotes in the middle
json_query_tag = db_sync.create_query_tag(
'{"database": "{{database}}", "schema": "{{schema}}", "table": "{{table}}"}',
database='test"database',
schema='test"schema',
table='test"table')

# Load the generated JSON formatted query tag to make sure it's a valid JSON
assert json.loads(json_query_tag) == {
'database': 'test"database',
'schema': 'test"schema',
'table': 'test"table'
}

# JSON formatted query tags with quoted variables
json_query_tag = db_sync.create_query_tag(
'{"database": "{{database}}", "schema": "{{schema}}", "table": "{{table}}"}',
database='"test_database"',
schema='"test_schema"',
table='"test_table"')
# Load the generated JSON formatted query tag to make sure it's a valid JSON
assert json.loads(json_query_tag) == {
'database': 'test_database',
'schema': 'test_schema',
'table': 'test_table'
}

0 comments on commit 61afc3a

Please sign in to comment.