Skip to content

Commit

Permalink
Merge branch 'feature/incremental_sync_and_schema_logging' into devel…
Browse files Browse the repository at this point in the history
…opment
  • Loading branch information
RuslanBergenov committed Apr 11, 2022
2 parents 9490fec + 67ac76c commit 842923d
Show file tree
Hide file tree
Showing 12 changed files with 595 additions and 121 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ env:
TARGET_CONFIG_CONTAINS_TARGET_TABLES_CONFIG: ${{ secrets.TARGET_CONFIG_CONTAINS_TARGET_TABLES_CONFIG }}
MALFORMED_TARGET_CONFIG: ${{ secrets.MALFORMED_TARGET_CONFIG }}
TARGET_CONFIG_MERGE_STATE_FALSE_FLAG: ${{ secrets.TARGET_CONFIG_MERGE_STATE_FALSE_FLAG }}
TARGET_CONFIG_INCREMENTAL: ${{ secrets.TARGET_CONFIG_INCREMENTAL }}

on:
push:
branches: [development, master]
branches: [development, master, feature/incremental_sync_and_schema_logging]
pull_request:
branches: [ master ]

Expand Down Expand Up @@ -65,6 +66,7 @@ jobs:
echo "$TARGET_CONFIG_CONTAINS_TARGET_TABLES_CONFIG" > "sandbox/target_config_contains_target_tables_config.json"
echo "$MALFORMED_TARGET_CONFIG" > "sandbox/malformed_target_config.json"
echo "$TARGET_CONFIG_MERGE_STATE_FALSE_FLAG" > "sandbox/target_config_merge_state_false_flag.json"
echo "$TARGET_CONFIG_INCREMENTAL" > "sandbox/target_config_incremental.json"
pip install -r dev-requirements.txt
pytest --verbose
Expand Down
246 changes: 156 additions & 90 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
singer-python~=5.12.1
google-cloud-bigquery==2.24.0
google-cloud-bigquery==2.26.0
jsonschema~=2.6.0
setuptools~=60.3.1
6 changes: 5 additions & 1 deletion target_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ def main():
with open(flags.state) as f:
state = json.load(f)

# determine replication method: append vs. truncate
# determine replication method: append, truncate or incremental
truncate = False
incremental = False
if config.get("replication_method", "append").lower() == "truncate":
truncate = True
elif config.get("replication_method", "append").lower() == "incremental":
incremental = True

# arguments supplied in target config
table_prefix = config.get("table_prefix", "")
Expand Down Expand Up @@ -117,6 +120,7 @@ def main():
dataset=dataset,
location=location,
truncate=truncate,
incremental=incremental,
validate_records=validate_records,
table_prefix=table_prefix,
table_suffix=table_suffix,
Expand Down
99 changes: 81 additions & 18 deletions target_bigquery/processhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import singer
from google.api_core import exceptions as google_exceptions
from google.cloud import bigquery
from google.cloud.bigquery import LoadJobConfig, CopyJobConfig
from google.cloud.bigquery import LoadJobConfig, CopyJobConfig, QueryJobConfig
from google.cloud.bigquery import WriteDisposition
from google.cloud.bigquery.job import SourceFormat
from google.cloud.exceptions import NotFound
from jsonschema import validate

from target_bigquery.encoders import DecimalEncoder
Expand All @@ -32,6 +33,7 @@ def __init__(self, logger, **kwargs):

# LoadJobProcessHandler kwargs
self.truncate = kwargs.get("truncate", False)
self.incremental = kwargs.get("incremental", False)
self.add_metadata_columns = kwargs.get("add_metadata_columns", True)
self.validate_records = kwargs.get("validate_records", True)
self.table_configs = kwargs.get("table_configs", {}) or {}
Expand Down Expand Up @@ -94,6 +96,8 @@ def handle_schema_message(self, msg):
self.bq_schema_dicts[msg.stream] = self._build_bq_schema_dict(schema)
self.bq_schemas[msg.stream] = schema

self.logger.info(f"{msg.stream} BigQuery schema {schema}")

yield from ()

def on_stream_end(self):
Expand Down Expand Up @@ -174,9 +178,15 @@ def handle_record_message(self, msg):
raise Exception(f"A record for stream {msg.stream} was encountered before a corresponding schema")

schema = self.schemas[stream]

bq_schema = self.bq_schema_dicts[stream]
nr = cleanup_record(schema, msg.record)
nr = format_record_to_schema(nr, self.bq_schema_dicts[stream])

try:
nr = format_record_to_schema(nr, self.bq_schema_dicts[stream])
except Exception as e:
extra={"record" : msg.record, "schema": schema, "bq_schema": bq_schema}
self.logger.critical(f"Cannot format a record for stream {msg.stream} to its corresponding BigQuery schema. Details: {extra}")
raise e

