Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
AP-1110 Handle change to key properties and remove non-nullability co…
Browse files Browse the repository at this point in the history
…nstraint (#254)
  • Loading branch information
Samira-El authored Mar 28, 2022
1 parent 83761af commit 7ee3db9
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 109 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pylint:

unit_test:
. ./venv/bin/activate ;\
pytest tests/unit -vv --cov target_snowflake --cov-fail-under=62
pytest tests/unit -vv --cov target_snowflake --cov-fail-under=67

integration_test:
. ./venv/bin/activate ;\
Expand Down
91 changes: 77 additions & 14 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import json
import sys
from typing import List, Dict, Union, Tuple

import snowflake.connector
import re
import time

from typing import List, Dict, Union, Tuple, Set
from singer import get_logger
from target_snowflake import flattening
from target_snowflake import stream_utils
Expand Down Expand Up @@ -116,10 +115,12 @@ def safe_column_name(name):
"""Generate SQL friendly column name"""
return f'"{name}"'.upper()


def json_element_name(name):
"""Generate SQL friendly semi structured element reference name"""
return f'"{name}"'


def column_clause(name, schema_property):
"""Generate DDL column name with column type string"""
return f'{safe_column_name(name)} {column_type(schema_property)}'
Expand Down Expand Up @@ -215,7 +216,7 @@ def __init__(self, connection_config, stream_schema_message=None, table_cache=No
self.file_format = FileFormat(self.connection_config['file_format'], self.query, file_format_type)

if not self.connection_config.get('stage') and self.file_format.file_format_type == FileFormatTypes.PARQUET:
self.logger.error("Table stages with Parquet file format is not suppported. "
self.logger.error("Table stages with Parquet file format is not supported. "
"Use named stages with Parquet file format or table stages with CSV files format")
sys.exit(1)

Expand Down Expand Up @@ -324,7 +325,7 @@ def query(self, query: Union[str, List[str]], params: Dict = None, max_records=0

# Run every query in one transaction if query is a list of SQL
if isinstance(query, list):
self.logger.info('Starting Transaction')
self.logger.debug('Starting Transaction')
cur.execute("START TRANSACTION")
queries = query
else:
Expand All @@ -338,7 +339,7 @@ def query(self, query: Union[str, List[str]], params: Dict = None, max_records=0
# update the LAST_QID
params['LAST_QID'] = qid

self.logger.info("Running query: '%s' with Params %s", q, params)
self.logger.debug("Running query: '%s' with Params %s", q, params)

cur.execute(q, params)
qid = cur.sfqid
Expand Down Expand Up @@ -374,13 +375,16 @@ def record_primary_key_string(self, record):
if len(self.stream_schema_message['key_properties']) == 0:
return None
flatten = flattening.flatten_record(record, self.flatten_schema, max_level=self.data_flattening_max_level)
try:
key_props = [str(flatten[p]) for p in self.stream_schema_message['key_properties']]
except Exception as exc:
pks = self.stream_schema_message['key_properties']
fields = list(flatten.keys())
raise PrimaryKeyNotFoundException(f"Cannot find {pks} primary key(s) in record. "
f"Available fields: {fields}") from exc

key_props = []
for key_prop in self.stream_schema_message['key_properties']:
if not flatten.get(key_prop):
raise PrimaryKeyNotFoundException(
f"Primary key '{key_prop}' does not exist in record or is null. "
f"Available fields: {list(flatten.keys())}")

key_props.append(str(flatten[key_prop]))

return ','.join(key_props)

def put_to_stage(self, file, stream, count, temp_dir=None):
Expand All @@ -390,7 +394,6 @@ def put_to_stage(self, file, stream, count, temp_dir=None):

def delete_from_stage(self, stream, s3_key):
"""Delete file from snowflake stage"""
self.logger.info('Deleting %s from stage', format(s3_key))
self.upload_client.delete_object(stream, s3_key)

def copy_to_archive(self, s3_source_key, s3_archive_key, s3_archive_metadata):
Expand Down Expand Up @@ -531,6 +534,7 @@ def _load_file_copy(self, s3_key, stream, columns_with_trans) -> int:
if len(results) > 0:
inserts = results[0].get('rows_loaded', 0)
return inserts

def primary_key_merge_condition(self):
"""Generate SQL join condition on primary keys for merge SQL statements"""
stream_schema_message = self.stream_schema_message
Expand Down Expand Up @@ -807,7 +811,6 @@ def sync_table(self):
query = self.create_table_query()
self.logger.info('Table %s does not exist. Creating...', table_name_with_schema)
self.query(query)

self.grant_privilege(self.schema_name, self.grantees, self.grant_select_on_all_tables_in_schema)

# Refresh columns cache if required
Expand All @@ -816,3 +819,63 @@ def sync_table(self):
else:
self.logger.info('Table %s exists', table_name_with_schema)
self.update_columns()

self._refresh_table_pks()

def _refresh_table_pks(self):
"""
Refresh table PK constraints by either dropping or adding PK based on changes to `key_properties` of the
stream schema.
The non-nullability of PK column is also dropped.
"""
table_name = self.table_name(self.stream_schema_message['stream'], False)
current_pks = self._get_current_pks()
new_pks = set(pk.upper() for pk in self.stream_schema_message.get('key_properties', []))

queries = []

self.logger.debug('Table: %s, Current PKs: %s | New PKs: %s ',
self.stream_schema_message['stream'],
current_pks,
new_pks
)

if not new_pks and current_pks:
self.logger.info('Table "%s" currently has PK constraint, but we need to drop it.', table_name)
queries.append(f'alter table {table_name} drop primary key;')

elif new_pks != current_pks:
self.logger.info('Changes detected in pk columns of table "%s", need to refresh PK.', table_name)
pk_list = ', '.join([safe_column_name(col) for col in new_pks])
queries.extend([
f'alter table {table_name} drop primary key;',
f'alter table {table_name} add primary key({pk_list});'
])

# For now, we don't wish to enforce non-nullability on the pk columns
for pk in current_pks.union(new_pks):
queries.append(f'alter table {table_name} alter column {safe_column_name(pk)} drop not null;')

self.query(queries)

def _get_current_pks(self) -> Set[str]:
"""
Finds the stream's current Pk in Snowflake.
Returns: Set of pk columns, in upper case. Empty means table has no PK
"""
table_name = self.table_name(self.stream_schema_message['stream'], False)

show_query = f"show primary keys in table {self.connection_config['dbname']}.{table_name};"

columns = set()
try:
columns = self.query(show_query)

# Catch exception when schema not exists and SHOW TABLES throws a ProgrammingError
# Regexp to extract snowflake error code and message from the exception message
# Do nothing if schema not exists
except snowflake.connector.errors.ProgrammingError as exc:
if not re.match(r'002043 \(02000\):.*\n.*does not exist.*', str(sys.exc_info()[1])):
raise exc

return set(col['column_name'] for col in columns)
13 changes: 13 additions & 0 deletions tests/integration/resources/messages-with-changing-pk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": ["id"]}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 1, "version": 1}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_simple_table", "version": 1}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 1, "results": "xyz1", "time_created": "2019-12-01T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 2, "results": "xyz2", "time_created": "2019-12-03T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 2, "version": 1}}}}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 3, "results": "xyz3", "time_created": "2019-12-09T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 4, "results": "xyz4", "time_created": "2019-12-11T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 4, "version": 1}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "name": {"type": "string"}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": ["id", "name"]}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 5, "name": "A", "results": "xyz5", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 6, "name": "B", "results": "xyz6", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 6, "version": 1}}}}
8 changes: 8 additions & 0 deletions tests/integration/resources/messages-with-null-pk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"type": "SCHEMA", "stream": "tap_mysql_test-test_simple_table", "schema": {"properties": {"id": {"type": ["integer"]}, "results": {"type": ["null", "string"]}, "time_created": {"type": ["null", "string"]}}, "type": "date-time"}, "key_properties": ["id"]}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 1, "version": 1}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_simple_table", "version": 1}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 1, "results": "xyz1", "time_created": "2019-12-01T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": 2, "results": "xyz2", "time_created": "2019-12-03T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "STATE", "value": {"bookmarks": {"tap_mysql_test-test_simple_table": {"replication_key": "id", "replication_key_value": 2, "version": 1}}}}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": null, "results": "xyz3", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_simple_table", "record": {"id": null, "results": "xyz4", "time_created": "2019-12-17T19:12:12.006049Z"}, "version": 1, "time_extracted": "2019-12-17T19:12:12.006049Z"}
Loading

0 comments on commit 7ee3db9

Please sign in to comment.