Skip to content

Commit

Permalink
Add a Bigquery Target with fastsync (#445)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmriego authored Jun 29, 2021
1 parent a0600ba commit 01613ee
Show file tree
Hide file tree
Showing 39 changed files with 2,631 additions and 52 deletions.
9 changes: 9 additions & 0 deletions dev-project/.env
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,12 @@ TARGET_REDSHIFT_COPY_ROLE_ARN=
TARGET_REDSHIFT_S3_BUCKET=
TARGET_REDSHIFT_S3_KEY_PREFIX=
TARGET_REDSHIFT_S3_ACL=

# ------------------------------------------------------------------------------
# Test BigQuery credentials used as target DWH
# IMPORTANT:
# Google BigQuery is not open sourced and not included in the docker test env.
# Please add real credentials otherwise the related tests will be ignored.
# ------------------------------------------------------------------------------
TARGET_BIGQUERY_PROJECT=
GOOGLE_APPLICATION_CREDENTIALS=
4 changes: 2 additions & 2 deletions dev-project/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ cd dev-project
../tests/db/target_postgres.sh

# Install PipelineWise in the container
../install.sh --acceptlicenses --nousage --connectors=target-snowflake,target-postgres,tap-mysql,tap-postgres,tap-mongodb,transform-field,tap-s3-csv
../install.sh --acceptlicenses --nousage --connectors=target-snowflake,target-postgres,target-bigquery,tap-mysql,tap-postgres,tap-mongodb,transform-field,tap-s3-csv
if [[ $? != 0 ]]; then
echo
echo "ERROR: Docker container not started. Failed to install one or more PipelineWise components."
Expand Down Expand Up @@ -59,4 +59,4 @@ echo " $ pipelinewise status"
echo "=========================================================================="

# Continue running the container
tail -f /dev/null
tail -f /dev/null
22 changes: 22 additions & 0 deletions docs/user_guide/schema_changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,31 @@ data only after the change.
| text | text | | 555-DEF |
+----------------+----------------+--------------------------------+------------------+

BigQuery versioning acts differently as it's not possible to modify existing columns.
After the data type change ``COLUMN_THREE`` remains ``INTEGER`` with
the old data and a new ``COLUMN_TREE__st`` column created with ``VARCHAR`` type that keeps
data only after the change.
The suffix added to the new column indicates the new column type.

+----------------+----------------+------------------+----------------------+
| **COLUMN_ONE** | **COLUMN_TWO** | **COLUMN_THREE** | **COLUMN_THREE__st** |
| | | (INTEGER) | (VARCHAR) |
+----------------+----------------+------------------+----------------------+
| text | text | 111 | |
+----------------+----------------+------------------+----------------------+
| text | text | 222 | |
+----------------+----------------+------------------+----------------------+
| text | text | 333 | |
+----------------+----------------+------------------+----------------------+
| text | text | | 444-ABC |
+----------------+----------------+------------------+----------------------+
| text | text | | 555-DEF |
+----------------+----------------+------------------+----------------------+

.. warning::

Please note the ``NULL`` values in ``COLUMN_THREE_20190812`` and ``COLUMN_THREE`` tables.
(or ``COLUMN_THREE`` and ``COLUMN_THREE__st`` for BigQuery)
**Historical values are not converted to the new data types!**
If you need the actual representation of the table after data type changes then
you need to :ref:`resync` the table.
Expand Down
1 change: 1 addition & 0 deletions install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ DEFAULT_CONNECTORS=(
target-snowflake
target-redshift
target-postgres
target-bigquery
transform-field
)
EXTRA_CONNECTORS=(
Expand Down
1 change: 1 addition & 0 deletions pipelinewise-target-bigquery
Submodule pipelinewise-target-bigquery added at 363efe
2 changes: 1 addition & 1 deletion pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ def run_tap(self):
tap_properties,
tap_state, {
'selected': True,
'target_type': ['target-snowflake', 'target-redshift', 'target-postgres'],
'target_type': ['target-snowflake', 'target-redshift', 'target-postgres', 'target-bigquery'],
'tap_type': ['tap-mysql', 'tap-postgres', 'tap-s3-csv', 'tap-mongodb'],
'initial_sync_required': True
},
Expand Down
20 changes: 19 additions & 1 deletion pipelinewise/cli/schemas/target.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@
"required": [
"s3_bucket"
]
},
"db_conn_target_bigquery": {
"type": "object",
"properties": {
"project_id": {
"type": "string"
},
"dataset_id": {
"type": "string"
}
},
"required": [
"project_id"
]
}
},
"type": "object",
Expand All @@ -193,7 +207,8 @@
"target-snowflake",
"target-redshift",
"target-postgres",
"target-s3-csv"
"target-s3-csv",
"target-bigquery"
]
},
"db_conn": {
Expand All @@ -209,6 +224,9 @@
},
{
"$ref": "#/definitions/db_conn_target_s3_csv"
},
{
"$ref": "#/definitions/db_conn_target_bigquery"
}
]
}
Expand Down
21 changes: 15 additions & 6 deletions pipelinewise/fastsync/commons/split_gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import io
import logging
import gzip
import builtins