# schema validation may fail if data doesn't match schema in terms of data types
# in this case, we validate schema again on data which has been forced to match schema
Expand Down Expand Up @@ -208,6 +218,15 @@ def on_stream_end(self):
self._do_temp_table_based_load(self.rows)
yield self.STATE

def primary_key_condition(self, stream):
self.logger.info(f"Primary keys: {', '.join(self.key_properties[stream])}")
keys = [f"t.{k}=s.{k}" for k in self.key_properties[stream]]
if len(keys) < 1:
raise Exception(f"No primary keys specified from the tap and Incremental option selected")
return " and ".join(keys)
#TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema)
#TODO: test it with dupe ids in the data

def _do_temp_table_based_load(self, rows):
assert isinstance(rows, dict)

Expand All @@ -231,22 +250,66 @@ def _do_temp_table_based_load(self, rows):
loaded_tmp_tables.append((stream, tmp_table_name))

# copy tables to production tables
# destination table can have dupe ids used in MERGE statement
# new data which being appended should have no dupes

# if new data has dupes, then MERGE will fail with a similar error:
# INFO Primary keys: id
# CRITICAL 400 UPDATE/MERGE must match at most one source row for each target row

# https://stackoverflow.com/questions/50504504/bigquery-error-update-merge-must-match-at-most-one-source-row-for-each-target-r
# https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax

# If a row in the table to be updated joins with more than one row from the FROM clause,
# then the query generates the following runtime error: UPDATE/MERGE must match at most one source row for each target row.
for stream, tmp_table_name in loaded_tmp_tables:
truncate = self.truncate if stream not in self.partially_loaded_streams else False

copy_config = CopyJobConfig()
if truncate:
copy_config.write_disposition = WriteDisposition.WRITE_TRUNCATE
self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by FULL_TABLE")
else:
copy_config.write_disposition = WriteDisposition.WRITE_APPEND
self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by APPEND")

self.client.copy_table(
sources=self.dataset.table(tmp_table_name),
destination=self.dataset.table(self.tables[stream]),
job_config=copy_config
).result()
incremental_success = False
if self.incremental:
self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by INCREMENTAL")
self.logger.warning(f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.")
table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}"
try:
self.client.get_table(table_id)
column_names = [x.name for x in self.bq_schemas[stream]]

query ="""MERGE `{table}` t
USING `{temp_table}` s
ON {primary_key_condition}
WHEN MATCHED THEN
UPDATE SET {set_values}
WHEN NOT MATCHED THEN
INSERT ({new_cols}) VALUES ({cols})
""".format(table=table_id,
temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}",
primary_key_condition=self.primary_key_condition(stream),
set_values=', '.join(f'{c}=s.{c}' for c in column_names),
new_cols=', '.join(column_names),
cols=', '.join(f's.{c}' for c in column_names))

job_config = QueryJobConfig()
query_job = self.client.query(query, job_config=job_config)
query_job.result()
self.logger.info(f'LOADED {query_job.num_dml_affected_rows} rows')
incremental_success = True

except NotFound:
self.logger.info(f"Table {table_id} is not found, proceeding to upload with TRUNCATE")
self.truncate = True
if not incremental_success:
truncate = self.truncate if stream not in self.partially_loaded_streams else False
copy_config = CopyJobConfig()
if truncate:
copy_config.write_disposition = WriteDisposition.WRITE_TRUNCATE
self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by FULL_TABLE")
else:
copy_config.write_disposition = WriteDisposition.WRITE_APPEND
self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by APPEND")

self.client.copy_table(
sources=self.dataset.table(tmp_table_name),
destination=self.dataset.table(self.tables[stream]),
job_config=copy_config
).result()

