Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring BigQuery FastSync implementation into alignment with Snowflake #901

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev-project/.env
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,6 @@ TARGET_REDSHIFT_S3_ACL=
# Please add real credentials otherwise the related tests will be ignored.
# ------------------------------------------------------------------------------
TARGET_BIGQUERY_PROJECT=
TARGET_BIGQUERY_GCS_BUCKET=
TARGET_BIGQUERY_GCS_KEY_PREFIX=
GOOGLE_APPLICATION_CREDENTIALS=
6 changes: 6 additions & 0 deletions pipelinewise/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,12 @@ def generate_inheritable_config(self, tap: Dict) -> Dict:
'archive_load_files_s3_prefix': tap.get(
'archive_load_files_s3_prefix', None
),
'archive_load_files_gcs_bucket': tap.get(
'archive_load_files_gcs_bucket', None
),
'archive_load_files_gcs_prefix': tap.get(
'archive_load_files_gcs_prefix', None
),
}
)

Expand Down
14 changes: 13 additions & 1 deletion pipelinewise/cli/schemas/target.json
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,22 @@
},
"dataset_id": {
"type": "string"
},
"gcs_bucket": {
"type": "string"
},
"gcs_key_prefix": {
"type": "string"
},
"gcs_parallelism": {
"type": "integer",
"minimum": 1,
"maximum": 1000
}
},
"required": [
"project_id"
"project_id",
"gcs_bucket"
]
}
},
Expand Down
5 changes: 3 additions & 2 deletions pipelinewise/fastsync/commons/tap_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def copy_table(
split_file_chunk_size_mb=1000,
split_file_max_chunks=20,
compress=True,
include_header=True,
):
"""
Export data from table to a zipped csv
Expand Down Expand Up @@ -286,8 +287,8 @@ def copy_table(
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
)

writer.writeheader()
if include_header:
writer.writeheader()
rows = []

LOGGER.info('Starting data processing...')
Expand Down
20 changes: 14 additions & 6 deletions pipelinewise/fastsync/commons/tap_s3_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from singer.utils import strptime_with_tz
from singer_encodings import csv as singer_encodings_csv

from . import split_gzip
from .utils import retry_pattern
from ...utils import safe_column_name

Expand Down Expand Up @@ -73,7 +74,7 @@ def _find_table_spec_by_name(self, table_name: str) -> Dict:
)
)

def copy_table(self, table_name: str, file_path: str) -> None:
def copy_table(self, table_name: str, file_path: str, compress=True, include_header=True) -> None:
"""
Copies data from all csv files that match the search_pattern and into the csv file in file_path
:param table_name: Name of the table
Expand Down Expand Up @@ -119,18 +120,25 @@ def copy_table(self, table_name: str, file_path: str) -> None:
self.tables_last_modified[table_name] = max_last_modified

# write to the given compressed csv file
with gzip.open(file_path, 'wt') as gzfile:

gzip_splitter = split_gzip.open(
file_path,
mode='wt',
chunk_size_mb=split_gzip.DEFAULT_CHUNK_SIZE_MB,
max_chunks=0,
compress=compress,
)
with gzip_splitter as split_gzip_files:
writer = csv.DictWriter(
gzfile,
split_gzip_files,
fieldnames=sorted(list(headers)),
# we need to sort the headers so that copying into snowflake works
delimiter=',',
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
)
# write the header
writer.writeheader()
if include_header:
# write the header
writer.writeheader()
# write all records at once
writer.writerows(records)

Expand Down
117 changes: 103 additions & 14 deletions pipelinewise/fastsync/commons/target_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import multiprocessing.pool
import os
import logging
import json
import re
from typing import List, Dict

from google.cloud import bigquery
from google.cloud import storage
from google.api_core import exceptions

from .transform_utils import TransformationHelper, SQLFlavor
Expand Down Expand Up @@ -37,6 +40,9 @@ class FastSyncTargetBigquery:
def __init__(self, connection_config, transformation_config=None):
self.connection_config = connection_config
self.transformation_config = transformation_config
self.gcs = storage.Client(
project=self.connection_config['project_id'],
)

def open_connection(self):
project_id = self.connection_config['project_id']
Expand Down Expand Up @@ -78,6 +84,87 @@ def to_query_parameter(value):

return query_job

def upload_to_gcs(self, file: str) -> storage.Blob:
bucket_name = self.connection_config['gcs_bucket']
key_prefix = self.connection_config.get('gcs_key_prefix', '')
key = '{}{}'.format(key_prefix, os.path.basename(file))

LOGGER.info(
'Uploading to GCS bucket: %s, local file: %s, GCS key: %s',
bucket_name,
file,
key,
)

# Upload to GCS
bucket = self.gcs.get_bucket(bucket_name)
blob = bucket.blob(key)
blob.upload_from_filename(file)

return blob

def multi_upload_to_gcs(self, files: List[str]) -> List[storage.Blob]:
bucket_name = self.connection_config['gcs_bucket']
key_prefix = self.connection_config.get('gcs_key_prefix', '')
key = '{}{}'.format(
key_prefix,
os.path.splitext(os.path.basename(files[0]))[0]
)
LOGGER.info(
'Uploading to GCS bucket: %s, %s local files, GCS key prefix: %s',
bucket_name,
len(files),
key,
)
gcs_parallelism = self.connection_config.get('gcs_parallelism', 1)
with multiprocessing.pool.ThreadPool(gcs_parallelism) as proc:
gcs_blobs = proc.map(self.upload_to_gcs, files)
return gcs_blobs

