From 0235744d21b0e97f9bb99f0beeb7481a91500961 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 16 Feb 2022 14:56:57 +0000 Subject: [PATCH 01/20] Implement `split_large_files` for BigQuery FastSync --- pipelinewise/fastsync/mysql_to_bigquery.py | 12 ++++++++++-- pipelinewise/fastsync/postgres_to_bigquery.py | 11 +++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py index b8529f39d..5ac3a385a 100644 --- a/pipelinewise/fastsync/mysql_to_bigquery.py +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -84,8 +84,16 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Exporting table data, get table definitions and close connection to avoid timeouts mysql.copy_table( - table, filepath, compress=False, max_num=MAX_NUM, date_type='datetime' + table, + filepath, + compress=False, + max_num=MAX_NUM, + date_type='datetime', + split_large_files=args.target.get('split_large_files'), + split_file_chunk_size_mb=args.target.get('split_file_chunk_size_mb'), + split_file_max_chunks=args.target.get('split_file_max_chunks'), ) + file_parts = glob.glob(f'{filepath}*') size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) bigquery_types = mysql.map_column_types_to_target(table) @@ -100,7 +108,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: for num, file_part in enumerate(file_parts): write_truncate = num == 0 bigquery.copy_to_table( - filepath, + file_part, target_schema, table, size_bytes, diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py index 0539c91b7..f29ce3477 100644 --- a/pipelinewise/fastsync/postgres_to_bigquery.py +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -87,7 +87,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Exporting table data, get table definitions and close connection to avoid timeouts postgres.copy_table( - table, filepath, compress=False, max_num=MAX_NUM, date_type='timestamp' + table, + filepath, + compress=False, + max_num=MAX_NUM, + date_type='timestamp', + split_large_files=args.target.get('split_large_files'), + split_file_chunk_size_mb=args.target.get('split_file_chunk_size_mb'), + split_file_max_chunks=args.target.get('split_file_max_chunks'), ) file_parts = glob.glob(f'{filepath}*') size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) @@ -104,7 +111,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: for num, file_part in enumerate(file_parts): write_truncate = num == 0 bigquery.copy_to_table( - filepath, + file_part, target_schema, table, size_bytes, From 5d1799724f64eb9ac67d09ea06e8173cea0dd103 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 16 Feb 2022 16:21:17 +0000 Subject: [PATCH 02/20] Implement BigQuery loading from GCS --- .../fastsync/commons/target_bigquery.py | 94 ++++++++++++++++--- pipelinewise/fastsync/mongodb_to_bigquery.py | 21 +++-- pipelinewise/fastsync/mysql_to_bigquery.py | 30 +++--- pipelinewise/fastsync/postgres_to_bigquery.py | 30 +++--- pipelinewise/fastsync/s3_csv_to_bigquery.py | 22 +++-- 5 files changed, 147 insertions(+), 50 deletions(-) diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index aa5a10a1e..c596d6f19 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -1,9 +1,11 @@ +import os import logging import json import re from typing import List, Dict from google.cloud import bigquery +from google.cloud import storage from google.api_core import exceptions from .transform_utils import TransformationHelper, SQLFlavor @@ -37,6 +39,9 @@ class FastSyncTargetBigquery: def __init__(self, connection_config, transformation_config=None): self.connection_config = connection_config self.transformation_config = transformation_config + self.gcs = storage.Client( + project=self.connection_config['project_id'], + ) def open_connection(self): project_id = self.connection_config['project_id'] @@ -78,6 +83,65 @@ def to_query_parameter(value): return query_job + def upload_to_gcs(self, file: str) -> storage.Blob: + bucket_name = self.connection_config['gcs_bucket'] + key_prefix = self.connection_config.get('gcs_key_prefix', '') + key = '{}{}'.format(key_prefix, os.path.basename(file)) + + LOGGER.info( + 'Uploading to GCS bucket: %s, local file: %s, GCS key: %s', + bucket_name, + file, + key, + ) + + # Upload to GCS + bucket = self.gcs.get_bucket(bucket_name) + blob = bucket.blob(key) + blob.upload_from_filename(file) + + return blob + + def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> None: + """Copy load file to archive folder with metadata added""" + table_dict = utils.tablename_to_dict(table) + archive_table = table_dict.get('table_name') + archive_schema = table_dict.get('schema_name', '') + + # Retain same filename + archive_file_basename = os.path.basename(blob.name) + + # Get archive GCS prefix from config, defaulting to 'archive' if not specified + archive_prefix = self.connection_config.get( + 'archive_load_files_gcs_prefix', 'archive' + ) + + source_bucket = self.gcs.get_bucket(self.connection_config.get('gcs_bucket')) + + # Get archive GCS bucket from config, defaulting to same bucket used for Snowflake imports if not specified + archive_bucket = self.gcs.get_bucket( + self.connection_config.get( + 'archive_load_files_gcs_bucket', source_bucket + ) + ) + + archive_blob_name =f'{archive_prefix}/{tap_id}/{archive_table}/{archive_file_basename}' + source_blob_name = f'{blob.bucket}/{blob.name}' + LOGGER.info('Archiving %s to %s', source_blob_name, archive_blob_name) + + source_bucket.copy_blob(blob, archive_bucket, new_name=archive_blob_name) + + # Combine existing metadata with archive related headers + archive_blob = archive_bucket.get_blob(archive_blob_name) + archive_blob.metadata = (blob.metadata or {}).update( + { + 'tap': tap_id, + 'schema': archive_schema, + 'table': archive_table, + 'archived-by': 'pipelinewise_fastsync', + } + ) + def create_schema(self, schema_name): temp_schema = self.connection_config.get('temp_schema', schema_name) @@ -147,19 +211,19 @@ def create_table( self.query(sql) - # pylint: disable=R0913,R0914 + # pylint: disable=too-many-locals def copy_to_table( self, - filepath, - target_schema, - table_name, - size_bytes, - is_temporary, - skip_csv_header=False, - allow_quoted_newlines=True, - write_truncate=True, + blobs: List[storage.Blob], + target_schema: str, + table_name: str, + size_bytes: int, + is_temporary: bool, + skip_csv_header: bool = False, + allow_quoted_newlines: bool = True, + write_truncate: bool = True, ): - LOGGER.info('BIGQUERY - Loading %s into Bigquery...', filepath) + LOGGER.info('BIGQUERY - Loading into Bigquery...') table_dict = utils.tablename_to_dict(table_name) target_table = safe_name( table_dict.get( @@ -172,6 +236,7 @@ def copy_to_table( dataset_ref = client.dataset(target_schema) table_ref = dataset_ref.table(target_table) table_schema = client.get_table(table_ref).schema + job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.CSV job_config.schema = table_schema @@ -180,10 +245,11 @@ def copy_to_table( ) job_config.allow_quoted_newlines = allow_quoted_newlines job_config.skip_leading_rows = 1 if skip_csv_header else 0 - with open(filepath, 'rb') as exported_data: - job = client.load_table_from_file( - exported_data, table_ref, job_config=job_config - ) + + uris = [f'gs://{blob.bucket}/{blob.name}' for blob in blobs] + job = client.load_table_from_uri( + source_uris=uris, destination=table_ref, job_config=job_config + ) try: job.result() except exceptions.BadRequest as exc: diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py index c82dd3cbc..81bfcbb73 100644 --- a/pipelinewise/fastsync/mongodb_to_bigquery.py +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -48,6 +48,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: """Sync one table""" mongodb = FastSyncTapMongoDB(args.tap, tap_type_to_target_type) bigquery = FastSyncTargetBigquery(args.target, args.transform) + tap_id = args.target.get('tap_id') + archive_load_files = args.target.get('archive_load_files', False) try: dbname = args.tap.get('dbname') @@ -72,20 +74,25 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery_columns = bigquery_types.get('columns', []) mongodb.close_connection() + # Uploading to GCS + gcs_blob = bigquery.upload_to_gcs(filepath) + os.remove(filepath) + # Creating temp table in Bigquery bigquery.create_schema(target_schema) bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True) # Load into Bigquery table bigquery.copy_to_table( - filepath, - target_schema, - table, - size_bytes, - is_temporary=True, - skip_csv_header=True, + [gcs_blob], target_schema, table, size_bytes, is_temporary=True, ) - os.remove(filepath) + + if archive_load_files: + # Copy load file to archive + bigquery.copy_to_archive(gcs_blob, tap_id, table) + + # Delete all file parts from s3 + gcs_blob.delete() # Obfuscate columns bigquery.obfuscate_columns(target_schema, table) diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py index 5ac3a385a..ad61b119a 100644 --- a/pipelinewise/fastsync/mysql_to_bigquery.py +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -68,6 +68,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: """Sync one table""" mysql = FastSyncTapMySql(args.tap, tap_type_to_target_type, target_quote='`') bigquery = FastSyncTargetBigquery(args.target, args.transform) + tap_id = args.target.get('tap_id') + archive_load_files = args.target.get('archive_load_files', False) try: filename = 'pipelinewise_fastsync_{}_{}.csv'.format( @@ -100,22 +102,28 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery_columns = bigquery_types.get('columns', []) mysql.close_connections() + # Uploading to GCS + gcs_blobs = [] + for file_part in file_parts: + gcs_blobs.append(bigquery.upload_to_gcs(file_part)) + os.remove(file_part) + # Creating temp table in Bigquery bigquery.create_schema(target_schema) bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True) # Load into Bigquery table - for num, file_part in enumerate(file_parts): - write_truncate = num == 0 - bigquery.copy_to_table( - file_part, - target_schema, - table, - size_bytes, - is_temporary=True, - write_truncate=write_truncate, - ) - os.remove(file_part) + bigquery.copy_to_table( + gcs_blobs, target_schema, table, size_bytes, is_temporary=True, + ) + + for blob in gcs_blobs: + if archive_load_files: + # Copy load file to archive + bigquery.copy_to_archive(blob, tap_id, table) + + # Delete all file parts from s3 + blob.delete() # Obfuscate columns bigquery.obfuscate_columns(target_schema, table) diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py index f29ce3477..5ee0e956c 100644 --- a/pipelinewise/fastsync/postgres_to_bigquery.py +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -68,6 +68,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: """Sync one table""" postgres = FastSyncTapPostgres(args.tap, tap_type_to_target_type, target_quote='`') bigquery = FastSyncTargetBigquery(args.target, args.transform) + tap_id = args.target.get('tap_id') + archive_load_files = args.target.get('archive_load_files', False) try: dbname = args.tap.get('dbname') @@ -103,22 +105,28 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery_columns = bigquery_types.get('columns', []) postgres.close_connection() + # Uploading to GCS + gcs_blobs = [] + for file_part in file_parts: + gcs_blobs.append(bigquery.upload_to_gcs(file_part)) + os.remove(file_part) + # Creating temp table in Bigquery bigquery.create_schema(target_schema) bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True) # Load into Bigquery table - for num, file_part in enumerate(file_parts): - write_truncate = num == 0 - bigquery.copy_to_table( - file_part, - target_schema, - table, - size_bytes, - is_temporary=True, - write_truncate=write_truncate, - ) - os.remove(file_part) + bigquery.copy_to_table( + gcs_blobs, target_schema, table, size_bytes, is_temporary=True, + ) + + for blob in gcs_blobs: + if archive_load_files: + # Copy load file to archive + bigquery.copy_to_archive(blob, tap_id, table) + + # Delete all file parts from s3 + blob.delete() # Obfuscate columns bigquery.obfuscate_columns(target_schema, table) diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py index 995c90017..733b1e65c 100644 --- a/pipelinewise/fastsync/s3_csv_to_bigquery.py +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -38,10 +38,13 @@ def tap_type_to_target_type(csv_type, *_): }.get(csv_type, 'STRING') +# pylint: disable=too-many-locals def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: """Sync one table""" s3_csv = FastSyncTapS3Csv(args.tap, tap_type_to_target_type, target_quote='`') bigquery = FastSyncTargetBigquery(args.target, args.transform) + tap_id = args.target.get('tap_id') + archive_load_files = args.target.get('archive_load_files', False) try: filename = utils.gen_export_filename( @@ -54,6 +57,10 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: s3_csv.copy_table(table_name, filepath) size_bytes = os.path.getsize(filepath) + # Uploading to GCS + gcs_blob = bigquery.upload_to_gcs(filepath) + os.remove(filepath) + bigquery_types = s3_csv.map_column_types_to_target(filepath, table_name) bigquery_columns = bigquery_types.get('columns', []) @@ -69,14 +76,15 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: # Load into Bigquery table bigquery.copy_to_table( - filepath, - target_schema, - table_name, - size_bytes, - is_temporary=True, - skip_csv_header=True, + [gcs_blob], target_schema, table_name, size_bytes, is_temporary=True, ) - os.remove(filepath) + + if archive_load_files: + # Copy load file to archive + bigquery.copy_to_archive(gcs_blob, tap_id, table_name) + + # Delete all file parts from s3 + gcs_blob.delete() # Obfuscate columns bigquery.obfuscate_columns(target_schema, table_name) From fec42537f249fc71dcb748eb59cfd7a75154196f Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 16 Feb 2022 16:23:08 +0000 Subject: [PATCH 03/20] Add `google-cloud-storage` dependency --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index cb8a05948..46ae72eee 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ 'psycopg2-binary==2.8.6', 'snowflake-connector-python[pandas]==2.4.6', 'google-cloud-bigquery==2.31.0', + 'google-cloud-storage==2.1.0', 'pipelinewise-singer-python==1.*', 'singer-encodings==0.0.*', 'messytables==0.15.*', From 68e255d1d0a4241271cbcbb5505e560df8915df5 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 16 Feb 2022 17:10:14 +0000 Subject: [PATCH 04/20] Update metadata correctly --- pipelinewise/fastsync/commons/target_bigquery.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index c596d6f19..5ad1c8f40 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -133,7 +133,8 @@ def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> None: # Combine existing metadata with archive related headers archive_blob = archive_bucket.get_blob(archive_blob_name) - archive_blob.metadata = (blob.metadata or {}).update( + new_metadata = blob.metadata or {} + new_metadata.update( { 'tap': tap_id, 'schema': archive_schema, @@ -141,6 +142,7 @@ def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> None: 'archived-by': 'pipelinewise_fastsync', } ) + archive_blob.metadata = new_metadata def create_schema(self, schema_name): temp_schema = self.connection_config.get('temp_schema', schema_name) From fb78d65647e5bf1633823f7b749854dd860136a1 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 16 Feb 2022 18:15:32 +0000 Subject: [PATCH 05/20] Fix bugs in `copy_table` GCS implementation --- pipelinewise/fastsync/commons/target_bigquery.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index 5ad1c8f40..74f7c8239 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -102,7 +102,7 @@ def upload_to_gcs(self, file: str) -> storage.Blob: return blob - def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> None: + def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> storage.Blob: """Copy load file to archive folder with metadata added""" table_dict = utils.tablename_to_dict(table) archive_table = table_dict.get('table_name') @@ -116,12 +116,12 @@ def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> None: 'archive_load_files_gcs_prefix', 'archive' ) - source_bucket = self.gcs.get_bucket(self.connection_config.get('gcs_bucket')) + source_bucket = blob.bucket # Get archive GCS bucket from config, defaulting to same bucket used for Snowflake imports if not specified archive_bucket = self.gcs.get_bucket( self.connection_config.get( - 'archive_load_files_gcs_bucket', source_bucket + 'archive_load_files_gcs_bucket', source_bucket.name ) ) @@ -133,7 +133,7 @@ def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> None: # Combine existing metadata with archive related headers archive_blob = archive_bucket.get_blob(archive_blob_name) - new_metadata = blob.metadata or {} + new_metadata = {} if blob.metadata is None else blob.metadata new_metadata.update( { 'tap': tap_id, @@ -143,6 +143,7 @@ def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> None: } ) archive_blob.metadata = new_metadata + return archive_blob def create_schema(self, schema_name): temp_schema = self.connection_config.get('temp_schema', schema_name) @@ -248,7 +249,7 @@ def copy_to_table( job_config.allow_quoted_newlines = allow_quoted_newlines job_config.skip_leading_rows = 1 if skip_csv_header else 0 - uris = [f'gs://{blob.bucket}/{blob.name}' for blob in blobs] + uris = [f'gs://{blob.bucket.name}/{blob.name}' for blob in blobs] job = client.load_table_from_uri( source_uris=uris, destination=table_ref, job_config=job_config ) From 8634f91c265e0c8248c6976e420d6b09812cdc71 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 16 Feb 2022 18:17:54 +0000 Subject: [PATCH 06/20] Update `TestFastSyncTargetBigquery::test_copy_to_table` --- .../commons/test_fastsync_target_bigquery.py | 85 ++++++++++--------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py index d82bbe2c9..bc355e6ad 100644 --- a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py +++ b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py @@ -177,57 +177,61 @@ def test_copy_to_table( ): """Validate if COPY command generated correctly""" # COPY table with standard table and column names - client().load_table_from_file.return_value = bigquery_job + client().load_table_from_uri.return_value = bigquery_job load_job_config.return_value = bigquery_job_config - mocked_open = mock_open() - with patch('pipelinewise.fastsync.commons.target_bigquery.open', mocked_open): - self.bigquery.copy_to_table( - filepath='/path/to/dummy-file.csv.gz', - target_schema='test_schema', - table_name='test_table', - size_bytes=1000, - is_temporary=False, - skip_csv_header=False, - ) - mocked_open.assert_called_with('/path/to/dummy-file.csv.gz', 'rb') + + mock_bucket = Mock() + mock_bucket.name = 'some-bucket' + mock_blob = Mock() + mock_blob.bucket = mock_bucket + mock_blob.name = '/path/to/dummy-file.csv.gz' + + self.bigquery.copy_to_table( + blobs=[mock_blob], + target_schema='test_schema', + table_name='test_table', + size_bytes=1000, + is_temporary=False, + skip_csv_header=False, + ) assert bigquery_job_config.source_format == bigquery.SourceFormat.CSV assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' assert bigquery_job_config.allow_quoted_newlines is True assert bigquery_job_config.skip_leading_rows == 0 client().dataset.assert_called_with('test_schema') client().dataset().table.assert_called_with('test_table') - assert client().load_table_from_file.call_count == 1 + client().load_table_from_uri.assert_called_once() + client().load_table_from_uri.reset_mock() # COPY table with reserved word in table and column names in temp table - with patch('pipelinewise.fastsync.commons.target_bigquery.open', mocked_open): - self.bigquery.copy_to_table( - filepath='/path/to/full-file.csv.gz', - target_schema='test_schema', - table_name='full', - size_bytes=1000, - is_temporary=True, - skip_csv_header=False, - ) - mocked_open.assert_called_with('/path/to/full-file.csv.gz', 'rb') + self.bigquery.copy_to_table( + blobs=[mock_blob], + target_schema='test_schema', + table_name='full', + size_bytes=1000, + is_temporary=True, + skip_csv_header=True, + write_truncate=False, + allow_quoted_newlines=False, + ) assert bigquery_job_config.source_format == bigquery.SourceFormat.CSV - assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' - assert bigquery_job_config.allow_quoted_newlines is True - assert bigquery_job_config.skip_leading_rows == 0 + assert bigquery_job_config.write_disposition == 'WRITE_APPEND' + assert bigquery_job_config.allow_quoted_newlines is False + assert bigquery_job_config.skip_leading_rows == 1 client().dataset.assert_called_with('test_schema') client().dataset().table.assert_called_with('full_temp') - assert client().load_table_from_file.call_count == 2 - - # COPY table with space and uppercase in table name and s3 key - with patch('pipelinewise.fastsync.commons.target_bigquery.open', mocked_open): - self.bigquery.copy_to_table( - filepath='/path/to/file with space.csv.gz', - target_schema='test_schema', - table_name='table with SPACE and UPPERCASE', - size_bytes=1000, - is_temporary=True, - skip_csv_header=False, - ) - mocked_open.assert_called_with('/path/to/file with space.csv.gz', 'rb') + client().load_table_from_uri.assert_called_once() + client().load_table_from_uri.reset_mock() + + # COPY table with space and uppercase in table name + self.bigquery.copy_to_table( + blobs=[mock_blob], + target_schema='test_schema', + table_name='table with SPACE and UPPERCASE', + size_bytes=1000, + is_temporary=True, + skip_csv_header=False, + ) assert bigquery_job_config.source_format == bigquery.SourceFormat.CSV assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' assert bigquery_job_config.allow_quoted_newlines is True @@ -236,7 +240,8 @@ def test_copy_to_table( client().dataset().table.assert_called_with( 'table_with_space_and_uppercase_temp' ) - assert client().load_table_from_file.call_count == 3 + client().load_table_from_uri.assert_called_once() + client().load_table_from_uri.reset_mock() @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') def test_grant_select_on_table(self, client, bigquery_job): From a02f65d33ccabb31213e487d7f4c0c9f3ddeee83 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 16 Feb 2022 18:18:15 +0000 Subject: [PATCH 07/20] Add additional BigQuery FastSync unittests --- .../commons/test_fastsync_target_bigquery.py | 316 +++++++++++++++++- 1 file changed, 315 insertions(+), 1 deletion(-) diff --git a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py index bc355e6ad..b978fae11 100644 --- a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py +++ b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py @@ -1,5 +1,5 @@ import pytest -from unittest.mock import Mock, patch, ANY, mock_open +from unittest.mock import Mock, call, patch, ANY from google.cloud import bigquery from pipelinewise.fastsync.commons.target_bigquery import FastSyncTargetBigquery @@ -345,3 +345,317 @@ def test_swap_tables( client().delete_table.assert_called_with( 'dummy-project.test_schema.table_with_space_and_uppercase_temp' ) + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_obfuscate_columns_case1(self, client): + """ + Test obfuscation where given transformations are emtpy + Test should pass with no executed queries + """ + target_schema = 'my_schema' + table_name = 'public.my_table' + + self.bigquery.transformation_config = {} + + self.bigquery.obfuscate_columns(target_schema, table_name) + client().query.assert_not_called() + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_obfuscate_columns_case2(self, client): + """ + Test obfuscation where given transformations has an unsupported transformation type + Test should fail + """ + target_schema = 'my_schema' + table_name = 'public.my_table' + + self.bigquery.transformation_config = { + 'transformations': [ + { + 'field_id': 'col_7', + 'tap_stream_name': 'public-my_table', + 'type': 'RANDOM', + } + ] + } + + with pytest.raises(ValueError): + self.bigquery.obfuscate_columns(target_schema, table_name) + + client().query.assert_not_called() + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_obfuscate_columns_case3(self, client): + """ + Test obfuscation where given transformations have no conditions + Test should pass + """ + target_schema = 'my_schema' + table_name = 'public.my_table' + + self.bigquery.transformation_config = { + 'transformations': [ + { + 'field_id': 'col_1', + 'tap_stream_name': 'public-my_table', + 'type': 'SET-NULL', + }, + { + 'field_id': 'col_2', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-HIDDEN', + }, + { + 'field_id': 'col_3', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-DATE', + }, + { + 'field_id': 'col_4', + 'tap_stream_name': 'public-my_table', + 'safe_field_id': '"COL_4"', + 'type': 'MASK-NUMBER', + }, + { + 'field_id': 'col_5', + 'tap_stream_name': 'public-my_table', + 'type': 'HASH', + }, + { + 'field_id': 'col_6', + 'tap_stream_name': 'public-my_table', + 'type': 'HASH-SKIP-FIRST-5', + }, + { + 'field_id': 'col_7', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-STRING-SKIP-ENDS-3', + }, + ] + } + + self.bigquery.obfuscate_columns(target_schema, table_name) + + client().query.assert_called_with( + "UPDATE `my_schema`.`my_table_temp` SET `col_1` = NULL, `col_2` = 'hidden', " + '`col_3` = TIMESTAMP(DATETIME(DATE(EXTRACT(YEAR FROM `col_3`), 1, ' + '1),TIME(`col_3`))), `col_4` = 0, `col_5` = TO_BASE64(SHA256(`col_5`)), ' + '`col_6` = CONCAT(SUBSTRING(`col_6`, 1, 5), ' + 'TO_BASE64(SHA256(SUBSTRING(`col_6`, 5 + 1)))), `col_7` = CASE WHEN ' + "LENGTH(`col_7`) > 2 * 3 THEN CONCAT(SUBSTRING(`col_7`, 1, 3), REPEAT('*', " + 'LENGTH(`col_7`)-(2 * 3)), SUBSTRING(`col_7`, LENGTH(`col_7`)-3+1, 3)) ELSE ' + "REPEAT('*', LENGTH(`col_7`)) END WHERE true;", + job_config=ANY, + ) + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_obfuscate_columns_case4(self, client): + """ + Test obfuscation where given transformations have conditions + Test should pass + """ + target_schema = 'my_schema' + table_name = 'public.my_table' + + self.bigquery.transformation_config = { + 'transformations': [ + { + 'field_id': 'col_1', + 'tap_stream_name': 'public-my_table', + 'type': 'SET-NULL', + }, + { + 'field_id': 'col_2', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-HIDDEN', + 'when': [ + {'column': 'col_4', 'safe_column': '"COL_4"', 'equals': None}, + { + 'column': 'col_1', + }, + ], + }, + { + 'field_id': 'col_3', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-DATE', + 'when': [{'column': 'col_5', 'equals': 'some_value'}], + }, + { + 'field_id': 'col_4', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-NUMBER', + }, + { + 'field_id': 'col_5', + 'tap_stream_name': 'public-my_table', + 'type': 'HASH', + }, + { + 'field_id': 'col_6', + 'tap_stream_name': 'public-my_table', + 'type': 'HASH-SKIP-FIRST-5', + 'when': [ + {'column': 'col_1', 'equals': 30}, + {'column': 'col_2', 'regex_match': r'[0-9]{3}\.[0-9]{3}'}, + ], + }, + { + 'field_id': 'col_7', + 'tap_stream_name': 'public-my_table', + 'type': 'MASK-STRING-SKIP-ENDS-3', + 'when': [ + {'column': 'col_1', 'equals': 30}, + {'column': 'col_2', 'regex_match': r'[0-9]{3}\.[0-9]{3}'}, + {'column': 'col_4', 'equals': None}, + ], + }, + ] + } + + self.bigquery.obfuscate_columns(target_schema, table_name) + + expected_sql = [ + "UPDATE `my_schema`.`my_table_temp` SET `col_2` = 'hidden' WHERE (`col_4` IS NULL);", + "UPDATE `my_schema`.`my_table_temp` SET `col_3` = TIMESTAMP(DATETIME(DATE(EXTRACT(YEAR FROM `col_3`), 1, 1),TIME(`col_3`))) WHERE (`col_5` = 'some_value');", + "UPDATE `my_schema`.`my_table_temp` SET `col_6` = CONCAT(SUBSTRING(`col_6`, 1, 5), TO_BASE64(SHA256(SUBSTRING(`col_6`, 5 + 1)))) WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}');", + "UPDATE `my_schema`.`my_table_temp` SET `col_7` = CASE WHEN LENGTH(`col_7`) > 2 * 3 THEN CONCAT(SUBSTRING(`col_7`, 1, 3), REPEAT('*', LENGTH(`col_7`)-(2 * 3)), SUBSTRING(`col_7`, LENGTH(`col_7`)-3+1, 3)) ELSE REPEAT('*', LENGTH(`col_7`)) END WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}') AND (`col_4` IS NULL);", + "UPDATE `my_schema`.`my_table_temp` SET `col_1` = NULL, `col_4` = 0, `col_5` = TO_BASE64(SHA256(`col_5`)) WHERE true;", + ] + + expected_calls = [] + for sql in expected_sql: + expected_calls.append(call(sql, job_config=ANY)) + expected_calls.append(call().result()) + + client().query.assert_has_calls(expected_calls) + + # pylint: disable=invalid-name + def test_default_archive_destination(self): + """ + Validate parameters passed to gcs copy_blob method when custom GCS bucket and folder are not defined + """ + mock_source_bucket = Mock() + mock_source_bucket.name = 'some_bucket' + + mock_buckets = {mock_source_bucket.name: mock_source_bucket} + + mock_blob = Mock() + mock_blob.bucket = mock_source_bucket + mock_blob.name = 'some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz' + + mock_gcs_client = Mock() + mock_gcs_client.get_bucket.side_effect = lambda x: mock_buckets.get(x) + + self.bigquery.gcs = mock_gcs_client + + self.bigquery.connection_config['gcs_bucket'] = mock_source_bucket.name + + archive_blob = self.bigquery.copy_to_archive( + mock_blob, + 'some-tap', + 'some_schema.some_table', + ) + + mock_source_bucket.copy_blob.assert_called_with( + mock_blob, + mock_source_bucket, + new_name='archive/some-tap/some_table/ppw_20210615115603_fastsync.csv.gz', + ) + archive_blob.metadata.update.assert_called_with( + { + 'tap': 'some-tap', + 'schema': 'some_schema', + 'table': 'some_table', + 'archived-by': 'pipelinewise_fastsync', + } + ) + + # pylint: disable=invalid-name + def test_custom_archive_destination(self): + """ + Validate parameters passed to s3 copy_object method when using custom s3 bucket and folder + """ + mock_source_bucket = Mock() + mock_source_bucket.name = 'some_bucket' + mock_archive_bucket = Mock() + mock_archive_bucket.name = 'archive_bucket' + + mock_buckets = { + mock_source_bucket.name: mock_source_bucket, + mock_archive_bucket.name: mock_archive_bucket, + } + + mock_blob = Mock() + mock_blob.bucket = mock_source_bucket + mock_blob.name = 'some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz' + + mock_gcs_client = Mock() + mock_gcs_client.get_bucket.side_effect = lambda x: mock_buckets.get(x) + + self.bigquery.gcs = mock_gcs_client + + self.bigquery.connection_config['gcs_bucket'] = 'some_bucket' + self.bigquery.connection_config['archive_load_files_gcs_bucket'] = 'archive_bucket' + self.bigquery.connection_config['archive_load_files_gcs_prefix'] = 'archive_folder' + + archive_blob = self.bigquery.copy_to_archive( + mock_blob, + 'some-tap', + 'some_schema.some_table', + ) + + mock_source_bucket.copy_blob.assert_called_with( + mock_blob, + mock_archive_bucket, + new_name='archive_folder/some-tap/some_table/ppw_20210615115603_fastsync.csv.gz', + ) + archive_blob.metadata.update.assert_called_with( + { + 'tap': 'some-tap', + 'schema': 'some_schema', + 'table': 'some_table', + 'archived-by': 'pipelinewise_fastsync', + } + ) + + # pylint: disable=invalid-name + def test_copied_archive_metadata(self): + """ + Validate parameters passed to s3 copy_object method when custom s3 bucket and folder are not defined + """ + mock_source_bucket = Mock() + mock_source_bucket.name = 'some_bucket' + + mock_buckets = {mock_source_bucket.name: mock_source_bucket} + + mock_blob = Mock() + mock_blob.bucket = mock_source_bucket + mock_blob.name = 'some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz' + mock_blob.metadata = {'copied-old-key': 'copied-old-value'} + + mock_gcs_client = Mock() + mock_gcs_client.get_bucket.side_effect = lambda x: mock_buckets.get(x) + + self.bigquery.gcs = mock_gcs_client + + self.bigquery.connection_config['s3_bucket'] = 'some_bucket' + + archive_blob = self.bigquery.copy_to_archive( + mock_blob, + 'some-tap', + 'some_schema.some_table', + ) + + mock_source_bucket.copy_blob.assert_called_with( + mock_blob, + mock_source_bucket, + new_name='archive/some-tap/some_table/ppw_20210615115603_fastsync.csv.gz', + ) + assert archive_blob.metadata == { + 'copied-old-key': 'copied-old-value', + 'tap': 'some-tap', + 'schema': 'some_schema', + 'table': 'some_table', + 'archived-by': 'pipelinewise_fastsync', + } From f07bb121c4eb152c0aaee54466742ffdd19cd557 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 17 Feb 2022 11:39:32 +0000 Subject: [PATCH 08/20] Use fixtures in Bigquery FastSync unittests --- .../commons/test_fastsync_target_bigquery.py | 316 ++++++++++-------- 1 file changed, 169 insertions(+), 147 deletions(-) diff --git a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py index b978fae11..ad46f91ac 100644 --- a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py +++ b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py @@ -4,21 +4,10 @@ from pipelinewise.fastsync.commons.target_bigquery import FastSyncTargetBigquery -@pytest.fixture(name='query_result') -def fixture_query_result(): - """ - Mocked Bigquery Run Query Results - """ - mocked_qr = Mock() - mocked_qr.return_value = [] - return mocked_qr - - -# pylint: disable=W0613 @pytest.fixture(name='bigquery_job') -def fixture_bigquery_job(query_result): +def fixture_bigquery_job(): """ - Mocked Bigquery Job Query + Mocked BigQuery Job Query """ mocked_qj = Mock() mocked_qj.output_rows = 0 @@ -36,87 +25,118 @@ def fixture_bigquery_job_config(): return mocked_qc -class FastSyncTargetBigqueryMock(FastSyncTargetBigquery): +@pytest.fixture(name='copy_job_config') +def fixture_copy_job_config(bigquery_job_config): + """ + Mocked BigQuery CopyJobConfig + """ + with patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.CopyJobConfig') as mock: + mock.return_value = bigquery_job_config + yield mock + + +@pytest.fixture(name='load_job_config') +def fixture_load_job_config(bigquery_job_config): + """ + Mocked BigQuery LoadJobConfig + """ + with patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.LoadJobConfig') as mock: + mock.return_value = bigquery_job_config + yield mock + + +@pytest.fixture(name='bigquery_client') +def fixture_bigquery_client(): + """ + Mocked BigQuery Client + """ + with patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') as mock: + yield mock + + +@pytest.fixture(name='gcs_client') +def fixture_gcs_client(): """ - Mocked FastSyncTargetBigquery class + Mocked GCS Client """ + with patch('pipelinewise.fastsync.commons.target_bigquery.storage.Client') as mock: + yield mock - def __init__(self, connection_config, transformation_config=None): - super().__init__(connection_config, transformation_config) - self.executed_queries = [] +# pylint: disable=unused-argument +@pytest.fixture(name='target_bigquery') +def fixture_target_bigquery(gcs_client): + """ + Mocked FastSyncTargetBigqueryMock + """ + yield FastSyncTargetBigquery( + connection_config={'project_id': 'dummy-project', 'location': 'EU'}, + transformation_config={}, + ) -# pylint: disable=attribute-defined-outside-init class TestFastSyncTargetBigquery: """ Unit tests for fastsync target bigquery """ - def setup_method(self): - """Initialise test FastSyncTargetPostgres object""" - self.bigquery = FastSyncTargetBigqueryMock( - connection_config={'project_id': 'dummy-project', 'location': 'EU'}, - transformation_config={}, - ) - - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_open_connection(self, client): + @staticmethod + def test_open_connection(target_bigquery, bigquery_client): """Validate if create schema queries generated correctly""" - self.bigquery.open_connection() - client.assert_called_with(project='dummy-project', location='EU') + target_bigquery.open_connection() + bigquery_client.assert_called_with(project='dummy-project', location='EU') - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_create_schema(self, client): + @staticmethod + def test_create_schema(target_bigquery, bigquery_client): """Validate if create schema queries generated correctly""" - self.bigquery.create_schema('new_schema') - client().create_dataset.assert_called_with('new_schema', exists_ok=True) + target_bigquery.create_schema('new_schema') + bigquery_client().create_dataset.assert_called_with('new_schema', exists_ok=True) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_drop_table(self, client): + @staticmethod + def test_drop_table(target_bigquery, bigquery_client): """Validate if drop table queries generated correctly""" - self.bigquery.drop_table('test_schema', 'test_table') - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'test_table') + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`test_table`', job_config=ANY ) - self.bigquery.drop_table('test_schema', 'test_table', is_temporary=True) - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'test_table', is_temporary=True) + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`test_table_temp`', job_config=ANY ) - self.bigquery.drop_table('test_schema', 'UPPERCASE_TABLE') - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'UPPERCASE_TABLE') + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`uppercase_table`', job_config=ANY ) - self.bigquery.drop_table('test_schema', 'UPPERCASE_TABLE', is_temporary=True) - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'UPPERCASE_TABLE', is_temporary=True) + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`uppercase_table_temp`', job_config=ANY ) - self.bigquery.drop_table('test_schema', 'test_table_with_space') - client().query.assert_called_with( + target_bigquery.drop_table('test_schema', 'test_table_with_space') + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`test_table_with_space`', job_config=ANY ) - self.bigquery.drop_table( + target_bigquery.drop_table( 'test_schema', 'test table with space', is_temporary=True ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'DROP TABLE IF EXISTS test_schema.`test_table_with_space_temp`', job_config=ANY, ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_create_table(self, client): + @staticmethod + def test_create_table(target_bigquery, bigquery_client): """Validate if create table queries generated correctly""" # Create table with standard table and column names - self.bigquery.create_table( + target_bigquery.create_table( target_schema='test_schema', table_name='test_table', columns=['`id` INTEGER', '`txt` STRING'], ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'CREATE OR REPLACE TABLE test_schema.`test_table` (' '`id` integer,`txt` string,' '_sdc_extracted_at timestamp,' @@ -126,12 +146,12 @@ def test_create_table(self, client): ) # Create table with reserved words in table and column names - self.bigquery.create_table( + target_bigquery.create_table( target_schema='test_schema', table_name='order', columns=['`id` INTEGER', '`txt` STRING', '`select` STRING'], ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'CREATE OR REPLACE TABLE test_schema.`order` (' '`id` integer,`txt` string,`select` string,' '_sdc_extracted_at timestamp,' @@ -141,12 +161,12 @@ def test_create_table(self, client): ) # Create table with mixed lower and uppercase and space characters - self.bigquery.create_table( + target_bigquery.create_table( target_schema='test_schema', table_name='TABLE with SPACE', columns=['`ID` INTEGER', '`COLUMN WITH SPACE` STRING'], ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'CREATE OR REPLACE TABLE test_schema.`table_with_space` (' '`id` integer,`column with space` string,' '_sdc_extracted_at timestamp,' @@ -156,12 +176,12 @@ def test_create_table(self, client): ) # Create table with no primary key - self.bigquery.create_table( + target_bigquery.create_table( target_schema='test_schema', table_name='test_table_no_pk', columns=['`ID` INTEGER', '`TXT` STRING'], ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'CREATE OR REPLACE TABLE test_schema.`test_table_no_pk` (' '`id` integer,`txt` string,' '_sdc_extracted_at timestamp,' @@ -170,15 +190,14 @@ def test_create_table(self, client): job_config=ANY, ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.LoadJobConfig') - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + @staticmethod def test_copy_to_table( - self, client, load_job_config, bigquery_job_config, bigquery_job + target_bigquery, bigquery_client, load_job_config, bigquery_job ): """Validate if COPY command generated correctly""" # COPY table with standard table and column names - client().load_table_from_uri.return_value = bigquery_job - load_job_config.return_value = bigquery_job_config + bigquery_client().load_table_from_uri.return_value = bigquery_job + bigquery_job_config = load_job_config.return_value mock_bucket = Mock() mock_bucket.name = 'some-bucket' @@ -186,7 +205,7 @@ def test_copy_to_table( mock_blob.bucket = mock_bucket mock_blob.name = '/path/to/dummy-file.csv.gz' - self.bigquery.copy_to_table( + target_bigquery.copy_to_table( blobs=[mock_blob], target_schema='test_schema', table_name='test_table', @@ -198,13 +217,13 @@ def test_copy_to_table( assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' assert bigquery_job_config.allow_quoted_newlines is True assert bigquery_job_config.skip_leading_rows == 0 - client().dataset.assert_called_with('test_schema') - client().dataset().table.assert_called_with('test_table') - client().load_table_from_uri.assert_called_once() - client().load_table_from_uri.reset_mock() + bigquery_client().dataset.assert_called_with('test_schema') + bigquery_client().dataset().table.assert_called_with('test_table') + bigquery_client().load_table_from_uri.assert_called_once() + bigquery_client().load_table_from_uri.reset_mock() # COPY table with reserved word in table and column names in temp table - self.bigquery.copy_to_table( + target_bigquery.copy_to_table( blobs=[mock_blob], target_schema='test_schema', table_name='full', @@ -218,13 +237,13 @@ def test_copy_to_table( assert bigquery_job_config.write_disposition == 'WRITE_APPEND' assert bigquery_job_config.allow_quoted_newlines is False assert bigquery_job_config.skip_leading_rows == 1 - client().dataset.assert_called_with('test_schema') - client().dataset().table.assert_called_with('full_temp') - client().load_table_from_uri.assert_called_once() - client().load_table_from_uri.reset_mock() + bigquery_client().dataset.assert_called_with('test_schema') + bigquery_client().dataset().table.assert_called_with('full_temp') + bigquery_client().load_table_from_uri.assert_called_once() + bigquery_client().load_table_from_uri.reset_mock() # COPY table with space and uppercase in table name - self.bigquery.copy_to_table( + target_bigquery.copy_to_table( blobs=[mock_blob], target_schema='test_schema', table_name='table with SPACE and UPPERCASE', @@ -236,118 +255,117 @@ def test_copy_to_table( assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' assert bigquery_job_config.allow_quoted_newlines is True assert bigquery_job_config.skip_leading_rows == 0 - client().dataset.assert_called_with('test_schema') - client().dataset().table.assert_called_with( + bigquery_client().dataset.assert_called_with('test_schema') + bigquery_client().dataset().table.assert_called_with( 'table_with_space_and_uppercase_temp' ) - client().load_table_from_uri.assert_called_once() - client().load_table_from_uri.reset_mock() + bigquery_client().load_table_from_uri.assert_called_once() + bigquery_client().load_table_from_uri.reset_mock() - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_grant_select_on_table(self, client, bigquery_job): + @staticmethod + def test_grant_select_on_table(target_bigquery, bigquery_client, bigquery_job): """Validate if GRANT command generated correctly""" # GRANT table with standard table and column names - client().query.return_value = bigquery_job - self.bigquery.grant_select_on_table( + bigquery_client().query.return_value = bigquery_job + target_bigquery.grant_select_on_table( target_schema='test_schema', table_name='test_table', role='test_role', is_temporary=False, ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT SELECT ON test_schema.`test_table` TO ROLE test_role', job_config=ANY ) # GRANT table with reserved word in table and column names in temp table - self.bigquery.grant_select_on_table( + target_bigquery.grant_select_on_table( target_schema='test_schema', table_name='full', role='test_role', is_temporary=False, ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT SELECT ON test_schema.`full` TO ROLE test_role', job_config=ANY ) # GRANT table with with space and uppercase in table name and s3 key - self.bigquery.grant_select_on_table( + target_bigquery.grant_select_on_table( target_schema='test_schema', table_name='table with SPACE and UPPERCASE', role='test_role', is_temporary=False, ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT SELECT ON test_schema.`table_with_space_and_uppercase` TO ROLE test_role', job_config=ANY, ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_grant_usage_on_schema(self, client): + @staticmethod + def test_grant_usage_on_schema(target_bigquery, bigquery_client): """Validate if GRANT command generated correctly""" - self.bigquery.grant_usage_on_schema( + target_bigquery.grant_usage_on_schema( target_schema='test_schema', role='test_role' ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT USAGE ON SCHEMA test_schema TO ROLE test_role', job_config=ANY ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_grant_select_on_schema(self, client): + @staticmethod + def test_grant_select_on_schema(target_bigquery, bigquery_client): """Validate if GRANT command generated correctly""" - self.bigquery.grant_select_on_schema( + target_bigquery.grant_select_on_schema( target_schema='test_schema', role='test_role' ) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( 'GRANT SELECT ON ALL TABLES IN SCHEMA test_schema TO ROLE test_role', job_config=ANY, ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.CopyJobConfig') - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + @staticmethod def test_swap_tables( - self, client, copy_job_config, bigquery_job_config, bigquery_job + target_bigquery, bigquery_client, copy_job_config, bigquery_job ): """Validate if swap table commands generated correctly""" # Swap tables with standard table and column names - client().copy_table.return_value = bigquery_job - copy_job_config.return_value = bigquery_job_config - self.bigquery.swap_tables(schema='test_schema', table_name='test_table') + bigquery_client().copy_table.return_value = bigquery_job + bigquery_job_config = copy_job_config.return_value + target_bigquery.swap_tables(schema='test_schema', table_name='test_table') assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' - client().copy_table.assert_called_with( + bigquery_client().copy_table.assert_called_with( 'dummy-project.test_schema.test_table_temp', 'dummy-project.test_schema.test_table', job_config=ANY, ) - client().delete_table.assert_called_with( + bigquery_client().delete_table.assert_called_with( 'dummy-project.test_schema.test_table_temp' ) # Swap tables with reserved word in table and column names in temp table - self.bigquery.swap_tables(schema='test_schema', table_name='full') + target_bigquery.swap_tables(schema='test_schema', table_name='full') assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' - client().copy_table.assert_called_with( + bigquery_client().copy_table.assert_called_with( 'dummy-project.test_schema.full_temp', 'dummy-project.test_schema.full', job_config=ANY, ) - client().delete_table.assert_called_with('dummy-project.test_schema.full_temp') + bigquery_client().delete_table.assert_called_with('dummy-project.test_schema.full_temp') # Swap tables with with space and uppercase in table name and s3 key - self.bigquery.swap_tables( + target_bigquery.swap_tables( schema='test_schema', table_name='table with SPACE and UPPERCASE' ) assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' - client().copy_table.assert_called_with( + bigquery_client().copy_table.assert_called_with( 'dummy-project.test_schema.table_with_space_and_uppercase_temp', 'dummy-project.test_schema.table_with_space_and_uppercase', job_config=ANY, ) - client().delete_table.assert_called_with( + bigquery_client().delete_table.assert_called_with( 'dummy-project.test_schema.table_with_space_and_uppercase_temp' ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_obfuscate_columns_case1(self, client): + @staticmethod + def test_obfuscate_columns_case1(target_bigquery, bigquery_client): """ Test obfuscation where given transformations are emtpy Test should pass with no executed queries @@ -355,13 +373,13 @@ def test_obfuscate_columns_case1(self, client): target_schema = 'my_schema' table_name = 'public.my_table' - self.bigquery.transformation_config = {} + target_bigquery.transformation_config = {} - self.bigquery.obfuscate_columns(target_schema, table_name) - client().query.assert_not_called() + target_bigquery.obfuscate_columns(target_schema, table_name) + bigquery_client().query.assert_not_called() - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_obfuscate_columns_case2(self, client): + @staticmethod + def test_obfuscate_columns_case2(target_bigquery, bigquery_client): """ Test obfuscation where given transformations has an unsupported transformation type Test should fail @@ -369,7 +387,7 @@ def test_obfuscate_columns_case2(self, client): target_schema = 'my_schema' table_name = 'public.my_table' - self.bigquery.transformation_config = { + target_bigquery.transformation_config = { 'transformations': [ { 'field_id': 'col_7', @@ -380,12 +398,12 @@ def test_obfuscate_columns_case2(self, client): } with pytest.raises(ValueError): - self.bigquery.obfuscate_columns(target_schema, table_name) + target_bigquery.obfuscate_columns(target_schema, table_name) - client().query.assert_not_called() + bigquery_client().query.assert_not_called() - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_obfuscate_columns_case3(self, client): + @staticmethod + def test_obfuscate_columns_case3(target_bigquery, bigquery_client): """ Test obfuscation where given transformations have no conditions Test should pass @@ -393,7 +411,7 @@ def test_obfuscate_columns_case3(self, client): target_schema = 'my_schema' table_name = 'public.my_table' - self.bigquery.transformation_config = { + target_bigquery.transformation_config = { 'transformations': [ { 'field_id': 'col_1', @@ -434,9 +452,9 @@ def test_obfuscate_columns_case3(self, client): ] } - self.bigquery.obfuscate_columns(target_schema, table_name) + target_bigquery.obfuscate_columns(target_schema, table_name) - client().query.assert_called_with( + bigquery_client().query.assert_called_with( "UPDATE `my_schema`.`my_table_temp` SET `col_1` = NULL, `col_2` = 'hidden', " '`col_3` = TIMESTAMP(DATETIME(DATE(EXTRACT(YEAR FROM `col_3`), 1, ' '1),TIME(`col_3`))), `col_4` = 0, `col_5` = TO_BASE64(SHA256(`col_5`)), ' @@ -448,8 +466,8 @@ def test_obfuscate_columns_case3(self, client): job_config=ANY, ) - @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') - def test_obfuscate_columns_case4(self, client): + @staticmethod + def test_obfuscate_columns_case4(target_bigquery, bigquery_client): """ Test obfuscation where given transformations have conditions Test should pass @@ -457,7 +475,7 @@ def test_obfuscate_columns_case4(self, client): target_schema = 'my_schema' table_name = 'public.my_table' - self.bigquery.transformation_config = { + target_bigquery.transformation_config = { 'transformations': [ { 'field_id': 'col_1', @@ -513,14 +531,15 @@ def test_obfuscate_columns_case4(self, client): ] } - self.bigquery.obfuscate_columns(target_schema, table_name) + target_bigquery.obfuscate_columns(target_schema, table_name) + # pylint: disable=line-too-long expected_sql = [ "UPDATE `my_schema`.`my_table_temp` SET `col_2` = 'hidden' WHERE (`col_4` IS NULL);", "UPDATE `my_schema`.`my_table_temp` SET `col_3` = TIMESTAMP(DATETIME(DATE(EXTRACT(YEAR FROM `col_3`), 1, 1),TIME(`col_3`))) WHERE (`col_5` = 'some_value');", "UPDATE `my_schema`.`my_table_temp` SET `col_6` = CONCAT(SUBSTRING(`col_6`, 1, 5), TO_BASE64(SHA256(SUBSTRING(`col_6`, 5 + 1)))) WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}');", "UPDATE `my_schema`.`my_table_temp` SET `col_7` = CASE WHEN LENGTH(`col_7`) > 2 * 3 THEN CONCAT(SUBSTRING(`col_7`, 1, 3), REPEAT('*', LENGTH(`col_7`)-(2 * 3)), SUBSTRING(`col_7`, LENGTH(`col_7`)-3+1, 3)) ELSE REPEAT('*', LENGTH(`col_7`)) END WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}') AND (`col_4` IS NULL);", - "UPDATE `my_schema`.`my_table_temp` SET `col_1` = NULL, `col_4` = 0, `col_5` = TO_BASE64(SHA256(`col_5`)) WHERE true;", + 'UPDATE `my_schema`.`my_table_temp` SET `col_1` = NULL, `col_4` = 0, `col_5` = TO_BASE64(SHA256(`col_5`)) WHERE true;', ] expected_calls = [] @@ -528,10 +547,11 @@ def test_obfuscate_columns_case4(self, client): expected_calls.append(call(sql, job_config=ANY)) expected_calls.append(call().result()) - client().query.assert_has_calls(expected_calls) + bigquery_client().query.assert_has_calls(expected_calls) # pylint: disable=invalid-name - def test_default_archive_destination(self): + @staticmethod + def test_default_archive_destination(target_bigquery): """ Validate parameters passed to gcs copy_blob method when custom GCS bucket and folder are not defined """ @@ -545,13 +565,13 @@ def test_default_archive_destination(self): mock_blob.name = 'some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz' mock_gcs_client = Mock() - mock_gcs_client.get_bucket.side_effect = lambda x: mock_buckets.get(x) + mock_gcs_client.get_bucket.side_effect = mock_buckets.get - self.bigquery.gcs = mock_gcs_client + target_bigquery.gcs = mock_gcs_client - self.bigquery.connection_config['gcs_bucket'] = mock_source_bucket.name + target_bigquery.connection_config['gcs_bucket'] = mock_source_bucket.name - archive_blob = self.bigquery.copy_to_archive( + archive_blob = target_bigquery.copy_to_archive( mock_blob, 'some-tap', 'some_schema.some_table', @@ -572,7 +592,8 @@ def test_default_archive_destination(self): ) # pylint: disable=invalid-name - def test_custom_archive_destination(self): + @staticmethod + def test_custom_archive_destination(target_bigquery): """ Validate parameters passed to s3 copy_object method when using custom s3 bucket and folder """ @@ -591,15 +612,15 @@ def test_custom_archive_destination(self): mock_blob.name = 'some_bucket/snowflake-import/ppw_20210615115603_fastsync.csv.gz' mock_gcs_client = Mock() - mock_gcs_client.get_bucket.side_effect = lambda x: mock_buckets.get(x) + mock_gcs_client.get_bucket.side_effect = mock_buckets.get - self.bigquery.gcs = mock_gcs_client + target_bigquery.gcs = mock_gcs_client - self.bigquery.connection_config['gcs_bucket'] = 'some_bucket' - self.bigquery.connection_config['archive_load_files_gcs_bucket'] = 'archive_bucket' - self.bigquery.connection_config['archive_load_files_gcs_prefix'] = 'archive_folder' + target_bigquery.connection_config['gcs_bucket'] = 'some_bucket' + target_bigquery.connection_config['archive_load_files_gcs_bucket'] = 'archive_bucket' + target_bigquery.connection_config['archive_load_files_gcs_prefix'] = 'archive_folder' - archive_blob = self.bigquery.copy_to_archive( + archive_blob = target_bigquery.copy_to_archive( mock_blob, 'some-tap', 'some_schema.some_table', @@ -620,7 +641,8 @@ def test_custom_archive_destination(self): ) # pylint: disable=invalid-name - def test_copied_archive_metadata(self): + @staticmethod + def test_copied_archive_metadata(target_bigquery): """ Validate parameters passed to s3 copy_object method when custom s3 bucket and folder are not defined """ @@ -635,13 +657,13 @@ def test_copied_archive_metadata(self): mock_blob.metadata = {'copied-old-key': 'copied-old-value'} mock_gcs_client = Mock() - mock_gcs_client.get_bucket.side_effect = lambda x: mock_buckets.get(x) + mock_gcs_client.get_bucket.side_effect =mock_buckets.get - self.bigquery.gcs = mock_gcs_client + target_bigquery.gcs = mock_gcs_client - self.bigquery.connection_config['s3_bucket'] = 'some_bucket' + target_bigquery.connection_config['s3_bucket'] = 'some_bucket' - archive_blob = self.bigquery.copy_to_archive( + archive_blob = target_bigquery.copy_to_archive( mock_blob, 'some-tap', 'some_schema.some_table', From 6b5209267fa232d4b42114d0eca7192000a2c58a Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 17 Feb 2022 12:36:57 +0000 Subject: [PATCH 09/20] Fix linting errors --- pipelinewise/fastsync/commons/target_bigquery.py | 2 +- .../fastsync/commons/test_fastsync_target_bigquery.py | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index 74f7c8239..dc2b4cc9a 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -125,7 +125,7 @@ def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> storag ) ) - archive_blob_name =f'{archive_prefix}/{tap_id}/{archive_table}/{archive_file_basename}' + archive_blob_name = f'{archive_prefix}/{tap_id}/{archive_table}/{archive_file_basename}' source_blob_name = f'{blob.bucket}/{blob.name}' LOGGER.info('Archiving %s to %s', source_blob_name, archive_blob_name) diff --git a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py index ad46f91ac..a23da57e2 100644 --- a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py +++ b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py @@ -62,6 +62,7 @@ def fixture_gcs_client(): with patch('pipelinewise.fastsync.commons.target_bigquery.storage.Client') as mock: yield mock + # pylint: disable=unused-argument @pytest.fixture(name='target_bigquery') def fixture_target_bigquery(gcs_client): @@ -536,10 +537,10 @@ def test_obfuscate_columns_case4(target_bigquery, bigquery_client): # pylint: disable=line-too-long expected_sql = [ "UPDATE `my_schema`.`my_table_temp` SET `col_2` = 'hidden' WHERE (`col_4` IS NULL);", - "UPDATE `my_schema`.`my_table_temp` SET `col_3` = TIMESTAMP(DATETIME(DATE(EXTRACT(YEAR FROM `col_3`), 1, 1),TIME(`col_3`))) WHERE (`col_5` = 'some_value');", - "UPDATE `my_schema`.`my_table_temp` SET `col_6` = CONCAT(SUBSTRING(`col_6`, 1, 5), TO_BASE64(SHA256(SUBSTRING(`col_6`, 5 + 1)))) WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}');", - "UPDATE `my_schema`.`my_table_temp` SET `col_7` = CASE WHEN LENGTH(`col_7`) > 2 * 3 THEN CONCAT(SUBSTRING(`col_7`, 1, 3), REPEAT('*', LENGTH(`col_7`)-(2 * 3)), SUBSTRING(`col_7`, LENGTH(`col_7`)-3+1, 3)) ELSE REPEAT('*', LENGTH(`col_7`)) END WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}') AND (`col_4` IS NULL);", - 'UPDATE `my_schema`.`my_table_temp` SET `col_1` = NULL, `col_4` = 0, `col_5` = TO_BASE64(SHA256(`col_5`)) WHERE true;', + "UPDATE `my_schema`.`my_table_temp` SET `col_3` = TIMESTAMP(DATETIME(DATE(EXTRACT(YEAR FROM `col_3`), 1, 1),TIME(`col_3`))) WHERE (`col_5` = 'some_value');", # noqa: E501 + "UPDATE `my_schema`.`my_table_temp` SET `col_6` = CONCAT(SUBSTRING(`col_6`, 1, 5), TO_BASE64(SHA256(SUBSTRING(`col_6`, 5 + 1)))) WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}');", # noqa: E501 + "UPDATE `my_schema`.`my_table_temp` SET `col_7` = CASE WHEN LENGTH(`col_7`) > 2 * 3 THEN CONCAT(SUBSTRING(`col_7`, 1, 3), REPEAT('*', LENGTH(`col_7`)-(2 * 3)), SUBSTRING(`col_7`, LENGTH(`col_7`)-3+1, 3)) ELSE REPEAT('*', LENGTH(`col_7`)) END WHERE (`col_1` = 30) AND REGEXP_CONTAINS(`col_2`, '[0-9]{3}\\.[0-9]{3}') AND (`col_4` IS NULL);", # noqa: E501 + 'UPDATE `my_schema`.`my_table_temp` SET `col_1` = NULL, `col_4` = 0, `col_5` = TO_BASE64(SHA256(`col_5`)) WHERE true;', # noqa: E501 ] expected_calls = [] @@ -657,7 +658,7 @@ def test_copied_archive_metadata(target_bigquery): mock_blob.metadata = {'copied-old-key': 'copied-old-value'} mock_gcs_client = Mock() - mock_gcs_client.get_bucket.side_effect =mock_buckets.get + mock_gcs_client.get_bucket.side_effect = mock_buckets.get target_bigquery.gcs = mock_gcs_client From 999d5e303c7554b98ab50c41f075dcff7f21b760 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 17 Feb 2022 13:11:33 +0000 Subject: [PATCH 10/20] Set up GCS tests --- dev-project/.env | 2 ++ tests/end_to_end/helpers/env.py | 2 ++ tests/end_to_end/test-project/target_bigquery.yml.template | 4 ++++ 3 files changed, 8 insertions(+) diff --git a/dev-project/.env b/dev-project/.env index 670d647bf..75426f8ea 100644 --- a/dev-project/.env +++ b/dev-project/.env @@ -109,4 +109,6 @@ TARGET_REDSHIFT_S3_ACL= # Please add real credentials otherwise the related tests will be ignored. # ------------------------------------------------------------------------------ TARGET_BIGQUERY_PROJECT= +TARGET_BIGQUERY_GCS_BUCKET= +TARGET_BIGQUERY_GCS_KEY_PREFIX= GOOGLE_APPLICATION_CREDENTIALS= diff --git a/tests/end_to_end/helpers/env.py b/tests/end_to_end/helpers/env.py index 2841454bd..3872efb24 100644 --- a/tests/end_to_end/helpers/env.py +++ b/tests/end_to_end/helpers/env.py @@ -220,6 +220,8 @@ def _load_env(self): 'template_patterns': ['target_bigquery', 'to_bq'], 'vars': { 'PROJECT': {'value': os.environ.get('TARGET_BIGQUERY_PROJECT')}, + 'GCS_BUCKET': {'value': os.environ.get('TARGET_BIGQUERY_GCS_BUCKET')}, + 'GCS_KEY_PREFIX': {'value': os.environ.get('TARGET_BIGQUERY_GCS_KEY_PREFIX')}, }, }, # ------------------------------------------------------------------ diff --git a/tests/end_to_end/test-project/target_bigquery.yml.template b/tests/end_to_end/test-project/target_bigquery.yml.template index d3167d734..a7e3360f3 100644 --- a/tests/end_to_end/test-project/target_bigquery.yml.template +++ b/tests/end_to_end/test-project/target_bigquery.yml.template @@ -13,3 +13,7 @@ type: "target-bigquery" # !! THIS SHOULD NOT CHANGE !! # ------------------------------------------------------------------------------ db_conn: project_id: "${TARGET_BIGQUERY_PROJECT}" # Bigquery account + + # We use an intermediate external stage on S3 to load data into Snowflake + gcs_bucket: "${TARGET_BIGQUERY_GCS_BUCKET}" # GCS external staging bucket name + gcs_key_prefix: "${TARGET_BIGQUERY_GCS_KEY_PREFIX}" # Optional: GCS key prefix From 38d289619ae12f0fa2e0226fc4678ec148752828 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 17 Feb 2022 13:20:51 +0000 Subject: [PATCH 11/20] Add keys to `target_bigquery` schema --- tests/end_to_end/test-project/target_bigquery.yml.template | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/end_to_end/test-project/target_bigquery.yml.template b/tests/end_to_end/test-project/target_bigquery.yml.template index a7e3360f3..d26ca0e7c 100644 --- a/tests/end_to_end/test-project/target_bigquery.yml.template +++ b/tests/end_to_end/test-project/target_bigquery.yml.template @@ -14,6 +14,6 @@ type: "target-bigquery" # !! THIS SHOULD NOT CHANGE !! db_conn: project_id: "${TARGET_BIGQUERY_PROJECT}" # Bigquery account - # We use an intermediate external stage on S3 to load data into Snowflake + # We use an intermediate external stage on GCS to load data into BigQuery gcs_bucket: "${TARGET_BIGQUERY_GCS_BUCKET}" # GCS external staging bucket name gcs_key_prefix: "${TARGET_BIGQUERY_GCS_KEY_PREFIX}" # Optional: GCS key prefix From fa747ee2aca74a4af6940dffae0a7f8d032ae876 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 17 Feb 2022 15:03:27 +0000 Subject: [PATCH 12/20] Make sure Target BigQuery is using uncompressed files --- pipelinewise/fastsync/commons/tap_s3_csv.py | 15 +++++++++++---- pipelinewise/fastsync/mongodb_to_bigquery.py | 2 +- pipelinewise/fastsync/s3_csv_to_bigquery.py | 2 +- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pipelinewise/fastsync/commons/tap_s3_csv.py b/pipelinewise/fastsync/commons/tap_s3_csv.py index 772982a17..f906e9552 100644 --- a/pipelinewise/fastsync/commons/tap_s3_csv.py +++ b/pipelinewise/fastsync/commons/tap_s3_csv.py @@ -20,6 +20,7 @@ from singer.utils import strptime_with_tz from singer_encodings import csv as singer_encodings_csv +from . import split_gzip from .utils import retry_pattern from ...utils import safe_column_name @@ -73,7 +74,7 @@ def _find_table_spec_by_name(self, table_name: str) -> Dict: ) ) - def copy_table(self, table_name: str, file_path: str) -> None: + def copy_table(self, table_name: str, file_path: str, compress=True) -> None: """ Copies data from all csv files that match the search_pattern and into the csv file in file_path :param table_name: Name of the table @@ -119,10 +120,16 @@ def copy_table(self, table_name: str, file_path: str) -> None: self.tables_last_modified[table_name] = max_last_modified # write to the given compressed csv file - with gzip.open(file_path, 'wt') as gzfile: - + gzip_splitter = split_gzip.open( + file_path, + mode='wt', + chunk_size_mb=split_gzip.DEFAULT_CHUNK_SIZE_MB, + max_chunks=0, + compress=compress, + ) + with gzip_splitter as split_gzip_files: writer = csv.DictWriter( - gzfile, + split_gzip_files, fieldnames=sorted(list(headers)), # we need to sort the headers so that copying into snowflake works delimiter=',', diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py index 81bfcbb73..c230b357a 100644 --- a/pipelinewise/fastsync/mongodb_to_bigquery.py +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -68,7 +68,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: ) # Exporting table data, get table definitions and close connection to avoid timeouts - mongodb.copy_table(table, filepath, args.temp_dir) + mongodb.copy_table(table, filepath, args.temp_dir, compress=False) size_bytes = os.path.getsize(filepath) bigquery_types = mongodb.map_column_types_to_target() bigquery_columns = bigquery_types.get('columns', []) diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py index 733b1e65c..0d87dcbfe 100644 --- a/pipelinewise/fastsync/s3_csv_to_bigquery.py +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -54,7 +54,7 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: target_schema = utils.get_target_schema(args.target, table_name) - s3_csv.copy_table(table_name, filepath) + s3_csv.copy_table(table_name, filepath, compress=False) size_bytes = os.path.getsize(filepath) # Uploading to GCS From ec9b0a38bc9366319b6b19ee586265ec0bd48aa1 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 17 Feb 2022 15:42:23 +0000 Subject: [PATCH 13/20] Remove CSV header when uploading to BigQuery --- pipelinewise/fastsync/commons/tap_mongodb.py | 5 +++-- pipelinewise/fastsync/commons/tap_s3_csv.py | 7 ++++--- pipelinewise/fastsync/mongodb_to_bigquery.py | 2 +- pipelinewise/fastsync/s3_csv_to_bigquery.py | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pipelinewise/fastsync/commons/tap_mongodb.py b/pipelinewise/fastsync/commons/tap_mongodb.py index 1a6c951a8..6bfcf8038 100644 --- a/pipelinewise/fastsync/commons/tap_mongodb.py +++ b/pipelinewise/fastsync/commons/tap_mongodb.py @@ -244,6 +244,7 @@ def copy_table( split_file_chunk_size_mb=1000, split_file_max_chunks=20, compress=True, + include_header=True, ): """ Export data from table to a zipped csv @@ -286,8 +287,8 @@ def copy_table( quotechar='"', quoting=csv.QUOTE_MINIMAL, ) - - writer.writeheader() + if include_header: + writer.writeheader() rows = [] LOGGER.info('Starting data processing...') diff --git a/pipelinewise/fastsync/commons/tap_s3_csv.py b/pipelinewise/fastsync/commons/tap_s3_csv.py index f906e9552..91dfe7cd9 100644 --- a/pipelinewise/fastsync/commons/tap_s3_csv.py +++ b/pipelinewise/fastsync/commons/tap_s3_csv.py @@ -74,7 +74,7 @@ def _find_table_spec_by_name(self, table_name: str) -> Dict: ) ) - def copy_table(self, table_name: str, file_path: str, compress=True) -> None: + def copy_table(self, table_name: str, file_path: str, compress=True, include_header=True) -> None: """ Copies data from all csv files that match the search_pattern and into the csv file in file_path :param table_name: Name of the table @@ -136,8 +136,9 @@ def copy_table(self, table_name: str, file_path: str, compress=True) -> None: quotechar='"', quoting=csv.QUOTE_MINIMAL, ) - # write the header - writer.writeheader() + if include_header: + # write the header + writer.writeheader() # write all records at once writer.writerows(records) diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py index c230b357a..adbec615e 100644 --- a/pipelinewise/fastsync/mongodb_to_bigquery.py +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -68,7 +68,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: ) # Exporting table data, get table definitions and close connection to avoid timeouts - mongodb.copy_table(table, filepath, args.temp_dir, compress=False) + mongodb.copy_table(table, filepath, args.temp_dir, compress=False, include_header=False) size_bytes = os.path.getsize(filepath) bigquery_types = mongodb.map_column_types_to_target() bigquery_columns = bigquery_types.get('columns', []) diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py index 0d87dcbfe..05706b885 100644 --- a/pipelinewise/fastsync/s3_csv_to_bigquery.py +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -54,7 +54,7 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: target_schema = utils.get_target_schema(args.target, table_name) - s3_csv.copy_table(table_name, filepath, compress=False) + s3_csv.copy_table(table_name, filepath, compress=False, include_header=False) size_bytes = os.path.getsize(filepath) # Uploading to GCS From 9c5e65ea90f684cc252948197fabd8c1f8edce91 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 17 Feb 2022 16:44:09 +0000 Subject: [PATCH 14/20] Add test for archiving of files --- ...gres_to_bq_archive_load_files.yml.template | 62 ++++++++++++++++++ tests/end_to_end/test_target_bigquery.py | 63 +++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 tests/end_to_end/test-project/tap_postgres_to_bq_archive_load_files.yml.template diff --git a/tests/end_to_end/test-project/tap_postgres_to_bq_archive_load_files.yml.template b/tests/end_to_end/test-project/tap_postgres_to_bq_archive_load_files.yml.template new file mode 100644 index 000000000..d3136feb1 --- /dev/null +++ b/tests/end_to_end/test-project/tap_postgres_to_bq_archive_load_files.yml.template @@ -0,0 +1,62 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "postgres_to_bq_archive_load_files" +name: "PostgreSQL source test database" +type: "tap-postgres" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - PostgreSQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_POSTGRES_HOST}" # PostgreSQL host + logical_poll_total_seconds: 3 # Time out if no LOG_BASED changes received for 3 seconds + port: ${TAP_POSTGRES_PORT} # PostgreSQL port + user: "${TAP_POSTGRES_USER}" # PostgreSQL user + password: "${TAP_POSTGRES_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_POSTGRES_DB}" # PostgreSQL database name + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "bigquery" # ID of the target connector where the data will be loaded +batch_size_rows: 1000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes +archive_load_files: True # Archive the load files in dedicated S3 folder +archive_load_files_gcs_prefix: archive_folder # Archive folder + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + + ### SOURCE SCHEMA 1: public + - source_schema: "public" + target_schema: "ppw_e2e_tap_postgres${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + + ### Table with INCREMENTAL replication + - table_name: "city" + replication_method: "INCREMENTAL" + replication_key: "id" + + ### Table with FULL_TABLE replication + - table_name: "country" + replication_method: "FULL_TABLE" + + ### SOURCE SCHEMA 2: public2 + - source_schema: "public2" + target_schema: "ppw_e2e_tap_postgres_public2${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + ### Table with FULL_TABLE replication + - table_name: "wearehere" + replication_method: "FULL_TABLE" \ No newline at end of file diff --git a/tests/end_to_end/test_target_bigquery.py b/tests/end_to_end/test_target_bigquery.py index 43c08b5fa..93ab80b0b 100644 --- a/tests/end_to_end/test_target_bigquery.py +++ b/tests/end_to_end/test_target_bigquery.py @@ -3,10 +3,12 @@ import uuid from datetime import datetime, timezone from random import randint +import tempfile import bson import pytest from bson import Timestamp +from google.cloud import storage from pipelinewise.fastsync import mysql_to_bigquery, postgres_to_bigquery from .helpers import tasks @@ -19,6 +21,7 @@ TAP_MARIADB_BUFFERED_STREAM_ID = 'mariadb_to_bq_buffered_stream' TAP_POSTGRES_ID = 'postgres_to_bq' TAP_POSTGRES_SPLIT_LARGE_FILES_ID = 'postgres_to_bq_split_large_files' +TAP_POSTGRES_ARCHIVE_LOAD_FILES_ID = 'postgres_to_bq_archive_load_files' TAP_MONGODB_ID = 'mongo_to_bq' TAP_S3_CSV_ID = 's3_csv_to_bq' TARGET_ID = 'bigquery' @@ -317,6 +320,66 @@ def test_resync_pg_to_bq_with_split_large_files( postgres_to_bigquery.tap_type_to_target_type, ) + # pylint: disable=invalid-name,too-many-locals + @pytest.mark.dependency(depends=['import_config']) + def test_replicate_pg_to_bq_with_archive_load_files(self): + """Fastsync tables from Postgres to Bigquery with archive load files enabled""" + archive_prefix = 'archive_folder' # Custom gcs prefix defined in TAP_POSTGRES_ARCHIVE_LOAD_FILES_ID + tap = 'postgres_to_bq_archive_load_files' + + client = storage.Client() + bucket_name = os.environ.get('TARGET_BIGQUERY_GCS_BUCKET') + bucket = client.get_bucket(bucket_name) + + # Delete any dangling files from archive + dangling_blobs = client.list_blobs( + bucket, prefix=f'{archive_prefix}/{tap}/' + ) + for blob in dangling_blobs: + blob.delete() + + # Run tap + assertions.assert_run_tap_success( + TAP_POSTGRES_ARCHIVE_LOAD_FILES_ID, TARGET_ID, ['fastsync', 'singer'] + ) + + # Assert expected files in archive folder + expected_archive_files_count = { + 'public.city': 1, # INCREMENTAL: fastsync and singer (but currently only fastsync archives) + 'public.country': 1, # FULL_TABLE : fastsync only + 'public2.wearehere': 1, # FULL_TABLE : fastsync only + } + for schema_table, expected_archive_files in expected_archive_files_count.items(): + schema, table = schema_table.split('.') + blobs_in_archive = list(client.list_blobs( + bucket, + prefix=f'{archive_prefix}/{tap}/{table}' + )) + + assert len(blobs_in_archive) == expected_archive_files + + # Assert expected metadata + expected_metadata = { + 'tap': tap, + 'schema': schema, + 'table': table, + 'archived-by': 'pipelinewise_fastsync', + } + for blob in blobs_in_archive: + assert blob.metadata == expected_metadata + + # Assert expected file contents + with tempfile.NamedTemporaryFile() as tmpfile: + blob.download_to_filename(tmpfile.name) + with open(tmpfile.name, 'rt', encoding='utf-8') as f: + rows_in_csv = len(f.readlines()) + + rows_in_table = self.run_query_tap_postgres( + 'SELECT COUNT(1) FROM {}'.format(schema_table) + )[0][0] + + assert rows_in_table == rows_in_csv + @pytest.mark.dependency(depends=['import_config']) def test_replicate_s3_to_bq(self): """Replicate csv files from s3 to Bigquery, check if return code is zero and success log file created""" From 96e30b0c83050abd0cc12ca0c6b961d74f6e3d04 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 17 Feb 2022 17:12:13 +0000 Subject: [PATCH 15/20] Read GCS archive bucket in correctly --- pipelinewise/cli/config.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pipelinewise/cli/config.py b/pipelinewise/cli/config.py index 5e31ce259..86310044e 100644 --- a/pipelinewise/cli/config.py +++ b/pipelinewise/cli/config.py @@ -458,6 +458,12 @@ def generate_inheritable_config(self, tap: Dict) -> Dict: 'archive_load_files_s3_prefix': tap.get( 'archive_load_files_s3_prefix', None ), + 'archive_load_files_gcs_bucket': tap.get( + 'archive_load_files_gcs_bucket', None + ), + 'archive_load_files_gcs_prefix': tap.get( + 'archive_load_files_gcs_prefix', None + ), } ) From e37d660a6c5909f29b8f10ac801887e30db39bb2 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 17 Feb 2022 18:05:07 +0000 Subject: [PATCH 16/20] Fix updating of GCS metadata --- pipelinewise/fastsync/commons/target_bigquery.py | 1 + tests/units/fastsync/commons/test_fastsync_target_bigquery.py | 1 + 2 files changed, 2 insertions(+) diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index dc2b4cc9a..045923abd 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -143,6 +143,7 @@ def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> storag } ) archive_blob.metadata = new_metadata + archive_blob.patch() return archive_blob def create_schema(self, schema_name): diff --git a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py index a23da57e2..e5427cc74 100644 --- a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py +++ b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py @@ -591,6 +591,7 @@ def test_default_archive_destination(target_bigquery): 'archived-by': 'pipelinewise_fastsync', } ) + archive_blob.patch.assert_called_once() # pylint: disable=invalid-name @staticmethod From 232c80cded6ab65b1b77b385668c46c37c0fdfb6 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 18 Feb 2022 14:08:29 +0000 Subject: [PATCH 17/20] Add new keys to schema --- pipelinewise/cli/schemas/target.json | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pipelinewise/cli/schemas/target.json b/pipelinewise/cli/schemas/target.json index 4c29c4198..e4fc3bde7 100644 --- a/pipelinewise/cli/schemas/target.json +++ b/pipelinewise/cli/schemas/target.json @@ -180,10 +180,17 @@ }, "dataset_id": { "type": "string" + }, + "gcs_bucket": { + "type": "string" + }, + "gcs_key_prefix": { + "type": "string" } }, "required": [ - "project_id" + "project_id", + "gcs_bucket" ] } }, From 46a2ad54e1bc353ff1d67d727d1a018bc8e7ac98 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 18 Feb 2022 14:18:22 +0000 Subject: [PATCH 18/20] Add `gcs_bucket` to required keys --- pipelinewise/fastsync/mongodb_to_bigquery.py | 2 +- pipelinewise/fastsync/mysql_to_bigquery.py | 1 + pipelinewise/fastsync/postgres_to_bigquery.py | 2 +- pipelinewise/fastsync/s3_csv_to_bigquery.py | 1 + 4 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py index adbec615e..046f0e121 100644 --- a/pipelinewise/fastsync/mongodb_to_bigquery.py +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -25,7 +25,7 @@ 'auth_database', 'dbname', ], - 'target': ['project_id'], + 'target': ['project_id', 'gcs_bucket'], } LOCK = multiprocessing.Lock() diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py index ad61b119a..eabb0f171 100644 --- a/pipelinewise/fastsync/mysql_to_bigquery.py +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -22,6 +22,7 @@ 'tap': ['host', 'port', 'user', 'password'], 'target': [ 'project_id', + 'gcs_bucket', ], } diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py index 5ee0e956c..18e42c222 100644 --- a/pipelinewise/fastsync/postgres_to_bigquery.py +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -20,7 +20,7 @@ REQUIRED_CONFIG_KEYS = { 'tap': ['host', 'port', 'user', 'password'], - 'target': ['project_id'], + 'target': ['project_id', 'gcs_bucket'], } LOCK = multiprocessing.Lock() diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py index 05706b885..2c379075f 100644 --- a/pipelinewise/fastsync/s3_csv_to_bigquery.py +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -19,6 +19,7 @@ 'tap': ['bucket', 'start_date'], 'target': [ 'project_id', + 'gcs_bucket', ], } From fd474a02e54ac17a9217bd40cf77430af309a4b9 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 18 Feb 2022 14:40:45 +0000 Subject: [PATCH 19/20] Allow `FastSyncTargetBigquery` to upload to GCS in parallel --- pipelinewise/cli/schemas/target.json | 5 +++++ .../fastsync/commons/target_bigquery.py | 19 +++++++++++++++++++ pipelinewise/fastsync/mysql_to_bigquery.py | 3 +-- pipelinewise/fastsync/postgres_to_bigquery.py | 3 +-- .../test-project/target_bigquery.yml.template | 1 + 5 files changed, 27 insertions(+), 4 deletions(-) diff --git a/pipelinewise/cli/schemas/target.json b/pipelinewise/cli/schemas/target.json index e4fc3bde7..ee5e6a13b 100644 --- a/pipelinewise/cli/schemas/target.json +++ b/pipelinewise/cli/schemas/target.json @@ -186,6 +186,11 @@ }, "gcs_key_prefix": { "type": "string" + }, + "gcs_parallelism": { + "type": "integer", + "minimum": 1, + "maximum": 1000 } }, "required": [ diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index 045923abd..d91c1d58e 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -1,3 +1,4 @@ +import multiprocessing import os import logging import json @@ -102,6 +103,24 @@ def upload_to_gcs(self, file: str) -> storage.Blob: return blob + def multi_upload_to_gcs(self, files: List[str]) -> List[storage.Blob]: + bucket_name = self.connection_config['gcs_bucket'] + key_prefix = self.connection_config.get('gcs_key_prefix', '') + key = '{}{}'.format( + key_prefix, + os.path.splitext(os.path.basename(files[0]))[0] + ) + LOGGER.info( + 'Uploading to GCS bucket: %s, %s local files, GCS key prefix: %s', + bucket_name, + len(files), + key, + ) + gcs_parallelism = self.connection_config.get('gcs_parallelism', 1) + with multiprocessing.Pool(gcs_parallelism) as proc: + gcs_blobs = proc.map(self.upload_to_gcs, files) + return gcs_blobs + def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> storage.Blob: """Copy load file to archive folder with metadata added""" table_dict = utils.tablename_to_dict(table) diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py index eabb0f171..ccd3a44e1 100644 --- a/pipelinewise/fastsync/mysql_to_bigquery.py +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -104,9 +104,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: mysql.close_connections() # Uploading to GCS - gcs_blobs = [] + gcs_blobs = bigquery.multi_upload_to_gcs(file_parts) for file_part in file_parts: - gcs_blobs.append(bigquery.upload_to_gcs(file_part)) os.remove(file_part) # Creating temp table in Bigquery diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py index 18e42c222..9f44c81a2 100644 --- a/pipelinewise/fastsync/postgres_to_bigquery.py +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -106,9 +106,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: postgres.close_connection() # Uploading to GCS - gcs_blobs = [] + gcs_blobs = bigquery.multi_upload_to_gcs(file_parts) for file_part in file_parts: - gcs_blobs.append(bigquery.upload_to_gcs(file_part)) os.remove(file_part) # Creating temp table in Bigquery diff --git a/tests/end_to_end/test-project/target_bigquery.yml.template b/tests/end_to_end/test-project/target_bigquery.yml.template index d26ca0e7c..de4970637 100644 --- a/tests/end_to_end/test-project/target_bigquery.yml.template +++ b/tests/end_to_end/test-project/target_bigquery.yml.template @@ -17,3 +17,4 @@ db_conn: # We use an intermediate external stage on GCS to load data into BigQuery gcs_bucket: "${TARGET_BIGQUERY_GCS_BUCKET}" # GCS external staging bucket name gcs_key_prefix: "${TARGET_BIGQUERY_GCS_KEY_PREFIX}" # Optional: GCS key prefix + gcs_parallelism: 1 # Optional: (Default: 1) From 64ffcdc7491f42ea97f65e9f44218a898fc8f398 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 18 Feb 2022 14:59:05 +0000 Subject: [PATCH 20/20] Use `ThreadPool` --- pipelinewise/fastsync/commons/target_bigquery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index d91c1d58e..71499d239 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -1,4 +1,4 @@ -import multiprocessing +import multiprocessing.pool import os import logging import json @@ -117,7 +117,7 @@ def multi_upload_to_gcs(self, files: List[str]) -> List[storage.Blob]: key, ) gcs_parallelism = self.connection_config.get('gcs_parallelism', 1) - with multiprocessing.Pool(gcs_parallelism) as proc: + with multiprocessing.pool.ThreadPool(gcs_parallelism) as proc: gcs_blobs = proc.map(self.upload_to_gcs, files) return gcs_blobs