Skip to content

Commit

Permalink
Fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
judahrand committed Feb 22, 2022
1 parent 53effff commit 19e3531
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 20 deletions.
22 changes: 15 additions & 7 deletions target_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
import io
import json
import logging
import os
import sys
from multiprocessing.pool import ThreadPool as Pool

from jsonschema import Draft7Validator, FormatChecker
from singer import get_logger

from target_bigquery import stream_utils
from target_bigquery.db_sync import DbSync
from target_bigquery.db_sync import DbSync, flatten_record
from target_bigquery.exceptions import (
RecordValidationException,
InvalidValidationOperationException
Expand Down Expand Up @@ -119,7 +118,19 @@ def persist_lines(config, lines) -> None:
"or more) Try removing 'multipleOf' methods from JSON schema.")
raise RecordValidationException(f"Record does not pass schema validation. RECORD: {o['record']}")

primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record'])
if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete):
record = stream_utils.add_metadata_values_to_record(o)
else:
record = stream_utils.remove_metadata_values_from_record(o)

# Flatten record
record = flatten_record(
record,
stream_to_sync[stream].stream_schema_message['schema'],
max_level=stream_to_sync[stream].data_flattening_max_level
)

primary_key_string = stream_to_sync[stream].record_primary_key_string(record)
if not primary_key_string:
primary_key_string = 'RID-{}'.format(total_row_count[stream])

Expand All @@ -129,10 +140,7 @@ def persist_lines(config, lines) -> None:
total_row_count[stream] += 1

# append record
if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete):
records_to_load[stream][primary_key_string] = stream_utils.add_metadata_values_to_record(o)
else:
records_to_load[stream][primary_key_string] = o['record']
records_to_load[stream][primary_key_string] = record

flush = False
if row_count[stream] >= batch_size_rows:
Expand Down
11 changes: 4 additions & 7 deletions target_bigquery/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def flatten_record(d, schema, parent_key=[], sep='__', level=0, max_level=0):
items = []
for k, v in d.items():
safe_key = safe_column_name(k, quotes=False)
new_key = flatten_key(k, parent_key, sep)
new_key = flatten_key(safe_key, parent_key, sep)
key_schema = schema['properties'][k]
if isinstance(v, MutableMapping) and level < max_level and 'properties' in key_schema:
items.extend(flatten_record(
Expand Down Expand Up @@ -384,19 +384,16 @@ def table_name(self, stream_name, is_temporary=False, without_schema=False):
def record_primary_key_string(self, record):
if len(self.stream_schema_message['key_properties']) == 0:
return None
flatten = flatten_record(record, self.stream_schema_message['schema'], max_level=self.data_flattening_max_level)
try:
key_props = [str(flatten[p.lower()]) for p in self.stream_schema_message['key_properties']]
key_props = [str(record[p.lower()]) for p in self.stream_schema_message['key_properties']]
except Exception as exc:
logger.info("Cannot find {} primary key(s) in record: {}".format(self.stream_schema_message['key_properties'], flatten))
logger.info("Cannot find {} primary key(s) in record: {}".format(self.stream_schema_message['key_properties'], record))
raise exc
return ','.join(key_props)

def records_to_json(self, records, schema):
field_map = {field.name: field for field in schema}
singer_schema = self.stream_schema_message['schema']
flattened = (flatten_record(record, singer_schema, max_level=self.data_flattening_max_level) for record in records)
for record in flattened:
for record in records:
yield json.dumps(
record,
cls=BigQueryJSONEncoder,
Expand Down
12 changes: 6 additions & 6 deletions target_bigquery/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ def float_to_decimal(value):
return {k: float_to_decimal(v) for k, v in value.items()}
return value

def parse_datetime(dt):

def add_metadata_values_to_record(record_message):
"""Populate metadata _sdc columns from incoming record message
The location of the required attributes are fixed in the stream
"""
def parse_datetime(dt):
try:
# TODO: figure out why we can get _sdc_deleted_at as both datetime and string objects
if isinstance(dt, date):
Expand All @@ -99,11 +104,6 @@ def parse_datetime(dt):
except TypeError:
return None


def add_metadata_values_to_record(record_message):
"""Populate metadata _sdc columns from incoming record message
The location of the required attributes are fixed in the stream
"""
extended_record = record_message['record']
extended_record['_sdc_extracted_at'] = parse_datetime(record_message['time_extracted'])
extended_record['_sdc_batched_at'] = datetime.now()
Expand Down

0 comments on commit 19e3531

Please sign in to comment.