LOGGER = logging.getLogger(__name__)

Expand All @@ -14,8 +15,8 @@
EST_COMPR_RATE = 0.12


# pylint: disable=W0622
def open(base_filename, mode='wb', chunk_size_mb=None, max_chunks=None, est_compr_rate=None):
# pylint: disable=W0622,R1732
def open(base_filename, mode='wb', chunk_size_mb=None, max_chunks=None, est_compr_rate=None, compress=True):
"""Open a gzip-compressed file in binary or text mode.
Args:
Expand All @@ -39,7 +40,7 @@ def open(base_filename, mode='wb', chunk_size_mb=None, max_chunks=None, est_comp
raise ValueError('Invalid chunk_size_mb: %d' % (chunk_size_mb,))
if max_chunks is not None and max_chunks < 0:
raise ValueError('Invalid max_chunks: %d' % (max_chunks,))
return SplitGzipFile(base_filename, mode, chunk_size_mb, max_chunks, est_compr_rate)
return SplitGzipFile(base_filename, mode, chunk_size_mb, max_chunks, est_compr_rate, compress)


# pylint: disable=R0902
Expand All @@ -53,14 +54,19 @@ def __init__(self,
mode: str = None,
chunk_size_mb: int = None,
max_chunks: int = None,
est_compr_rate: float = None):
est_compr_rate: float = None,
compress=True):
super().__init__()

self.base_filename = base_filename
self.mode = mode
self.chunk_size_mb = chunk_size_mb or DEFAULT_CHUNK_SIZE_MB
self.max_chunks = max_chunks if max_chunks is not None else DEFAULT_MAX_CHUNKS
self.est_compr_rate = est_compr_rate if est_compr_rate is not None else EST_COMPR_RATE
self.compress= compress
if compress:
self.est_compr_rate = est_compr_rate if est_compr_rate is not None else EST_COMPR_RATE
else:
self.est_compr_rate = 1.0
self.chunk_seq = 1
self.current_chunk_size_mb = 0
self.chunk_filename = None
Expand Down Expand Up @@ -100,7 +106,10 @@ def _activate_chunk_file(self):

# Open the actual chunk file with gzip data writer
self.chunk_filename = chunk_filename
self.chunk_file = gzip.open(self.chunk_filename, self.mode)
if self.compress:
self.chunk_file = gzip.open(self.chunk_filename, self.mode)
else:
self.chunk_file = builtins.open(self.chunk_filename, self.mode)

@staticmethod
def _bytes_to_megabytes(size: int) -> float:
Expand Down
9 changes: 6 additions & 3 deletions pipelinewise/fastsync/commons/tap_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,16 @@ def close_connection(self):
"""
self.database.client.close()