def copy_to_archive(self, blob: storage.Blob, tap_id: str, table: str) -> storage.Blob:
"""Copy load file to archive folder with metadata added"""
table_dict = utils.tablename_to_dict(table)
archive_table = table_dict.get('table_name')
archive_schema = table_dict.get('schema_name', '')

# Retain same filename
archive_file_basename = os.path.basename(blob.name)

# Get archive GCS prefix from config, defaulting to 'archive' if not specified
archive_prefix = self.connection_config.get(
'archive_load_files_gcs_prefix', 'archive'
)

source_bucket = blob.bucket

# Get archive GCS bucket from config, defaulting to same bucket used for Snowflake imports if not specified
archive_bucket = self.gcs.get_bucket(
self.connection_config.get(
'archive_load_files_gcs_bucket', source_bucket.name
)
)

archive_blob_name = f'{archive_prefix}/{tap_id}/{archive_table}/{archive_file_basename}'
source_blob_name = f'{blob.bucket}/{blob.name}'
LOGGER.info('Archiving %s to %s', source_blob_name, archive_blob_name)

source_bucket.copy_blob(blob, archive_bucket, new_name=archive_blob_name)

# Combine existing metadata with archive related headers
archive_blob = archive_bucket.get_blob(archive_blob_name)
new_metadata = {} if blob.metadata is None else blob.metadata
new_metadata.update(
{
'tap': tap_id,
'schema': archive_schema,
'table': archive_table,
'archived-by': 'pipelinewise_fastsync',
}
)
archive_blob.metadata = new_metadata
archive_blob.patch()
return archive_blob

def create_schema(self, schema_name):
temp_schema = self.connection_config.get('temp_schema', schema_name)

Expand Down Expand Up @@ -147,19 +234,19 @@ def create_table(

self.query(sql)

# pylint: disable=R0913,R0914
# pylint: disable=too-many-locals
def copy_to_table(
self,
filepath,
target_schema,
table_name,
size_bytes,
is_temporary,
skip_csv_header=False,
allow_quoted_newlines=True,
write_truncate=True,
blobs: List[storage.Blob],
target_schema: str,
table_name: str,
size_bytes: int,
is_temporary: bool,
skip_csv_header: bool = False,
allow_quoted_newlines: bool = True,
write_truncate: bool = True,
):
LOGGER.info('BIGQUERY - Loading %s into Bigquery...', filepath)
LOGGER.info('BIGQUERY - Loading into Bigquery...')
table_dict = utils.tablename_to_dict(table_name)
target_table = safe_name(
table_dict.get(
Expand All @@ -172,6 +259,7 @@ def copy_to_table(
dataset_ref = client.dataset(target_schema)
table_ref = dataset_ref.table(target_table)
table_schema = client.get_table(table_ref).schema

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.schema = table_schema
Expand All @@ -180,10 +268,11 @@ def copy_to_table(
)
job_config.allow_quoted_newlines = allow_quoted_newlines
job_config.skip_leading_rows = 1 if skip_csv_header else 0
with open(filepath, 'rb') as exported_data:
job = client.load_table_from_file(
exported_data, table_ref, job_config=job_config
)

uris = [f'gs://{blob.bucket.name}/{blob.name}' for blob in blobs]
job = client.load_table_from_uri(
source_uris=uris, destination=table_ref, job_config=job_config
)
try:
job.result()
except exceptions.BadRequest as exc:
Expand Down
25 changes: 16 additions & 9 deletions pipelinewise/fastsync/mongodb_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
'auth_database',
'dbname',
],
'target': ['project_id'],
'target': ['project_id', 'gcs_bucket'],
}

LOCK = multiprocessing.Lock()
Expand All @@ -48,6 +48,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
"""Sync one table"""
mongodb = FastSyncTapMongoDB(args.tap, tap_type_to_target_type)
bigquery = FastSyncTargetBigquery(args.target, args.transform)
tap_id = args.target.get('tap_id')
archive_load_files = args.target.get('archive_load_files', False)

try:
dbname = args.tap.get('dbname')
Expand All @@ -66,26 +68,31 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
)

# Exporting table data, get table definitions and close connection to avoid timeouts
mongodb.copy_table(table, filepath, args.temp_dir)
mongodb.copy_table(table, filepath, args.temp_dir, compress=False, include_header=False)
size_bytes = os.path.getsize(filepath)
bigquery_types = mongodb.map_column_types_to_target()
bigquery_columns = bigquery_types.get('columns', [])
mongodb.close_connection()

# Uploading to GCS
gcs_blob = bigquery.upload_to_gcs(filepath)
os.remove(filepath)

# Creating temp table in Bigquery
bigquery.create_schema(target_schema)
bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True)

# Load into Bigquery table
bigquery.copy_to_table(
filepath,
target_schema,
table,
size_bytes,
is_temporary=True,
skip_csv_header=True,
[gcs_blob], target_schema, table, size_bytes, is_temporary=True,
)
os.remove(filepath)

if archive_load_files:
# Copy load file to archive
bigquery.copy_to_archive(gcs_blob, tap_id, table)

# Delete all file parts from s3
gcs_blob.delete()

# Obfuscate columns
bigquery.obfuscate_columns(target_schema, table)
Expand Down
Loading