diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index 822e5ab1..c203d0ca 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -405,8 +405,9 @@ def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None): csv_line = db_sync.record_to_csv_line(record) f.write(bytes(csv_line + '\n', 'UTF-8')) + size_bytes = os.path.getsize(csv_file) s3_key = db_sync.put_to_stage(csv_file, stream, row_count, temp_dir=temp_dir) - db_sync.load_csv(s3_key, row_count) + db_sync.load_csv(s3_key, row_count, size_bytes) os.remove(csv_file) db_sync.delete_from_stage(s3_key) diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 0da37b2e..ba22212a 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -319,7 +319,7 @@ def query(self, query, params=None, max_records=0): queries = [query] for q in queries: - self.logger.debug("SNOWFLAKE - Running query: {}".format(q)) + self.logger.debug("Running query: {}".format(q)) cur.execute(q, params) # Raise exception if returned rows greater than max allowed records @@ -414,7 +414,7 @@ def delete_from_stage(self, s3_key): bucket = self.connection_config['s3_bucket'] self.s3.delete_object(Bucket=bucket, Key=s3_key) - def load_csv(self, s3_key, count): + def load_csv(self, s3_key, count, size_bytes): stream_schema_message = self.stream_schema_message stream = stream_schema_message['stream'] self.logger.info("Loading {} rows into '{}'".format(count, self.table_name(stream, False))) @@ -430,6 +430,8 @@ def load_csv(self, s3_key, count): with self.open_connection() as connection: with connection.cursor(snowflake.connector.DictCursor) as cur: + inserts = 0 + updates = 0 # Insert or Update with MERGE command if primary key defined if len(self.stream_schema_message['key_properties']) > 0: @@ -455,9 +457,15 @@ def load_csv(self, s3_key, count): ', '.join([c['name'] for c in columns_with_trans]), ', '.join(['s.{}'.format(c['name']) for c in columns_with_trans]) ) - self.logger.debug("SNOWFLAKE - {}".format(merge_sql)) + self.logger.debug("Running query: {}".format(merge_sql)) cur.execute(merge_sql) + # Get number of inserted and updated records - MERGE does insert and update + results = cur.fetchall() + if len(results) > 0: + inserts = results[0].get('number of rows inserted', 0) + updates = results[0].get('number of rows updated', 0) + # Insert only with COPY command if no primary key else: copy_sql = """COPY INTO {} ({}) FROM @{}/{} @@ -469,10 +477,17 @@ def load_csv(self, s3_key, count): s3_key, self.connection_config['file_format'], ) - self.logger.debug("SNOWFLAKE - {}".format(copy_sql)) + self.logger.debug("Running query: {}".format(copy_sql)) cur.execute(copy_sql) - self.logger.info("SNOWFLAKE - Merge into {}: {}".format(self.table_name(stream, False), cur.fetchall())) + # Get number of inserted records - COPY does insert only + results = cur.fetchall() + if len(results) > 0: + inserts = results[0].get('rows_loaded', 0) + + self.logger.info('Loading into {}: {}'.format( + self.table_name(stream, False), + json.dumps({'inserts': inserts, 'updates': updates, 'size_bytes': size_bytes}))) def primary_key_merge_condition(self): stream_schema_message = self.stream_schema_message