diff --git a/dev-project/.env b/dev-project/.env index 670d647bf..75426f8ea 100644 --- a/dev-project/.env +++ b/dev-project/.env @@ -109,4 +109,6 @@ TARGET_REDSHIFT_S3_ACL= # Please add real credentials otherwise the related tests will be ignored. # ------------------------------------------------------------------------------ TARGET_BIGQUERY_PROJECT= +TARGET_BIGQUERY_GCS_BUCKET= +TARGET_BIGQUERY_GCS_KEY_PREFIX= GOOGLE_APPLICATION_CREDENTIALS= diff --git a/pipelinewise/cli/config.py b/pipelinewise/cli/config.py index 5e31ce259..86310044e 100644 --- a/pipelinewise/cli/config.py +++ b/pipelinewise/cli/config.py @@ -458,6 +458,12 @@ def generate_inheritable_config(self, tap: Dict) -> Dict: 'archive_load_files_s3_prefix': tap.get( 'archive_load_files_s3_prefix', None ), + 'archive_load_files_gcs_bucket': tap.get( + 'archive_load_files_gcs_bucket', None + ), + 'archive_load_files_gcs_prefix': tap.get( + 'archive_load_files_gcs_prefix', None + ), } ) diff --git a/pipelinewise/cli/schemas/target.json b/pipelinewise/cli/schemas/target.json index 4c29c4198..ee5e6a13b 100644 --- a/pipelinewise/cli/schemas/target.json +++ b/pipelinewise/cli/schemas/target.json @@ -180,10 +180,22 @@ }, "dataset_id": { "type": "string" + }, + "gcs_bucket": { + "type": "string" + }, + "gcs_key_prefix": { + "type": "string" + }, + "gcs_parallelism": { + "type": "integer", + "minimum": 1, + "maximum": 1000 } }, "required": [ - "project_id" + "project_id", + "gcs_bucket" ] } }, diff --git a/pipelinewise/fastsync/commons/tap_mongodb.py b/pipelinewise/fastsync/commons/tap_mongodb.py index 1a6c951a8..6bfcf8038 100644 --- a/pipelinewise/fastsync/commons/tap_mongodb.py +++ b/pipelinewise/fastsync/commons/tap_mongodb.py @@ -244,6 +244,7 @@ def copy_table( split_file_chunk_size_mb=1000, split_file_max_chunks=20, compress=True, + include_header=True, ): """ Export data from table to a zipped csv @@ -286,8 +287,8 @@ def copy_table( quotechar='"', quoting=csv.QUOTE_MINIMAL, ) - - writer.writeheader() + if include_header: + writer.writeheader() rows = [] LOGGER.info('Starting data processing...') diff --git a/pipelinewise/fastsync/commons/tap_s3_csv.py b/pipelinewise/fastsync/commons/tap_s3_csv.py index 772982a17..91dfe7cd9 100644 --- a/pipelinewise/fastsync/commons/tap_s3_csv.py +++ b/pipelinewise/fastsync/commons/tap_s3_csv.py @@ -20,6 +20,7 @@ from singer.utils import strptime_with_tz from singer_encodings import csv as singer_encodings_csv +from . import split_gzip from .utils import retry_pattern from ...utils import safe_column_name @@ -73,7 +74,7 @@ def _find_table_spec_by_name(self, table_name: str) -> Dict: ) ) - def copy_table(self, table_name: str, file_path: str) -> None: + def copy_table(self, table_name: str, file_path: str, compress=True, include_header=True) -> None: """ Copies data from all csv files that match the search_pattern and into the csv file in file_path :param table_name: Name of the table @@ -119,18 +120,25 @@ def copy_table(self, table_name: str, file_path: str) -> None: self.tables_last_modified[table_name] = max_last_modified # write to the given compressed csv file - with gzip.open(file_path, 'wt') as gzfile: - + gzip_splitter = split_gzip.open( + file_path, + mode='wt', + chunk_size_mb=split_gzip.DEFAULT_CHUNK_SIZE_MB, + max_chunks=0, + compress=compress, + ) + with gzip_splitter as split_gzip_files: writer = csv.DictWriter( - gzfile, + split_gzip_files, fieldnames=sorted(list(headers)), # we need to sort the headers so that copying into snowflake works delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL, ) - # write the header - writer.writeheader() + if include_header: + # write the header + writer.writeheader() # write all records at once writer.writerows(records) diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index aa5a10a1e..71499d239 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -1,9 +1,12 @@ +import multiprocessing.pool +import os import logging import json import re from typing import List, Dict from google.cloud import bigquery +from google.cloud import storage from google.api_core import exceptions from .transform_utils import TransformationHelper, SQLFlavor @@ -37,6 +40,9 @@ class FastSyncTargetBigquery: def __init__(self, connection_config, transformation_config=None): self.connection_config = connection_config self.transformation_config = transformation_config + self.gcs = storage.Client( + project=self.connection_config['project_id'], + ) def open_connection(self): project_id = self.connection_config['project_id'] @@ -78,6 +84,87 @@ def to_query_parameter(value): return query_job + def upload_to_gcs(self, file: str) -> storage.Blob: + bucket_name = self.connection_config['gcs_bucket'] + key_prefix = self.connection_config.get('gcs_key_prefix', '') + key = '{}{}'.format(key_prefix, os.path.basename(file)) + + LOGGER.info( + 'Uploading to GCS bucket: %s, local file: %s, GCS key: %s', + bucket_name, + file, + key, + ) + + # Upload to GCS + bucket = self.gcs.get_bucket(bucket_name) + blob = bucket.blob(key) + blob.upload_from_filename(file) + + return blob + + def multi_upload_to_gcs(self, files: List[str]) -> List[storage.Blob]: + bucket_name = self.connection_config['gcs_bucket'] + key_prefix = self.connection_config.get('gcs_key_prefix', '') + key = '{}{}'.format( + key_prefix, + os.path.splitext(os.path.basename(files[0]))[0] + ) + LOGGER.info( + 'Uploading to GCS bucket: %s, %s local files, GCS key prefix: %s', + bucket_name, + len(files), + key, + ) + gcs_parallelism = self.connection_config.get('gcs_parallelism', 1) + with multiprocessing.pool.ThreadPool(gcs_parallelism) as proc: + gcs_blobs = proc.map(self.upload_to_gcs, files) + return gcs_blobs + + def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> storage.Blob: + """Copy load file to archive folder with metadata added""" + table_dict = utils.tablename_to_dict(table) + archive_table = table_dict.get('table_name') + archive_schema = table_dict.get('schema_name', '') + + # Retain same filename + archive_file_basename = os.path.basename(blob.name) + + # Get archive GCS prefix from config, defaulting to 'archive' if not specified + archive_prefix = self.connection_config.get( + 'archive_load_files_gcs_prefix', 'archive' + ) + + source_bucket = blob.bucket + + # Get archive GCS bucket from config, defaulting to same bucket used for Snowflake imports if not specified + archive_bucket = self.gcs.get_bucket( + self.connection_config.get( + 'archive_load_files_gcs_bucket', source_bucket.name + ) + ) + + archive_blob_name = f'{archive_prefix}/{tap_id}/{archive_table}/{archive_file_basename}' + source_blob_name = f'{blob.bucket}/{blob.name}' + LOGGER.info('Archiving %s to %s', source_blob_name, archive_blob_name) + + source_bucket.copy_blob(blob, archive_bucket, new_name=archive_blob_name) + + # Combine existing metadata with archive related headers + archive_blob = archive_bucket.get_blob(archive_blob_name) + new_metadata = {} if blob.metadata is None else blob.metadata + new_metadata.update( + { + 'tap': tap_id, + 'schema': archive_schema, + 'table': archive_table, + 'archived-by': 'pipelinewise_fastsync', + } + ) + archive_blob.metadata = new_metadata + archive_blob.patch() + return archive_blob + def create_schema(self, schema_name): temp_schema = self.connection_config.get('temp_schema', schema_name) @@ -147,19 +234,19 @@ def create_table( self.query(sql) - # pylint: disable=R0913,R0914 + # pylint: disable=too-many-locals def copy_to_table( self, - filepath, - target_schema, - table_name, - size_bytes, - is_temporary, - skip_csv_header=False, - allow_quoted_newlines=True, - write_truncate=True, + blobs: List[storage.Blob], + target_schema: str, + table_name: str, + size_bytes: int, + is_temporary: bool, + skip_csv_header: bool = False, + allow_quoted_newlines: bool = True, + write_truncate: bool = True, ): - LOGGER.info('BIGQUERY - Loading %s into Bigquery...', filepath) + LOGGER.info('BIGQUERY - Loading into Bigquery...') table_dict = utils.tablename_to_dict(table_name) target_table = safe_name( table_dict.get( @@ -172,6 +259,7 @@ def copy_to_table( dataset_ref = client.dataset(target_schema) table_ref = dataset_ref.table(target_table) table_schema = client.get_table(table_ref).schema + job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.CSV job_config.schema = table_schema @@ -180,10 +268,11 @@ def copy_to_table( ) job_config.allow_quoted_newlines = allow_quoted_newlines job_config.skip_leading_rows = 1 if skip_csv_header else 0 - with open(filepath, 'rb') as exported_data: - job = client.load_table_from_file( - exported_data, table_ref, job_config=job_config - ) + + uris = [f'gs://{blob.bucket.name}/{blob.name}' for blob in blobs] + job = client.load_table_from_uri( + source_uris=uris, destination=table_ref, job_config=job_config + ) try: job.result() except exceptions.BadRequest as exc: diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py index c82dd3cbc..046f0e121 100644 --- a/pipelinewise/fastsync/mongodb_to_bigquery.py +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -25,7 +25,7 @@ 'auth_database', 'dbname', ], - 'target': ['project_id'], + 'target': ['project_id', 'gcs_bucket'], } LOCK = multiprocessing.Lock() @@ -48,6 +48,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: """Sync one table""" mongodb = FastSyncTapMongoDB(args.tap, tap_type_to_target_type) bigquery = FastSyncTargetBigquery(args.target, args.transform) + tap_id = args.target.get('tap_id') + archive_load_files = args.target.get('archive_load_files', False) try: dbname = args.tap.get('dbname') @@ -66,26 +68,31 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: ) # Exporting table data, get table definitions and close connection to avoid timeouts - mongodb.copy_table(table, filepath, args.temp_dir) + mongodb.copy_table(table, filepath, args.temp_dir, compress=False, include_header=False) size_bytes = os.path.getsize(filepath) bigquery_types = mongodb.map_column_types_to_target() bigquery_columns = bigquery_types.get('columns', []) mongodb.close_connection() + # Uploading to GCS + gcs_blob = bigquery.upload_to_gcs(filepath) + os.remove(filepath) + # Creating temp table in Bigquery bigquery.create_schema(target_schema) bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True) # Load into Bigquery table bigquery.copy_to_table( - filepath, - target_schema, - table, - size_bytes, - is_temporary=True, - skip_csv_header=True, + [gcs_blob], target_schema, table, size_bytes, is_temporary=True, ) - os.remove(filepath) + + if archive_load_files: + # Copy load file to archive + bigquery.copy_to_archive(gcs_blob, tap_id, table) + + # Delete all file parts from s3 + gcs_blob.delete() # Obfuscate columns bigquery.obfuscate_columns(target_schema, table) diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py index b8529f39d..ccd3a44e1 100644 --- a/pipelinewise/fastsync/mysql_to_bigquery.py +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -22,6 +22,7 @@ 'tap': ['host', 'port', 'user', 'password'], 'target': [ 'project_id', + 'gcs_bucket', ], } @@ -68,6 +69,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: """Sync one table""" mysql = FastSyncTapMySql(args.tap, tap_type_to_target_type, target_quote='`') bigquery = FastSyncTargetBigquery(args.target, args.transform) + tap_id = args.target.get('tap_id') + archive_load_files = args.target.get('archive_load_files', False) try: filename = 'pipelinewise_fastsync_{}_{}.csv'.format( @@ -84,30 +87,43 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Exporting table data, get table definitions and close connection to avoid timeouts mysql.copy_table( - table, filepath, compress=False, max_num=MAX_NUM, date_type='datetime' + table, + filepath, + compress=False, + max_num=MAX_NUM, + date_type='datetime', + split_large_files=args.target.get('split_large_files'), + split_file_chunk_size_mb=args.target.get('split_file_chunk_size_mb'), + split_file_max_chunks=args.target.get('split_file_max_chunks'), ) + file_parts = glob.glob(f'{filepath}*') size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) bigquery_types = mysql.map_column_types_to_target(table) bigquery_columns = bigquery_types.get('columns', []) mysql.close_connections() + # Uploading to GCS + gcs_blobs = bigquery.multi_upload_to_gcs(file_parts) + for file_part in file_parts: + os.remove(file_part) + # Creating temp table in Bigquery bigquery.create_schema(target_schema) bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True) # Load into Bigquery table - for num, file_part in enumerate(file_parts): - write_truncate = num == 0 - bigquery.copy_to_table( - filepath, - target_schema, - table, - size_bytes, - is_temporary=True, - write_truncate=write_truncate, - ) - os.remove(file_part) + bigquery.copy_to_table( + gcs_blobs, target_schema, table, size_bytes, is_temporary=True, + ) + + for blob in gcs_blobs: + if archive_load_files: + # Copy load file to archive + bigquery.copy_to_archive(blob, tap_id, table) + + # Delete all file parts from s3 + blob.delete() # Obfuscate columns bigquery.obfuscate_columns(target_schema, table) diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py index 0539c91b7..9f44c81a2 100644 --- a/pipelinewise/fastsync/postgres_to_bigquery.py +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -20,7 +20,7 @@ REQUIRED_CONFIG_KEYS = { 'tap': ['host', 'port', 'user', 'password'], - 'target': ['project_id'], + 'target': ['project_id', 'gcs_bucket'], } LOCK = multiprocessing.Lock() @@ -68,6 +68,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: """Sync one table""" postgres = FastSyncTapPostgres(args.tap, tap_type_to_target_type, target_quote='`') bigquery = FastSyncTargetBigquery(args.target, args.transform) + tap_id = args.target.get('tap_id') + archive_load_files = args.target.get('archive_load_files', False) try: dbname = args.tap.get('dbname') @@ -87,7 +89,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Exporting table data, get table definitions and close connection to avoid timeouts postgres.copy_table( - table, filepath, compress=False, max_num=MAX_NUM, date_type='timestamp' + table, + filepath, + compress=False, + max_num=MAX_NUM, + date_type='timestamp', + split_large_files=args.target.get('split_large_files'), + split_file_chunk_size_mb=args.target.get('split_file_chunk_size_mb'), + split_file_max_chunks=args.target.get('split_file_max_chunks'), ) file_parts = glob.glob(f'{filepath}*') size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) @@ -96,22 +105,27 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery_columns = bigquery_types.get('columns', []) postgres.close_connection() + # Uploading to GCS + gcs_blobs = bigquery.multi_upload_to_gcs(file_parts) + for file_part in file_parts: + os.remove(file_part) + # Creating temp table in Bigquery bigquery.create_schema(target_schema) bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True) # Load into Bigquery table - for num, file_part in enumerate(file_parts): - write_truncate = num == 0 - bigquery.copy_to_table( - filepath, - target_schema, - table, - size_bytes, - is_temporary=True, - write_truncate=write_truncate, - ) - os.remove(file_part) + bigquery.copy_to_table( + gcs_blobs, target_schema, table, size_bytes, is_temporary=True, + ) + + for blob in gcs_blobs: + if archive_load_files: + # Copy load file to archive + bigquery.copy_to_archive(blob, tap_id, table) + + # Delete all file parts from s3 + blob.delete() # Obfuscate columns bigquery.obfuscate_columns(target_schema, table) diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py index 995c90017..2c379075f 100644 --- a/pipelinewise/fastsync/s3_csv_to_bigquery.py +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -19,6 +19,7 @@ 'tap': ['bucket', 'start_date'], 'target': [ 'project_id', + 'gcs_bucket', ], } @@ -38,10 +39,13 @@ def tap_type_to_target_type(csv_type, *_): }.get(csv_type, 'STRING') +# pylint: disable=too-many-locals def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: """Sync one table""" s3_csv = FastSyncTapS3Csv(args.tap, tap_type_to_target_type, target_quote='`') bigquery = FastSyncTargetBigquery(args.target, args.transform) + tap_id = args.target.get('tap_id') + archive_load_files = args.target.get('archive_load_files', False) try: filename = utils.gen_export_filename( @@ -51,9 +55,13 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: target_schema = utils.get_target_schema(args.target, table_name) - s3_csv.copy_table(table_name, filepath) + s3_csv.copy_table(table_name, filepath, compress=False, include_header=False) size_bytes = os.path.getsize(filepath) + # Uploading to GCS + gcs_blob = bigquery.upload_to_gcs(filepath) + os.remove(filepath) + bigquery_types = s3_csv.map_column_types_to_target(filepath, table_name) bigquery_columns = bigquery_types.get('columns', []) @@ -69,14 +77,15 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: # Load into Bigquery table bigquery.copy_to_table( - filepath, - target_schema, - table_name, - size_bytes, - is_temporary=True, - skip_csv_header=True, + [gcs_blob], target_schema, table_name, size_bytes, is_temporary=True, ) - os.remove(filepath) + + if archive_load_files: + # Copy load file to archive + bigquery.copy_to_archive(gcs_blob, tap_id, table_name) + + # Delete all file parts from s3 + gcs_blob.delete() # Obfuscate columns bigquery.obfuscate_columns(target_schema, table_name) diff --git a/setup.py b/setup.py index cb8a05948..46ae72eee 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ 'psycopg2-binary==2.8.6', 'snowflake-connector-python[pandas]==2.4.6', 'google-cloud-bigquery==2.31.0', + 'google-cloud-storage==2.1.0', 'pipelinewise-singer-python==1.*', 'singer-encodings==0.0.*', 'messytables==0.15.*', diff --git a/tests/end_to_end/helpers/env.py b/tests/end_to_end/helpers/env.py index 2841454bd..3872efb24 100644 --- a/tests/end_to_end/helpers/env.py +++ b/tests/end_to_end/helpers/env.py @@ -220,6 +220,8 @@ def _load_env(self): 'template_patterns': ['target_bigquery', 'to_bq'], 'vars': { 'PROJECT': {'value': os.environ.get('TARGET_BIGQUERY_PROJECT')}, + 'GCS_BUCKET': {'value': os.environ.get('TARGET_BIGQUERY_GCS_BUCKET')}, + 'GCS_KEY_PREFIX': {'value': os.environ.get('TARGET_BIGQUERY_GCS_KEY_PREFIX')}, }, }, # ------------------------------------------------------------------ diff --git a/tests/end_to_end/test-project/tap_postgres_to_bq_archive_load_files.yml.template b/tests/end_to_end/test-project/tap_postgres_to_bq_archive_load_files.yml.template new file mode 100644 index 000000000..d3136feb1 --- /dev/null +++ b/tests/end_to_end/test-project/tap_postgres_to_bq_archive_load_files.yml.template @@ -0,0 +1,62 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "postgres_to_bq_archive_load_files" +name: "PostgreSQL source test database" +type: "tap-postgres" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - PostgreSQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_POSTGRES_HOST}" # PostgreSQL host + logical_poll_total_seconds: 3 # Time out if no LOG_BASED changes received for 3 seconds + port: ${TAP_POSTGRES_PORT} # PostgreSQL port + user: "${TAP_POSTGRES_USER}" # PostgreSQL user + password: "${TAP_POSTGRES_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_POSTGRES_DB}" # PostgreSQL database name + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "bigquery" # ID of the target connector where the data will be loaded +batch_size_rows: 1000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes +archive_load_files: True # Archive the load files in dedicated S3 folder +archive_load_files_gcs_prefix: archive_folder # Archive folder + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + + ### SOURCE SCHEMA 1: public + - source_schema: "public" + target_schema: "ppw_e2e_tap_postgres${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + + ### Table with INCREMENTAL replication + - table_name: "city" + replication_method: "INCREMENTAL" + replication_key: "id" + + ### Table with FULL_TABLE replication + - table_name: "country" + replication_method: "FULL_TABLE" + + ### SOURCE SCHEMA 2: public2 + - source_schema: "public2" + target_schema: "ppw_e2e_tap_postgres_public2${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + ### Table with FULL_TABLE replication + - table_name: "wearehere" + replication_method: "FULL_TABLE" \ No newline at end of file diff --git a/tests/end_to_end/test-project/target_bigquery.yml.template b/tests/end_to_end/test-project/target_bigquery.yml.template index d3167d734..de4970637 100644 --- a/tests/end_to_end/test-project/target_bigquery.yml.template +++ b/tests/end_to_end/test-project/target_bigquery.yml.template @@ -13,3 +13,8 @@ type: "target-bigquery" # !! THIS SHOULD NOT CHANGE !! # ------------------------------------------------------------------------------ db_conn: project_id: "${TARGET_BIGQUERY_PROJECT}" # Bigquery account + + # We use an intermediate external stage on GCS to load data into BigQuery + gcs_bucket: "${TARGET_BIGQUERY_GCS_BUCKET}" # GCS external staging bucket name + gcs_key_prefix: "${TARGET_BIGQUERY_GCS_KEY_PREFIX}" # Optional: GCS key prefix + gcs_parallelism: 1 # Optional: (Default: 1) diff --git a/tests/end_to_end/test_target_bigquery.py b/tests/end_to_end/test_target_bigquery.py index 43c08b5fa..93ab80b0b 100644 --- a/tests/end_to_end/test_target_bigquery.py +++ b/tests/end_to_end/test_target_bigquery.py @@ -3,10 +3,12 @@ import uuid from datetime import datetime, timezone from random import randint +import tempfile import bson import pytest from bson import Timestamp +from google.cloud import storage from pipelinewise.fastsync import mysql_to_bigquery, postgres_to_bigquery from .helpers import tasks @@ -19,6 +21,7 @@ TAP_MARIADB_BUFFERED_STREAM_ID = 'mariadb_to_bq_buffered_stream' TAP_POSTGRES_ID = 'postgres_to_bq' TAP_POSTGRES_SPLIT_LARGE_FILES_ID = 'postgres_to_bq_split_large_files' +TAP_POSTGRES_ARCHIVE_LOAD_FILES_ID = 'postgres_to_bq_archive_load_files' TAP_MONGODB_ID = 'mongo_to_bq' TAP_S3_CSV_ID = 's3_csv_to_bq' TARGET_ID = 'bigquery' @@ -317,6 +320,66 @@ def test_resync_pg_to_bq_with_split_large_files( postgres_to_bigquery.tap_type_to_target_type, ) + # pylint: disable=invalid-name,too-many-locals + @pytest.mark.dependency(depends=['import_config']) + def test_replicate_pg_to_bq_with_archive_load_files(self): + """Fastsync tables from Postgres to Bigquery with archive load files enabled""" + archive_prefix = 'archive_folder' # Custom gcs prefix defined in TAP_POSTGRES_ARCHIVE_LOAD_FILES_ID + tap = 'postgres_to_bq_archive_load_files' + + client = storage.Client() + bucket_name = os.environ.get('TARGET_BIGQUERY_GCS_BUCKET') + bucket = client.get_bucket(bucket_name) + + # Delete any dangling files from archive + dangling_blobs = client.list_blobs( + bucket, prefix=f'{archive_prefix}/{tap}/' + ) + for blob in dangling_blobs: + blob.delete() + + # Run tap + assertions.assert_run_tap_success( + TAP_POSTGRES_ARCHIVE_LOAD_FILES_ID, TARGET_ID, ['fastsync', 'singer'] + ) + + # Assert expected files in archive folder + expected_archive_files_count = { + 'public.city': 1, # INCREMENTAL: fastsync and singer (but currently only fastsync archives) + 'public.country': 1, # FULL_TABLE : fastsync only + 'public2.wearehere': 1, # FULL_TABLE : fastsync only + } + for schema_table, expected_archive_files in expected_archive_files_count.items(): + schema, table = schema_table.split('.') + blobs_in_archive = list(client.list_blobs( + bucket, + prefix=f'{archive_prefix}/{tap}/{table}' + )) + + assert len(blobs_in_archive) == expected_archive_files + + # Assert expected metadata + expected_metadata = { + 'tap': tap, + 'schema': schema, + 'table': table, + 'archived-by': 'pipelinewise_fastsync', + } + for blob in blobs_in_archive: + assert blob.metadata == expected_metadata + + # Assert expected file contents + with tempfile.NamedTemporaryFile() as tmpfile: + blob.download_to_filename(tmpfile.name) + with open(tmpfile.name, 'rt', encoding='utf-8') as f: + rows_in_csv = len(f.readlines()) + + rows_in_table = self.run_query_tap_postgres( + 'SELECT COUNT(1) FROM {}'.format(schema_table) + )[0][0] + + assert rows_in_table == rows_in_csv + @pytest.mark.dependency(depends=['import_config']) def test_replicate_s3_to_bq(self): """Replicate csv files from s3 to Bigquery, check if return code is zero and success log file created""" diff --git a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py index d82bbe2c9..e5427cc74 100644 --- a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py +++ b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py @@ -1,24 +1,13 @@ import pytest -from unittest.mock import Mock, patch, ANY, mock_open +from unittest.mock import Mock, call, patch, ANY from google.cloud import bigquery from pipelinewise.fastsync.commons.target_bigquery import FastSyncTargetBigquery -@pytest.fixture(name='query_result') -def fixture_query_result(): - """ - Mocked Bigquery Run Query Results - """ - mocked_qr = Mock() - mocked_qr.return_value = [] - return mocked_qr - - -# pylint: disable=W0613 @pytest.fixture(name='bigquery_job') -def fixture_bigquery_job(query_result): +def fixture_bigquery_job(): """ - Mocked Bigquery Job Query + Mocked BigQuery Job Query """ mocked_qj = Mock() mocked_qj.output_rows = 0 @@ -36,87 +25,119 @@ def fixture_bigquery_job_config(): return mocked_qc -class FastSyncTargetBigqueryMock(FastSyncTargetBigquery): +@pytest.fixture(name='copy_job_config') +def fixture_copy_job_config(bigquery_job_config): + """ + Mocked BigQuery CopyJobConfig + """ + with patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.CopyJobConfig') as mock: + mock.return_value = bigquery_job_config + yield mock + + +@pytest.fixture(name='load_job_config') +def fixture_load_job_config(bigquery_job_config): + """ + Mocked BigQuery LoadJobConfig + """ + with patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.LoadJobConfig') as mock: + mock.return_value = bigquery_job_config + yield mock + + +@pytest.fixture(name='bigquery_client') +def fixture_bigquery_client(): """ - Mocked FastSyncTargetBigquery class + Mocked BigQuery Client """ + with patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') as mock: + yield mock + - def __init__(self, connection_config, transformation_config=None): - super().__init__(connection_config, transformation_config) - self.executed_queries = [] +@pytest.fixture(name='gcs_client') +def fixture_gcs_client(): + """ + Mocked GCS Client + """ + with patch('pipelinewise.fastsync.commons.target_bigquery.storage.Client') as mock: + yield mock + + +# pylint: disable=unused-argument +@pytest.fixture(name='target_bigquery') +def fixture_target_bigquery(gcs_client): + """ + Mocked FastSyncTargetBigqueryMock + """ + yield FastSyncTargetBigquery( + connection_config={'project_id': 'dummy-project', 'location': 'EU'}, + transformation_config={}, + ) -# pylint: disable=attribute-defined-outside-init class TestFastSyncTargetBigquery: """ Unit tests for fastsync target bigquery """ - def setup_method(self): - """Initialise test FastSyncTargetPostgres object""" - self.bigquery = FastSyncTargetBigqueryMock( - connection_config={'project_id': 'dummy-project', 'location': 'EU'}, - transformation_config={}, - ) - - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_open_connection(self, client): + @staticmethod + def test_open_connection(target_bigquery, bigquery_client): """Validate if create schema queries generated correctly""" - self.bigquery.open_connection() - client.assert_called_with(project='dummy-project', location='EU') + target_bigquery.open_connection() + bigquery_client.assert_called_with(project='dummy-project', location='EU') - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_create_schema(self, client): + @staticmethod + def test_create_schema(target_bigquery, bigquery_client): """Validate if create schema queries generated correctly""" - self.bigquery.create_schema('new_schema') - client().create_dataset.assert_called_with('new_schema', exists_ok=True) + target_bigquery.create_schema('new_schema') + bigquery_client().create_dataset.assert_called_with('new_schema', exists_ok=True) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_drop_table(self, client): + @staticmethod + def test_drop_table(target_bigquery, bigquery_client): """Validate if drop table queries generated correctly""" - self.bigquery.drop_table('test_schema', 'test_table') - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'test_table') + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`test_table`', job_config=ANY ) - self.bigquery.drop_table('test_schema', 'test_table', is_temporary=True) - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'test_table', is_temporary=True) + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`test_table_temp`', job_config=ANY ) - self.bigquery.drop_table('test_schema', 'UPPERCASE_TABLE') - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'UPPERCASE_TABLE') + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`uppercase_table`', job_config=ANY ) - self.bigquery.drop_table('test_schema', 'UPPERCASE_TABLE', is_temporary=True) - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'UPPERCASE_TABLE', is_temporary=True) + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`uppercase_table_temp`', job_config=ANY ) - self.bigquery.drop_table('test_schema', 'test_table_with_space') - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'test_table_with_space') + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`test_table_with_space`', job_config=ANY ) - self.bigquery.drop_table( + target_bigquery.drop_table( 'test_schema', 'test table with space', is_temporary=True ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`test_table_with_space_temp`', job_config=ANY, ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_create_table(self, client): + @staticmethod + def test_create_table(target_bigquery, bigquery_client): """Validate if create table queries generated correctly""" # Create table with standard table and column names - self.bigquery.create_table( + target_bigquery.create_table( target_schema='test_schema', table_name='test_table', columns=['`id` INTEGER', '`txt` STRING'], ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'CREATE OR REPLACE TABLE test_schema.`test_table` (' '`id` integer,`txt` string,' '_sdc_extracted_at timestamp,' @@ -126,12 +147,12 @@ def test_create_table(self, client): ) # Create table with reserved words in table and column names - self.bigquery.create_table( + target_bigquery.create_table( target_schema='test_schema', table_name='order', columns=['`id` INTEGER', '`txt` STRING', '`select` STRING'], ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'CREATE OR REPLACE TABLE test_schema.`order` (' '`id` integer,`txt` string,`select` string,' '_sdc_extracted_at timestamp,' @@ -141,12 +162,12 @@ def test_create_table(self, client): ) # Create table with mixed lower and uppercase and space characters - self.bigquery.create_table( + target_bigquery.create_table( target_schema='test_schema', table_name='TABLE with SPACE', columns=['`ID` INTEGER', '`COLUMN WITH SPACE` STRING'], ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'CREATE OR REPLACE TABLE test_schema.`table_with_space` (' '`id` integer,`column with space` string,' '_sdc_extracted_at timestamp,' @@ -156,12 +177,12 @@ def test_create_table(self, client): ) # Create table with no primary key - self.bigquery.create_table( + target_bigquery.create_table( target_schema='test_schema', table_name='test_table_no_pk', columns=['`ID` INTEGER', '`TXT` STRING'], ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'CREATE OR REPLACE TABLE test_schema.`test_table_no_pk` (' '`id` integer,`txt` string,' '_sdc_extracted_at timestamp,' @@ -170,173 +191,495 @@ def test_create_table(self, client): job_config=ANY, ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.LoadJobConfig') - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + @staticmethod def test_copy_to_table( - self, client, load_job_config, bigquery_job_config, bigquery_job + target_bigquery, bigquery_client, load_job_config, bigquery_job ): """Validate if COPY command generated correctly""" # COPY table with standard table and column names - client().load_table_from_file.return_value = bigquery_job - load_job_config.return_value = bigquery_job_config - mocked_open = mock_open() - with patch('pipelinewise.fastsync.commons.target_bigquery.open', mocked_open): - self.bigquery.copy_to_table( - filepath='/path/to/dummy-file.csv.gz', - target_schema='test_schema', - table_name='test_table', - size_bytes=1000, - is_temporary=False, - skip_csv_header=False, - ) - mocked_open.assert_called_with('/path/to/dummy-file.csv.gz', 'rb') + bigquery_client().load_table_from_uri.return_value = bigquery_job + bigquery_job_config = load_job_config.return_value + + mock_bucket = Mock() + mock_bucket.name = 'some-bucket' + mock_blob = Mock() + mock_blob.bucket = mock_bucket + mock_blob.name = '/path/to/dummy-file.csv.gz' + + target_bigquery.copy_to_table( + blobs=[mock_blob], + target_schema='test_schema', + table_name='test_table', + size_bytes=1000, + is_temporary=False, + skip_csv_header=False, + ) assert bigquery_job_config.source_format == bigquery.SourceFormat.CSV assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' assert bigquery_job_config.allow_quoted_newlines is True assert bigquery_job_config.skip_leading_rows == 0 - client().dataset.assert_called_with('test_schema') - client().dataset().table.assert_called_with('test_table') - assert client().load_table_from_file.call_count == 1 + bigquery_client().dataset.assert_called_with('test_schema') + bigquery_client().dataset().table.assert_called_with('test_table') + bigquery_client().load_table_from_uri.assert_called_once() + bigquery_client().load_table_from_uri.reset_mock() # COPY table with reserved word in table and column names in temp table - with patch('pipelinewise.fastsync.commons.target_bigquery.open', mocked_open): - self.bigquery.copy_to_table( - filepath='/path/to/full-file.csv.gz', - target_schema='test_schema', - table_name='full', - size_bytes=1000, - is_temporary=True, - skip_csv_header=False, - ) - mocked_open.assert_called_with('/path/to/full-file.csv.gz', 'rb') + target_bigquery.copy_to_table( + blobs=[mock_blob], + target_schema='test_schema', + table_name='full', + size_bytes=1000, + is_temporary=True, + skip_csv_header=True, + write_truncate=False, + allow_quoted_newlines=False, + ) assert bigquery_job_config.source_format == bigquery.SourceFormat.CSV - assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' - assert bigquery_job_config.allow_quoted_newlines is True - assert bigquery_job_config.skip_leading_rows == 0 - client().dataset.assert_called_with('test_schema') - client().dataset().table.assert_called_with('full_temp') - assert client().load_table_from_file.call_count == 2 - - # COPY table with space and uppercase in table name and s3 key - with patch('pipelinewise.fastsync.commons.target_bigquery.open', mocked_open): - self.bigquery.copy_to_table( - filepath='/path/to/file with space.csv.gz', - target_schema='test_schema', - table_name='table with SPACE and UPPERCASE', - size_bytes=1000, - is_temporary=True, - skip_csv_header=False, - ) - mocked_open.assert_called_with('/path/to/file with space.csv.gz', 'rb') + assert bigquery_job_config.write_disposition == 'WRITE_APPEND' + assert bigquery_job_config.allow_quoted_newlines is False + assert bigquery_job_config.skip_leading_rows == 1 + bigquery_client().dataset.assert_called_with('test_schema') + bigquery_client().dataset().table.assert_called_with('full_temp') + bigquery_client().load_table_from_uri.assert_called_once() + bigquery_client().load_table_from_uri.reset_mock() + + # COPY table with space and uppercase in table name + target_bigquery.copy_to_table( + blobs=[mock_blob], + target_schema='test_schema', + table_name='table with SPACE and UPPERCASE', + size_bytes=1000, + is_temporary=True, + skip_csv_header=False, + ) assert bigquery_job_config.source_format == bigquery.SourceFormat.CSV assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' assert bigquery_job_config.allow_quoted_newlines is True assert bigquery_job_config.skip_leading_rows == 0 - client().dataset.assert_called_with('test_schema') - client().dataset().table.assert_called_with( + bigquery_client().dataset.assert_called_with('test_schema') + bigquery_client().dataset().table.assert_called_with( 'table_with_space_and_uppercase_temp' ) - assert client().load_table_from_file.call_count == 3 + bigquery_client().load_table_from_uri.assert_called_once() + bigquery_client().load_table_from_uri.reset_mock() - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_grant_select_on_table(self, client, bigquery_job): + @staticmethod + def test_grant_select_on_table(target_bigquery, bigquery_client, bigquery_job): """Validate if GRANT command generated correctly""" # GRANT table with standard table and column names - client().query.return_value = bigquery_job - self.bigquery.grant_select_on_table( + bigquery_client().query.return_value = bigquery_job + target_bigquery.grant_select_on_table( target_schema='test_schema', table_name='test_table', role='test_role', is_temporary=False, ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT SELECT ON test_schema.`test_table` TO ROLE test_role', job_config=ANY ) # GRANT table with reserved word in table and column names in temp table - self.bigquery.grant_select_on_table( + target_bigquery.grant_select_on_table( target_schema='test_schema', table_name='full', role='test_role', is_temporary=False, ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT SELECT ON test_schema.`full` TO ROLE test_role', job_config=ANY ) # GRANT table with with space and uppercase in table name and s3 key - self.bigquery.grant_select_on_table( + target_bigquery.grant_select_on_table( target_schema='test_schema', table_name='table with SPACE and UPPERCASE', role='test_role', is_temporary=False, ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT SELECT ON test_schema.`table_with_space_and_uppercase` TO ROLE test_role', job_config=ANY, ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_grant_usage_on_schema(self, client): + @staticmethod + def test_grant_usage_on_schema(target_bigquery, bigquery_client): """Validate if GRANT command generated correctly""" - self.bigquery.grant_usage_on_schema( + target_bigquery.grant_usage_on_schema( target_schema='test_schema', role='test_role' ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT USAGE ON SCHEMA test_schema TO ROLE test_role', job_config=ANY ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_grant_select_on_schema(self, client): + @staticmethod + def test_grant_select_on_schema(target_bigquery, bigquery_client): """Validate if GRANT command generated correctly""" - self.bigquery.grant_select_on_schema( + target_bigquery.grant_select_on_schema( target_schema='test_schema', role='test_role' ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT SELECT ON ALL TABLES IN SCHEMA test_schema TO ROLE test_role', job_config=ANY, ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.CopyJobConfig') - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + @staticmethod def test_swap_tables( - self, client, copy_job_config, bigquery_job_config, bigquery_job + target_bigquery, bigquery_client, copy_job_config, bigquery_job ): """Validate if swap table commands generated correctly""" # Swap tables with standard table and column names - client().copy_table.return_value = bigquery_job - copy_job_config.return_value = bigquery_job_config - self.bigquery.swap_tables(schema='test_schema', table_name='test_table') + bigquery_client().copy_table.return_value = bigquery_job + bigquery_job_config = copy_job_config.return_value + target_bigquery.swap_tables(schema='test_schema', table_name='test_table') assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' - client().copy_table.assert_called_with( + bigquery_client().copy_table.assert_called_with( 'dummy-project.test_schema.test_table_temp', 'dummy-project.test_schema.test_table', job_config=ANY, ) - client().delete_table.assert_called_with( + bigquery_client().delete_table.assert_called_with( 'dummy-project.test_schema.test_table_temp' ) # Swap tables with reserved word in table and column names in temp table - self.bigquery.swap_tables(schema='test_schema', table_name='full') + target_bigquery.swap_tables(schema='test_schema', table_name='full') assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' - client().copy_table.assert_called_with( + bigquery_client().copy_table.assert_called_with( 'dummy-project.test_schema.full_temp', 'dummy-project.test_schema.full', job_config=ANY, ) - client().delete_table.assert_called_with('dummy-project.test_schema.full_temp') + bigquery_client().delete_table.assert_called_with('dummy-project.test_schema.full_temp') # Swap tables with with space and uppercase in table name and s3 key - self.bigquery.swap_tables( + target_bigquery.swap_tables( schema='test_schema', table_name='table with SPACE and UPPERCASE' ) assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' - client().copy_table.assert_called_with( + bigquery_client().copy_table.assert_called_with( 'dummy-project.test_schema.table_with_space_and_uppercase_temp', 'dummy-project.test_schema.table_with_space_and_uppercase', job_config=ANY, ) - client().delete_table.assert_called_with( + bigquery_client().delete_table.assert_called_with( 'dummy-project.test_schema.table_with_space_and_uppercase_temp' ) + + @staticmethod + def test_obfuscate_columns_case1(target_bigquery, bigquery_client): + """ + Test obfuscation where given transformations are emtpy + Test should pass with no executed queries + """ + target_schema = 'my_schema' + table_name = 'public.my_table' + + target_bigquery.transformation_config = {} + + target_bigquery.obfuscate_columns(target_schema, table_name) + bigquery_client().query.assert_not_called() + + @staticmethod + def test_obfuscate_columns_case2(target_bigquery, bigquery_client): + """ + Test obfuscation where given transformations has an unsupported transformation type + Test should fail + """ + target_schema = 'my_schema' + table_name = 'public.my_table' + + target_bigquery.transformation_config = { + 'transformations': [ + { + 'field_id': 'col_7', + 'tap_stream_name': 'public-my_table', + 'type': 'RANDOM', + } + ] + } + + with pytest.raises(ValueError): + target_bigquery.obfuscate_columns(target_schema, table_name) + + bigquery_client().query.assert_not_called() + + @staticmethod + def test_obfuscate_columns_case3(target_bigquery, bigquery_client): + """ + Test obfuscation where given transformations have no conditions + Test should pass + """ + target_schema = 'my_schema' + table_name = 'public.my_table' + + target_bigquery.transformation_config = { + 'transformations': [ + { + 'field_id': 'col_1', + 'tap_stream_name': 'public-my_table', + 'type': 'SET-NULL', + }, + { + 'field_id': 'col_2', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-HIDDEN', + }, + { + 'field_id': 'col_3', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-DATE', + }, + { + 'field_id': 'col_4', + 'tap_stream_name': 'public-my_table', + 'safe_field_id': '"COL_4"', + 'type': 'MASK-NUMBER', + }, + { + 'field_id': 'col_5', + 'tap_stream_name': 'public-my_table', + 'type': 'HASH', + }, + { + 'field_id': 'col_6', + 'tap_stream_name': 'public-my_table', + 'type': 'HASH-SKIP-FIRST-5', + }, + { + 'field_id': 'col_7', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-STRING-SKIP-ENDS-3', + }, + ] + } + + target_bigquery.obfuscate_columns(target_schema, table_name) + + bigquery_client().query.assert_called_with( + "UPDATE `my_schema`.`my_table_temp` SET `col_1` = NULL, `col_2` = 'hidden', " + '`col_3` = TIMESTAMP(DATETIME(DATE(EXTRACT(YEAR FROM `col_3`), 1, ' + '1),TIME(`col_3`))), `col_4` = 0, `col_5` = TO_BASE64(SHA256(`col_5`)), ' + '`col_6` = CONCAT(SUBSTRING(`col_6`, 1, 5), ' + 'TO_BASE64(SHA256(SUBSTRING(`col_6`, 5 + 1)))), `col_7` = CASE WHEN ' + "LENGTH(`col_7`) > 2 * 3 THEN CONCAT(SUBSTRING(`col_7`, 1, 3), REPEAT('*', " + 'LENGTH(`col_7`)-(2 * 3)), SUBSTRING(`col_7`, LENGTH(`col_7`)-3+1, 3)) ELSE ' + "REPEAT('*', LENGTH(`col_7`)) END WHERE true;", + job_config=ANY, + ) + + @staticmethod + def test_obfuscate_columns_case4(target_bigquery, bigquery_client): + """ + Test obfuscation where given transformations have conditions + Test should pass + """ + target_schema = 'my_schema' + table_name = 'public.my_table' + + target_bigquery.transformation_config = { + 'transformations': [ + { + 'field_id': 'col_1', + 'tap_stream_name': 'public-my_table', + 'type': 'SET-NULL', + }, + { + 'field_id': 'col_2', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-HIDDEN', + 'when': [ + {'column': 'col_4', 'safe_column': '"COL_4"', 'equals': None}, + { + 'column': 'col_1', + }, + ], + }, + { + 'field_id': 'col_3', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-DATE', + 'when': [{'column': 'col_5', 'equals': 'some_value'}], + }, + { + 'field_id': 'col_4', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-NUMBER', + }, + { + 'field_id': 'col_5', + 'tap_stream_name': 'public-my_table', + 'type': 'HASH', + }, + { + 'field_id': 'col_6', + 'tap_stream_name': 'public-my_table', + 'type': 'HASH-SKIP-FIRST-5', + 'when': [ + {'column': 'col_1', 'equals': 30}, + {'column': 'col_2', 'regex_match': r'[0-9]{3}\.[0-9]{3}'}, + ], + }, + { + 'field_id': 'col_7', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-STRING-SKIP-ENDS-3', + 'when': [ + {'column': 'col_1', 'equals': 30}, + {'column': 'col_2', 'regex_match': r'[0-9]{3}\.[0-9]{3}'}, + {'column': 'col_4', 'equals': None}, + ], + }, + ] + } + + target_bigquery.obfuscate_columns(target_schema, table_name) + + # pylint: disable=line-too-long + expected_sql = [ + "UPDATE `my_schema`.`my_table_temp` SET `col_2` = 'hidden' WHERE (`col_4` IS NULL);", + "UPDATE `my_schema`.`my_table_temp` SET `col_3` = TIMESTAMP(DATETIME(DATE(EXTRACT(YEAR FROM `col_3`), 1, 1),TIME(`col_3`))) WHERE (`col_5` = 'some_value');", # noqa: E501 + "UPDATE `my_schema`.`my_table_temp` SET `col_6` = CONCAT(SUBSTRING(`col_6`, 1, 5), TO_BASE64(SHA256(SUBSTRING(`col_6`, 5 + 1)))) WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}');", # noqa: E501 + "UPDATE `my_schema`.`my_table_temp` SET `col_7` = CASE WHEN LENGTH(`col_7`) > 2 * 3 THEN CONCAT(SUBSTRING(`col_7`, 1, 3), REPEAT('*', LENGTH(`col_7`)-(2 * 3)), SUBSTRING(`col_7`, LENGTH(`col_7`)-3+1, 3)) ELSE REPEAT('*', LENGTH(`col_7`)) END WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}') AND (`col_4` IS NULL);", # noqa: E501 + 'UPDATE `my_schema`.`my_table_temp` SET `col_1` = NULL, `col_4` = 0, `col_5` = TO_BASE64(SHA256(`col_5`)) WHERE true;', # noqa: E501 + ] + + expected_calls = [] + for sql in expected_sql: + expected_calls.append(call(sql, job_config=ANY)) + expected_calls.append(call().result()) + + bigquery_client().query.assert_has_calls(expected_calls) + + # pylint: disable=invalid-name + @staticmethod + def test_default_archive_destination(target_bigquery): + """ + Validate parameters passed to gcs copy_blob method when custom GCS bucket and folder are not defined + """ + mock_source_bucket = Mock() + mock_source_bucket.name = 'some_bucket' + + mock_buckets = {mock_source_bucket.name: mock_source_bucket} + + mock_blob = Mock() + mock_blob.bucket = mock_source_bucket + mock_blob.name = 'some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz' + + mock_gcs_client = Mock() + mock_gcs_client.get_bucket.side_effect = mock_buckets.get + + target_bigquery.gcs = mock_gcs_client + + target_bigquery.connection_config['gcs_bucket'] = mock_source_bucket.name + + archive_blob = target_bigquery.copy_to_archive( + mock_blob, + 'some-tap', + 'some_schema.some_table', + ) + + mock_source_bucket.copy_blob.assert_called_with( + mock_blob, + mock_source_bucket, + new_name='archive/some-tap/some_table/ppw_20210615115603_fastsync.csv.gz', + ) + archive_blob.metadata.update.assert_called_with( + { + 'tap': 'some-tap', + 'schema': 'some_schema', + 'table': 'some_table', + 'archived-by': 'pipelinewise_fastsync', + } + ) + archive_blob.patch.assert_called_once() + + # pylint: disable=invalid-name + @staticmethod + def test_custom_archive_destination(target_bigquery): + """ + Validate parameters passed to s3 copy_object method when using custom s3 bucket and folder + """ + mock_source_bucket = Mock() + mock_source_bucket.name = 'some_bucket' + mock_archive_bucket = Mock() + mock_archive_bucket.name = 'archive_bucket' + + mock_buckets = { + mock_source_bucket.name: mock_source_bucket, + mock_archive_bucket.name: mock_archive_bucket, + } + + mock_blob = Mock() + mock_blob.bucket = mock_source_bucket + mock_blob.name = 'some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz' + + mock_gcs_client = Mock() + mock_gcs_client.get_bucket.side_effect = mock_buckets.get + + target_bigquery.gcs = mock_gcs_client + + target_bigquery.connection_config['gcs_bucket'] = 'some_bucket' + target_bigquery.connection_config['archive_load_files_gcs_bucket'] = 'archive_bucket' + target_bigquery.connection_config['archive_load_files_gcs_prefix'] = 'archive_folder' + + archive_blob = target_bigquery.copy_to_archive( + mock_blob, + 'some-tap', + 'some_schema.some_table', + ) + + mock_source_bucket.copy_blob.assert_called_with( + mock_blob, + mock_archive_bucket, + new_name='archive_folder/some-tap/some_table/ppw_20210615115603_fastsync.csv.gz', + ) + archive_blob.metadata.update.assert_called_with( + { + 'tap': 'some-tap', + 'schema': 'some_schema', + 'table': 'some_table', + 'archived-by': 'pipelinewise_fastsync', + } + ) + + # pylint: disable=invalid-name + @staticmethod + def test_copied_archive_metadata(target_bigquery): + """ + Validate parameters passed to s3 copy_object method when custom s3 bucket and folder are not defined + """ + mock_source_bucket = Mock() + mock_source_bucket.name = 'some_bucket' + + mock_buckets = {mock_source_bucket.name: mock_source_bucket} + + mock_blob = Mock() + mock_blob.bucket = mock_source_bucket + mock_blob.name = 'some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz' + mock_blob.metadata = {'copied-old-key': 'copied-old-value'} + + mock_gcs_client = Mock() + mock_gcs_client.get_bucket.side_effect = mock_buckets.get + + target_bigquery.gcs = mock_gcs_client + + target_bigquery.connection_config['s3_bucket'] = 'some_bucket' + + archive_blob = target_bigquery.copy_to_archive( + mock_blob, + 'some-tap', + 'some_schema.some_table', + ) + + mock_source_bucket.copy_blob.assert_called_with( + mock_blob, + mock_source_bucket, + new_name='archive/some-tap/some_table/ppw_20210615115603_fastsync.csv.gz', + ) + assert archive_blob.metadata == { + 'copied-old-key': 'copied-old-value', + 'tap': 'some-tap', + 'schema': 'some_schema', + 'table': 'some_table', + 'archived-by': 'pipelinewise_fastsync', + }