Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
[AP-608] Log inserts, updates and csv size_bytes (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti authored Mar 20, 2020
1 parent e20855b commit 18ac93b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
3 changes: 2 additions & 1 deletion target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,9 @@ def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None):
csv_line = db_sync.record_to_csv_line(record)
f.write(bytes(csv_line + '\n', 'UTF-8'))

size_bytes = os.path.getsize(csv_file)
s3_key = db_sync.put_to_stage(csv_file, stream, row_count, temp_dir=temp_dir)
db_sync.load_csv(s3_key, row_count)
db_sync.load_csv(s3_key, row_count, size_bytes)

os.remove(csv_file)
db_sync.delete_from_stage(s3_key)
Expand Down
25 changes: 20 additions & 5 deletions target_snowflake/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def query(self, query, params=None, max_records=0):
queries = [query]

for q in queries:
self.logger.debug("SNOWFLAKE - Running query: {}".format(q))
self.logger.debug("Running query: {}".format(q))
cur.execute(q, params)

# Raise exception if returned rows greater than max allowed records
Expand Down Expand Up @@ -414,7 +414,7 @@ def delete_from_stage(self, s3_key):
bucket = self.connection_config['s3_bucket']
self.s3.delete_object(Bucket=bucket, Key=s3_key)

def load_csv(self, s3_key, count):
def load_csv(self, s3_key, count, size_bytes):
stream_schema_message = self.stream_schema_message
stream = stream_schema_message['stream']
self.logger.info("Loading {} rows into '{}'".format(count, self.table_name(stream, False)))
Expand All @@ -430,6 +430,8 @@ def load_csv(self, s3_key, count):

with self.open_connection() as connection:
with connection.cursor(snowflake.connector.DictCursor) as cur:
inserts = 0
updates = 0

# Insert or Update with MERGE command if primary key defined
if len(self.stream_schema_message['key_properties']) > 0:
Expand All @@ -455,9 +457,15 @@ def load_csv(self, s3_key, count):
', '.join([c['name'] for c in columns_with_trans]),
', '.join(['s.{}'.format(c['name']) for c in columns_with_trans])
)
self.logger.debug("SNOWFLAKE - {}".format(merge_sql))
self.logger.debug("Running query: {}".format(merge_sql))
cur.execute(merge_sql)

# Get number of inserted and updated records - MERGE does insert and update
results = cur.fetchall()
if len(results) > 0:
inserts = results[0].get('number of rows inserted', 0)
updates = results[0].get('number of rows updated', 0)

# Insert only with COPY command if no primary key
else:
copy_sql = """COPY INTO {} ({}) FROM @{}/{}
Expand All @@ -469,10 +477,17 @@ def load_csv(self, s3_key, count):
s3_key,
self.connection_config['file_format'],
)
self.logger.debug("SNOWFLAKE - {}".format(copy_sql))
self.logger.debug("Running query: {}".format(copy_sql))
cur.execute(copy_sql)

self.logger.info("SNOWFLAKE - Merge into {}: {}".format(self.table_name(stream, False), cur.fetchall()))
# Get number of inserted records - COPY does insert only
results = cur.fetchall()
if len(results) > 0:
inserts = results[0].get('rows_loaded', 0)

self.logger.info('Loading into {}: {}'.format(
self.table_name(stream, False),
json.dumps({'inserts': inserts, 'updates': updates, 'size_bytes': size_bytes})))

def primary_key_merge_condition(self):
stream_schema_message = self.stream_schema_message
Expand Down

0 comments on commit 18ac93b

Please sign in to comment.