From 0242eff027ddc7e9cae0d522b5c996da5121bcaa Mon Sep 17 00:00:00 2001 From: "nil.serra" Date: Tue, 30 Nov 2021 10:08:49 +0100 Subject: [PATCH 01/28] Added incremental sync option --- README.md | 5 +++ requirements.txt | 2 +- target_bigquery/__init__.py | 6 ++- target_bigquery/processhandler.py | 71 ++++++++++++++++++++++++------- 4 files changed, 66 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 6013167..f62afe3 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,11 @@ Create a file called **target-config.json** in your working directory, following - The data will be written to the dataset specified in your **target-config.json**. - If you do not have the dataset with this name yet, it will be created. - The table will be created. +- There's an optional parameter `replication_method` that can either be: + * `append`: Adding new rows to the table (Default value) + * `truncate`: Deleting all previous rows and uploading the new ones to the table + * `incremental`: **Upserting** new rows into the table, using the **primary key** given by the tap connector + (if it finds an old row with same key, updates it. Otherwise it inserts the new row) Sample **target-config.json** file: ``` diff --git a/requirements.txt b/requirements.txt index fe052dd..5fe413b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ singer-python~=5.12.1 -google-cloud-bigquery==2.24.0 +google-cloud-bigquery==2.26.0 jsonschema~=2.6.0 setuptools~=57.4.0 \ No newline at end of file diff --git a/target_bigquery/__init__.py b/target_bigquery/__init__.py index f5b903b..1988bef 100644 --- a/target_bigquery/__init__.py +++ b/target_bigquery/__init__.py @@ -60,10 +60,13 @@ def main(): with open(flags.state) as f: state = json.load(f) - # determine replication method: append vs. truncate + # determine replication method: append, truncate or incremental truncate = False + incremental = False if config.get("replication_method", "append").lower() == "truncate": truncate = True + elif config.get("replication_method", "append").lower() == "incremental": + incremental = True # arguments supplied in target config table_prefix = config.get("table_prefix", "") @@ -117,6 +120,7 @@ def main(): dataset=dataset, location=location, truncate=truncate, + incremental=incremental, validate_records=validate_records, table_prefix=table_prefix, table_suffix=table_suffix, diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index a7eedc0..ec96a57 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -6,9 +6,10 @@ import singer from google.api_core import exceptions as google_exceptions from google.cloud import bigquery -from google.cloud.bigquery import LoadJobConfig, CopyJobConfig +from google.cloud.bigquery import LoadJobConfig, CopyJobConfig, QueryJobConfig from google.cloud.bigquery import WriteDisposition from google.cloud.bigquery.job import SourceFormat +from google.cloud.exceptions import NotFound from jsonschema import validate from target_bigquery.encoders import DecimalEncoder @@ -208,6 +209,13 @@ def on_stream_end(self): self._do_temp_table_based_load(self.rows) yield self.STATE + def primary_key_condition(self, stream): + self.logger.info(f"Primary keys: {', '.join(self.key_properties[stream])}") + keys = [f"t.{k}=s.{k}" for k in self.key_properties[stream]] + if len(keys) < 1: + raise Exception(f"No primary keys specified from the tap and Incremental option selected") + return " and ".join(keys) + def _do_temp_table_based_load(self, rows): assert isinstance(rows, dict) @@ -232,21 +240,52 @@ def _do_temp_table_based_load(self, rows): # copy tables to production tables for stream, tmp_table_name in loaded_tmp_tables: - truncate = self.truncate if stream not in self.partially_loaded_streams else False - - copy_config = CopyJobConfig() - if truncate: - copy_config.write_disposition = WriteDisposition.WRITE_TRUNCATE - self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by FULL_TABLE") - else: - copy_config.write_disposition = WriteDisposition.WRITE_APPEND - self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by APPEND") - - self.client.copy_table( - sources=self.dataset.table(tmp_table_name), - destination=self.dataset.table(self.tables[stream]), - job_config=copy_config - ).result() + incremental_success = False + if self.incremental: + self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by INCREMENTAL") + table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}" + try: + self.client.get_table(table_id) + column_names = [x.name for x in self.bq_schemas[stream]] + + query ="""MERGE `{table}` t + USING `{temp_table}` s + ON {primary_key_condition} + WHEN MATCHED THEN + UPDATE SET {set_values} + WHEN NOT MATCHED THEN + INSERT ({new_cols}) VALUES ({cols}) + """.format(table=table_id, + temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}", + primary_key_condition=self.primary_key_condition(stream), + set_values=', '.join(f'{c}=s.{c}' for c in column_names), + new_cols=', '.join(column_names), + cols=', '.join(f's.{c}' for c in column_names)) + + job_config = QueryJobConfig() + query_job = self.client.query(query, job_config=job_config) + query_job.result() + self.logger.info(f'LOADED {query_job.num_dml_affected_rows} rows') + incremental_success = True + + except NotFound: + self.logger.info(f"Table {table_id} is not found, proceeding to upload with TRUNCATE") + self.truncate = True + if not incremental_success: + truncate = self.truncate if stream not in self.partially_loaded_streams else False + copy_config = CopyJobConfig() + if truncate: + copy_config.write_disposition = WriteDisposition.WRITE_TRUNCATE + self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by FULL_TABLE") + else: + copy_config.write_disposition = WriteDisposition.WRITE_APPEND + self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by APPEND") + + self.client.copy_table( + sources=self.dataset.table(tmp_table_name), + destination=self.dataset.table(self.tables[stream]), + job_config=copy_config + ).result() self.partially_loaded_streams.add(stream) self.rows[stream].close() # erase the file From 5921ff3b53033c39392e6294cca906680531ba7f Mon Sep 17 00:00:00 2001 From: "nil.serra" Date: Tue, 30 Nov 2021 11:36:13 +0100 Subject: [PATCH 02/28] Fix --- target_bigquery/processhandler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index ec96a57..db0ff54 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -33,6 +33,7 @@ def __init__(self, logger, **kwargs): # LoadJobProcessHandler kwargs self.truncate = kwargs.get("truncate", False) + self.incremental = kwargs.get("incremental", False) self.add_metadata_columns = kwargs.get("add_metadata_columns", True) self.validate_records = kwargs.get("validate_records", True) self.table_configs = kwargs.get("table_configs", {}) or {} From 60ebe7543eb05f7c27c6d8967ee44a7ddd11973f Mon Sep 17 00:00:00 2001 From: "nil.serra" Date: Tue, 30 Nov 2021 11:45:46 +0100 Subject: [PATCH 03/28] Added GCP Workload Identity Federation as authentication option --- README.md | 9 ++++++ requirements.txt | 1 + target_bigquery/__init__.py | 28 +++++++++++++++++++ target_bigquery/get_token.py | 46 +++++++++++++++++++++++++++++++ target_bigquery/processhandler.py | 19 ++++++++++--- target_bigquery/utils.py | 9 ++++-- 6 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 target_bigquery/get_token.py diff --git a/README.md b/README.md index f62afe3..0be7737 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,15 @@ Create a service account credential: - [Creating an environment variable on a Windows 10 machine](https://www.architectryan.com/2018/08/31/how-to-change-environment-variables-on-windows-10/) - [Creating an environment variable on a Mac machine](https://medium.com/@himanshuagarwal1395/setting-up-environment-variables-in-macos-sierra-f5978369b255) +There's also the possibility of using [GCP Workload Identity Federation](https://cloud.google.com/iam/docs/workload-identity-federation) as authentication method. To use it you have to: + * Set as `True` the optional parameter `gcp_workload_identity_federation` + * Specify the following parameters in the config file: + * gcp_project_number + * gcp_workload_id + * gcp_workload_provider + * gcp_service_account_email + * aws_account_id + * aws_role_name ### Step 3: Configure #### Target config file diff --git a/requirements.txt b/requirements.txt index 5fe413b..02cc94c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ singer-python~=5.12.1 google-cloud-bigquery==2.26.0 +scalesec-gcp-workload-identity==1.0.7 jsonschema~=2.6.0 setuptools~=57.4.0 \ No newline at end of file diff --git a/target_bigquery/__init__.py b/target_bigquery/__init__.py index 1988bef..0d3061c 100644 --- a/target_bigquery/__init__.py +++ b/target_bigquery/__init__.py @@ -12,6 +12,7 @@ from target_bigquery.process import process from target_bigquery.utils import emit_state, ensure_dataset from target_bigquery.state import State, LiteralState +from target_bigquery.get_token import getToken logger = singer.get_logger() @@ -68,6 +69,33 @@ def main(): elif config.get("replication_method", "append").lower() == "incremental": incremental = True + # Get parameters and generate token when using gcp workload identity federation + use_gcp = config.get("gcp_workload_identity_federation") + if use_gcp: + gcp_project_number = config.get("gcp_project_number") + if gcp_project_number is None: + raise Exception("gcp_project_number not specified") + gcp_workload_id = config.get("gcp_workload_id") + if gcp_workload_id is None: + raise Exception("gcp_workload_id not specified") + gcp_workload_provider = config.get("gcp_workload_provider") + if gcp_workload_provider is None: + raise Exception("gcp_workload_provider not specified") + gcp_service_account_email = config.get("gcp_service_account_email") + if gcp_service_account_email is None: + raise Exception("gcp_service_account_email not specified") + aws_account_id = config.get("aws_account_id") + if aws_account_id is None: + raise Exception("aws_account_id not specified") + aws_role_name = config.get("aws_role_name") + if aws_role_name is None: + raise Exception("aws_role_name not specified") + aws_region = config.get("aws_region") + gcp_token_lifetime = config.get("gcp_token_lifetime") + gcp_token_scopes = config.get("gcp_token_scopes") + getToken(gcp_project_number, gcp_workload_id, gcp_workload_provider, gcp_service_account_email, + aws_account_id, aws_role_name, aws_region, gcp_token_lifetime, gcp_token_scopes) + # arguments supplied in target config table_prefix = config.get("table_prefix", "") table_suffix = config.get("table_suffix", "") diff --git a/target_bigquery/get_token.py b/target_bigquery/get_token.py new file mode 100644 index 0000000..885da1d --- /dev/null +++ b/target_bigquery/get_token.py @@ -0,0 +1,46 @@ +import logging +import os + +from scalesec_gcp_workload_identity.main import TokenService +import boto3 +import time + +logger = logging.getLogger(__name__) + +def getToken(gcp_project_number, gcp_workload_id, gcp_workload_provider, gcp_service_account_email, + aws_account_id, aws_role_name, aws_region, gcp_token_lifetime, gcp_token_scopes): + logger.info("Started Getting Token, params are:\n" + "gcp_project_number: " + gcp_project_number +", gcp_workload_id: " + gcp_workload_id + ", gcp_workload_provider: " + + gcp_workload_provider + ", gcp_service_account_email: " + gcp_service_account_email + ", aws_account_id: " + + aws_account_id + ", aws_role_name: " + aws_role_name + ", aws_region: " + aws_region + ", gcp_token_lifetime: " + + gcp_token_lifetime + ", gcp_token_scopes: " + gcp_token_scopes) + attempts = 6 + while True: + try: + client = boto3.client('sts') + identity = client.get_caller_identity() + logger.info(f"Using caller identity: {identity}\n") + break + except: + attempts = attempts - 1 + if attempts <= 0: + raise + else: + logger.info("AWS Credentials are not yet available. Trying again in a few seconds...") + time.sleep(10) + + token_service = TokenService( + gcp_project_number=gcp_project_number, + gcp_workload_id=gcp_workload_id, + gcp_workload_provider=gcp_workload_provider, + gcp_service_account_email=gcp_service_account_email, + aws_account_id=aws_account_id, + aws_role_name=aws_role_name, + aws_region=aws_region, + gcp_token_lifetime=gcp_token_lifetime, + gcp_token_scopes=gcp_token_scopes, + ) + + sa_token, expiry_date = token_service.get_token() + os.environ['GCP_AUTH_TOKEN'] = sa_token + os.environ['GCP_AUTH_TOKEN_EXPIRY_DATE'] = expiry_date diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index db0ff54..95ec01c 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -1,9 +1,11 @@ import json +import os import uuid from datetime import datetime from tempfile import TemporaryFile import singer +import google.oauth2.credentials from google.api_core import exceptions as google_exceptions from google.cloud import bigquery from google.cloud.bigquery import LoadJobConfig, CopyJobConfig, QueryJobConfig @@ -140,10 +142,19 @@ def __init__(self, logger, **kwargs): self.bq_schema_dicts = {} self.rows = {} - self.client = bigquery.Client( - project=self.project_id, - location=kwargs.get("location", "US") - ) + token = os.getenv('GCP_AUTH_TOKEN') + if token is not None: + credentials = google.oauth2.credentials.Credentials(token) + self.client = bigquery.Client( + credentials=credentials, + project=self.project_id, + location=kwargs.get("location", "US") + ) + else: + self.client = bigquery.Client( + project=self.project_id, + location=kwargs.get("location", "US") + ) def handle_schema_message(self, msg): for s in super(LoadJobProcessHandler, self).handle_schema_message(msg): diff --git a/target_bigquery/utils.py b/target_bigquery/utils.py index 577d2cb..ae5dd52 100644 --- a/target_bigquery/utils.py +++ b/target_bigquery/utils.py @@ -3,6 +3,7 @@ import sys import singer +import google.oauth2.credentials from google.api_core import exceptions from google.cloud import bigquery from google.cloud.bigquery import Dataset @@ -40,8 +41,12 @@ def ensure_dataset(project_id, dataset_id, location): :return: client (BigQuery Client Object) and Dataset (BigQuery dataset) """ from google.cloud.bigquery import DatasetReference - client = bigquery.Client(project=project_id, location=location) - + token = os.environ.get('GCP_AUTH_TOKEN', None) + if token is not None: + credentials = google.oauth2.credentials.Credentials(token) + client = bigquery.Client(credentials=credentials, project=project_id, location=location) + else: + client = bigquery.Client(project=project_id, location=location) dataset_ref = DatasetReference(project_id, dataset_id) try: client.create_dataset(dataset_ref) From b474e45f96cbe9c4a6e40af19606ff2209142292 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 27 Dec 2021 11:23:36 +0100 Subject: [PATCH 04/28] feat: log record to schema conversion issues this enables better debugging of schema issues --- target_bigquery/processhandler.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index a7eedc0..f95e0a2 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -174,9 +174,15 @@ def handle_record_message(self, msg): raise Exception(f"A record for stream {msg.stream} was encountered before a corresponding schema") schema = self.schemas[stream] - + bq_schema = self.bq_schema_dicts[stream] nr = cleanup_record(schema, msg.record) - nr = format_record_to_schema(nr, self.bq_schema_dicts[stream]) + + try: + nr = format_record_to_schema(nr, self.bq_schema_dicts[stream]) + except Exception as e: + extra={"record" : msg.record, "schema": schema, "bq_schema": bq_schema} + self.logger.info(f"Cannot format a record for stream {msg.stream} to its corresponding BigQuery schema. Details: {extra}") + raise e # schema validation may fail if data doesn't match schema in terms of data types # in this case, we validate schema again on data which has been forced to match schema From 7b7fb90ee4ca9de8479eb5c31d5c1fbf352442a4 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 3 Jan 2022 08:33:07 +0100 Subject: [PATCH 05/28] feat: log generated BigQuery schema to help debug schema conversion issues --- target_bigquery/processhandler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index f95e0a2..0ada32e 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -94,6 +94,8 @@ def handle_schema_message(self, msg): self.bq_schema_dicts[msg.stream] = self._build_bq_schema_dict(schema) self.bq_schemas[msg.stream] = schema + self.logger.info(f"{msg.stream} BigQuery schema {schema}") + yield from () def on_stream_end(self): From 87e7ca7fa41616409fdf460bdbf879c74d2dcf61 Mon Sep 17 00:00:00 2001 From: "nil.serra" Date: Tue, 30 Nov 2021 11:45:46 +0100 Subject: [PATCH 06/28] Revert "Added GCP Workload Identity Federation as authentication option" This reverts commit 60ebe7543eb05f7c27c6d8967ee44a7ddd11973f. --- README.md | 9 ------ requirements.txt | 1 - target_bigquery/__init__.py | 28 ------------------- target_bigquery/get_token.py | 46 ------------------------------- target_bigquery/processhandler.py | 19 +++---------- target_bigquery/utils.py | 9 ++---- 6 files changed, 6 insertions(+), 106 deletions(-) delete mode 100644 target_bigquery/get_token.py diff --git a/README.md b/README.md index 0be7737..f62afe3 100644 --- a/README.md +++ b/README.md @@ -128,15 +128,6 @@ Create a service account credential: - [Creating an environment variable on a Windows 10 machine](https://www.architectryan.com/2018/08/31/how-to-change-environment-variables-on-windows-10/) - [Creating an environment variable on a Mac machine](https://medium.com/@himanshuagarwal1395/setting-up-environment-variables-in-macos-sierra-f5978369b255) -There's also the possibility of using [GCP Workload Identity Federation](https://cloud.google.com/iam/docs/workload-identity-federation) as authentication method. To use it you have to: - * Set as `True` the optional parameter `gcp_workload_identity_federation` - * Specify the following parameters in the config file: - * gcp_project_number - * gcp_workload_id - * gcp_workload_provider - * gcp_service_account_email - * aws_account_id - * aws_role_name ### Step 3: Configure #### Target config file diff --git a/requirements.txt b/requirements.txt index d3ee147..ec4e47b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ singer-python~=5.12.1 google-cloud-bigquery==2.26.0 -scalesec-gcp-workload-identity==1.0.7 jsonschema~=2.6.0 setuptools~=60.3.1 \ No newline at end of file diff --git a/target_bigquery/__init__.py b/target_bigquery/__init__.py index 0d3061c..1988bef 100644 --- a/target_bigquery/__init__.py +++ b/target_bigquery/__init__.py @@ -12,7 +12,6 @@ from target_bigquery.process import process from target_bigquery.utils import emit_state, ensure_dataset from target_bigquery.state import State, LiteralState -from target_bigquery.get_token import getToken logger = singer.get_logger() @@ -69,33 +68,6 @@ def main(): elif config.get("replication_method", "append").lower() == "incremental": incremental = True - # Get parameters and generate token when using gcp workload identity federation - use_gcp = config.get("gcp_workload_identity_federation") - if use_gcp: - gcp_project_number = config.get("gcp_project_number") - if gcp_project_number is None: - raise Exception("gcp_project_number not specified") - gcp_workload_id = config.get("gcp_workload_id") - if gcp_workload_id is None: - raise Exception("gcp_workload_id not specified") - gcp_workload_provider = config.get("gcp_workload_provider") - if gcp_workload_provider is None: - raise Exception("gcp_workload_provider not specified") - gcp_service_account_email = config.get("gcp_service_account_email") - if gcp_service_account_email is None: - raise Exception("gcp_service_account_email not specified") - aws_account_id = config.get("aws_account_id") - if aws_account_id is None: - raise Exception("aws_account_id not specified") - aws_role_name = config.get("aws_role_name") - if aws_role_name is None: - raise Exception("aws_role_name not specified") - aws_region = config.get("aws_region") - gcp_token_lifetime = config.get("gcp_token_lifetime") - gcp_token_scopes = config.get("gcp_token_scopes") - getToken(gcp_project_number, gcp_workload_id, gcp_workload_provider, gcp_service_account_email, - aws_account_id, aws_role_name, aws_region, gcp_token_lifetime, gcp_token_scopes) - # arguments supplied in target config table_prefix = config.get("table_prefix", "") table_suffix = config.get("table_suffix", "") diff --git a/target_bigquery/get_token.py b/target_bigquery/get_token.py deleted file mode 100644 index 885da1d..0000000 --- a/target_bigquery/get_token.py +++ /dev/null @@ -1,46 +0,0 @@ -import logging -import os - -from scalesec_gcp_workload_identity.main import TokenService -import boto3 -import time - -logger = logging.getLogger(__name__) - -def getToken(gcp_project_number, gcp_workload_id, gcp_workload_provider, gcp_service_account_email, - aws_account_id, aws_role_name, aws_region, gcp_token_lifetime, gcp_token_scopes): - logger.info("Started Getting Token, params are:\n" - "gcp_project_number: " + gcp_project_number +", gcp_workload_id: " + gcp_workload_id + ", gcp_workload_provider: " - + gcp_workload_provider + ", gcp_service_account_email: " + gcp_service_account_email + ", aws_account_id: " - + aws_account_id + ", aws_role_name: " + aws_role_name + ", aws_region: " + aws_region + ", gcp_token_lifetime: " - + gcp_token_lifetime + ", gcp_token_scopes: " + gcp_token_scopes) - attempts = 6 - while True: - try: - client = boto3.client('sts') - identity = client.get_caller_identity() - logger.info(f"Using caller identity: {identity}\n") - break - except: - attempts = attempts - 1 - if attempts <= 0: - raise - else: - logger.info("AWS Credentials are not yet available. Trying again in a few seconds...") - time.sleep(10) - - token_service = TokenService( - gcp_project_number=gcp_project_number, - gcp_workload_id=gcp_workload_id, - gcp_workload_provider=gcp_workload_provider, - gcp_service_account_email=gcp_service_account_email, - aws_account_id=aws_account_id, - aws_role_name=aws_role_name, - aws_region=aws_region, - gcp_token_lifetime=gcp_token_lifetime, - gcp_token_scopes=gcp_token_scopes, - ) - - sa_token, expiry_date = token_service.get_token() - os.environ['GCP_AUTH_TOKEN'] = sa_token - os.environ['GCP_AUTH_TOKEN_EXPIRY_DATE'] = expiry_date diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 95ec01c..db0ff54 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -1,11 +1,9 @@ import json -import os import uuid from datetime import datetime from tempfile import TemporaryFile import singer -import google.oauth2.credentials from google.api_core import exceptions as google_exceptions from google.cloud import bigquery from google.cloud.bigquery import LoadJobConfig, CopyJobConfig, QueryJobConfig @@ -142,19 +140,10 @@ def __init__(self, logger, **kwargs): self.bq_schema_dicts = {} self.rows = {} - token = os.getenv('GCP_AUTH_TOKEN') - if token is not None: - credentials = google.oauth2.credentials.Credentials(token) - self.client = bigquery.Client( - credentials=credentials, - project=self.project_id, - location=kwargs.get("location", "US") - ) - else: - self.client = bigquery.Client( - project=self.project_id, - location=kwargs.get("location", "US") - ) + self.client = bigquery.Client( + project=self.project_id, + location=kwargs.get("location", "US") + ) def handle_schema_message(self, msg): for s in super(LoadJobProcessHandler, self).handle_schema_message(msg): diff --git a/target_bigquery/utils.py b/target_bigquery/utils.py index ae5dd52..577d2cb 100644 --- a/target_bigquery/utils.py +++ b/target_bigquery/utils.py @@ -3,7 +3,6 @@ import sys import singer -import google.oauth2.credentials from google.api_core import exceptions from google.cloud import bigquery from google.cloud.bigquery import Dataset @@ -41,12 +40,8 @@ def ensure_dataset(project_id, dataset_id, location): :return: client (BigQuery Client Object) and Dataset (BigQuery dataset) """ from google.cloud.bigquery import DatasetReference - token = os.environ.get('GCP_AUTH_TOKEN', None) - if token is not None: - credentials = google.oauth2.credentials.Credentials(token) - client = bigquery.Client(credentials=credentials, project=project_id, location=location) - else: - client = bigquery.Client(project=project_id, location=location) + client = bigquery.Client(project=project_id, location=location) + dataset_ref = DatasetReference(project_id, dataset_id) try: client.create_dataset(dataset_ref) From 41081efac0ffebf502cf0c58f9dca87bf6360641 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Fri, 7 Jan 2022 15:11:35 -0700 Subject: [PATCH 07/28] test: CI/CD --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 6149243..f7970f9 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -25,7 +25,7 @@ env: on: push: - branches: [development, master] + branches: [development, master, feature/incremental_sync] pull_request: branches: [ master ] From a06f45f3da99437f30ff2bb48e133ea002a56a09 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Fri, 7 Jan 2022 18:39:25 -0700 Subject: [PATCH 08/28] test: incremental sync --- .../simple_stream_incremental_load_1.json | 7 + .../simple_stream_incremental_load_2.json | 9 ++ tests/test_partialloads.py | 123 +++++++++++++++++- 3 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 tests/rsc/partial_load_streams/simple_stream_incremental_load_1.json create mode 100644 tests/rsc/partial_load_streams/simple_stream_incremental_load_2.json diff --git a/tests/rsc/partial_load_streams/simple_stream_incremental_load_1.json b/tests/rsc/partial_load_streams/simple_stream_incremental_load_1.json new file mode 100644 index 0000000..9e11dc3 --- /dev/null +++ b/tests/rsc/partial_load_streams/simple_stream_incremental_load_1.json @@ -0,0 +1,7 @@ +{ "type": "SCHEMA", "stream": "simple_stream", "schema": { "properties": { "id": { "type": [ "null", "string" ] }, "name": { "type": [ "null", "string" ] }, "value": { "type": [ "null", "integer" ] }, "ratio": { "type": [ "null", "number" ] }, "timestamp": { "type": "string", "format": "date-time" }, "date": { "type": "string", "format": "date" } }, "type": [ "null", "object" ] }, "key_properties": [ "id" ], "bookmark_properties": [ "date" ] } +{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "001", "name": "LOAD_1", "value": 1, "ratio": 0.1, "timestamp": "2020-01-09T00:00:00.000000Z", "date": "2020-01-09" }, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-09T00:00:00.000000Z"}}}} +{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "002", "name": "LOAD_1", "value": 2, "ratio": 0.1, "timestamp": "2020-01-10T00:00:00.000000Z", "date": "2020-01-10" }, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-10T00:00:00.000000Z"}}}} +{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "003", "name": "LOAD_1", "value": 3, "ratio": 0.1, "timestamp": "2020-01-11T00:00:00.000000Z", "date": "2020-01-11" }, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}} \ No newline at end of file diff --git a/tests/rsc/partial_load_streams/simple_stream_incremental_load_2.json b/tests/rsc/partial_load_streams/simple_stream_incremental_load_2.json new file mode 100644 index 0000000..66658f1 --- /dev/null +++ b/tests/rsc/partial_load_streams/simple_stream_incremental_load_2.json @@ -0,0 +1,9 @@ +{ "type": "SCHEMA", "stream": "simple_stream", "schema": { "properties": { "id": { "type": [ "null", "string" ] }, "name": { "type": [ "null", "string" ] }, "value": { "type": [ "null", "integer" ] }, "ratio": { "type": [ "null", "number" ] }, "timestamp": { "type": "string", "format": "date-time" }, "date": { "type": "string", "format": "date" } }, "type": [ "null", "object" ] }, "key_properties": [ "id" ], "bookmark_properties": [ "date" ] } +{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "001", "name": "UPDATED", "value": 1, "ratio": 0.1, "timestamp": "2020-01-09T00:00:00.000000Z", "date": "2020-01-09" }, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-09T00:00:00.000000Z"}}}} +{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "002", "name": "UPDATED", "value": 2, "ratio": 0.1, "timestamp": "2020-01-10T00:00:00.000000Z", "date": "2020-01-10" }, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-10T00:00:00.000000Z"}}}} +{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "003", "name": "UPDATED", "value": 3, "ratio": 0.1, "timestamp": "2020-01-11T00:00:00.000000Z", "date": "2020-01-11" }, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}} +{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "004", "name": "INSERTED", "value": 3, "ratio": 0.1, "timestamp": "2020-01-12T00:00:00.000000Z", "date": "2020-01-11" }, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-12T00:00:00.000000Z"}}}} \ No newline at end of file diff --git a/tests/test_partialloads.py b/tests/test_partialloads.py index 8c00ceb..8c4ce7f 100644 --- a/tests/test_partialloads.py +++ b/tests/test_partialloads.py @@ -29,11 +29,24 @@ "dataset_id": "{your_dataset_id}" "max_cache": 0 } + + + - target_config_incremental.json: + + { + "project_id": "{your-project-id}", + "dataset_id": "{your_dataset_id}", + "replication_method": "incremental" + } + """ from tests import unittestcore +from google.cloud.bigquery import Client +import json import os - +from decimal import Decimal +import pandas as pd class TestPartialLoadsPartialLoadJob(unittestcore.BaseUnitTest): @@ -329,6 +342,114 @@ def test_simple_stream_load_twice_append(self): self.delete_temp_state() + def test_simple_stream_load_incremental(self): + + from target_bigquery import main + + for i in range(2): + self.set_cli_args( + stdin=os.path.join(os.path.join( + os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), + 'rsc'), + 'partial_load_streams'), 'simple_stream_incremental_load_1.json'), + config=os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target_config_incremental.json'), + processhandler="partial-load-job", + ds_delete=i == 0 + ) + + ret = main() + state = self.get_state() + self.assertEqual(1, len(state)) + + self.assertEqual(ret, 0, msg="Exit code is not 0!") + self.assertDictEqual(state[-1], + {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}) + + table = self.client.get_table("{}.simple_stream".format(self.dataset_id)) + + self.assertEqual(3, table.num_rows, msg="Number of rows mismatch") + + self.delete_temp_state() + + # verify data + + config_file = os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target-config.json') + + config = json.load(open(config_file)) + project_id = config["project_id"] + dataset_id = config["dataset_id"] + stream = "simple_stream" + + bq_client = Client(project=project_id) + + query_string = f"SELECT id, name FROM `{project_id}.{dataset_id}.{stream}`" + + df_actual = ( + bq_client.query(query_string) + .result() + .to_dataframe() + ) + + data_expected = { + 'id': ['001', '002', '003'], + 'name': ["LOAD_1","LOAD_1","LOAD_1"] + } + + # creating a Dataframe object + df_expected = pd.DataFrame(data_expected) + + assert df_expected.equals(df_actual) + + + self.set_cli_args( + stdin=os.path.join(os.path.join( + os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), + 'rsc'), + 'partial_load_streams'), 'simple_stream_incremental_load_2.json'), + config=os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target_config_incremental.json'), + processhandler="partial-load-job", + ds_delete=False + ) + + ret = main() + state = self.get_state() + self.assertEqual(1, len(state)) + + self.assertEqual(ret, 0, msg="Exit code is not 0!") + self.assertDictEqual(state[-1], + {"bookmarks": {"simple_stream": {"timestamp": "2020-01-12T00:00:00.000000Z"}}}) + + table = self.client.get_table("{}.simple_stream".format(self.dataset_id)) + + self.assertEqual(4, table.num_rows, msg="Number of rows mismatch") + + self.assertIsNone(table.clustering_fields) + self.assertIsNone(table.partitioning_type) + self.delete_temp_state() + + df_actual = ( + bq_client.query(query_string) + .result() + .to_dataframe() + ) + + data_expected = { + 'id': ['001', '002', '003', '004'], + 'name': ["UPDATED", "UPDATED", "UPDATED","INSERTED"] + } + + # creating a Dataframe object + df_expected = pd.DataFrame(data_expected) + + assert df_expected.equals(df_actual) + + class TestPartialLoadsBookmarksPartialLoadJob(unittestcore.BaseUnitTest): def test_simple_stream(self): From 666930faf969c65cf952bf9c84c572d60b6bfe6f Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Fri, 7 Jan 2022 18:39:50 -0700 Subject: [PATCH 09/28] refactor: README markdown formatting --- README.md | 246 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 153 insertions(+), 93 deletions(-) diff --git a/README.md b/README.md index f62afe3..31c2f81 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,19 @@ A [Singer](https://singer.io) target that writes data to Google BigQuery. [![Python package](https://github.com/adswerve/target-bigquery/actions/workflows/python-package.yml/badge.svg)](https://github.com/adswerve/target-bigquery/actions/workflows/python-package.yml) -`target-bigquery` works together with any other [Singer Tap] to move data from sources like [Braintree], [Freshdesk] and [Hubspot] to Google BigQuery. +`target-bigquery` works together with any other [Singer Tap] to move data from sources like [Braintree], [Freshdesk] +and [Hubspot] to Google BigQuery. ## Contents + - [Contact](#contact) - [Dependencies](#dependencies) - [How to use it](#how-to-use-it) - [Step 1: Enable Google BigQuery API](#step-1-enable-google-bigquery-api) - [Step 2: Authenticate with a service account](#step-2-authenticate-with-a-service-account) - [Step 3: Configure](#step-3-configure) - - [Target config file](#target-config-file) - - [Tap config files](#tap-config-files) + - [Target config file](#target-config-file) + - [Tap config files](#tap-config-files) - [Step 4: Install and run](#step-4-install-and-run) - [Step 5: target-tables-config file: set up partitioning and clustering](#step-5-target-tables-config-file-set-up-partitioning-and-clustering) - [Partitioning background](#partitioning-background) @@ -33,19 +35,24 @@ Email: `analytics-help@adswerve.com` Install requirements, using either of the two methods below. **Method 1** + ``` pip install -r requirements.txt ``` **Method 2** -Alternatively, you can run the following command. It runs *setup.py* and installs target-bigquery into the env like the user would. **-e** emulates how a user of the package would install requirements. +Alternatively, you can run the following command. It runs *setup.py* and installs target-bigquery into the env like the +user would. **-e** emulates how a user of the package would install requirements. + ``` pip install -e . ``` + **Additional development and testing requirements** -Install additional dependencies required for development and testing. +Install additional dependencies required for development and testing. + ``` pip install -r dev-requirements.txt ``` @@ -82,31 +89,28 @@ Create a service account credential: PI & Services -> Credentials -> Create Credentials -> Service account - - 2. Under **Service account details**, enter **Service account name**. Click **Create** Enter Service account name -3. Under **Grant this service account access to the project**, select **BigQuery Data Editor** and **BigQuery Job User** as the minimal set of permissions. Click **Done** +3. Under **Grant this service account access to the project**, select **BigQuery Data Editor** and **BigQuery Job User** + as the minimal set of permissions. Click **Done** - - **BigQuery Data Editor** permission allows the service account to access (and change) the data. - - **BigQuery Job User** permission allows the service account to actually run a load or select job. +- **BigQuery Data Editor** permission allows the service account to access (and change) the data. +- **BigQuery Job User** permission allows the service account to actually run a load or select job. Grant this service account access to the project - 4. On the **API & Services Credentials** screen, select the service account you just created. Select the service account - 5. Click **ADD KEY** -> **Create new key** -> **JSON key**. Download the service account credential JSON file. @@ -121,10 +125,12 @@ Create a service account credential: Download the service account credential JSON file +6. Name the file **client_secrets.json**. You can place the file where `target-bigquery` will be executed or provide a + path to the service account json file. -6. Name the file **client_secrets.json**. You can place the file where `target-bigquery` will be executed or provide a path to the service account json file. +7. Set a **GOOGLE_APPLICATION_CREDENTIALS** environment variable on the machine, where the value is the fully qualified + path to **client_secrets.json** file: -7. Set a **GOOGLE_APPLICATION_CREDENTIALS** environment variable on the machine, where the value is the fully qualified path to **client_secrets.json** file: - [Creating an environment variable on a Windows 10 machine](https://www.architectryan.com/2018/08/31/how-to-change-environment-variables-on-windows-10/) - [Creating an environment variable on a Mac machine](https://medium.com/@himanshuagarwal1395/setting-up-environment-variables-in-macos-sierra-f5978369b255) @@ -132,21 +138,24 @@ Create a service account credential: #### Target config file -Create a file called **target-config.json** in your working directory, following this sample [target-config.json](/sample_config/target-config-exchange-rates-api.json) file (or see the example below). +Create a file called **target-config.json** in your working directory, following this +sample [target-config.json](/sample_config/target-config-exchange-rates-api.json) file (or see the example below). -- Required parameters are the project name `project_id` and `dataset_id`. -- Optional parameters are `table_suffix`, `validate records`, `add_metadata_columns`, `location` and `table_config`. -- Default data location is "US" (if your location is not the US, you can indicate a different location in your **target-config.json** file). -- The data will be written to the dataset specified in your **target-config.json**. -- If you do not have the dataset with this name yet, it will be created. -- The table will be created. +- Required parameters are the project name `project_id` and `dataset_id`. +- Optional parameters are `table_suffix`, `validate records`, `add_metadata_columns`, `location` and `table_config`. +- Default data location is "US" (if your location is not the US, you can indicate a different location in your ** + target-config.json** file). +- The data will be written to the dataset specified in your **target-config.json**. +- If you do not have the dataset with this name yet, it will be created. +- The table will be created. - There's an optional parameter `replication_method` that can either be: - * `append`: Adding new rows to the table (Default value) - * `truncate`: Deleting all previous rows and uploading the new ones to the table - * `incremental`: **Upserting** new rows into the table, using the **primary key** given by the tap connector - (if it finds an old row with same key, updates it. Otherwise it inserts the new row) + * `append`: Adding new rows to the table (Default value) + * `truncate`: Deleting all previous rows and uploading the new ones to the table + * `incremental`: **Upserting** new rows into the table, using the **primary key** given by the tap connector + (if it finds an old row with same key, updates it. Otherwise it inserts the new row) Sample **target-config.json** file: + ``` { "project_id": "{your_GCP_project_id}", @@ -161,31 +170,40 @@ Sample **target-config.json** file: #### Tap config files -This is a little bit outside of the scope of this documentation, but let's quickly take a look at sample *tap* config files as well, to see how tap and target work together. +This is a little bit outside of the scope of this documentation, but let's quickly take a look at sample *tap* config +files as well, to see how tap and target work together. Sample [tap-config.json](/sample_config/tap-config-exchange-rates-api.json) file configures the data source: + ``` { "base": "USD", "start_date": "2021-01-01" } ``` -- Sample [state.json](/sample_config/state.json) file is now just a empty JSON file `{}`, and it will be written or updated when the tap runs. +- Sample [state.json](/sample_config/state.json) file is now just a empty JSON file `{}`, and it will be written or + updated when the tap runs. - This is an optional file. -- The tap will write the date into **state.json** file, indicating when the data loading stopped at. -- Next time you run the tap, it'll continue from this date in the state file. If **state.json** file is provided, then it takes presedence over the "start_date" in the tap config file. +- The tap will write the date into **state.json** file, indicating when the data loading stopped at. +- Next time you run the tap, it'll continue from this date in the state file. If **state.json** file is provided, then + it takes presedence over the "start_date" in the tap config file. Learn more: https://github.com/singer-io/getting-started ### Step 4: Install and run -1. First, make sure Python 3 is installed on your system or follow these installation instructions for [Mac](python-mac) or [Ubuntu](python-ubuntu). +1. First, make sure Python 3 is installed on your system or follow these installation instructions for [Mac](python-mac) + or [Ubuntu](python-ubuntu). -2. `target-bigquery` can be run with any [Singer Tap], but we'll use [tap-exchangeratesapi] - which pulls currency exchange rate data from a public data set - as an example. (Learn more about [Exchangeratesapi.io](http://exchangeratesapi.io/)) +2. `target-bigquery` can be run with any [Singer Tap], but we'll use [tap-exchangeratesapi] - which pulls currency + exchange rate data from a public data set - as an example. (Learn more + about [Exchangeratesapi.io](http://exchangeratesapi.io/)) -3. In the **target-config.json** file, enter the id of your GCP (Google Cloud Platform Project) - you can find it on the Home page of your [GCP web console](https://console.cloud.google.com). +3. In the **target-config.json** file, enter the id of your GCP (Google Cloud Platform Project) - you can find it on the + Home page of your [GCP web console](https://console.cloud.google.com). Sample **target-config.json** file: + ``` { "project_id": "{your project id}", @@ -193,11 +211,14 @@ Sample **target-config.json** file: } ``` -4. These commands will install `tap-exchangeratesapi` and `target-bigquery` with pip and then run them together, piping the output of `tap-exchangeratesapi` to `target-bigquery`. +4. These commands will install `tap-exchangeratesapi` and `target-bigquery` with pip and then run them together, piping + the output of `tap-exchangeratesapi` to `target-bigquery`. -We recommend that you **install tap and target in their own virtual environments.** It will be easier to manage requirements and avoid dependency conflicts. +We recommend that you **install tap and target in their own virtual environments.** It will be easier to manage +requirements and avoid dependency conflicts. -- The commands below are for running locally on a Windows machine. For a Mac or Linux machine, the syntax will be slightly different. +- The commands below are for running locally on a Windows machine. For a Mac or Linux machine, the syntax will be + slightly different. ```bash cd "{your project root directory}" @@ -236,15 +257,17 @@ py -m venv target # ^ on a Windows machine indicates a new line. On a Mac, use "\\". ``` - - -- If you're using a different tap, substitute `tap-exchangeratesapi` in the final command above to the command used to run your tap. +- If you're using a different tap, substitute `tap-exchangeratesapi` in the final command above to the command used to + run your tap. ### Step 5: target-tables-config file: set up partitioning and clustering ### Partitioning background -A [partitioned table](https://cloud.google.com/bigquery/docs/partitioned-tables) is a special table that is divided into segments, called partitions, that make it easier to manage and query your data. By dividing a large table into smaller partitions, you can: +A [partitioned table](https://cloud.google.com/bigquery/docs/partitioned-tables) is a special table that is divided into +segments, called partitions, that make it easier to manage and query your data. By dividing a large table into smaller +partitions, you can: + - improve query performance, - control costs by reducing the number of bytes read by a query. @@ -256,17 +279,16 @@ You can partition BigQuery tables by: - Integer range: Tables are partitioned based on an integer column. - - - ### Clustering background - - When you create a clustered table in BigQuery, the table data is automatically organized based on the contents of one or more columns in the table’s schema. - - The columns you specify are used to colocate related data. -- When you cluster a table using multiple columns, the order of columns you specify is important. The order of the specified columns determines the sort order of the data. - - Clustering can improve the performance of certain types of queries such as queries that use filter clauses and queries that aggregate data. - - You can cluster up to 4 columns in a table - +- When you create a clustered table in BigQuery, the table data is automatically organized based on the contents of one + or more columns in the table’s schema. +- The columns you specify are used to colocate related data. +- When you cluster a table using multiple columns, the order of columns you specify is important. The order of the + specified columns determines the sort order of the data. +- Clustering can improve the performance of certain types of queries such as queries that use filter clauses and queries + that aggregate data. +- You can cluster up to 4 columns in a table **Learn more about BigQuery partitioned and clustered tables:** @@ -282,7 +304,8 @@ https://medium.com/analytics-vidhya/bigquery-partitioning-clustering-9f84fc201e6 **Example 1: [tap-recharge] data** -This is not a follow-along example. Additional tap configuration would be required to run it. This example is just for illustration purposes. +This is not a follow-along example. Additional tap configuration would be required to run it. This example is just for +illustration purposes. If we were to load [tap-recharge] *charges* table into BigQuery, we could partition it by date. @@ -292,6 +315,7 @@ For clustering, we can selected: - columns likely to appear in `WHERE` and `GROUP BY` statements To configure partitioning and clustering in BigQuery destination tables, we create **target-tables-config.json**: + ``` { "streams": { @@ -309,22 +333,25 @@ We can verify in BigQuery web UI that partitioning and clustering worked: **Example 2: [tap-exchangeratesapi] data** -You can follow along and try this example on your own. We will continue where we left off in **Step 4: Install and Run** above. +You can follow along and try this example on your own. We will continue where we left off in **Step 4: Install and Run** +above. 1. Take a look at our [tap-exchangeratesapi] data. We have: + - dates - datetimes -- floats which show exchange rates +- floats which show exchange rates Download the service account credential JSON file Download the service account credential JSON file -In our [tap-exchangeratesapi] example, no columns are good candidates for clustering. +In our [tap-exchangeratesapi] example, no columns are good candidates for clustering. -You can only set up partitioning. +You can only set up partitioning. 2. Create your **target-tables-config.json** with partitioning configuration. Leave cluster fields blank: + ``` { "streams": { @@ -335,41 +362,53 @@ You can only set up partitioning. }} ``` -3. Clear you **state.json**, so it's an empty JSON `{}`, because we want to load all data again. Skip this step, if you didn't previously load this data in **Step 4** above. +3. Clear you **state.json**, so it's an empty JSON `{}`, because we want to load all data again. Skip this step, if you + didn't previously load this data in **Step 4** above. + +4. Delete your BigQuery destination table **exchangeratesapi**, because we want to re-load it again from scratch. Skip + this step, if you didn't previously load this data in **Step 4** above. -4. Delete your BigQuery destination table **exchangeratesapi**, because we want to re-load it again from scratch. Skip this step, if you didn't previously load this data in **Step 4** above. +3. Load data data into BigQuery, while configuring target tables. Pass **target-tables-config.json** as a command line + argument. -3. Load data data into BigQuery, while configuring target tables. Pass **target-tables-config.json** as a command line argument. ```bash {project_root_dir}\tap\Scripts\tap-exchangeratesapi --config sample_config/tap-config-exchange-rates-api.json | ^ {project_root_dir}\target\Scripts\target-bigquery --config sample_config/target-config-exchange-rates-api.json ^ -t sample_config/target-tables-config-exchange-rates-api.json > sample_config/state.json ``` -- "^" indicates a new line in Windows Command Prompt. In Mac terminal, use "\\". -- If you don't want to pass **target-tables-config.json** file as a CLI argument, you can add ```"table_config": "target-tables-config.json"``` to your **target-config.json** file. See **Step 3: Configure** above. +- "^" indicates a new line in Windows Command Prompt. In Mac terminal, use "\\". +- If you don't want to pass **target-tables-config.json** file as a CLI argument, you can + add ```"table_config": "target-tables-config.json"``` to your **target-config.json** file. See **Step 3: Configure** + above. -6. Verify in BigQuery web UI that partitioning and clustering worked (in our example below, we only set up partitioning): +6. Verify in BigQuery web UI that partitioning and clustering worked (in our example below, we only set up + partitioning): Download the service account credential JSON file - ### Step 6: target-tables-config file: force data types and modes #### Problem: + - Normally, tap catalog file governs schema of data which will be loaded into target-bigquery. - However, sometimes you can get a column of an undesired data type, which is not following your tap-catalog file. - + #### Solution: -- You can force that column to the desired data type by using `force_fields` flag inside your *target-tables-config.json* file. - + +- You can force that column to the desired data type by using `force_fields` flag inside your * + target-tables-config.json* file. + #### Example: -- We used this solution to fix `"date_start"` field from `"ads_insights_age_and_gender"` stream from tap-facebook. -- In tap catalog file, we said we wanted this column to be a **date**. -- However, the tap generates schema where this column is a **string**, despite our tap catalog file. -- Therefore, we used `force_fields` flag in target-tables-config.json to override what the tap generates and force the column to be a date. + +- We used this solution to fix `"date_start"` field from `"ads_insights_age_and_gender"` stream from tap-facebook. +- In tap catalog file, we said we wanted this column to be a **date**. +- However, the tap generates schema where this column is a **string**, despite our tap catalog file. +- Therefore, we used `force_fields` flag in target-tables-config.json to override what the tap generates and force the + column to be a date. - Example of *target-tables-config.json* file: + ``` { "streams": { @@ -388,6 +427,7 @@ You can only set up partitioning. ## Unit tests set up Add the following files to *sandbox* directory under project root directory: + - **sa.json** with GCP credential - **target-config.json**: @@ -428,31 +468,31 @@ Add the following files to *sandbox* directory under project root directory: ``` - **target_config_contains_target_tables_config.json** - - - if you're running unit test from the unit test .py file: - - ``` - { - "project_id": "{your-project-id}", - "dataset_id": "{your_dataset_id}", - "table_config": "rsc/config/simple_stream_table_config.json" - } - ``` - - if you're running unit test from shell, for example: - - ```bash - pytest --verbose tests/test_simplestream.py::TestSimpleStreamLoadJob::test_simple_stream_with_tables_config_passed_inside_target_config_file - ``` + - if you're running unit test from the unit test .py file: - In this case, here's your config file, notice the difference in directory: - ``` - { - "project_id": "{your-project-id}", - "dataset_id": "{your_dataset_id}", - "table_config": "tests/rsc/config/simple_stream_table_config.json" - } - ``` + ``` + { + "project_id": "{your-project-id}", + "dataset_id": "{your_dataset_id}", + "table_config": "rsc/config/simple_stream_table_config.json" + } + ``` + + - if you're running unit test from shell, for example: + + ```bash + pytest --verbose tests/test_simplestream.py::TestSimpleStreamLoadJob::test_simple_stream_with_tables_config_passed_inside_target_config_file + ``` + + In this case, here's your config file, notice the difference in directory: + ``` + { + "project_id": "{your-project-id}", + "dataset_id": "{your_dataset_id}", + "table_config": "tests/rsc/config/simple_stream_table_config.json" + } + ``` - **malformed_target_config.json**: @@ -463,8 +503,8 @@ Add the following files to *sandbox* directory under project root directory: "validate_records": false } ``` - -- **target_config_merge_state_false_flag.json**: + +- **target_config_merge_state_false_flag.json**: ``` { "project_id": "{your-project-id}", @@ -472,20 +512,40 @@ Add the following files to *sandbox* directory under project root directory: "merge_state_messages": 0 } ``` + +- **target_config_incremental.json**: + + ``` + { + "project_id": "{your-project-id}", + "dataset_id": "{your_dataset_id}", + "replication_method": "incremental" + } + ``` + ## Config files in this project This project has three locations with config files: 1) **sample_config** - sample config files to illustrate points made in this README 2) **tests/rsc/config** - config files necessary for unit tests -3) **sandbox** - config files you create for unit tests. We didn't include them because they have sensitive info (e.g., GCP project names). Follow instructions in the section **Unit tests set up**, as well as comments in unit tests. +3) **sandbox** - config files you create for unit tests. We didn't include them because they have sensitive info (e.g., + GCP project names). Follow instructions in the section **Unit tests set up**, as well as comments in unit tests. + --- [Singer Tap]: https://singer.io + [Braintree]: https://github.com/singer-io/tap-braintree + [Freshdesk]: https://github.com/singer-io/tap-freshdesk + [Hubspot]: https://github.com/singer-io/tap-hubspot + [tap-exchangeratesapi]: https://github.com/singer-io/tap-exchangeratesapi + [python-mac]: http://docs.python-guide.org/en/latest/starting/install3/osx/ + [python-ubuntu]: https://www.digitalocean.com/community/tutorials/how-to-install-python-3-and-set-up-a-local-programming-environment-on-ubuntu-16-04 + [tap-recharge]: https://github.com/singer-io/tap-recharge \ No newline at end of file From 5761045be8a8d609d1a5966b8b476991fabb312f Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Fri, 7 Jan 2022 18:42:38 -0700 Subject: [PATCH 10/28] docs: comments --- tests/test_partialloads.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_partialloads.py b/tests/test_partialloads.py index 8c4ce7f..d679ae5 100644 --- a/tests/test_partialloads.py +++ b/tests/test_partialloads.py @@ -346,6 +346,7 @@ def test_simple_stream_load_incremental(self): from target_bigquery import main + # LOAD same data twice for i in range(2): self.set_cli_args( stdin=os.path.join(os.path.join( @@ -404,7 +405,7 @@ def test_simple_stream_load_incremental(self): assert df_expected.equals(df_actual) - + # Load new data with MERGE statement self.set_cli_args( stdin=os.path.join(os.path.join( os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), @@ -433,6 +434,7 @@ def test_simple_stream_load_incremental(self): self.assertIsNone(table.partitioning_type) self.delete_temp_state() + # verify data df_actual = ( bq_client.query(query_string) .result() From 06fd1dd2817b319ca4c017feec9e03e976eda42a Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 8 Jan 2022 17:37:53 -0700 Subject: [PATCH 11/28] fix: unit test --- .github/workflows/python-package.yml | 2 ++ tests/test_partialloads.py | 2 +- tests/unittestcore.py | 4 ++++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f7970f9..7055a10 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -22,6 +22,7 @@ env: TARGET_CONFIG_CONTAINS_TARGET_TABLES_CONFIG: ${{ secrets.TARGET_CONFIG_CONTAINS_TARGET_TABLES_CONFIG }} MALFORMED_TARGET_CONFIG: ${{ secrets.MALFORMED_TARGET_CONFIG }} TARGET_CONFIG_MERGE_STATE_FALSE_FLAG: ${{ secrets.TARGET_CONFIG_MERGE_STATE_FALSE_FLAG }} + TARGET_CONFIG_INCREMENTAL: ${{ secrets.TARGET_CONFIG_INCREMENTAL }} on: push: @@ -65,6 +66,7 @@ jobs: echo "$TARGET_CONFIG_CONTAINS_TARGET_TABLES_CONFIG" > "sandbox/target_config_contains_target_tables_config.json" echo "$MALFORMED_TARGET_CONFIG" > "sandbox/malformed_target_config.json" echo "$TARGET_CONFIG_MERGE_STATE_FALSE_FLAG" > "sandbox/target_config_merge_state_false_flag.json" + echo "TARGET_CONFIG_INCREMENTAL" > "sandbox/target_config_incremental.json" pip install -r dev-requirements.txt pytest --verbose diff --git a/tests/test_partialloads.py b/tests/test_partialloads.py index d679ae5..75e44f4 100644 --- a/tests/test_partialloads.py +++ b/tests/test_partialloads.py @@ -387,7 +387,7 @@ def test_simple_stream_load_incremental(self): bq_client = Client(project=project_id) - query_string = f"SELECT id, name FROM `{project_id}.{dataset_id}.{stream}`" + query_string = f"SELECT id, name FROM `{project_id}.{dataset_id}.{stream}` ORDER BY 1, 2" df_actual = ( bq_client.query(query_string) diff --git a/tests/unittestcore.py b/tests/unittestcore.py index 9f5c038..5b9f6bd 100644 --- a/tests/unittestcore.py +++ b/tests/unittestcore.py @@ -40,6 +40,10 @@ def setUp(self): os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), 'target_config_merge_state_false_flag.json') + os.environ["TARGET_CONFIG_INCREMENTAL"] = os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target_config_incremental.json') + # TODO: make naming convention of target config files consistent "_" vs "-". Use "_" as it's easier to copy with a click # I think we would just need to rename target-config.json to target_config.json (also update it in README) self.client = None From d724e359e45cafa392101499c78da4b42f424c5a Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 8 Jan 2022 18:59:07 -0700 Subject: [PATCH 12/28] fix: unit test --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 7055a10..35cfe73 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -66,7 +66,7 @@ jobs: echo "$TARGET_CONFIG_CONTAINS_TARGET_TABLES_CONFIG" > "sandbox/target_config_contains_target_tables_config.json" echo "$MALFORMED_TARGET_CONFIG" > "sandbox/malformed_target_config.json" echo "$TARGET_CONFIG_MERGE_STATE_FALSE_FLAG" > "sandbox/target_config_merge_state_false_flag.json" - echo "TARGET_CONFIG_INCREMENTAL" > "sandbox/target_config_incremental.json" + echo "$TARGET_CONFIG_INCREMENTAL" > "sandbox/target_config_incremental.json" pip install -r dev-requirements.txt pytest --verbose From 9457193c2219504a4f57e53bd06949ce1b91008c Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Mon, 10 Jan 2022 18:08:58 -0700 Subject: [PATCH 13/28] test: truncate and append operations - verify data --- tests/test_partialloads.py | 101 ++++++++++++++++++++++++++++++++----- 1 file changed, 88 insertions(+), 13 deletions(-) diff --git a/tests/test_partialloads.py b/tests/test_partialloads.py index 75e44f4..df48eb4 100644 --- a/tests/test_partialloads.py +++ b/tests/test_partialloads.py @@ -45,8 +45,13 @@ from google.cloud.bigquery import Client import json import os -from decimal import Decimal import pandas as pd +import logging + +# logging.basicConfig(level=logging.DEBUG) +# logger = logging.getLogger(__name__) +from testfixtures import log_capture + class TestPartialLoadsPartialLoadJob(unittestcore.BaseUnitTest): @@ -280,15 +285,17 @@ def test_simple_stream_load_twice_truncate(self): from target_bigquery import main + config_file_path = os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target_config_cache.json') + for i in range(2): # two truncate loops self.set_cli_args( stdin=os.path.join(os.path.join( os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'), 'partial_load_streams'), 'simple_stream.json'), - config=os.path.join( - os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), - 'target_config_cache.json'), + config=config_file_path, processhandler="partial-load-job", ds_delete=i == 0 ) @@ -309,19 +316,48 @@ def test_simple_stream_load_twice_truncate(self): self.assertIsNone(table.partitioning_type) self.delete_temp_state() + # verify data + + config = json.load(open(config_file_path)) + project_id = config["project_id"] + dataset_id = config["dataset_id"] + stream = "simple_stream" + + bq_client = Client(project=project_id) + + query_string = f"SELECT id, name FROM `{project_id}.{dataset_id}.{stream}` ORDER BY 1, 2" + + df_actual = ( + bq_client.query(query_string) + .result() + .to_dataframe() + ) + + data_expected = { + 'id': ['123', '123', '123'], + 'name': ["TEST_1", "TEST_2", "TEST_3"] + } + + # creating a Dataframe object + df_expected = pd.DataFrame(data_expected) + + assert df_expected.equals(df_actual) + def test_simple_stream_load_twice_append(self): from target_bigquery import main + config_file_path = os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target_config_cache_append.json') + for i in range(2): # two append loops self.set_cli_args( stdin=os.path.join(os.path.join( os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'), 'partial_load_streams'), 'simple_stream.json'), - config=os.path.join( - os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), - 'target_config_cache_append.json'), + config=config_file_path, processhandler="partial-load-job", ds_delete=i == 0 ) @@ -341,10 +377,41 @@ def test_simple_stream_load_twice_append(self): self.assertIsNone(table.partitioning_type) self.delete_temp_state() + # verify data + config = json.load(open(config_file_path)) + project_id = config["project_id"] + dataset_id = config["dataset_id"] + stream = "simple_stream" + + bq_client = Client(project=project_id) - def test_simple_stream_load_incremental(self): + query_string = f"SELECT id, name FROM `{project_id}.{dataset_id}.{stream}` ORDER BY 1, 2" + + df_actual = ( + bq_client.query(query_string) + .result() + .to_dataframe() + ) + + data_expected = { + 'id': ['123', '123', '123', '123', '123', '123'], + 'name': ["TEST_1", "TEST_1", "TEST_2", "TEST_2", "TEST_3", "TEST_3"] + } + + # creating a Dataframe object + df_expected = pd.DataFrame(data_expected) + + assert df_expected.equals(df_actual) + + @log_capture() + def test_simple_stream_load_incremental(self, logcapture): from target_bigquery import main + # logger.warning("test") + + config_file = os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target-config.json') # LOAD same data twice for i in range(2): @@ -353,9 +420,7 @@ def test_simple_stream_load_incremental(self): os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'), 'partial_load_streams'), 'simple_stream_incremental_load_1.json'), - config=os.path.join( - os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), - 'target_config_incremental.json'), + config=config_file, processhandler="partial-load-job", ds_delete=i == 0 ) @@ -397,7 +462,7 @@ def test_simple_stream_load_incremental(self): data_expected = { 'id': ['001', '002', '003'], - 'name': ["LOAD_1","LOAD_1","LOAD_1"] + 'name': ["LOAD_1", "LOAD_1", "LOAD_1"] } # creating a Dataframe object @@ -443,7 +508,7 @@ def test_simple_stream_load_incremental(self): data_expected = { 'id': ['001', '002', '003', '004'], - 'name': ["UPDATED", "UPDATED", "UPDATED","INSERTED"] + 'name': ["UPDATED", "UPDATED", "UPDATED", "INSERTED"] } # creating a Dataframe object @@ -451,6 +516,16 @@ def test_simple_stream_load_incremental(self): assert df_expected.equals(df_actual) + # expected_log = ('root', 'INFO', + # "LOADED 4 rows") + # + # logcapture.check_present(expected_log, ) + # TODO: logging is not being captured in data load tests + + def verify_data(self): + pass + # TODO: use a function to avoid repetition + class TestPartialLoadsBookmarksPartialLoadJob(unittestcore.BaseUnitTest): From 1897466a2bb8d57d1fccbe3fa07fedb70a555ad3 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Tue, 11 Jan 2022 14:56:01 -0700 Subject: [PATCH 14/28] fix: unit tests --- target_bigquery/processhandler.py | 1 + tests/test_partialloads.py | 27 ++++----------------------- 2 files changed, 5 insertions(+), 23 deletions(-) diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index db0ff54..1bcd9d3 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -240,6 +240,7 @@ def _do_temp_table_based_load(self, rows): loaded_tmp_tables.append((stream, tmp_table_name)) # copy tables to production tables + #TODO: what happens if MERGE fails because of dupe ids? for stream, tmp_table_name in loaded_tmp_tables: incremental_success = False if self.incremental: diff --git a/tests/test_partialloads.py b/tests/test_partialloads.py index df48eb4..0029152 100644 --- a/tests/test_partialloads.py +++ b/tests/test_partialloads.py @@ -46,11 +46,6 @@ import json import os import pandas as pd -import logging - -# logging.basicConfig(level=logging.DEBUG) -# logger = logging.getLogger(__name__) -from testfixtures import log_capture class TestPartialLoadsPartialLoadJob(unittestcore.BaseUnitTest): @@ -403,15 +398,9 @@ def test_simple_stream_load_twice_append(self): assert df_expected.equals(df_actual) - @log_capture() - def test_simple_stream_load_incremental(self, logcapture): + def test_simple_stream_load_incremental(self): from target_bigquery import main - # logger.warning("test") - - config_file = os.path.join( - os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), - 'target-config.json') # LOAD same data twice for i in range(2): @@ -420,7 +409,9 @@ def test_simple_stream_load_incremental(self, logcapture): os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'), 'partial_load_streams'), 'simple_stream_incremental_load_1.json'), - config=config_file, + config=os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target_config_incremental.json'), processhandler="partial-load-job", ds_delete=i == 0 ) @@ -516,16 +507,6 @@ def test_simple_stream_load_incremental(self, logcapture): assert df_expected.equals(df_actual) - # expected_log = ('root', 'INFO', - # "LOADED 4 rows") - # - # logcapture.check_present(expected_log, ) - # TODO: logging is not being captured in data load tests - - def verify_data(self): - pass - # TODO: use a function to avoid repetition - class TestPartialLoadsBookmarksPartialLoadJob(unittestcore.BaseUnitTest): From 9939a7997a924a9b5543d3d7c67e457a28b6398a Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Tue, 11 Jan 2022 16:16:40 -0700 Subject: [PATCH 15/28] test: unit tests --- target_bigquery/processhandler.py | 13 ++++- .../simple_stream_incremental_load_3.json | 5 ++ tests/test_partialloads.py | 52 +++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 tests/rsc/partial_load_streams/simple_stream_incremental_load_3.json diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 1bcd9d3..fd2c172 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -240,7 +240,18 @@ def _do_temp_table_based_load(self, rows): loaded_tmp_tables.append((stream, tmp_table_name)) # copy tables to production tables - #TODO: what happens if MERGE fails because of dupe ids? + # destination table can have dupe ids used in MERGE statement + # new data which being appended should have no dupes + + # if new data has dupes, then MERGE will fail with a similar error: + # INFO Primary keys: id + # CRITICAL 400 UPDATE/MERGE must match at most one source row for each target row + + # https://stackoverflow.com/questions/50504504/bigquery-error-update-merge-must-match-at-most-one-source-row-for-each-target-r + # https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax + + # If a row in the table to be updated joins with more than one row from the FROM clause, + # then the query generates the following runtime error: UPDATE/MERGE must match at most one source row for each target row. for stream, tmp_table_name in loaded_tmp_tables: incremental_success = False if self.incremental: diff --git a/tests/rsc/partial_load_streams/simple_stream_incremental_load_3.json b/tests/rsc/partial_load_streams/simple_stream_incremental_load_3.json new file mode 100644 index 0000000..9d41474 --- /dev/null +++ b/tests/rsc/partial_load_streams/simple_stream_incremental_load_3.json @@ -0,0 +1,5 @@ +{ "type": "SCHEMA", "stream": "simple_stream", "schema": { "properties": { "id": { "type": [ "null", "string" ] }, "name": { "type": [ "null", "string" ] }, "value": { "type": [ "null", "integer" ] }, "ratio": { "type": [ "null", "number" ] }, "timestamp": { "type": "string", "format": "date-time" }, "date": { "type": "string", "format": "date" } }, "type": [ "null", "object" ] }, "key_properties": [ "id" ], "bookmark_properties": [ "date" ] } +{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "001", "name": "DUPE", "value": 1, "ratio": 0.1, "timestamp": "2020-01-09T00:00:00.000000Z", "date": "2020-01-09" }, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-09T00:00:00.000000Z"}}}} +{ "type": "RECORD", "stream": "simple_stream", "record": { "id": "001", "name": "DUPE", "value": 2, "ratio": 0.1, "timestamp": "2020-01-10T00:00:00.000000Z", "date": "2020-01-10" }, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-10T00:00:00.000000Z"}}}} \ No newline at end of file diff --git a/tests/test_partialloads.py b/tests/test_partialloads.py index 0029152..bed7e33 100644 --- a/tests/test_partialloads.py +++ b/tests/test_partialloads.py @@ -46,6 +46,7 @@ import json import os import pandas as pd +import pytest class TestPartialLoadsPartialLoadJob(unittestcore.BaseUnitTest): @@ -507,6 +508,57 @@ def test_simple_stream_load_incremental(self): assert df_expected.equals(df_actual) + def test_simple_stream_load_incremental_error_expected(self): + + from target_bigquery import main + + # LOAD 1 + + self.set_cli_args( + stdin=os.path.join(os.path.join( + os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), + 'rsc'), + 'partial_load_streams'), 'simple_stream_incremental_load_1.json'), + config=os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target_config_incremental.json'), + processhandler="partial-load-job", + ds_delete=True + ) + + ret = main() + + # LOAD 2 + # Load new data with MERGE statement + self.set_cli_args( + stdin=os.path.join(os.path.join( + os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), + 'rsc'), + 'partial_load_streams'), 'simple_stream_incremental_load_3.json'), + config=os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target_config_incremental.json'), + processhandler="partial-load-job", + ds_delete=False + ) + # with pytest.raises(ValueError, match="JSON schema is invalid/incomplete. It has empty properties"): + ret = main() + + # destination table can have dupe ids used in MERGE statement + # new data which being appended should have no dupes + + # if new data has dupes, then MERGE will fail with a similar error: + # INFO Primary keys: id + # CRITICAL 400 UPDATE/MERGE must match at most one source row for each target row + + # https://stackoverflow.com/questions/50504504/bigquery-error-update-merge-must-match-at-most-one-source-row-for-each-target-r + # https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax + + # If a row in the table to be updated joins with more than one row from the FROM clause, + # then the query generates the following runtime error: UPDATE/MERGE must match at most one source row for each target row. + + self.assertEqual(ret, 2, msg="Exit code is not 2!") # expected exit code is 2 - serious problem + class TestPartialLoadsBookmarksPartialLoadJob(unittestcore.BaseUnitTest): From 7a81f9e6105040e1a1ad4bbf91560c00ed759571 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Tue, 11 Jan 2022 16:18:48 -0700 Subject: [PATCH 16/28] docs: comments --- tests/test_partialloads.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_partialloads.py b/tests/test_partialloads.py index bed7e33..3ef55d1 100644 --- a/tests/test_partialloads.py +++ b/tests/test_partialloads.py @@ -513,7 +513,6 @@ def test_simple_stream_load_incremental_error_expected(self): from target_bigquery import main # LOAD 1 - self.set_cli_args( stdin=os.path.join(os.path.join( os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), @@ -528,7 +527,7 @@ def test_simple_stream_load_incremental_error_expected(self): ret = main() - # LOAD 2 + # LOAD 2: data has dupes, which cause MERGE query to break # Load new data with MERGE statement self.set_cli_args( stdin=os.path.join(os.path.join( From dc382e442a6dba16b553b807a387f32ac0f95a0e Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Wed, 12 Jan 2022 12:41:14 -0700 Subject: [PATCH 17/28] refactor: logging --- target_bigquery/processhandler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 28ffddf..d6ca882 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -185,7 +185,7 @@ def handle_record_message(self, msg): nr = format_record_to_schema(nr, self.bq_schema_dicts[stream]) except Exception as e: extra={"record" : msg.record, "schema": schema, "bq_schema": bq_schema} - self.logger.info(f"Cannot format a record for stream {msg.stream} to its corresponding BigQuery schema. Details: {extra}") + self.logger.critical(f"Cannot format a record for stream {msg.stream} to its corresponding BigQuery schema. Details: {extra}") raise e # schema validation may fail if data doesn't match schema in terms of data types From 0e4e98a953bf25d46ab9e045fad7bdc9edb669e5 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Wed, 12 Jan 2022 12:48:14 -0700 Subject: [PATCH 18/28] test: CI/CD --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 35cfe73..6babdc5 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -26,7 +26,7 @@ env: on: push: - branches: [development, master, feature/incremental_sync] + branches: [development, master, feature/incremental_sync_and_schema_logging] pull_request: branches: [ master ] From 8cea24ac77efbf97d6b0c282fdb48bab2ac97096 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Wed, 12 Jan 2022 15:35:03 -0700 Subject: [PATCH 19/28] test: logging --- .../stream_format_record_to_schema_fails.json | 3 +++ tests/test_complexstream.py | 23 +++++++++++++++++++ 2 files changed, 26 insertions(+) create mode 100644 tests/rsc/data/stream_format_record_to_schema_fails.json diff --git a/tests/rsc/data/stream_format_record_to_schema_fails.json b/tests/rsc/data/stream_format_record_to_schema_fails.json new file mode 100644 index 0000000..aa96cc8 --- /dev/null +++ b/tests/rsc/data/stream_format_record_to_schema_fails.json @@ -0,0 +1,3 @@ +{ "type": "SCHEMA", "stream": "simple_stream", "schema": {"properties": {"id": {"type": ["string"]}, "name": {"type": ["null", "string"]}, "orderindex": {"type": ["null", "integer"]}, "override_statuses": {"type": ["null", "boolean"]}, "hidden": {"type": ["null", "boolean"]}, "space": {"properties": {"id": {"type": ["null", "string"]}, "name": {"type": ["null", "string"]}}, "type": "object"}, "task_count": {"type": ["null", "string", "integer"]}, "statuses": {"items": {}, "type": ["array", "null"]}, "lists": {"items": {}, "type": ["array", "null"]}, "archived": {"type": ["null", "boolean"]}, "permission_level": {"type": ["null", "string"]}}, "type": "object"}, "key_properties": [ "id" ], "bookmark_properties": [ "date" ] } +{ "type": "RECORD", "stream": "simple_stream", "record": {"id": "12933951", "name": "Milestone and Project Plan", "orderindex": 17, "override_statuses": "false", "hidden": "false", "space": {"id": "2577684", "name": "meshDelivery"}, "task_count": "10", "archived": "true", "statuses": [], "lists": [{"id": "25670974", "name": "POC ", "orderindex": 0, "status": "null", "priority": "null", "assignee": "null", "task_count": 10, "due_date": "null", "start_date": "null", "space": {"id": "2577684", "name": "meshDelivery", "access": "true"}, "archived": "false", "override_statuses": "null", "statuses": [{"id": "p2577684_eDZ87cTk", "status": "Open", "orderindex": 0, "color": "#d3d3d3", "type": "open"}, {"id": "p2577684_Sf8kB74x", "status": "planned", "orderindex": 1, "color": "#82CB11", "type": "custom"}, {"id": "p2577684_yG5b2doG", "status": "in progress", "orderindex": 2, "color": "#4194f6", "type": "custom"}, {"id": "p2577684_BZKpph7f", "status": "review", "orderindex": 3, "color": "#A875FF", "type": "custom"}, {"id": "p2577684_ouoISXPV", "status": "Closed", "orderindex": 4, "color": "#6bc950", "type": "closed"}], "permission_level": "create"}], "permission_level": "create"}, "time_extracted": "2020-07-14T22:21:35.098374Z" } +{"type": "STATE", "value": {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}, "time_extracted": "2020-07-14T22:21:35.098374Z" } \ No newline at end of file diff --git a/tests/test_complexstream.py b/tests/test_complexstream.py index d12fc1c..70ab49b 100644 --- a/tests/test_complexstream.py +++ b/tests/test_complexstream.py @@ -371,3 +371,26 @@ def test_misformed_complex_stream(self): print(state) self.assertEqual(ret, 0, msg="Exit code is not 0!") + + + def test_schema_logging(self): + + from target_bigquery import main + + self.set_cli_args( + stdin=os.path.join(os.path.join( + os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'), + 'data'), 'stream_format_record_to_schema_fails.json'), + config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target-config.json'), + processhandler="load-job", + ) + + ret = main() + + self.assertEqual(ret, 2, msg="Exit code is not 2!") + #TODO: test logging. + # I had an issue trying to do that with pytest logcapture - no logging was captured in a data load test + + + From 91e28bf396f332940df102959d14fc6c9a39eaa7 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Thu, 13 Jan 2022 15:41:53 -0700 Subject: [PATCH 20/28] docs: comments --- target_bigquery/processhandler.py | 2 ++ tests/test_complexstream.py | 19 +++++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index d6ca882..4e8a42b 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -264,6 +264,8 @@ def _do_temp_table_based_load(self, rows): incremental_success = False if self.incremental: self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by INCREMENTAL") + #TODO: reword the warning about this replication method + self.logger.warning(f"INCREMENTAL replication method might result in data loss because we are editing the production data during the sync operation") table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}" try: self.client.get_table(table_id) diff --git a/tests/test_complexstream.py b/tests/test_complexstream.py index 70ab49b..ba8d8e0 100644 --- a/tests/test_complexstream.py +++ b/tests/test_complexstream.py @@ -14,6 +14,12 @@ from decimal import Decimal import pandas as pd +import logging + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) +from testfixtures import log_capture + class TestComplexStreamLoadJob(unittestcore.BaseUnitTest): def test_klaviyo_stream(self): @@ -355,6 +361,8 @@ def test_misformed_complex_stream(self): } """ + logging.info("jake test") + from target_bigquery import main self.set_cli_args( @@ -372,11 +380,13 @@ def test_misformed_complex_stream(self): self.assertEqual(ret, 0, msg="Exit code is not 0!") - - def test_schema_logging(self): + @log_capture() + def test_schema_logging(self, logcapture): from target_bigquery import main + logger.info("Jake test") + self.set_cli_args( stdin=os.path.join(os.path.join( os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'), @@ -389,6 +399,11 @@ def test_schema_logging(self): ret = main() self.assertEqual(ret, 2, msg="Exit code is not 2!") + + expected_log = ('root', 'INFO', "Jake test") + + logcapture.check_present(expected_log, ) + #TODO: test logging. # I had an issue trying to do that with pytest logcapture - no logging was captured in a data load test From 636883130b421c5f4e3a26d054bf12fd4bde3ebb Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Thu, 20 Jan 2022 12:38:29 -0700 Subject: [PATCH 21/28] docs: TODO --- target_bigquery/processhandler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 4e8a42b..ec2b1a8 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -224,6 +224,8 @@ def primary_key_condition(self, stream): if len(keys) < 1: raise Exception(f"No primary keys specified from the tap and Incremental option selected") return " and ".join(keys) + #TODO: test it with multiple ids + def _do_temp_table_based_load(self, rows): assert isinstance(rows, dict) From 185bb6a4cdee0cf52a2f97ec44f97f30114c37ce Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Thu, 20 Jan 2022 12:40:55 -0700 Subject: [PATCH 22/28] docs: README warning about using incremental (MERGE) replication method --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 31c2f81..4c86021 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,7 @@ sample [target-config.json](/sample_config/target-config-exchange-rates-api.json * `truncate`: Deleting all previous rows and uploading the new ones to the table * `incremental`: **Upserting** new rows into the table, using the **primary key** given by the tap connector (if it finds an old row with same key, updates it. Otherwise it inserts the new row) + - WARNING: we do not recommend using `incremental` option as it might result in loss of production data. We recommend using `append` option instead which will preserve historical data. Sample **target-config.json** file: From 8675bc1e35012101c4690ec01b4bace369df5032 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Thu, 20 Jan 2022 12:46:30 -0700 Subject: [PATCH 23/28] docs: logging warning about using incremental (MERGE) replication method --- target_bigquery/processhandler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index ec2b1a8..de36f36 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -267,7 +267,7 @@ def _do_temp_table_based_load(self, rows): if self.incremental: self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by INCREMENTAL") #TODO: reword the warning about this replication method - self.logger.warning(f"INCREMENTAL replication method might result in data loss because we are editing the production data during the sync operation") + self.logger.warning(f"INCREMENTAL replication method might result in data loss because we are editing the production data during the sync operation. We recommend that you use APPEND target-bigquery replication instead.") table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}" try: self.client.get_table(table_id) From 5129cf31e0c0536381bdeb8b7fa12d8ac0ee649f Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Thu, 20 Jan 2022 18:16:25 -0700 Subject: [PATCH 24/28] docs: TODO --- target_bigquery/processhandler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index de36f36..620bc76 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -224,8 +224,8 @@ def primary_key_condition(self, stream): if len(keys) < 1: raise Exception(f"No primary keys specified from the tap and Incremental option selected") return " and ".join(keys) - #TODO: test it with multiple ids - + #TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema) + #TODO: test it with dupe ids in the data def _do_temp_table_based_load(self, rows): assert isinstance(rows, dict) From 75bc145c575977a402b710d22f6b7e0dc5774efe Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Thu, 20 Jan 2022 18:18:03 -0700 Subject: [PATCH 25/28] test: logging check removed from sync unit test --- tests/test_complexstream.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_complexstream.py b/tests/test_complexstream.py index ba8d8e0..3574fd1 100644 --- a/tests/test_complexstream.py +++ b/tests/test_complexstream.py @@ -402,8 +402,7 @@ def test_schema_logging(self, logcapture): expected_log = ('root', 'INFO', "Jake test") - logcapture.check_present(expected_log, ) - + # logcapture.check_present(expected_log, ) # this fails for now #TODO: test logging. # I had an issue trying to do that with pytest logcapture - no logging was captured in a data load test From 5f9dd8973463710c398dec2e11cab9fbbe4afadb Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Mon, 24 Jan 2022 17:36:09 -0700 Subject: [PATCH 26/28] test: logging --- tests/test_complexstream.py | 78 ++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 19 deletions(-) diff --git a/tests/test_complexstream.py b/tests/test_complexstream.py index 3574fd1..620a86f 100644 --- a/tests/test_complexstream.py +++ b/tests/test_complexstream.py @@ -15,10 +15,8 @@ import pandas as pd import logging +from testfixtures import LogCapture -logging.basicConfig(level=logging.DEBUG) -logger = logging.getLogger(__name__) -from testfixtures import log_capture class TestComplexStreamLoadJob(unittestcore.BaseUnitTest): @@ -215,10 +213,9 @@ def test_complex_stream_decimal_schema_valid(self): try: test_field = bq_schemas_dict[stream][7] except: - stream = stream+"_dev" + stream = stream + "_dev" test_field = bq_schemas_dict[stream][7] - assert test_field.name == "budget_remaining" assert test_field.field_type in ["NUMERIC", "DECIMAL"] # NUMERIC is the same as DECIMAL assert test_field.precision == 32 @@ -230,8 +227,8 @@ def test_complex_stream_decimal_schema_valid(self): dataframe = ( bq_client.query(query_string) - .result() - .to_dataframe() + .result() + .to_dataframe() ) actual = dataframe["budget_remaining"] expected = pd.Series([Decimal('2450980'), Decimal('2450980'), Decimal('5000000.1'), @@ -361,7 +358,6 @@ def test_misformed_complex_stream(self): } """ - logging.info("jake test") from target_bigquery import main @@ -380,19 +376,19 @@ def test_misformed_complex_stream(self): self.assertEqual(ret, 0, msg="Exit code is not 0!") - @log_capture() - def test_schema_logging(self, logcapture): - from target_bigquery import main + def test_schema_error(self): - logger.info("Jake test") + from target_bigquery import main self.set_cli_args( stdin=os.path.join(os.path.join( - os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'), + os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), + 'rsc'), 'data'), 'stream_format_record_to_schema_fails.json'), - config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), - 'target-config.json'), + config=os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target-config.json'), processhandler="load-job", ) @@ -400,11 +396,55 @@ def test_schema_logging(self, logcapture): self.assertEqual(ret, 2, msg="Exit code is not 2!") - expected_log = ('root', 'INFO', "Jake test") + def test_schema_logging(self): - # logcapture.check_present(expected_log, ) # this fails for now - #TODO: test logging. - # I had an issue trying to do that with pytest logcapture - no logging was captured in a data load test + """ + Test logging as part of this pull request QA: https://github.com/adswerve/target-bigquery/pull/27 + # Feature/improve schema logging #27 + About testing logging: + https://testfixtures.readthedocs.io/en/latest/logging.html + https://stackoverflow.com/questions/13733552/logger-configuration-to-log-to-file-and-print-to-stdout + """ + from target_bigquery import main, logger + + with LogCapture() as actual_logs: + # make sure logs are displayed during local testing in console + logFormatter = logging.Formatter("%(levelname)s %(message)s") + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(logFormatter) + logger.addHandler(consoleHandler) + + # test log + logger.info("unit test starts") + + # run sync + self.set_cli_args( + stdin=os.path.join(os.path.join( + os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), + 'rsc'), + 'data'), 'stream_format_record_to_schema_fails.json'), + config=os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target-config.json'), + processhandler="load-job", + ) + + ret = main() + + # verify that sync did not succeed + self.assertEqual(ret, 2, msg="Exit code is not 2!") + # test log + logger.info("unit test ends") + + # verify logs + actual_logs.check_present( + ('root', 'INFO', "unit test starts"), + ('root', 'INFO', + "simple_stream schema: {'properties': {'id': {'type': ['string']}, 'name': {'type': ['null', 'string']}, 'orderindex': {'type': ['null', 'integer']}, 'override_statuses': {'type': ['null', 'boolean']}, 'hidden': {'type': ['null', 'boolean']}, 'space': {'properties': {'id': {'type': ['null', 'string']}, 'name': {'type': ['null', 'string']}}, 'type': 'object'}, 'task_count': {'type': ['null', 'string', 'integer']}, 'statuses': {'items': {}, 'type': ['array', 'null']}, 'lists': {'items': {}, 'type': ['array', 'null']}, 'archived': {'type': ['null', 'boolean']}, 'permission_level': {'type': ['null', 'string']}}, 'type': 'object'}"), + ('root', 'CRITICAL', + "Cannot format a record for stream simple_stream to its corresponding BigQuery schema. Details: {'record': {'id': '12933951', 'name': 'Milestone and Project Plan', 'orderindex': 17, 'override_statuses': 'false', 'hidden': 'false', 'space': {'id': '2577684', 'name': 'meshDelivery'}, 'task_count': '10', 'archived': 'true', 'statuses': [], 'lists': [{'id': '25670974', 'name': 'POC ', 'orderindex': 0, 'status': 'null', 'priority': 'null', 'assignee': 'null', 'task_count': 10, 'due_date': 'null', 'start_date': 'null', 'space': {'id': '2577684', 'name': 'meshDelivery', 'access': 'true'}, 'archived': 'false', 'override_statuses': 'null', 'statuses': [{'id': 'p2577684_eDZ87cTk', 'status': 'Open', 'orderindex': 0, 'color': '#d3d3d3', 'type': 'open'}, {'id': 'p2577684_Sf8kB74x', 'status': 'planned', 'orderindex': 1, 'color': '#82CB11', 'type': 'custom'}, {'id': 'p2577684_yG5b2doG', 'status': 'in progress', 'orderindex': 2, 'color': '#4194f6', 'type': 'custom'}, {'id': 'p2577684_BZKpph7f', 'status': 'review', 'orderindex': 3, 'color': '#A875FF', 'type': 'custom'}, {'id': 'p2577684_ouoISXPV', 'status': 'Closed', 'orderindex': 4, 'color': '#6bc950', 'type': 'closed'}], 'permission_level': 'create'}], 'permission_level': 'create'}, 'schema': {'properties': {'id': {'type': ['string']}, 'name': {'type': ['null', 'string']}, 'orderindex': {'type': ['null', 'integer']}, 'override_statuses': {'type': ['null', 'boolean']}, 'hidden': {'type': ['null', 'boolean']}, 'space': {'properties': {'id': {'type': ['null', 'string']}, 'name': {'type': ['null', 'string']}}, 'type': 'object'}, 'task_count': {'type': ['null', 'string', 'integer']}, 'statuses': {'items': {}, 'type': ['array', 'null']}, 'lists': {'items': {}, 'type': ['array', 'null']}, 'archived': {'type': ['null', 'boolean']}, 'permission_level': {'type': ['null', 'string']}}, 'type': 'object'}, 'bq_schema': {'id': {'type': 'STRING', 'mode': 'REQUIRED', 'policyTags': {'names': []}}, 'name': {'type': 'STRING', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'policyTags': {'names': []}}, 'orderindex': {'type': 'INTEGER', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'policyTags': {'names': []}}, 'override_statuses': {'type': 'BOOLEAN', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'policyTags': {'names': []}}, 'hidden': {'type': 'BOOLEAN', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'policyTags': {'names': []}}, 'space': {'type': 'RECORD', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'fields': {'id': {'type': 'STRING', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'policyTags': {'names': []}}, 'name': {'type': 'STRING', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'policyTags': {'names': []}}}}, 'task_count': {'type': 'STRING', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'policyTags': {'names': []}}, 'statuses': {'type': 'RECORD', 'mode': 'REPEATED', 'precision': None, 'scale': None, 'fields': []}, 'lists': {'type': 'RECORD', 'mode': 'REPEATED', 'precision': None, 'scale': None, 'fields': []}, 'archived': {'type': 'BOOLEAN', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'policyTags': {'names': []}}, 'permission_level': {'type': 'STRING', 'mode': 'NULLABLE', 'precision': None, 'scale': None, 'policyTags': {'names': []}}, '_time_extracted': {'type': 'timestamp', 'mode': 'NULLABLE', 'policyTags': {'names': []}}, '_time_loaded': {'type': 'timestamp', 'mode': 'NULLABLE', 'policyTags': {'names': []}}}}"), + ('root', 'INFO', "unit test ends") + ) From f3eb383cc7a9621d0c0cf5a38b357a3d41166b56 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Mon, 24 Jan 2022 17:38:37 -0700 Subject: [PATCH 27/28] test: logging --- tests/test_complexstream.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/test_complexstream.py b/tests/test_complexstream.py index 620a86f..6901e34 100644 --- a/tests/test_complexstream.py +++ b/tests/test_complexstream.py @@ -411,10 +411,14 @@ def test_schema_logging(self): with LogCapture() as actual_logs: # make sure logs are displayed during local testing in console - logFormatter = logging.Formatter("%(levelname)s %(message)s") - consoleHandler = logging.StreamHandler() - consoleHandler.setFormatter(logFormatter) - logger.addHandler(consoleHandler) + # make sure unit test logs are in the same format as what we see during the sync + log_formatter = logging.Formatter("%(levelname)s %(message)s") + console_handler = logging.StreamHandler() + console_handler.setFormatter(log_formatter) + logger.addHandler(console_handler) + + # set level + console_handler.setLevel(logging.INFO) # test log logger.info("unit test starts") From 67ac76cecbb4c799ec3a61bbbb7f194624d72359 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Mon, 24 Jan 2022 17:57:40 -0700 Subject: [PATCH 28/28] docs: warnings about risks of using incremental (MERGE) replication method --- README.md | 2 +- target_bigquery/processhandler.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 4c86021..19aae79 100644 --- a/README.md +++ b/README.md @@ -153,7 +153,7 @@ sample [target-config.json](/sample_config/target-config-exchange-rates-api.json * `truncate`: Deleting all previous rows and uploading the new ones to the table * `incremental`: **Upserting** new rows into the table, using the **primary key** given by the tap connector (if it finds an old row with same key, updates it. Otherwise it inserts the new row) - - WARNING: we do not recommend using `incremental` option as it might result in loss of production data. We recommend using `append` option instead which will preserve historical data. + - WARNING: We do not recommend using `incremental` option (which uses `MERGE` SQL statement). It might result in loss of production data, because historical records get updated. Instead, we recommend using the `append` replication method, which will preserve historical data. Sample **target-config.json** file: diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 620bc76..16f2e96 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -266,8 +266,7 @@ def _do_temp_table_based_load(self, rows): incremental_success = False if self.incremental: self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by INCREMENTAL") - #TODO: reword the warning about this replication method - self.logger.warning(f"INCREMENTAL replication method might result in data loss because we are editing the production data during the sync operation. We recommend that you use APPEND target-bigquery replication instead.") + self.logger.warning(f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.") table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}" try: self.client.get_table(table_id)