# pylint: disable=R0914
# pylint: disable=R0914,R0913
def copy_table(self,
table_name: str,
filepath: str,
temp_dir: str,
split_large_files=False,
split_file_chunk_size_mb=1000,
split_file_max_chunks=20):
split_file_max_chunks=20,
compress=True
):
"""
Export data from table to a zipped csv
Args:
Expand All @@ -174,7 +176,8 @@ def copy_table(self,
gzip_splitter = split_gzip.open(filepath,
mode='wt',
chunk_size_mb=split_file_chunk_size_mb,
max_chunks=split_file_max_chunks if split_large_files else 0)
max_chunks=split_file_max_chunks if split_large_files else 0,
compress=compress)
with gzip.open(export_file_path, 'rb') as export_file, gzip_splitter as gzfile:
writer = csv.DictWriter(gzfile,
fieldnames=[elem[0] for elem in self._get_collection_columns()],
Expand Down
60 changes: 45 additions & 15 deletions pipelinewise/fastsync/commons/tap_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ class FastSyncTapMySql:
Common functions for fastsync from a MySQL database
"""

def __init__(self, connection_config, tap_type_to_target_type):
def __init__(self, connection_config, tap_type_to_target_type, target_quote=None):
self.connection_config = connection_config
self.connection_config['charset'] = connection_config.get('charset', DEFAULT_CHARSET)
self.connection_config['export_batch_rows'] = connection_config.get('export_batch_rows',
DEFAULT_EXPORT_BATCH_ROWS)
self.connection_config['session_sqls'] = connection_config.get('session_sqls', DEFAULT_SESSION_SQLS)
self.tap_type_to_target_type = tap_type_to_target_type
self.target_quote = target_quote
self.conn = None
self.conn_unbuffered = None

Expand Down Expand Up @@ -186,16 +187,34 @@ def get_primary_keys(self, table_name):
table_dict['table_name'])
pk_specs = self.query(sql)
if len(pk_specs) > 0:
return [safe_column_name(k.get('Column_name')) for k in pk_specs]
return [safe_column_name(k.get('Column_name'), self.target_quote) for k in pk_specs]

return None

