diff --git a/target_bigquery/__init__.py b/target_bigquery/__init__.py index ab3c1d5..7337f04 100644 --- a/target_bigquery/__init__.py +++ b/target_bigquery/__init__.py @@ -6,16 +6,14 @@ import io import json import logging -import os import sys -from tempfile import mkstemp from joblib import Parallel, delayed, parallel_backend from jsonschema import Draft7Validator, FormatChecker from singer import get_logger from target_bigquery import stream_utils -from target_bigquery.db_sync import DbSync +from target_bigquery.db_sync import DbSync, flatten_record from target_bigquery.exceptions import ( RecordValidationException, InvalidValidationOperationException @@ -120,7 +118,19 @@ def persist_lines(config, lines) -> None: "or more) Try removing 'multipleOf' methods from JSON schema.") raise RecordValidationException(f"Record does not pass schema validation. RECORD: {o['record']}") - primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record']) + if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete): + record = stream_utils.add_metadata_values_to_record(o) + else: + record = stream_utils.remove_metadata_values_from_record(o) + + # Flatten record + record = flatten_record( + record, + stream_to_sync[stream].stream_schema_message['schema'], + max_level=stream_to_sync[stream].data_flattening_max_level + ) + + primary_key_string = stream_to_sync[stream].record_primary_key_string(record) if not primary_key_string: primary_key_string = 'RID-{}'.format(total_row_count[stream]) @@ -130,10 +140,7 @@ def persist_lines(config, lines) -> None: total_row_count[stream] += 1 # append record - if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete): - records_to_load[stream][primary_key_string] = stream_utils.add_metadata_values_to_record(o) - else: - records_to_load[stream][primary_key_string] = o['record'] + records_to_load[stream][primary_key_string] = record flush = False if row_count[stream] >= batch_size_rows: diff --git a/target_bigquery/db_sync.py b/target_bigquery/db_sync.py index 4216592..d7ba7f7 100644 --- a/target_bigquery/db_sync.py +++ b/target_bigquery/db_sync.py @@ -211,7 +211,7 @@ def flatten_record(d, schema, parent_key=[], sep='__', level=0, max_level=0): items = [] for k, v in d.items(): safe_key = safe_column_name(k, quotes=False) - new_key = flatten_key(k, parent_key, sep) + new_key = flatten_key(safe_key, parent_key, sep) key_schema = schema['properties'][k] if isinstance(v, MutableMapping) and level < max_level and 'properties' in key_schema: items.extend(flatten_record( @@ -384,9 +384,8 @@ def table_name(self, stream_name, is_temporary=False, without_schema=False): def record_primary_key_string(self, record): if len(self.stream_schema_message['key_properties']) == 0: return None - flatten = flatten_record(record, self.stream_schema_message['schema'], max_level=self.data_flattening_max_level) try: - key_props = [str(flatten[p.lower()]) for p in self.stream_schema_message['key_properties']] + key_props = [str(record[p.lower()]) for p in self.stream_schema_message['key_properties']] except Exception as exc: logger.info("Cannot find {} primary key(s) in record: {}".format(self.stream_schema_message['key_properties'], flatten)) raise exc @@ -394,9 +393,7 @@ def record_primary_key_string(self, record): def records_to_json(self, records, schema): field_map = {field.name: field for field in schema} - singer_schema = self.stream_schema_message['schema'] - flattened = (flatten_record(record, singer_schema, max_level=self.data_flattening_max_level) for record in records) - for record in flattened: + for record in records: yield json.dumps( record, cls=BigQueryJSONEncoder, diff --git a/target_bigquery/stream_utils.py b/target_bigquery/stream_utils.py index d5b9c7d..b31e3bd 100644 --- a/target_bigquery/stream_utils.py +++ b/target_bigquery/stream_utils.py @@ -86,7 +86,12 @@ def float_to_decimal(value): return {k: float_to_decimal(v) for k, v in value.items()} return value -def parse_datetime(dt): + +def add_metadata_values_to_record(record_message): + """Populate metadata _sdc columns from incoming record message + The location of the required attributes are fixed in the stream + """ + def parse_datetime(dt): try: # TODO: figure out why we can get _sdc_deleted_at as both datetime and string objects if isinstance(dt, date): @@ -99,11 +104,6 @@ def parse_datetime(dt): except TypeError: return None - -def add_metadata_values_to_record(record_message): - """Populate metadata _sdc columns from incoming record message - The location of the required attributes are fixed in the stream - """ extended_record = record_message['record'] extended_record['_sdc_extracted_at'] = parse_datetime(record_message['time_extracted']) extended_record['_sdc_batched_at'] = datetime.now()