self.partially_loaded_streams.add(stream)
self.rows[stream].close() # erase the file
Expand Down
3 changes: 3 additions & 0 deletions tests/rsc/data/stream_format_record_to_schema_fails.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "type": "SCHEMA", "stream": "simple_stream", "schema": {"properties": {"id": {"type": ["string"]}, "name": {"type": ["null", "string"]}, "orderindex": {"type": ["null", "integer"]}, "override_statuses": {"type": ["null", "boolean"]}, "hidden": {"type": ["null", "boolean"]}, "space": {"properties": {"id": {"type": ["null", "string"]}, "name": {"type": ["null", "string"]}}, "type": "object"}, "task_count": {"type": ["null", "string", "integer"]}, "statuses": {"items": {}, "type": ["array", "null"]}, "lists": {"items": {}, "type": ["array", "null"]}, "archived": {"type": ["null", "boolean"]}, "permission_level": {"type": ["null", "string"]}}, "type": "object"}, "key_properties": [ "id" ], "bookmark_properties": [ "date" ] }
{ "type": "RECORD", "stream": "simple_stream", "record": {"id": "12933951", "name": "Milestone and Project Plan", "orderindex": 17, "override_statuses": "false", "hidden": "false", "space": {"id": "2577684", "name": "meshDelivery"}, "task_count": "10", "archived": "true", "statuses": [], "lists": [{"id": "25670974", "name": "POC <customer/department>", "orderindex": 0, "status": "null", "priority": "null", "assignee": "null", "task_count": 10, "due_date": "null", "start_date": "null", "space": {"id": "2577684", "name": "meshDelivery", "access": "true"}, "archived": "false", "override_statuses": "null", "statuses": [{"id": "p2577684_eDZ87cTk", "status": "Open", "orderindex": 0, "color": "#d3d3d3", "type": "open"}, {"id": "p2577684_Sf8kB74x", "status": "planned", "orderindex": 1, "color": "#82CB11", "type": "custom"}, {"id": "p2577684_yG5b2doG", "status": "in progress", "orderindex": 2, "color": "#4194f6", "type": "custom"}, {"id": "p2577684_BZKpph7f", "status": "review", "orderindex": 3, "color": "#A875FF", "type": "custom"}, {"id": "p2577684_ouoISXPV", "status": "Closed", "orderindex": 4, "color": "#6bc950", "type": "closed"}], "permission_level": "create"}], "permission_level": "create"}, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}, "time_extracted": "2020-07-14T22:21:35.098374Z" }
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{ "type": "SCHEMA", "stream": "simple_stream", "schema": { "properties": { "id": { "type": [ "null", "string" ] }, "name": { "type": [ "null", "string" ] }, "value": { "type": [ "null", "integer" ] }, "ratio": { "type": [ "null", "number" ] }, "timestamp": { "type": "string", "format": "date-time" }, "date": { "type": "string", "format": "date" } }, "type": [ "null", "object" ] }, "key_properties": [ "id" ], "bookmark_properties": [ "date" ] }
{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "001", "name": "LOAD_1", "value": 1, "ratio": 0.1, "timestamp": "2020-01-09T00:00:00.000000Z", "date": "2020-01-09" }, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-09T00:00:00.000000Z"}}}}
{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "002", "name": "LOAD_1", "value": 2, "ratio": 0.1, "timestamp": "2020-01-10T00:00:00.000000Z", "date": "2020-01-10" }, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-10T00:00:00.000000Z"}}}}
{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "003", "name": "LOAD_1", "value": 3, "ratio": 0.1, "timestamp": "2020-01-11T00:00:00.000000Z", "date": "2020-01-11" }, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{ "type": "SCHEMA", "stream": "simple_stream", "schema": { "properties": { "id": { "type": [ "null", "string" ] }, "name": { "type": [ "null", "string" ] }, "value": { "type": [ "null", "integer" ] }, "ratio": { "type": [ "null", "number" ] }, "timestamp": { "type": "string", "format": "date-time" }, "date": { "type": "string", "format": "date" } }, "type": [ "null", "object" ] }, "key_properties": [ "id" ], "bookmark_properties": [ "date" ] }
{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "001", "name": "UPDATED", "value": 1, "ratio": 0.1, "timestamp": "2020-01-09T00:00:00.000000Z", "date": "2020-01-09" }, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-09T00:00:00.000000Z"}}}}
{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "002", "name": "UPDATED", "value": 2, "ratio": 0.1, "timestamp": "2020-01-10T00:00:00.000000Z", "date": "2020-01-10" }, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-10T00:00:00.000000Z"}}}}
{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "003", "name": "UPDATED", "value": 3, "ratio": 0.1, "timestamp": "2020-01-11T00:00:00.000000Z", "date": "2020-01-11" }, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}}
{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "004", "name": "INSERTED", "value": 3, "ratio": 0.1, "timestamp": "2020-01-12T00:00:00.000000Z", "date": "2020-01-11" }, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-12T00:00:00.000000Z"}}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{ "type": "SCHEMA", "stream": "simple_stream", "schema": { "properties": { "id": { "type": [ "null", "string" ] }, "name": { "type": [ "null", "string" ] }, "value": { "type": [ "null", "integer" ] }, "ratio": { "type": [ "null", "number" ] }, "timestamp": { "type": "string", "format": "date-time" }, "date": { "type": "string", "format": "date" } }, "type": [ "null", "object" ] }, "key_properties": [ "id" ], "bookmark_properties": [ "date" ] }
{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "001", "name": "DUPE", "value": 1, "ratio": 0.1, "timestamp": "2020-01-09T00:00:00.000000Z", "date": "2020-01-09" }, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-09T00:00:00.000000Z"}}}}
{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "001", "name": "DUPE", "value": 2, "ratio": 0.1, "timestamp": "2020-01-10T00:00:00.000000Z", "date": "2020-01-10" }, "time_extracted": "2020-07-14T22:21:35.098374Z" }
{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-10T00:00:00.000000Z"}}}}
Loading

0 comments on commit 842923d

Please sign in to comment.