From 52ab820494376b0d748c3e32001b638d3dd59e26 Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Mon, 5 Aug 2019 20:26:51 +0100 Subject: [PATCH] Refresh columns cache in one transaction --- target_snowflake/db_sync.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 7aba1fdf..e1098952 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -293,18 +293,26 @@ def open_connection(self): ) def query(self, query, params=None): - logger.info("SNOWFLAKE - Running query: {}".format(query)) + result = [] with self.open_connection() as connection: with connection.cursor(snowflake.connector.DictCursor) as cur: - cur.execute( - query, - params - ) + queries = [] - if cur.rowcount > 0: - return cur.fetchall() + # Run every query in one transaction if query is a list of SQL + if type(query) is list: + queries.append("START TRANSACTION") + queries.extend(query) + else: + queries = [query] + + for q in queries: + logger.info("SNOWFLAKE - Running query: {}".format(q)) + cur.execute(q, params) + + if cur.rowcount > 0: + result = cur.fetchall() - return [] + return result def table_name(self, stream_name, is_temporary, without_schema = False): stream_dict = stream_name_to_dict(stream_name) @@ -406,20 +414,19 @@ def cache_information_schema_columns(self, table_schemas=[], create_only=False): if not create_only and table_schemas: # Delete existing data about the current schema - sql = """ + delete = """ DELETE FROM {}.columns """.format(self.pipelinewise_schema) - sql = sql + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas)) - self.query(sql) + delete = delete + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas)) # Insert the latest data from information_schema into the cache table - sql = """ + insert = """ INSERT INTO {}.columns SELECT table_schema, table_name, column_name, data_type FROM information_schema.columns """.format(self.pipelinewise_schema) - sql = sql + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas)) - self.query(sql) + insert = insert + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas)) + self.query([delete, insert]) def load_csv(self, s3_key, count):