Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Refresh columns cache in one transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti committed Aug 6, 2019
1 parent 2018d90 commit 52ab820
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,18 +293,26 @@ def open_connection(self):
)

def query(self, query, params=None):
logger.info("SNOWFLAKE - Running query: {}".format(query))
result = []
with self.open_connection() as connection:
with connection.cursor(snowflake.connector.DictCursor) as cur:
cur.execute(
query,
params
)
queries = []

if cur.rowcount > 0:
return cur.fetchall()
# Run every query in one transaction if query is a list of SQL
if type(query) is list:
queries.append("START TRANSACTION")
queries.extend(query)
else:
queries = [query]

for q in queries:
logger.info("SNOWFLAKE - Running query: {}".format(q))
cur.execute(q, params)

if cur.rowcount > 0:
result = cur.fetchall()

return []
return result

def table_name(self, stream_name, is_temporary, without_schema = False):
stream_dict = stream_name_to_dict(stream_name)
Expand Down Expand Up @@ -406,20 +414,19 @@ def cache_information_schema_columns(self, table_schemas=[], create_only=False):

if not create_only and table_schemas:
# Delete existing data about the current schema
sql = """
delete = """
DELETE FROM {}.columns
""".format(self.pipelinewise_schema)
sql = sql + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas))
self.query(sql)
delete = delete + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas))

# Insert the latest data from information_schema into the cache table
sql = """
insert = """
INSERT INTO {}.columns
SELECT table_schema, table_name, column_name, data_type
FROM information_schema.columns
""".format(self.pipelinewise_schema)
sql = sql + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas))
self.query(sql)
insert = insert + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas))
self.query([delete, insert])


def load_csv(self, s3_key, count):
Expand Down

0 comments on commit 52ab820

Please sign in to comment.