def get_table_columns(self, table_name):
def get_table_columns(self, table_name, max_num=None, date_type='date'):
"""
Get MySQL table column details from information_schema
"""
table_dict = utils.tablename_to_dict(table_name)
sql = """

if max_num:
decimals = len(max_num.split('.')[1]) if '.' in max_num else 0
decimal_format = f"""
CONCAT('GREATEST(LEAST({max_num}, ROUND(`', column_name, '`, {decimals})), -{max_num})')
"""
integer_format = """
CONCAT('`', column_name, '`')
"""
else:
decimal_format = """
CONCAT('`', column_name, '`')
"""
integer_format = decimal_format

schema_name = table_dict.get('schema_name')
table_name = table_dict.get('table_name')

sql = f"""
SELECT column_name,
data_type,
column_type,
Expand All @@ -207,26 +226,32 @@ def get_table_columns(self, table_name):
WHEN data_type IN ('blob', 'tinyblob', 'mediumblob', 'longblob')
THEN CONCAT('REPLACE(hex(`', column_name, '`)', ", '\n', ' ')")
WHEN data_type IN ('binary', 'varbinary')
THEN concat('REPLACE(hex(trim(trailing CHAR(0x00) from `',COLUMN_NAME,'`))', ", '\n', ' ')")
THEN concat('REPLACE(REPLACE(hex(trim(trailing CHAR(0x00) from `',COLUMN_NAME,'`))', ", '\n', ' '), '\r', '')")
WHEN data_type IN ('bit')
THEN concat('cast(`', column_name, '` AS unsigned)')
WHEN data_type IN ('datetime', 'timestamp', 'date')
WHEN data_type IN ('date')
THEN concat('nullif(CAST(`', column_name, '` AS {date_type}),STR_TO_DATE("0000-00-00 00:00:00", "%Y-%m-%d %T"))')
WHEN data_type IN ('datetime', 'timestamp')
THEN concat('nullif(`', column_name, '`,STR_TO_DATE("0000-00-00 00:00:00", "%Y-%m-%d %T"))')
WHEN column_type IN ('tinyint(1)')
THEN concat('CASE WHEN `' , column_name , '` is null THEN null WHEN `' , column_name , '` = 0 THEN 0 ELSE 1 END')
WHEN column_type IN ('geometry', 'point', 'linestring', 'polygon', 'multipoint', 'multilinestring', 'multipolygon', 'geometrycollection')
THEN concat('ST_AsGeoJSON(', column_name, ')')
WHEN column_name = 'raw_data_hash'
THEN concat('REPLACE(hex(`', column_name, '`)', ", '\n', ' ')")
ELSE concat('REPLACE(REPLACE(cast(`', column_name, '` AS char CHARACTER SET utf8)', ", '\n', ' '), '\0', '')")
THEN concat('REPLACE(REPLACE(hex(`', column_name, '`)', ", '\n', ' '), '\r', '')")
WHEN data_type IN ('double', 'numeric', 'float', 'decimal', 'real')
THEN {decimal_format}
WHEN data_type IN ('smallint', 'integer', 'bigint', 'mediumint', 'int')
THEN {integer_format}
ELSE concat('REPLACE(REPLACE(REPLACE(cast(`', column_name, '` AS char CHARACTER SET utf8)', ", '\n', ' '), '\r', ''), '\0', '')")
END AS safe_sql_value,
ordinal_position
FROM information_schema.columns
WHERE table_schema = '{}'
AND table_name = '{}') x
WHERE table_schema = '{schema_name}'
AND table_name = '{table_name}') x
ORDER BY
ordinal_position
""".format(table_dict.get('schema_name'), table_dict.get('table_name'))
"""
return self.query(sql)

def map_column_types_to_target(self, table_name):
Expand All @@ -235,7 +260,7 @@ def map_column_types_to_target(self, table_name):
"""
mysql_columns = self.get_table_columns(table_name)
mapped_columns = [
'{} {}'.format(safe_column_name(pc.get('column_name')),
'{} {}'.format(safe_column_name(pc.get('column_name'), self.target_quote),
self.tap_type_to_target_type(pc.get('data_type'), pc.get('column_type')))
for pc in mysql_columns]

Expand All @@ -248,9 +273,13 @@ def map_column_types_to_target(self, table_name):
def copy_table(self,
table_name,
path,
max_num=None,
date_type='date',
split_large_files=False,
split_file_chunk_size_mb=1000,
split_file_max_chunks=20):
split_file_max_chunks=20,
compress=True,
):
"""
Export data from table to a zipped csv
Args:
Expand All @@ -261,7 +290,7 @@ def copy_table(self,
split_file_chunk_size_mb: File chunk sizes if `split_large_files` enabled. (Default: 1000)
split_file_max_chunks: Max number of chunks if `split_large_files` enabled. (Default: 20)
"""
table_columns = self.get_table_columns(table_name)
table_columns = self.get_table_columns(table_name, max_num, date_type)
column_safe_sql_values = [c.get('safe_sql_value') for c in table_columns]

# If self.get_table_columns returns zero row then table not exist
Expand All @@ -284,7 +313,8 @@ def copy_table(self,
gzip_splitter = split_gzip.open(path,
mode='wt',
chunk_size_mb=split_file_chunk_size_mb,
max_chunks=split_file_max_chunks if split_large_files else 0)
max_chunks=split_file_max_chunks if split_large_files else 0,
compress=compress)

with gzip_splitter as split_gzip_files:
writer = csv.writer(split_gzip_files,
Expand Down
Loading

0 comments on commit 01613ee

Please sign in to comment.