From 01613ee3cf8421d8231946b24c2be91a33c6f268 Mon Sep 17 00:00:00 2001 From: jmriego Date: Tue, 29 Jun 2021 14:33:25 +0100 Subject: [PATCH] Add a Bigquery Target with fastsync (#445) --- dev-project/.env | 9 + dev-project/entrypoint.sh | 4 +- docs/user_guide/schema_changes.rst | 22 ++ install.sh | 1 + pipelinewise-target-bigquery | 1 + pipelinewise/cli/pipelinewise.py | 2 +- pipelinewise/cli/schemas/target.json | 20 +- pipelinewise/fastsync/commons/split_gzip.py | 21 +- pipelinewise/fastsync/commons/tap_mongodb.py | 9 +- pipelinewise/fastsync/commons/tap_mysql.py | 60 +++- pipelinewise/fastsync/commons/tap_postgres.py | 63 +++- pipelinewise/fastsync/commons/tap_s3_csv.py | 9 +- .../fastsync/commons/target_bigquery.py | 284 ++++++++++++++++ .../fastsync/commons/transform_utils.py | 22 +- pipelinewise/fastsync/mongodb_to_bigquery.py | 157 +++++++++ pipelinewise/fastsync/mysql_to_bigquery.py | 192 +++++++++++ pipelinewise/fastsync/postgres_to_bigquery.py | 199 +++++++++++ pipelinewise/fastsync/s3_csv_to_bigquery.py | 154 +++++++++ pipelinewise/utils.py | 8 +- setup.py | 5 + .../target-bigquery/requirements.txt | 1 + tests/end_to_end/helpers/assertions.py | 32 +- tests/end_to_end/helpers/db.py | 66 ++++ tests/end_to_end/helpers/env.py | 33 ++ .../tap_mongodb_to_bq.yml.template | 46 +++ .../test-project/tap_mysql_to_bq.yml.template | 103 ++++++ ...p_mysql_to_bq_buffered_stream.yml.template | 103 ++++++ ...mysql_to_bq_split_large_files.yml.template | 45 +++ .../tap_postgres_to_bq.yml.template | 107 ++++++ ...tgres_to_bq_split_large_files.yml.template | 46 +++ .../tap_s3_csv_to_bq.yml.template | 67 ++++ .../test-project/target_bigquery.yml.template | 15 + tests/end_to_end/test_target_bigquery.py | 309 ++++++++++++++++++ .../commons/test_fastsync_target_bigquery.py | 284 ++++++++++++++++ .../fastsync/test_mongodb_to_bigquery.py | 40 +++ .../units/fastsync/test_mysql_to_bigquery.py | 48 +++ .../fastsync/test_postgres_to_bigquery.py | 40 +++ .../units/fastsync/test_s3_csv_to_bigquery.py | 40 +++ tests/units/test_utils.py | 16 + 39 files changed, 2631 insertions(+), 52 deletions(-) create mode 160000 pipelinewise-target-bigquery create mode 100644 pipelinewise/fastsync/commons/target_bigquery.py create mode 100644 pipelinewise/fastsync/mongodb_to_bigquery.py create mode 100644 pipelinewise/fastsync/mysql_to_bigquery.py create mode 100644 pipelinewise/fastsync/postgres_to_bigquery.py create mode 100644 pipelinewise/fastsync/s3_csv_to_bigquery.py create mode 100644 singer-connectors/target-bigquery/requirements.txt create mode 100644 tests/end_to_end/test-project/tap_mongodb_to_bq.yml.template create mode 100644 tests/end_to_end/test-project/tap_mysql_to_bq.yml.template create mode 100644 tests/end_to_end/test-project/tap_mysql_to_bq_buffered_stream.yml.template create mode 100644 tests/end_to_end/test-project/tap_mysql_to_bq_split_large_files.yml.template create mode 100644 tests/end_to_end/test-project/tap_postgres_to_bq.yml.template create mode 100644 tests/end_to_end/test-project/tap_postgres_to_bq_split_large_files.yml.template create mode 100644 tests/end_to_end/test-project/tap_s3_csv_to_bq.yml.template create mode 100644 tests/end_to_end/test-project/target_bigquery.yml.template create mode 100644 tests/end_to_end/test_target_bigquery.py create mode 100644 tests/units/fastsync/commons/test_fastsync_target_bigquery.py create mode 100644 tests/units/fastsync/test_mongodb_to_bigquery.py create mode 100644 tests/units/fastsync/test_mysql_to_bigquery.py create mode 100644 tests/units/fastsync/test_postgres_to_bigquery.py create mode 100644 tests/units/fastsync/test_s3_csv_to_bigquery.py diff --git a/dev-project/.env b/dev-project/.env index 83f15508b..c2856e7dc 100644 --- a/dev-project/.env +++ b/dev-project/.env @@ -90,3 +90,12 @@ TARGET_REDSHIFT_COPY_ROLE_ARN= TARGET_REDSHIFT_S3_BUCKET= TARGET_REDSHIFT_S3_KEY_PREFIX= TARGET_REDSHIFT_S3_ACL= + +# ------------------------------------------------------------------------------ +# Test BigQuery credentials used as target DWH +# IMPORTANT: +# Google BigQuery is not open sourced and not included in the docker test env. +# Please add real credentials otherwise the related tests will be ignored. +# ------------------------------------------------------------------------------ +TARGET_BIGQUERY_PROJECT= +GOOGLE_APPLICATION_CREDENTIALS= diff --git a/dev-project/entrypoint.sh b/dev-project/entrypoint.sh index 7b0d3c1bb..a90b2a450 100755 --- a/dev-project/entrypoint.sh +++ b/dev-project/entrypoint.sh @@ -27,7 +27,7 @@ cd dev-project ../tests/db/target_postgres.sh # Install PipelineWise in the container -../install.sh --acceptlicenses --nousage --connectors=target-snowflake,target-postgres,tap-mysql,tap-postgres,tap-mongodb,transform-field,tap-s3-csv +../install.sh --acceptlicenses --nousage --connectors=target-snowflake,target-postgres,target-bigquery,tap-mysql,tap-postgres,tap-mongodb,transform-field,tap-s3-csv if [[ $? != 0 ]]; then echo echo "ERROR: Docker container not started. Failed to install one or more PipelineWise components." @@ -59,4 +59,4 @@ echo " $ pipelinewise status" echo "==========================================================================" # Continue running the container -tail -f /dev/null \ No newline at end of file +tail -f /dev/null diff --git a/docs/user_guide/schema_changes.rst b/docs/user_guide/schema_changes.rst index 832d65ecf..b002c13ed 100644 --- a/docs/user_guide/schema_changes.rst +++ b/docs/user_guide/schema_changes.rst @@ -65,9 +65,31 @@ data only after the change. | text | text | | 555-DEF | +----------------+----------------+--------------------------------+------------------+ +BigQuery versioning acts differently as it's not possible to modify existing columns. +After the data type change ``COLUMN_THREE`` remains ``INTEGER`` with +the old data and a new ``COLUMN_TREE__st`` column created with ``VARCHAR`` type that keeps +data only after the change. +The suffix added to the new column indicates the new column type. + ++----------------+----------------+------------------+----------------------+ +| **COLUMN_ONE** | **COLUMN_TWO** | **COLUMN_THREE** | **COLUMN_THREE__st** | +| | | (INTEGER) | (VARCHAR) | ++----------------+----------------+------------------+----------------------+ +| text | text | 111 | | ++----------------+----------------+------------------+----------------------+ +| text | text | 222 | | ++----------------+----------------+------------------+----------------------+ +| text | text | 333 | | ++----------------+----------------+------------------+----------------------+ +| text | text | | 444-ABC | ++----------------+----------------+------------------+----------------------+ +| text | text | | 555-DEF | ++----------------+----------------+------------------+----------------------+ + .. warning:: Please note the ``NULL`` values in ``COLUMN_THREE_20190812`` and ``COLUMN_THREE`` tables. + (or ``COLUMN_THREE`` and ``COLUMN_THREE__st`` for BigQuery) **Historical values are not converted to the new data types!** If you need the actual representation of the table after data type changes then you need to :ref:`resync` the table. diff --git a/install.sh b/install.sh index cfa07d488..1bb5ca109 100755 --- a/install.sh +++ b/install.sh @@ -183,6 +183,7 @@ DEFAULT_CONNECTORS=( target-snowflake target-redshift target-postgres + target-bigquery transform-field ) EXTRA_CONNECTORS=( diff --git a/pipelinewise-target-bigquery b/pipelinewise-target-bigquery new file mode 160000 index 000000000..363efee6b --- /dev/null +++ b/pipelinewise-target-bigquery @@ -0,0 +1 @@ +Subproject commit 363efee6bbe102081bb6cd7f8664720fabcedd1c diff --git a/pipelinewise/cli/pipelinewise.py b/pipelinewise/cli/pipelinewise.py index 1a44a496f..5db67826f 100644 --- a/pipelinewise/cli/pipelinewise.py +++ b/pipelinewise/cli/pipelinewise.py @@ -978,7 +978,7 @@ def run_tap(self): tap_properties, tap_state, { 'selected': True, - 'target_type': ['target-snowflake', 'target-redshift', 'target-postgres'], + 'target_type': ['target-snowflake', 'target-redshift', 'target-postgres', 'target-bigquery'], 'tap_type': ['tap-mysql', 'tap-postgres', 'tap-s3-csv', 'tap-mongodb'], 'initial_sync_required': True }, diff --git a/pipelinewise/cli/schemas/target.json b/pipelinewise/cli/schemas/target.json index bb9dc40ce..4c29c4198 100644 --- a/pipelinewise/cli/schemas/target.json +++ b/pipelinewise/cli/schemas/target.json @@ -171,6 +171,20 @@ "required": [ "s3_bucket" ] + }, + "db_conn_target_bigquery": { + "type": "object", + "properties": { + "project_id": { + "type": "string" + }, + "dataset_id": { + "type": "string" + } + }, + "required": [ + "project_id" + ] } }, "type": "object", @@ -193,7 +207,8 @@ "target-snowflake", "target-redshift", "target-postgres", - "target-s3-csv" + "target-s3-csv", + "target-bigquery" ] }, "db_conn": { @@ -209,6 +224,9 @@ }, { "$ref": "#/definitions/db_conn_target_s3_csv" + }, + { + "$ref": "#/definitions/db_conn_target_bigquery" } ] } diff --git a/pipelinewise/fastsync/commons/split_gzip.py b/pipelinewise/fastsync/commons/split_gzip.py index 0518df514..9905d5eb9 100644 --- a/pipelinewise/fastsync/commons/split_gzip.py +++ b/pipelinewise/fastsync/commons/split_gzip.py @@ -2,6 +2,7 @@ import io import logging import gzip +import builtins LOGGER = logging.getLogger(__name__) @@ -14,8 +15,8 @@ EST_COMPR_RATE = 0.12 -# pylint: disable=W0622 -def open(base_filename, mode='wb', chunk_size_mb=None, max_chunks=None, est_compr_rate=None): +# pylint: disable=W0622,R1732 +def open(base_filename, mode='wb', chunk_size_mb=None, max_chunks=None, est_compr_rate=None, compress=True): """Open a gzip-compressed file in binary or text mode. Args: @@ -39,7 +40,7 @@ def open(base_filename, mode='wb', chunk_size_mb=None, max_chunks=None, est_comp raise ValueError('Invalid chunk_size_mb: %d' % (chunk_size_mb,)) if max_chunks is not None and max_chunks < 0: raise ValueError('Invalid max_chunks: %d' % (max_chunks,)) - return SplitGzipFile(base_filename, mode, chunk_size_mb, max_chunks, est_compr_rate) + return SplitGzipFile(base_filename, mode, chunk_size_mb, max_chunks, est_compr_rate, compress) # pylint: disable=R0902 @@ -53,14 +54,19 @@ def __init__(self, mode: str = None, chunk_size_mb: int = None, max_chunks: int = None, - est_compr_rate: float = None): + est_compr_rate: float = None, + compress=True): super().__init__() self.base_filename = base_filename self.mode = mode self.chunk_size_mb = chunk_size_mb or DEFAULT_CHUNK_SIZE_MB self.max_chunks = max_chunks if max_chunks is not None else DEFAULT_MAX_CHUNKS - self.est_compr_rate = est_compr_rate if est_compr_rate is not None else EST_COMPR_RATE + self.compress= compress + if compress: + self.est_compr_rate = est_compr_rate if est_compr_rate is not None else EST_COMPR_RATE + else: + self.est_compr_rate = 1.0 self.chunk_seq = 1 self.current_chunk_size_mb = 0 self.chunk_filename = None @@ -100,7 +106,10 @@ def _activate_chunk_file(self): # Open the actual chunk file with gzip data writer self.chunk_filename = chunk_filename - self.chunk_file = gzip.open(self.chunk_filename, self.mode) + if self.compress: + self.chunk_file = gzip.open(self.chunk_filename, self.mode) + else: + self.chunk_file = builtins.open(self.chunk_filename, self.mode) @staticmethod def _bytes_to_megabytes(size: int) -> float: diff --git a/pipelinewise/fastsync/commons/tap_mongodb.py b/pipelinewise/fastsync/commons/tap_mongodb.py index 0757c8744..5b908b144 100644 --- a/pipelinewise/fastsync/commons/tap_mongodb.py +++ b/pipelinewise/fastsync/commons/tap_mongodb.py @@ -140,14 +140,16 @@ def close_connection(self): """ self.database.client.close() - # pylint: disable=R0914 + # pylint: disable=R0914,R0913 def copy_table(self, table_name: str, filepath: str, temp_dir: str, split_large_files=False, split_file_chunk_size_mb=1000, - split_file_max_chunks=20): + split_file_max_chunks=20, + compress=True + ): """ Export data from table to a zipped csv Args: @@ -174,7 +176,8 @@ def copy_table(self, gzip_splitter = split_gzip.open(filepath, mode='wt', chunk_size_mb=split_file_chunk_size_mb, - max_chunks=split_file_max_chunks if split_large_files else 0) + max_chunks=split_file_max_chunks if split_large_files else 0, + compress=compress) with gzip.open(export_file_path, 'rb') as export_file, gzip_splitter as gzfile: writer = csv.DictWriter(gzfile, fieldnames=[elem[0] for elem in self._get_collection_columns()], diff --git a/pipelinewise/fastsync/commons/tap_mysql.py b/pipelinewise/fastsync/commons/tap_mysql.py index caad8093f..ac9628646 100644 --- a/pipelinewise/fastsync/commons/tap_mysql.py +++ b/pipelinewise/fastsync/commons/tap_mysql.py @@ -24,13 +24,14 @@ class FastSyncTapMySql: Common functions for fastsync from a MySQL database """ - def __init__(self, connection_config, tap_type_to_target_type): + def __init__(self, connection_config, tap_type_to_target_type, target_quote=None): self.connection_config = connection_config self.connection_config['charset'] = connection_config.get('charset', DEFAULT_CHARSET) self.connection_config['export_batch_rows'] = connection_config.get('export_batch_rows', DEFAULT_EXPORT_BATCH_ROWS) self.connection_config['session_sqls'] = connection_config.get('session_sqls', DEFAULT_SESSION_SQLS) self.tap_type_to_target_type = tap_type_to_target_type + self.target_quote = target_quote self.conn = None self.conn_unbuffered = None @@ -186,16 +187,34 @@ def get_primary_keys(self, table_name): table_dict['table_name']) pk_specs = self.query(sql) if len(pk_specs) > 0: - return [safe_column_name(k.get('Column_name')) for k in pk_specs] + return [safe_column_name(k.get('Column_name'), self.target_quote) for k in pk_specs] return None - def get_table_columns(self, table_name): + def get_table_columns(self, table_name, max_num=None, date_type='date'): """ Get MySQL table column details from information_schema """ table_dict = utils.tablename_to_dict(table_name) - sql = """ + + if max_num: + decimals = len(max_num.split('.')[1]) if '.' in max_num else 0 + decimal_format = f""" + CONCAT('GREATEST(LEAST({max_num}, ROUND(`', column_name, '`, {decimals})), -{max_num})') + """ + integer_format = """ + CONCAT('`', column_name, '`') + """ + else: + decimal_format = """ + CONCAT('`', column_name, '`') + """ + integer_format = decimal_format + + schema_name = table_dict.get('schema_name') + table_name = table_dict.get('table_name') + + sql = f""" SELECT column_name, data_type, column_type, @@ -207,26 +226,32 @@ def get_table_columns(self, table_name): WHEN data_type IN ('blob', 'tinyblob', 'mediumblob', 'longblob') THEN CONCAT('REPLACE(hex(`', column_name, '`)', ", '\n', ' ')") WHEN data_type IN ('binary', 'varbinary') - THEN concat('REPLACE(hex(trim(trailing CHAR(0x00) from `',COLUMN_NAME,'`))', ", '\n', ' ')") + THEN concat('REPLACE(REPLACE(hex(trim(trailing CHAR(0x00) from `',COLUMN_NAME,'`))', ", '\n', ' '), '\r', '')") WHEN data_type IN ('bit') THEN concat('cast(`', column_name, '` AS unsigned)') - WHEN data_type IN ('datetime', 'timestamp', 'date') + WHEN data_type IN ('date') + THEN concat('nullif(CAST(`', column_name, '` AS {date_type}),STR_TO_DATE("0000-00-00 00:00:00", "%Y-%m-%d %T"))') + WHEN data_type IN ('datetime', 'timestamp') THEN concat('nullif(`', column_name, '`,STR_TO_DATE("0000-00-00 00:00:00", "%Y-%m-%d %T"))') WHEN column_type IN ('tinyint(1)') THEN concat('CASE WHEN `' , column_name , '` is null THEN null WHEN `' , column_name , '` = 0 THEN 0 ELSE 1 END') WHEN column_type IN ('geometry', 'point', 'linestring', 'polygon', 'multipoint', 'multilinestring', 'multipolygon', 'geometrycollection') THEN concat('ST_AsGeoJSON(', column_name, ')') WHEN column_name = 'raw_data_hash' - THEN concat('REPLACE(hex(`', column_name, '`)', ", '\n', ' ')") - ELSE concat('REPLACE(REPLACE(cast(`', column_name, '` AS char CHARACTER SET utf8)', ", '\n', ' '), '\0', '')") + THEN concat('REPLACE(REPLACE(hex(`', column_name, '`)', ", '\n', ' '), '\r', '')") + WHEN data_type IN ('double', 'numeric', 'float', 'decimal', 'real') + THEN {decimal_format} + WHEN data_type IN ('smallint', 'integer', 'bigint', 'mediumint', 'int') + THEN {integer_format} + ELSE concat('REPLACE(REPLACE(REPLACE(cast(`', column_name, '` AS char CHARACTER SET utf8)', ", '\n', ' '), '\r', ''), '\0', '')") END AS safe_sql_value, ordinal_position FROM information_schema.columns - WHERE table_schema = '{}' - AND table_name = '{}') x + WHERE table_schema = '{schema_name}' + AND table_name = '{table_name}') x ORDER BY ordinal_position - """.format(table_dict.get('schema_name'), table_dict.get('table_name')) + """ return self.query(sql) def map_column_types_to_target(self, table_name): @@ -235,7 +260,7 @@ def map_column_types_to_target(self, table_name): """ mysql_columns = self.get_table_columns(table_name) mapped_columns = [ - '{} {}'.format(safe_column_name(pc.get('column_name')), + '{} {}'.format(safe_column_name(pc.get('column_name'), self.target_quote), self.tap_type_to_target_type(pc.get('data_type'), pc.get('column_type'))) for pc in mysql_columns] @@ -248,9 +273,13 @@ def map_column_types_to_target(self, table_name): def copy_table(self, table_name, path, + max_num=None, + date_type='date', split_large_files=False, split_file_chunk_size_mb=1000, - split_file_max_chunks=20): + split_file_max_chunks=20, + compress=True, + ): """ Export data from table to a zipped csv Args: @@ -261,7 +290,7 @@ def copy_table(self, split_file_chunk_size_mb: File chunk sizes if `split_large_files` enabled. (Default: 1000) split_file_max_chunks: Max number of chunks if `split_large_files` enabled. (Default: 20) """ - table_columns = self.get_table_columns(table_name) + table_columns = self.get_table_columns(table_name, max_num, date_type) column_safe_sql_values = [c.get('safe_sql_value') for c in table_columns] # If self.get_table_columns returns zero row then table not exist @@ -284,7 +313,8 @@ def copy_table(self, gzip_splitter = split_gzip.open(path, mode='wt', chunk_size_mb=split_file_chunk_size_mb, - max_chunks=split_file_max_chunks if split_large_files else 0) + max_chunks=split_file_max_chunks if split_large_files else 0, + compress=compress) with gzip_splitter as split_gzip_files: writer = csv.writer(split_gzip_files, diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index 427c3abf7..b5e7285d1 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -20,9 +20,10 @@ class FastSyncTapPostgres: Common functions for fastsync from a Postgres database """ - def __init__(self, connection_config, tap_type_to_target_type): + def __init__(self, connection_config, tap_type_to_target_type, target_quote=None): self.connection_config = connection_config self.tap_type_to_target_type = tap_type_to_target_type + self.target_quote = target_quote self.conn = None self.curr = None self.primary_host_conn = None @@ -341,39 +342,62 @@ def get_primary_keys(self, table): AND indisprimary""".format(schema_name, table_name) pk_specs = self.query(sql) if len(pk_specs) > 0: - return [safe_column_name(k[0]) for k in pk_specs] + return [safe_column_name(k[0], self.target_quote) for k in pk_specs] return None - def get_table_columns(self, table_name): + def get_table_columns(self, table_name, max_num=None, date_type='date'): """ Get PG table column details from information_schema """ table_dict = utils.tablename_to_dict(table_name) - sql = """ + + if max_num: + decimals = len(max_num.split('.')[1]) if '.' in max_num else 0 + decimal_format = f""" + 'CASE WHEN "' || column_name || '" IS NULL THEN NULL ELSE GREATEST(LEAST({max_num}, ROUND("' || column_name || '"::numeric , {decimals})), -{max_num}) END' + """ + integer_format = """ + '"' || column_name || '"' + """ + else: + decimal_format = """ + '"' || column_name || '"' + """ + integer_format = decimal_format + + schema_name = table_dict.get('schema_name') + table_name = table_dict.get('table_name') + + sql = f""" SELECT column_name ,data_type ,safe_sql_value + ,character_maximum_length FROM (SELECT column_name, data_type, CASE WHEN data_type = 'ARRAY' THEN 'array_to_json("' || column_name || '") AS ' || column_name + WHEN data_type = 'date' THEN column_name || '::{date_type} AS ' || column_name WHEN udt_name = 'time' THEN 'replace("' || column_name || E'"::varchar,\\\'24:00:00\\\',\\\'00:00:00\\\') AS ' || column_name WHEN udt_name = 'timetz' THEN 'replace(("' || column_name || E'" at time zone \'\'UTC\'\')::time::varchar,\\\'24:00:00\\\',\\\'00:00:00\\\') AS ' || column_name WHEN udt_name in ('timestamp', 'timestamptz') THEN 'CASE WHEN "' ||column_name|| E'" < \\'0001-01-01 00:00:00.000\\' ' 'OR "' ||column_name|| E'" > \\'9999-12-31 23:59:59.999\\' THEN \\'9999-12-31 23:59:59.999\\' ' 'ELSE "' ||column_name|| '" END AS "' ||column_name|| '"' + WHEN data_type IN ('double precision', 'numeric', 'decimal', 'real') THEN {decimal_format} || ' AS ' || column_name + WHEN data_type IN ('smallint', 'integer', 'bigint', 'serial', 'bigserial') THEN {integer_format} || ' AS ' || column_name ELSE '"'||column_name||'"' - END AS safe_sql_value + END AS safe_sql_value, + character_maximum_length FROM information_schema.columns - WHERE table_schema = '{}' - AND table_name = '{}' + WHERE table_schema = '{schema_name}' + AND table_name = '{table_name}' ORDER BY ordinal_position ) AS x - """.format(table_dict.get('schema_name'), table_dict.get('table_name')) + """ return self.query(sql) def map_column_types_to_target(self, table_name): @@ -381,20 +405,32 @@ def map_column_types_to_target(self, table_name): Map PG column types to equivalent types in target """ postgres_columns = self.get_table_columns(table_name) - mapped_columns = ['{} {}'.format(safe_column_name(pc[0]), - self.tap_type_to_target_type(pc[1])) for pc in postgres_columns] + mapped_columns = [] + for pc in postgres_columns: + column_type = self.tap_type_to_target_type(pc[1]) + # postgres bit type can have length greater than 1 + # most targets would want to map length 1 to boolean and the rest to number + if isinstance(column_type, list): + column_type = column_type[1 if pc[3] > 1 else 0] + mapping = '{} {}'.format(safe_column_name(pc[0], self.target_quote), column_type) + mapped_columns.append(mapping) return { 'columns': mapped_columns, 'primary_key': self.get_primary_keys(table_name) } + # pylint: disable=too-many-arguments def copy_table(self, table_name, path, + max_num=None, + date_type='date', split_large_files=False, split_file_chunk_size_mb=1000, - split_file_max_chunks=20): + split_file_max_chunks=20, + compress=True + ): """ Export data from table to a zipped csv Args: @@ -405,7 +441,7 @@ def copy_table(self, split_file_chunk_size_mb: File chunk sizes if `split_large_files` enabled. (Default: 1000) split_file_max_chunks: Max number of chunks if `split_large_files` enabled. (Default: 20) """ - table_columns = self.get_table_columns(table_name) + table_columns = self.get_table_columns(table_name, max_num, date_type) column_safe_sql_values = [c.get('safe_sql_value') for c in table_columns] # If self.get_table_columns returns zero row then table not exist @@ -425,7 +461,8 @@ def copy_table(self, gzip_splitter = split_gzip.open(path, mode='wb', chunk_size_mb=split_file_chunk_size_mb, - max_chunks=split_file_max_chunks if split_large_files else 0) + max_chunks=split_file_max_chunks if split_large_files else 0, + compress=compress) with gzip_splitter as split_gzip_files: self.curr.copy_expert(sql, split_gzip_files, size=131072) diff --git a/pipelinewise/fastsync/commons/tap_s3_csv.py b/pipelinewise/fastsync/commons/tap_s3_csv.py index 9abee9947..e23f60e42 100644 --- a/pipelinewise/fastsync/commons/tap_s3_csv.py +++ b/pipelinewise/fastsync/commons/tap_s3_csv.py @@ -26,7 +26,7 @@ class FastSyncTapS3Csv: """ # pylint: disable=bare-except - def __init__(self, connection_config: Dict, tap_type_to_target_type: Callable): + def __init__(self, connection_config: Dict, tap_type_to_target_type: Callable, target_quote=None): """ Constructor :param connection_config: tap connection config @@ -43,6 +43,7 @@ def __init__(self, connection_config: Dict, tap_type_to_target_type: Callable): self.connection_config = connection_config self.tap_type_to_target_type = tap_type_to_target_type + self.target_quote = target_quote self.tables_last_modified = {} def _find_table_spec_by_name(self, table_name: str) -> Dict: @@ -148,7 +149,7 @@ def _get_file_records(self, s3_path: str, table_spec: Dict, records: List[Dict], # make all columns safe # pylint: disable=invalid-name for k, v in row.items(): - new_row[safe_column_name(k)] = v + new_row[safe_column_name(k, self.target_quote)] = v record = {**new_row, **custom_columns} @@ -171,7 +172,7 @@ def map_column_types_to_target(self, filepath: str, table: str): # use timestamp as a type instead if column is set in date_overrides configuration mapped_columns = [] date_overrides = None if 'date_overrides' not in specs \ - else {safe_column_name(c) for c in specs['date_overrides']} + else {safe_column_name(c, self.target_quote) for c in specs['date_overrides']} for column_name, column_type in csv_columns: @@ -228,7 +229,7 @@ def _get_primary_keys(self, table_specs: Dict) -> Optional[List]: :return: the keys concatenated and separated by comma if keys are given, otherwise None """ if table_specs.get('key_properties', False): - return [safe_column_name(k) for k in table_specs['key_properties']] + return [safe_column_name(k, self.target_quote) for k in table_specs['key_properties']] return None diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py new file mode 100644 index 000000000..56927e11a --- /dev/null +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -0,0 +1,284 @@ +import logging +import json +import re +from typing import List, Dict + +from google.cloud import bigquery +from google.api_core import exceptions + +from .transform_utils import TransformationHelper, SQLFlavor +from . import utils + +LOGGER = logging.getLogger(__name__) + +# tone down snowflake connector logging level. +logging.getLogger('bigquery.connector').setLevel(logging.WARNING) + + +def safe_name(name, quotes=True): + """ + Get a table name that BigQuery would accept by removing all bad chars + and making it lowercase + """ + name = name.replace('`', '') + pattern = '[^a-zA-Z0-9]' + removed_bad_chars = re.sub(pattern, '_', name).lower() + if quotes: + return '`{}`'.format(removed_bad_chars) + return removed_bad_chars + + +# pylint: disable=missing-function-docstring,no-self-use,too-many-arguments +class FastSyncTargetBigquery: + """ + Common functions for fastsync to BigQuery + """ + def __init__(self, connection_config, transformation_config=None): + self.connection_config = connection_config + self.transformation_config = transformation_config + + def open_connection(self): + project_id = self.connection_config['project_id'] + return bigquery.Client(project=project_id) + + def query(self, query, params=None): + def to_query_parameter(value): + if isinstance(value, int): + value_type = 'INT64' + elif isinstance(value, float): + value_type = 'NUMERIC' + #TODO: repeated float here and in target + elif isinstance(value, float): + value_type = 'FLOAT64' + elif isinstance(value, bool): + value_type = 'BOOL' + else: + value_type = 'STRING' + return bigquery.ScalarQueryParameter(None, value_type, value) + + if params is None: + params = [] + + job_config = bigquery.QueryJobConfig() + query_params = [to_query_parameter(p) for p in params] + job_config.query_parameters = query_params + + queries = [] + if isinstance(query, list): + queries.extend(query) + else: + queries = [query] + + client = self.open_connection() + LOGGER.info('TARGET_BIGQUERY - Running query: %s', query) + query_job = client.query(';\n'.join(queries), job_config=job_config) + query_job.result() + + return query_job + + def create_schema(self, schema_name): + temp_schema = self.connection_config.get('temp_schema', schema_name) + + client = self.open_connection() + for schema in set([schema_name, temp_schema]): + datasets = client.list_datasets() + dataset_ids = [d.dataset_id.lower() for d in datasets] + + if schema.lower() not in dataset_ids: + LOGGER.info("Schema '%s' does not exist. Creating...", schema) + client.create_dataset(schema, exists_ok=True) + + def drop_table(self, target_schema, table_name, is_temporary=False): + table_dict = utils.tablename_to_dict(table_name) + target_table = safe_name(table_dict.get('table_name' if not is_temporary else 'temp_table_name')) + + sql = 'DROP TABLE IF EXISTS {}.{}'.format(target_schema, target_table.lower()) + self.query(sql) + + def create_table(self, target_schema: str, table_name: str, columns: List[str], + is_temporary: bool = False, sort_columns=False): + + table_dict = utils.tablename_to_dict(table_name) + target_table = safe_name(table_dict.get('table_name' if not is_temporary else 'temp_table_name').lower()) + + # skip the EXTRACTED, BATCHED and DELETED columns in case they exist because they gonna be added later + columns = [c for c in columns if not ( + c.upper().startswith(utils.SDC_EXTRACTED_AT.upper()) or + c.upper().startswith(utils.SDC_BATCHED_AT.upper()) or + c.upper().startswith(utils.SDC_DELETED_AT.upper()))] + + columns += [f'{utils.SDC_EXTRACTED_AT} TIMESTAMP', + f'{utils.SDC_BATCHED_AT} TIMESTAMP', + f'{utils.SDC_DELETED_AT} TIMESTAMP' + ] + + # We need the sort the columns for some taps( for now tap-s3-csv) + # because later on when copying a csv file into Snowflake + # the csv file columns need to be in the same order as the the target table that will be created below + if sort_columns: + columns.sort() + + columns = [c.lower() for c in columns] + + sql = f'CREATE OR REPLACE TABLE {target_schema}.{target_table} (' \ + f'{",".join(columns)})' + + self.query(sql) + + # pylint: disable=R0913,R0914 + def copy_to_table(self, filepath, target_schema, table_name, size_bytes, is_temporary, + skip_csv_header=False, allow_quoted_newlines=True, write_truncate=True): + LOGGER.info('BIGQUERY - Loading %s into Bigquery...', filepath) + table_dict = utils.tablename_to_dict(table_name) + target_table = safe_name(table_dict.get('table_name' if not is_temporary else 'temp_table_name').lower(), + quotes=False) + + client = self.open_connection() + dataset_ref = client.dataset(target_schema) + table_ref = dataset_ref.table(target_table) + table_schema = client.get_table(table_ref).schema + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.CSV + job_config.schema = table_schema + job_config.write_disposition = 'WRITE_TRUNCATE' if write_truncate else 'WRITE_APPEND' + job_config.allow_quoted_newlines = allow_quoted_newlines + job_config.skip_leading_rows = 1 if skip_csv_header else 0 + with open(filepath, 'rb') as exported_data: + job = client.load_table_from_file(exported_data, table_ref, job_config=job_config) + try: + job.result() + except exceptions.BadRequest as exc: + for error_row in job.errors: + LOGGER.critical('ERROR: %s', error_row['message']) + raise exc + + LOGGER.info('Job %s', job) + LOGGER.info('Job.output_rows %s', job.output_rows) + inserts = job.output_rows + LOGGER.info('Loading into %s."%s": %s', + target_schema, + target_table, + json.dumps({'inserts': inserts, 'updates': 0, 'size_bytes': size_bytes})) + + LOGGER.info(job.errors) + + # grant_... functions are common functions called by utils.py: grant_privilege function + # "to_group" is not used here but exists for compatibility reasons with other database types + # "to_group" is for databases that can grant to users and groups separately like Amazon Redshift + # pylint: disable=unused-argument + def grant_select_on_table(self, target_schema, table_name, role, is_temporary, to_group=False): + # Grant role is not mandatory parameter, do nothing if not specified + if role: + table_dict = utils.tablename_to_dict(table_name) + target_table = safe_name(table_dict.get('table_name' if not is_temporary else 'temp_table_name')) + sql = 'GRANT SELECT ON {}.{} TO ROLE {}'.format(target_schema, target_table, role) + self.query(sql) + + # pylint: disable=unused-argument + def grant_usage_on_schema(self, target_schema, role, to_group=False): + # Grant role is not mandatory parameter, do nothing if not specified + if role: + sql = 'GRANT USAGE ON SCHEMA {} TO ROLE {}'.format(target_schema, role) + self.query(sql) + + # pylint: disable=unused-argument + def grant_select_on_schema(self, target_schema, role, to_group=False): + # Grant role is not mandatory parameter, do nothing if not specified + if role: + sql = 'GRANT SELECT ON ALL TABLES IN SCHEMA {} TO ROLE {}'.format(target_schema, role) + self.query(sql) + + def obfuscate_columns(self, target_schema: str, table_name: str): + """ + Apply any configured transformations to the given table + Args: + target_schema: target schema name + table_name: table name + """ + LOGGER.info('Starting obfuscation rules...') + + table_dict = utils.tablename_to_dict(table_name) + temp_table = table_dict.get('temp_table_name') + transformations = self.transformation_config.get('transformations', []) + + # Input table_name is formatted as {{schema}}.{{table}} + # Stream name in taps transformation.json is formatted as {{schema}}-{{table}} + # + # We need to convert to the same format to find the transformation + # has that has to be applied + tap_stream_name_by_table_name = '{}-{}'.format(table_dict['schema_name'], table_dict['table_name']) \ + if table_dict['schema_name'] is not None else table_dict['table_name'] + + # Find obfuscation rules for the current table + # trans_map = self.__get_stream_transformation_map(tap_stream_name_by_table_name, transformations) + trans_map = TransformationHelper.get_trans_in_sql_flavor( + tap_stream_name_by_table_name, + transformations, + SQLFlavor('bigquery')) + + self.__apply_transformations(trans_map, target_schema, temp_table) + + LOGGER.info('Obfuscation rules applied.') + + + def swap_tables(self, schema, table_name): + project_id = self.connection_config['project_id'] + table_dict = utils.tablename_to_dict(table_name) + target_table = safe_name(table_dict.get('table_name').lower(), quotes=False) + temp_table = safe_name(table_dict.get('temp_table_name').lower(), quotes=False) + + # Swap tables and drop the temp table + table_id = '{}.{}.{}'.format(project_id, schema, target_table) + temp_table_id = '{}.{}.{}'.format(project_id, schema, temp_table) + + # we cant swap tables in bigquery, so we copy the temp into the table + # then delete the temp table + job_config = bigquery.CopyJobConfig() + job_config.write_disposition = 'WRITE_TRUNCATE' + client = self.open_connection() + replace_job = client.copy_table(temp_table_id, table_id, job_config=job_config) + replace_job.result() + + # delete the temp table + client.delete_table(temp_table_id) + + def __apply_transformations(self, transformations: List[Dict], target_schema: str, table_name: str) -> None: + """ + Generate and execute the SQL queries based on the given transformations. + Args: + transformations: List of dictionaries in the form {"trans": "", conditions: "... AND ..."} + target_schema: name of the target schema where the table lives + table_name: the table name on which we want to apply the transformations + """ + full_qual_table_name = '{}.{}'.format(safe_name(target_schema), safe_name(table_name)) + + if transformations: + all_cols_update_sql = '' + + # Conditional transformations will have to be executed one at time separately + + for trans_item in transformations: + + # If we have conditions, then we need to construct the query and execute it to transform the + # single column conditionally + if trans_item['conditions']: + sql = f'UPDATE {full_qual_table_name} ' \ + f'SET {trans_item["trans"]} WHERE {trans_item["conditions"]};' + + self.query(sql) + + # Otherwise, we can add this column to a general UPDATE query with no predicates + else: + + # if the variable is empty, then initialize it otherwise append the + # current transformation to it + if not all_cols_update_sql: + all_cols_update_sql = trans_item['trans'] + else: + all_cols_update_sql = f'{all_cols_update_sql}, {trans_item["trans"]}' + + # If we have some non-conditional transformations then construct and execute a query + if all_cols_update_sql: + all_cols_update_sql = f'UPDATE {full_qual_table_name} SET {all_cols_update_sql} WHERE true;' + + self.query(all_cols_update_sql) diff --git a/pipelinewise/fastsync/commons/transform_utils.py b/pipelinewise/fastsync/commons/transform_utils.py index c9f540fa8..164af4b35 100644 --- a/pipelinewise/fastsync/commons/transform_utils.py +++ b/pipelinewise/fastsync/commons/transform_utils.py @@ -30,6 +30,7 @@ class SQLFlavor(Enum): """ SNOWFLAKE = 'snowflake' POSTGRES = 'postgres' + BIGQUERY = 'bigquery' # pylint: disable=too-few-public-methods @@ -162,18 +163,22 @@ def __conditions_to_sql( elif 'regex_match' in condition: + value = f"'{condition['regex_match']}'" + if sql_flavor == SQLFlavor.SNOWFLAKE: operator = 'REGEXP' elif sql_flavor == SQLFlavor.POSTGRES: operator = '~' + elif sql_flavor == SQLFlavor.BIGQUERY: + conditions.append(f"REGEXP_CONTAINS({cls.__safe_column(condition['column'], sql_flavor)}, {value})") + continue + else: raise NotImplementedError(f'regex_match conditional transformation in {sql_flavor.value} SQL ' f'flavor not implemented!') - value = f"'{condition['regex_match']}'" - else: continue @@ -190,6 +195,9 @@ def __safe_column(cls, col: str, sql_flavor: SQLFlavor): elif sql_flavor == SQLFlavor.POSTGRES: column = f'"{col.lower()}"' + elif sql_flavor == SQLFlavor.BIGQUERY: + column = f'`{col.lower()}`' + else: column = col @@ -213,6 +221,9 @@ def __hash_to_sql(cls, column: str, sql_flavor: SQLFlavor) -> str: elif sql_flavor == SQLFlavor.POSTGRES: trans = f'{column} = ENCODE(DIGEST({column}, \'sha256\'), \'hex\')' + elif sql_flavor == SQLFlavor.BIGQUERY: + trans = f'{column} = TO_BASE64(SHA256({column}))' + else: raise NotImplementedError( f'HASH transformation in {sql_flavor.value} SQL flavor not implemented!') @@ -240,6 +251,9 @@ def __hash_skip_first_to_sql(cls, transform_type: TransformationType, column: st elif sql_flavor == SQLFlavor.POSTGRES: trans = '{0} = CONCAT(SUBSTRING({0}, 1, {1}), ENCODE(DIGEST(SUBSTRING({0}, {1} + 1), ' \ '\'sha256\'), \'hex\'))'.format(column, skip_first_n) + elif sql_flavor == SQLFlavor.BIGQUERY: + trans = '{0} = CONCAT(SUBSTRING({0}, 1, {1}), TO_BASE64(SHA256(SUBSTRING({0}, {1} + 1))))'.format( + column, skip_first_n) else: raise NotImplementedError(f'HASH-SKIP-FIRST-{skip_first_n} transformation in {sql_flavor.value} SQL flavor ' f'not implemented!') @@ -271,6 +285,10 @@ def __mask_date_to_sql(cls, column: str, sql_flavor: SQLFlavor) -> str: 'DATE_PART(\'hour\', {0})::int, ' \ 'DATE_PART(\'minute\', {0})::int, ' \ 'DATE_PART(\'second\', {0})::double precision)'.format(column) + elif sql_flavor == SQLFlavor.BIGQUERY: + trans = f'{column} = TIMESTAMP(DATETIME(' \ + f'DATE(EXTRACT(YEAR FROM {column}), 1, 1),' \ + f'TIME({column})))' else: raise NotImplementedError(f'MASK-DATE transformation in {sql_flavor.value} SQL flavor ' f'not implemented!') diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py new file mode 100644 index 000000000..9fc88b343 --- /dev/null +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +import logging +import os +import sys +import time +import multiprocessing + +from typing import Union +from argparse import Namespace +from functools import partial +from datetime import datetime + +from .commons import utils +from .commons.tap_mongodb import FastSyncTapMongoDB +from .commons.target_bigquery import FastSyncTargetBigquery + +LOGGER = logging.getLogger(__name__) + +REQUIRED_CONFIG_KEYS = { + 'tap': [ + 'host', + 'port', + 'user', + 'password', + 'auth_database', + 'dbname', + ], + 'target': [ + 'project_id' + ] +} + +LOCK = multiprocessing.Lock() + + +def tap_type_to_target_type(mongo_type): + """Data type mapping from MongoDB to Bigquery""" + return { + 'string': 'STRING', + 'object': 'STRING', + 'array': 'STRING', + 'date': 'TIMESTAMP', + 'datetime': 'TIMESTAMP', + 'timestamp': 'TIMESTAMP', + }.get(mongo_type, 'STRING') + + +# pylint: disable=too-many-locals +def sync_table(table: str, args: Namespace) -> Union[bool, str]: + """Sync one table""" + mongodb = FastSyncTapMongoDB(args.tap, tap_type_to_target_type) + bigquery = FastSyncTargetBigquery(args.target, args.transform) + + try: + dbname = args.tap.get('dbname') + filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format(dbname, table, time.strftime('%Y%m%d-%H%M%S')) + filepath = os.path.join(args.temp_dir, filename) + target_schema = utils.get_target_schema(args.target, table) + + # Open connection + mongodb.open_connection() + + # Get bookmark - LSN position or Incremental Key value + bookmark = utils.get_bookmark_for_table(table, args.properties, mongodb, dbname=dbname) + + # Exporting table data, get table definitions and close connection to avoid timeouts + mongodb.copy_table(table, filepath, args.temp_dir) + size_bytes = os.path.getsize(filepath) + bigquery_types = mongodb.map_column_types_to_target() + bigquery_columns = bigquery_types.get('columns', []) + mongodb.close_connection() + + # Creating temp table in Bigquery + bigquery.create_schema(target_schema) + bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True) + + # Load into Bigquery table + bigquery.copy_to_table(filepath, target_schema, table, size_bytes, is_temporary=True, skip_csv_header=True) + os.remove(filepath) + + # Obfuscate columns + bigquery.obfuscate_columns(target_schema, table) + + # Create target table and swap with the temp table in Bigquery + bigquery.create_table(target_schema, table, bigquery_columns) + bigquery.swap_tables(target_schema, table) + + # Save bookmark to singer state file + # Lock to ensure that only one process writes the same state file at a time + LOCK.acquire() + try: + utils.save_state_file(args.state, table, bookmark) + finally: + LOCK.release() + + # Table loaded, grant select on all tables in target schema + grantees = utils.get_grantees(args.target, table) + utils.grant_privilege(target_schema, grantees, bigquery.grant_usage_on_schema) + utils.grant_privilege(target_schema, grantees, bigquery.grant_select_on_schema) + + return True + + except Exception as exc: + LOGGER.exception(exc) + return '{}: {}'.format(table, exc) + + +def main_impl(): + """Main sync logic""" + args = utils.parse_args(REQUIRED_CONFIG_KEYS) + pool_size = utils.get_pool_size(args.tap) + start_time = datetime.now() + table_sync_excs = [] + + # Log start info + LOGGER.info(""" + ------------------------------------------------------- + STARTING SYNC + ------------------------------------------------------- + Tables selected to sync : %s + Total tables selected to sync : %s + Pool size : %s + ------------------------------------------------------- + """, args.tables, len(args.tables), pool_size) + + # Start loading tables in parallel in spawning processes + with multiprocessing.Pool(pool_size) as proc: + table_sync_excs = list( + filter(lambda x: not isinstance(x, bool), proc.map(partial(sync_table, args=args), args.tables))) + + # Log summary + end_time = datetime.now() + LOGGER.info(""" + ------------------------------------------------------- + SYNC FINISHED - SUMMARY + ------------------------------------------------------- + Total tables selected to sync : %s + Tables loaded successfully : %s + Exceptions during table sync : %s + + Pool size : %s + Runtime : %s + ------------------------------------------------------- + """, len(args.tables), len(args.tables) - len(table_sync_excs), str(table_sync_excs), + pool_size, end_time - start_time) + + if len(table_sync_excs) > 0: + sys.exit(1) + + +def main(): + """Main entry point""" + try: + main_impl() + except Exception as exc: + LOGGER.critical(exc) + raise exc diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py new file mode 100644 index 000000000..4fa2e92ad --- /dev/null +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +import logging +import os +import sys +import glob +import time +from functools import partial +from argparse import Namespace +import multiprocessing +from typing import Union + +from datetime import datetime +from .commons import utils +from .commons.tap_mysql import FastSyncTapMySql +from .commons.target_bigquery import FastSyncTargetBigquery + +MAX_NUM='99999999999999999999999999999.999999999' + +LOGGER = logging.getLogger(__name__) + +REQUIRED_CONFIG_KEYS = { + 'tap': [ + 'host', + 'port', + 'user', + 'password' + ], + 'target': [ + 'project_id', + ] +} + +LOCK = multiprocessing.Lock() + + +def tap_type_to_target_type(mysql_type, mysql_column_type): + """Data type mapping from MySQL to Bigquery""" + return { + 'char':'STRING', + 'varchar':'STRING', + 'binary':'STRING', + 'varbinary':'STRING', + 'blob':'STRING', + 'tinyblob':'STRING', + 'mediumblob':'STRING', + 'longblob':'STRING', + 'geometry':'STRING', + 'text':'STRING', + 'tinytext':'STRING', + 'mediumtext':'STRING', + 'longtext':'STRING', + 'enum':'STRING', + 'int':'INT64', + 'tinyint':'BOOL' if mysql_column_type == 'tinyint(1)' else 'INT64', + 'smallint':'INT64', + 'mediumint':'INT64', + 'bigint':'INT64', + 'bit':'BOOL', + 'decimal':'NUMERIC', + 'double':'NUMERIC', + 'float':'NUMERIC', + 'bool':'BOOL', + 'boolean':'BOOL', + 'date':'TIMESTAMP', + 'datetime':'TIMESTAMP', + 'timestamp':'TIMESTAMP', + 'time':'TIME' + }.get(mysql_type, 'STRING') + + +# pylint: disable=too-many-locals +def sync_table(table: str, args: Namespace) -> Union[bool, str]: + """Sync one table""" + mysql = FastSyncTapMySql(args.tap, tap_type_to_target_type, target_quote='`') + bigquery = FastSyncTargetBigquery(args.target, args.transform) + + try: + filename = 'pipelinewise_fastsync_{}_{}.csv'.format(table, time.strftime('%Y%m%d-%H%M%S')) + filepath = os.path.join(args.temp_dir, filename) + target_schema = utils.get_target_schema(args.target, table) + + # Open connection and get binlog file position + mysql.open_connections() + + # Get bookmark - Binlog position or Incremental Key value + bookmark = utils.get_bookmark_for_table(table, args.properties, mysql) + + # Exporting table data, get table definitions and close connection to avoid timeouts + mysql.copy_table(table, + filepath, + compress=False, + max_num=MAX_NUM, + date_type='datetime') + file_parts = glob.glob(f'{filepath}*') + size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) + bigquery_types = mysql.map_column_types_to_target(table) + bigquery_columns = bigquery_types.get('columns', []) + mysql.close_connections() + + # Creating temp table in Bigquery + bigquery.create_schema(target_schema) + bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True) + + # Load into Bigquery table + for num, file_part in enumerate(file_parts): + write_truncate = num == 0 + bigquery.copy_to_table( + filepath, + target_schema, + table, + size_bytes, + is_temporary=True, + write_truncate=write_truncate) + os.remove(file_part) + + # Obfuscate columns + bigquery.obfuscate_columns(target_schema, table) + + # Create target table and swap with the temp table in Bigquery + bigquery.create_table(target_schema, table, bigquery_columns) + bigquery.swap_tables(target_schema, table) + + # Save bookmark to singer state file + # Lock to ensure that only one process writes the same state file at a time + LOCK.acquire() + try: + utils.save_state_file(args.state, table, bookmark) + finally: + LOCK.release() + + # Table loaded, grant select on all tables in target schema + grantees = utils.get_grantees(args.target, table) + utils.grant_privilege(target_schema, grantees, bigquery.grant_usage_on_schema) + utils.grant_privilege(target_schema, grantees, bigquery.grant_select_on_schema) + + return True + + except Exception as exc: + LOGGER.exception(exc) + return '{}: {}'.format(table, exc) + + +def main_impl(): + """Main sync logic""" + args = utils.parse_args(REQUIRED_CONFIG_KEYS) + pool_size = utils.get_pool_size(args.tap) + start_time = datetime.now() + table_sync_excs = [] + + # Log start info + LOGGER.info(""" + ------------------------------------------------------- + STARTING SYNC + ------------------------------------------------------- + Tables selected to sync : %s + Total tables selected to sync : %s + Pool size : %s + ------------------------------------------------------- + """, args.tables, len(args.tables), pool_size) + + # Start loading tables in parallel in spawning processes + with multiprocessing.Pool(pool_size) as proc: + table_sync_excs = list( + filter(lambda x: not isinstance(x, bool), proc.map(partial(sync_table, args=args), args.tables))) + + # Log summary + end_time = datetime.now() + LOGGER.info(""" + ------------------------------------------------------- + SYNC FINISHED - SUMMARY + ------------------------------------------------------- + Total tables selected to sync : %s + Tables loaded successfully : %s + Exceptions during table sync : %s + + Pool size : %s + Runtime : %s + ------------------------------------------------------- + """, len(args.tables), len(args.tables) - len(table_sync_excs), str(table_sync_excs), + pool_size, end_time - start_time) + + if len(table_sync_excs) > 0: + sys.exit(1) + + +def main(): + """Main entry point""" + try: + main_impl() + except Exception as exc: + LOGGER.critical(exc) + raise exc diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py new file mode 100644 index 000000000..3e75ea5e5 --- /dev/null +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +import logging +import os +import sys +import glob +import time +from functools import partial +from argparse import Namespace +import multiprocessing +from typing import Union + +from datetime import datetime +from .commons import utils +from .commons.tap_postgres import FastSyncTapPostgres +from .commons.target_bigquery import FastSyncTargetBigquery + +MAX_NUM='99999999999999999999999999999.999999999' + +LOGGER = logging.getLogger(__name__) + +REQUIRED_CONFIG_KEYS = { + 'tap': [ + 'host', + 'port', + 'user', + 'password' + ], + 'target': [ + 'project_id' + ] +} + +LOCK = multiprocessing.Lock() + +def tap_type_to_target_type(pg_type): + """Data type mapping from Postgres to Bigquery""" + return { + 'char':'STRING', + 'character':'STRING', + 'varchar':'STRING', + 'character varying':'STRING', + 'text':'STRING', + 'bit': ['BOOL', 'NUMERIC'], + 'varbit':'NUMERIC', + 'bit varying':'NUMERIC', + 'smallint':'INT64', + 'int':'INT64', + 'integer':'INT64', + 'bigint':'INT64', + 'smallserial':'INT64', + 'serial':'INT64', + 'bigserial':'INT64', + 'numeric':'NUMERIC', + 'double precision':'NUMERIC', + 'real':'NUMERIC', + 'bool':'BOOL', + 'boolean':'BOOL', + 'date':'TIMESTAMP', + 'timestamp':'TIMESTAMP', + 'timestamp without time zone':'TIMESTAMP', + 'timestamp with time zone':'TIMESTAMP', + 'time':'TIME', + 'time without time zone':'TIME', + 'time with time zone':'TIME', + # This is all uppercase, because postgres stores it in this format in information_schema.columns.data_type + 'ARRAY':'STRING', + 'json':'STRING', + 'jsonb':'STRING' + }.get(pg_type, 'STRING') + + +# pylint: disable=too-many-locals +def sync_table(table: str, args: Namespace) -> Union[bool, str]: + """Sync one table""" + postgres = FastSyncTapPostgres(args.tap, tap_type_to_target_type, target_quote='`') + bigquery = FastSyncTargetBigquery(args.target, args.transform) + + try: + dbname = args.tap.get('dbname') + filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format(dbname, table, time.strftime('%Y%m%d-%H%M%S')) + filepath = os.path.join(args.temp_dir, filename) + target_schema = utils.get_target_schema(args.target, table) + + # Open connection + postgres.open_connection() + + # Get bookmark - LSN position or Incremental Key value + bookmark = utils.get_bookmark_for_table(table, args.properties, postgres, dbname=dbname) + + # Exporting table data, get table definitions and close connection to avoid timeouts + postgres.copy_table(table, + filepath, + compress=False, + max_num=MAX_NUM, + date_type='timestamp') + file_parts = glob.glob(f'{filepath}*') + size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) + + bigquery_types = postgres.map_column_types_to_target(table) + bigquery_columns = bigquery_types.get('columns', []) + postgres.close_connection() + + # Creating temp table in Bigquery + bigquery.create_schema(target_schema) + bigquery.create_table(target_schema, table, bigquery_columns, is_temporary=True) + + # Load into Bigquery table + for num, file_part in enumerate(file_parts): + write_truncate = num == 0 + bigquery.copy_to_table( + filepath, + target_schema, + table, + size_bytes, + is_temporary=True, + write_truncate=write_truncate) + os.remove(file_part) + + # Obfuscate columns + bigquery.obfuscate_columns(target_schema, table) + + # Create target table and swap with the temp table in Bigquery + bigquery.create_table(target_schema, table, bigquery_columns) + bigquery.swap_tables(target_schema, table) + + # Save bookmark to singer state file + # Lock to ensure that only one process writes the same state file at a time + LOCK.acquire() + try: + utils.save_state_file(args.state, table, bookmark) + finally: + LOCK.release() + + # Table loaded, grant select on all tables in target schema + grantees = utils.get_grantees(args.target, table) + utils.grant_privilege(target_schema, grantees, bigquery.grant_usage_on_schema) + utils.grant_privilege(target_schema, grantees, bigquery.grant_select_on_schema) + + return True + + except Exception as exc: + LOGGER.exception(exc) + return '{}: {}'.format(table, exc) + + +def main_impl(): + """Main sync logic""" + args = utils.parse_args(REQUIRED_CONFIG_KEYS) + pool_size = utils.get_pool_size(args.tap) + start_time = datetime.now() + table_sync_excs = [] + + # Log start info + LOGGER.info(""" + ------------------------------------------------------- + STARTING SYNC + ------------------------------------------------------- + Tables selected to sync : %s + Total tables selected to sync : %s + Pool size : %s + ------------------------------------------------------- + """, args.tables, len(args.tables), pool_size) + + # if internal arg drop_pg_slot is set to True, then we drop the slot before starting resync + if args.drop_pg_slot: + FastSyncTapPostgres.drop_slot(args.tap) + + # Start loading tables in parallel in spawning processes + with multiprocessing.Pool(pool_size) as proc: + table_sync_excs = list( + filter(lambda x: not isinstance(x, bool), proc.map(partial(sync_table, args=args), args.tables))) + + # Log summary + end_time = datetime.now() + LOGGER.info(""" + ------------------------------------------------------- + SYNC FINISHED - SUMMARY + ------------------------------------------------------- + Total tables selected to sync : %s + Tables loaded successfully : %s + Exceptions during table sync : %s + + Pool size : %s + Runtime : %s + ------------------------------------------------------- + """, len(args.tables), len(args.tables) - len(table_sync_excs), str(table_sync_excs), + pool_size, end_time - start_time) + + if len(table_sync_excs) > 0: + sys.exit(1) + + +def main(): + """Main entry point""" + try: + main_impl() + except Exception as exc: + LOGGER.critical(exc) + raise exc diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py new file mode 100644 index 000000000..5ee3b10df --- /dev/null +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +import multiprocessing +import os +import sys +from argparse import Namespace +from datetime import datetime +from functools import partial +from typing import Union + +from ..logger import Logger +from .commons import utils +from .commons.tap_s3_csv import FastSyncTapS3Csv +from .commons.target_bigquery import FastSyncTargetBigquery + +LOGGER = Logger().get_logger(__name__) + + +REQUIRED_CONFIG_KEYS = { + 'tap': [ + 'bucket', + 'start_date' + ], + 'target': [ + + 'project_id', + ] +} + +LOCK = multiprocessing.Lock() + + +def tap_type_to_target_type(csv_type): + """Data type mapping from S3 csv to Bigquery""" + + return { + 'integer': 'INT64', + 'number': 'NUMERIC', + 'string': 'STRING', + 'boolean': 'STRING', # The guess sometimes can be wrong, we'll use string for now. + 'date': 'TIMESTAMP', + + 'date_override': 'TIMESTAMP' # Column type to use when date_override defined in YAML + }.get(csv_type, 'STRING') + + +def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: + """Sync one table""" + s3_csv = FastSyncTapS3Csv(args.tap, tap_type_to_target_type, target_quote='`') + bigquery = FastSyncTargetBigquery(args.target, args.transform) + + try: + filename = utils.gen_export_filename(tap_id=args.target.get('tap_id'), table=table_name) + filepath = os.path.join(args.temp_dir, filename) + + target_schema = utils.get_target_schema(args.target, table_name) + + s3_csv.copy_table(table_name, filepath) + size_bytes = os.path.getsize(filepath) + + bigquery_types = s3_csv.map_column_types_to_target(filepath, table_name) + bigquery_columns = bigquery_types.get('columns', []) + + # Creating temp table in Bigquery + bigquery.create_schema(target_schema) + bigquery.create_table(target_schema, + table_name, + bigquery_columns, + is_temporary=True, + sort_columns=True) + + # Load into Bigquery table + bigquery.copy_to_table(filepath, target_schema, table_name, size_bytes, is_temporary=True, skip_csv_header=True) + os.remove(filepath) + + # Obfuscate columns + bigquery.obfuscate_columns(target_schema, table_name) + + # Create target table and swap with the temp table in Bigquery + bigquery.create_table(target_schema, table_name, bigquery_columns) + bigquery.swap_tables(target_schema, table_name) + + # Get bookmark + bookmark = utils.get_bookmark_for_table(table_name, args.properties, s3_csv) + + # Save bookmark to singer state file + # Lock to ensure that only one process writes the same state file at a time + LOCK.acquire() + try: + utils.save_state_file(args.state, table_name, bookmark) + finally: + LOCK.release() + + # Table loaded, grant select on all tables in target schema + grantees = utils.get_grantees(args.target, table_name) + utils.grant_privilege(target_schema, grantees, bigquery.grant_usage_on_schema) + utils.grant_privilege(target_schema, grantees, bigquery.grant_select_on_schema) + + return True + + except Exception as exc: + LOGGER.exception(exc) + return f'{table_name}: {exc}' + + +def main_impl(): + """Main sync logic""" + args = utils.parse_args(REQUIRED_CONFIG_KEYS) + pool_size = utils.get_pool_size(args.tap) + start_time = datetime.now() + + # Log start info + LOGGER.info(""" + ------------------------------------------------------- + STARTING SYNC + ------------------------------------------------------- + Tables selected to sync : %s + Total tables selected to sync : %s + Pool size : %s + ------------------------------------------------------- + """, args.tables, len(args.tables), pool_size) + + # Start loading tables in parallel in spawning processes by + # utilising all available Pool size + with multiprocessing.Pool(pool_size) as proc: + table_sync_excs = list( + filter(lambda x: not isinstance(x, bool), proc.map(partial(sync_table, args=args), args.tables))) + + # Log summary + end_time = datetime.now() + LOGGER.info(""" + ------------------------------------------------------- + SYNC FINISHED - SUMMARY + ------------------------------------------------------- + Total tables selected to sync : %s + Tables loaded successfully : %s + Exceptions during table sync : %s + + Pool size : %s + Runtime : %s + ------------------------------------------------------- + """, len(args.tables), len(args.tables) - len(table_sync_excs), + str(table_sync_excs), pool_size, end_time - start_time) + + if len(table_sync_excs) > 0: + sys.exit(1) + + +def main(): + """Main entry point""" + try: + main_impl() + except Exception as exc: + LOGGER.critical(exc) + raise exc diff --git a/pipelinewise/utils.py b/pipelinewise/utils.py index 5e5bdcf7b..3746d1d09 100644 --- a/pipelinewise/utils.py +++ b/pipelinewise/utils.py @@ -4,16 +4,18 @@ from typing import Optional -def safe_column_name(name: Optional[str]) -> Optional[str]: +def safe_column_name(name: Optional[str], quote_character: Optional[str]=None) -> Optional[str]: """ Makes column name safe by capitalizing and wrapping it in double quotes Args: name: column name + quote_character: character the database uses for quoting identifiers Returns: str: safe column name """ + if quote_character is None: + quote_character = '"' if name: - return f'"{name.upper()}"' - + return f'{quote_character}{name.upper()}{quote_character}' return name diff --git a/setup.py b/setup.py index b16d0fef7..e0b94d2b4 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,7 @@ 'PyMySQL==0.7.11', 'psycopg2-binary==2.8.6', 'snowflake-connector-python[pandas]==2.4.6', + 'google-cloud-bigquery==1.24.0', 'pipelinewise-singer-python==1.*', 'singer-encodings==0.0.*', 'messytables==0.15.*', @@ -56,11 +57,15 @@ 'postgres-to-redshift=pipelinewise.fastsync.postgres_to_redshift:main', 'mysql-to-postgres=pipelinewise.fastsync.mysql_to_postgres:main', 'postgres-to-postgres=pipelinewise.fastsync.postgres_to_postgres:main', + 'mysql-to-bigquery=pipelinewise.fastsync.mysql_to_bigquery:main', + 'postgres-to-bigquery=pipelinewise.fastsync.postgres_to_bigquery:main', 's3-csv-to-snowflake=pipelinewise.fastsync.s3_csv_to_snowflake:main', 's3-csv-to-postgres=pipelinewise.fastsync.s3_csv_to_postgres:main', 's3-csv-to-redshift=pipelinewise.fastsync.s3_csv_to_redshift:main', + 's3-csv-to-bigquery=pipelinewise.fastsync.s3_csv_to_bigquery:main', 'mongodb-to-snowflake=pipelinewise.fastsync.mongodb_to_snowflake:main', 'mongodb-to-postgres=pipelinewise.fastsync.mongodb_to_postgres:main', + 'mongodb-to-bigquery=pipelinewise.fastsync.mongodb_to_bigquery:main', ] }, packages=find_packages(exclude=['tests*']), diff --git a/singer-connectors/target-bigquery/requirements.txt b/singer-connectors/target-bigquery/requirements.txt new file mode 100644 index 000000000..24832ec8b --- /dev/null +++ b/singer-connectors/target-bigquery/requirements.txt @@ -0,0 +1 @@ +pipelinewise-target-bigquery==1.1.1 diff --git a/tests/end_to_end/helpers/assertions.py b/tests/end_to_end/helpers/assertions.py index fe4ad6619..8cf316b8b 100644 --- a/tests/end_to_end/helpers/assertions.py +++ b/tests/end_to_end/helpers/assertions.py @@ -101,7 +101,9 @@ def assert_cols_in_table(query_runner_fn: callable, table_schema: str, table_nam :param table_name: table with the columns :param columns: list of columns to check if there are in the table's columns """ - sql = db.sql_get_columns_for_table(table_schema, table_name) + funcs = _map_tap_to_target_functions(None, query_runner_fn) + sql_get_columns_for_table_fn = funcs.get('target_sql_get_table_cols_fn', db.sql_get_columns_for_table) + sql = sql_get_columns_for_table_fn(table_schema, table_name) result = query_runner_fn(sql) cols = [res[0] for res in result] try: @@ -149,6 +151,13 @@ def _map_tap_to_target_functions(tap_query_runner_fn: callable, target_query_run 'target_sql_get_cols_fn': db.sql_get_columns_snowflake, 'target_sql_dynamic_row_count_fn': db.sql_dynamic_row_count_snowflake, }, + # target-bigquery specific attributes and functions + 'run_query_target_bigquery': { + 'target_sql_get_cols_fn': db.sql_get_columns_bigquery, + 'target_sql_get_table_cols_fn': db.sql_get_columns_for_table_bigquery, + 'target_sql_dynamic_row_count_fn': db.sql_dynamic_row_count_bigquery, + 'target_sql_safe_name_fn': db.safe_name_bigquery, + }, # target-redshift specific attributes and functions 'run_query_target_redshift': { 'target_sql_get_cols_fn': db.sql_get_columns_redshift, @@ -157,7 +166,9 @@ def _map_tap_to_target_functions(tap_query_runner_fn: callable, target_query_run } # Merge the keys into one dict by tap and target query runner names - return {**f_map[tap_query_runner_fn.__name__], **f_map[target_query_runner_fn.__name__]} + if tap_query_runner_fn: + return {**f_map[tap_query_runner_fn.__name__], **f_map[target_query_runner_fn.__name__]} + return {**f_map[target_query_runner_fn.__name__]} def assert_row_counts_equal(tap_query_runner_fn: callable, target_query_runner_fn: callable) -> None: @@ -185,6 +196,17 @@ def assert_row_counts_equal(tap_query_runner_fn: callable, target_query_runner_f row_counts_in_source = _run_sql(tap_query_runner_fn, source_sql_row_count) row_counts_in_target = _run_sql(target_query_runner_fn, target_sql_row_count) + # Some sources and targets can't be compared directly (e.g. BigQuery doesn't accept spaces in table names) + # we fix that by renaming the source tables to names that the target would accept + if 'target_sql_safe_name_fn' in funcs: + row_counts_in_source = [ + ( + funcs['target_sql_safe_name_fn'](table), + row_count + ) + for (table,row_count) in row_counts_in_source + ] + # Compare the two dataset assert row_counts_in_target == row_counts_in_source @@ -239,6 +261,12 @@ def _cols_list_to_dict(cols: List) -> dict: # Compare the two dataset for table_cols in source_table_cols: table_to_check = table_cols[0].lower() + + # Some sources and targets can't be compared directly (e.g. BigQuery doesn't accept spaces in table names) + # we fix that by renaming the source tables to names that the target would accept + if 'target_sql_safe_name_fn' in funcs: + table_to_check = funcs['target_sql_safe_name_fn'](table_to_check) + source_cols = table_cols[1].lower().split(';') try: diff --git a/tests/end_to_end/helpers/db.py b/tests/end_to_end/helpers/db.py index 39aaf5edc..be363efe1 100644 --- a/tests/end_to_end/helpers/db.py +++ b/tests/end_to_end/helpers/db.py @@ -5,9 +5,12 @@ import pymongo import pymysql import snowflake.connector +from google.cloud import bigquery from pymongo.database import Database +from pipelinewise.fastsync.commons.target_bigquery import safe_name + # pylint: disable=too-many-arguments def run_query_postgres(query, host, port, user, password, database): """Run and SQL query in a postgres database""" @@ -57,6 +60,23 @@ def run_query_snowflake(query, account, database, warehouse, user, password): return result_rows +def safe_name_bigquery(name): + """Return the safe_name of a column in BigQuery""" + return safe_name(name, quotes=False) + + +def delete_dataset_bigquery(dataset, project): + """Run and SQL query in a BigQuery database""" + client = bigquery.Client(project=project) + client.delete_dataset(dataset, delete_contents=True, not_found_ok=True) + +def run_query_bigquery(query, project): + """Run and SQL query in a BigQuery database""" + client = bigquery.Client(project=project) + query_job = client.query(query) + query_job.result() + return [r.values() for r in query_job] + def run_query_redshift(query, host, port, user, password, database): """Redshift is compatible with postgres""" return run_query_postgres(query, host, port, user, password, database) @@ -76,6 +96,19 @@ def sql_get_columns_for_table(table_schema: str, table_name: str) -> list: AND table_name IN ('{table_name.upper()}', '{table_name.lower()}')""" +def sql_get_columns_for_table_bigquery(table_schema: str, table_name: str) -> list: + """Generate an SQL command that returns the list of column of a specific + table. Compatible with MySQL/ MariaDB/ Postgres and Snowflake + + table_schema and table_name can be lowercase and uppercase strings. + It's using the IN clause to avoid transforming the entire + information_schema.columns table""" + return f""" + SELECT column_name + FROM {table_schema}.INFORMATION_SCHEMA.COLUMNS + WHERE table_name IN ('{table_name.upper()}', '{table_name.lower()}')""" + + def sql_get_columns_mysql(schemas: list) -> str: """Generates an SQL command that gives the list of columns of every table in a specific schema from a mysql database""" @@ -116,6 +149,20 @@ def sql_get_columns_snowflake(schemas: list) -> str: ORDER BY table_name""" +def sql_get_columns_bigquery(schemas: list) -> str: + """Generates an SQL command that gives the list of columns of every table + in a specific schema from a snowflake database""" + table_queries = ' UNION ALL '.join(f""" + SELECT table_name, column_name, data_type + FROM `{schema}`.INFORMATION_SCHEMA.COLUMNS""" for schema in schemas) + + return f""" + SELECT table_name, STRING_AGG(CONCAT(column_name, ':', data_type, ':'), ';' ORDER BY column_name) + FROM ({table_queries}) + GROUP BY table_name + ORDER BY table_name""" + + def sql_get_columns_redshift(schemas: list) -> str: """Generates an SQL command that gives the list of columns of every table in a specific schema from a Redshift database""" @@ -204,6 +251,25 @@ def sql_dynamic_row_count_snowflake(schemas: list) -> str: """ +def sql_dynamic_row_count_bigquery(schemas: list) -> str: + """Generates an SQL statement that counts the number of rows in + every table in a specific schema(s) in a Snowflake database""" + table_queries = ' UNION DISTINCT '.join(f""" + SELECT table_schema, table_name + FROM `{schema}`.INFORMATION_SCHEMA.TABLES + WHERE table_type = 'BASE TABLE'""" for schema in schemas) + + return f""" + WITH table_list AS ({table_queries}) + SELECT CONCAT( + STRING_AGG(CONCAT('SELECT \\'', LOWER(table_name), '\\' tbl, COUNT(*) row_count FROM ', + table_schema, '.`', table_name, '`'), + ' UNION DISTINCT '), + ' ORDER BY tbl') + FROM table_list + """ + + def sql_dynamic_row_count_redshift(schemas: list) -> str: """Generates an SQL statement that counts the number of rows in every table in a specific schema(s) in a Redshift database""" diff --git a/tests/end_to_end/helpers/env.py b/tests/end_to_end/helpers/env.py index 1732b2e93..fddd9c26a 100644 --- a/tests/end_to_end/helpers/env.py +++ b/tests/end_to_end/helpers/env.py @@ -145,6 +145,18 @@ def _load_env(self): } }, # ------------------------------------------------------------------ + # Target BigQuery is an OPTIONAL test connector because it's not open sourced and not part of + # the docker environment. + # To run the related test cases add real BigQuery credentials to ../../../dev-project/.env + # ------------------------------------------------------------------ + 'TARGET_BIGQUERY': { + 'optional': True, + 'template_patterns': ['target_bigquery', 'to_bq'], + 'vars': { + 'PROJECT' : {'value': os.environ.get('TARGET_BIGQUERY_PROJECT')}, + } + }, + # ------------------------------------------------------------------ # Target Redshift is an OPTIONAL test connector because it's not open sourced and not part of # the docker environment. # To run the related test cases add real Amazon Redshift credentials to ../../../dev-project/.env @@ -182,6 +194,7 @@ def _load_env(self): self.env['TARGET_POSTGRES']['is_configured'] = self._is_env_connector_configured('TARGET_POSTGRES') self.env['TARGET_REDSHIFT']['is_configured'] = self._is_env_connector_configured('TARGET_REDSHIFT') self.env['TARGET_SNOWFLAKE']['is_configured'] = self._is_env_connector_configured('TARGET_SNOWFLAKE') + self.env['TARGET_BIGQUERY']['is_configured'] = self._is_env_connector_configured('TARGET_BIGQUERY') def _get_conn_env_var(self, connector, key): """Get the value of a specific variable in the self.env dict""" @@ -360,6 +373,16 @@ def run_query_target_snowflake(self, query): user=self._get_conn_env_var('TARGET_SNOWFLAKE', 'USER'), password=self._get_conn_env_var('TARGET_SNOWFLAKE', 'PASSWORD')) + def delete_dataset_target_bigquery(self, dataset): + """Run and SQL query in target bigquery database""" + return db.delete_dataset_bigquery(dataset, + project=self._get_conn_env_var('TARGET_BIGQUERY', 'PROJECT')) + + def run_query_target_bigquery(self, query): + """Run and SQL query in target bigquery database""" + return db.run_query_bigquery(query, + project=self._get_conn_env_var('TARGET_BIGQUERY', 'PROJECT')) + # ------------------------------------------------------------------------- # Setup methods to initialise source and target databases and to make them # ready running the tests @@ -440,3 +463,13 @@ def setup_target_snowflake(self): # Clean config directory shutil.rmtree(os.path.join(CONFIG_DIR, 'snowflake'), ignore_errors=True) + + def setup_target_bigquery(self): + """Clean bigquery target database and prepare for test run""" + self.delete_dataset_target_bigquery('ppw_e2e_tap_postgres') + self.delete_dataset_target_bigquery('ppw_e2e_tap_postgres_public2') + self.delete_dataset_target_bigquery('ppw_e2e_tap_postgres_logical1') + self.delete_dataset_target_bigquery('ppw_e2e_tap_postgres_logical2') + self.delete_dataset_target_bigquery('ppw_e2e_tap_mysql') + self.delete_dataset_target_bigquery('ppw_e2e_tap_s3_csv') + self.delete_dataset_target_bigquery('ppw_e2e_tap_mongodb') diff --git a/tests/end_to_end/test-project/tap_mongodb_to_bq.yml.template b/tests/end_to_end/test-project/tap_mongodb_to_bq.yml.template new file mode 100644 index 000000000..f449df856 --- /dev/null +++ b/tests/end_to_end/test-project/tap_mongodb_to_bq.yml.template @@ -0,0 +1,46 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "mongo_to_bq" +name: "MongoDB source test database" +type: "tap-mongodb" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - Mongo connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_MONGODB_HOST}" # Mongodb host + port: ${TAP_MONGODB_PORT} # Mongodb port + user: "${TAP_MONGODB_USER}" # Mongodb user + password: "${TAP_MONGODB_PASSWORD}" # Mongodb plain string or vault encrypted + auth_database: "admin" # Mongodb database to authenticate on + dbname: "${TAP_MONGODB_DB}" # Mongodb database name to sync from + replica_set: rs0 # Mongodb replica set name, default null + write_batch_rows: 500 + update_buffer_size: 30 +# await_time_ms: 5000 # 5 sec + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "bigquery" # ID of the target connector where the data will be loaded +batch_size_rows: 1000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + - source_schema: "${TAP_MONGODB_DB}" + target_schema: "ppw_e2e_tap_mongodb" + + tables: + - table_name: "listings" + + - table_name: "my_collection" + replication_method: "LOG_BASED" diff --git a/tests/end_to_end/test-project/tap_mysql_to_bq.yml.template b/tests/end_to_end/test-project/tap_mysql_to_bq.yml.template new file mode 100644 index 000000000..c7a8046ff --- /dev/null +++ b/tests/end_to_end/test-project/tap_mysql_to_bq.yml.template @@ -0,0 +1,103 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "mariadb_to_bq" +name: "MariaDB source test database" +type: "tap-mysql" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - MySQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_MYSQL_HOST}" # MySQL host + port: ${TAP_MYSQL_PORT} # MySQL port + user: "${TAP_MYSQL_USER}" # MySQL user + password: "${TAP_MYSQL_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_MYSQL_DB}" # MySQL database name + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "bigquery" # ID of the target connector where the data will be loaded +batch_size_rows: 20000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + - source_schema: "mysql_source_db" + target_schema: "ppw_e2e_tap_mysql" + + tables: + ### Table with LOG_BASED replication + - table_name: "weight_unit" + replication_method: "LOG_BASED" + transformations: + - column: "weight_unit_name" + type: "HASH-SKIP-FIRST-2" + + ### Table with INCREMENTAL replication + - table_name: "address" + replication_method: "INCREMENTAL" + replication_key: "date_updated" + transformations: + - column: "zip_code_zip_code_id" + type: "MASK-NUMBER" + when: + - column: 'street_number' + regex_match: '[801]' + + - column: "date_created" + type: "MASK-DATE" + + ### Table with FULL_TABLE replication + - table_name: "order" + replication_method: "FULL_TABLE" + + ### Table with no primary key + - table_name: "no_pk_table" + replication_method: "FULL_TABLE" + + ### Table with binary and varbinary columns + - table_name: "table_with_binary" + replication_method: "LOG_BASED" + + ### Table with reserved words and columns and special characters + - table_name: "edgydata" + replication_method: "LOG_BASED" + transformations: + - column: "case" + type: "HASH" + + - column: "group" + type: "MASK-NUMBER" + when: + - column: 'case' + equals: 'A' + - column: "group" + type: "SET-NULL" + when: + - column: 'case' + equals: 'B' + + ### Table with reserved word + - table_name: "full" + replication_method: "INCREMENTAL" + replication_key: "begin" + + ### Table with space and mixed upper and lowercase characters + - table_name: "table_with_space and UPPERCase" + replication_method: "LOG_BASED" + + ### Table with all possible data types + - table_name: "all_datatypes" + replication_method: "LOG_BASED" + diff --git a/tests/end_to_end/test-project/tap_mysql_to_bq_buffered_stream.yml.template b/tests/end_to_end/test-project/tap_mysql_to_bq_buffered_stream.yml.template new file mode 100644 index 000000000..d1c5af6db --- /dev/null +++ b/tests/end_to_end/test-project/tap_mysql_to_bq_buffered_stream.yml.template @@ -0,0 +1,103 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "mariadb_to_bq_buffered_stream" +name: "MariaDB source test database" +type: "tap-mysql" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - MySQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_MYSQL_HOST}" # MySQL host + port: ${TAP_MYSQL_PORT} # MySQL port + user: "${TAP_MYSQL_USER}" # MySQL user + password: "${TAP_MYSQL_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_MYSQL_DB}" # MySQL database name + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "bigquery" # ID of the target connector where the data will be loaded +batch_size_rows: 20000 # Batch size for the stream to optimise load performance +stream_buffer_size: 20 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + - source_schema: "mysql_source_db" + target_schema: "ppw_e2e_tap_mysql" + + tables: + ### Table with LOG_BASED replication + - table_name: "weight_unit" + replication_method: "LOG_BASED" + transformations: + - column: "weight_unit_name" + type: "HASH-SKIP-FIRST-2" + + ### Table with INCREMENTAL replication + - table_name: "address" + replication_method: "INCREMENTAL" + replication_key: "date_updated" + transformations: + - column: "zip_code_zip_code_id" + type: "MASK-NUMBER" + when: + - column: 'street_number' + regex_match: '[801]' + + - column: "date_created" + type: "MASK-DATE" + + ### Table with FULL_TABLE replication + - table_name: "order" + replication_method: "FULL_TABLE" + + ### Table with no primary key + - table_name: "no_pk_table" + replication_method: "FULL_TABLE" + + ### Table with binary and varbinary columns + - table_name: "table_with_binary" + replication_method: "LOG_BASED" + + ### Table with reserved words and columns and special characters + - table_name: "edgydata" + replication_method: "LOG_BASED" + transformations: + - column: "case" + type: "HASH" + + - column: "group" + type: "MASK-NUMBER" + when: + - column: 'case' + equals: 'A' + - column: "group" + type: "SET-NULL" + when: + - column: 'case' + equals: 'B' + + ### Table with reserved word + - table_name: "full" + replication_method: "INCREMENTAL" + replication_key: "begin" + + ### Table with space and mixed upper and lowercase characters + - table_name: "table_with_space and UPPERCase" + replication_method: "LOG_BASED" + + ### Table with all possible data types + - table_name: "all_datatypes" + replication_method: "LOG_BASED" + diff --git a/tests/end_to_end/test-project/tap_mysql_to_bq_split_large_files.yml.template b/tests/end_to_end/test-project/tap_mysql_to_bq_split_large_files.yml.template new file mode 100644 index 000000000..12a69bf01 --- /dev/null +++ b/tests/end_to_end/test-project/tap_mysql_to_bq_split_large_files.yml.template @@ -0,0 +1,45 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "mariadb_to_bq_split_large_files" +name: "MariaDB source test database" +type: "tap-mysql" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - MySQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_MYSQL_HOST}" # MySQL host + port: ${TAP_MYSQL_PORT} # MySQL port + user: "${TAP_MYSQL_USER}" # MySQL user + password: "${TAP_MYSQL_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_MYSQL_DB}" # MySQL database name + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "bigquery" # ID of the target connector where the data will be loaded +batch_size_rows: 20000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes + +# Enable splitting large files to smaller pieces +split_large_files: True +split_file_chunk_size_mb: 1 +split_file_max_chunks: 5 + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + - source_schema: "mysql_source_db" + target_schema: "ppw_e2e_tap_mysql" + + tables: + - table_name: "address" diff --git a/tests/end_to_end/test-project/tap_postgres_to_bq.yml.template b/tests/end_to_end/test-project/tap_postgres_to_bq.yml.template new file mode 100644 index 000000000..7d8697c4d --- /dev/null +++ b/tests/end_to_end/test-project/tap_postgres_to_bq.yml.template @@ -0,0 +1,107 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "postgres_to_bq" +name: "PostgreSQL source test database" +type: "tap-postgres" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - PostgreSQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_POSTGRES_HOST}" # PostgreSQL host + logical_poll_total_seconds: 3 # Time out if no LOG_BASED changes received for 3 seconds + port: ${TAP_POSTGRES_PORT} # PostgreSQL port + user: "${TAP_POSTGRES_USER}" # PostgreSQL user + password: "${TAP_POSTGRES_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_POSTGRES_DB}" # PostgreSQL database name + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "bigquery" # ID of the target connector where the data will be loaded +batch_size_rows: 1000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + + ### SOURCE SCHEMA 1: public + - source_schema: "public" + target_schema: "ppw_e2e_tap_postgres" + + tables: + + ### Table with INCREMENTAL replication + - table_name: "city" + replication_method: "INCREMENTAL" + replication_key: "id" + + ### Table with FULL_TABLE replication + - table_name: "country" + replication_method: "FULL_TABLE" + + ### Table with no primary key + - table_name: "no_pk_table" + replication_method: "FULL_TABLE" + + ### Table with reserved words and columns and special characters + - table_name: "edgydata" + replication_method: "INCREMENTAL" + replication_key: "cid" + transformations: + - column: "cvarchar" + type: "HASH-SKIP-FIRST-3" + + ### Table with reserved word + - table_name: "order" + replication_method: "INCREMENTAL" + replication_key: "id" + + ### Table with space and mixed upper and lowercase characters + - table_name: "table_with_space and UPPERCase" + replication_method: "LOG_BASED" + + ### Table with space and mixed upper and lowercase characters + - table_name: "table_with_reserved_words" + replication_method: "FULL_TABLE" + + + ### SOURCE SCHEMA 2: public2 + - source_schema: "public2" + target_schema: "ppw_e2e_tap_postgres_public2" + + tables: + ### Table with FULL_TABLE replication + - table_name: "wearehere" + replication_method: "FULL_TABLE" + + ### Table with reserved words and columns and special characters + - table_name: "public2_edgydata" + replication_method: "INCREMENTAL" + replication_key: "cid" + + ### SOURCE SCHEMA 3: logical 1 + #- source_schema: "logical1" + # target_schema: "ppw_e2e_tap_postgres_logical1" + # + # tables: + # - table_name: "logical1_table1" + # replication_method: "LOG_BASED" + # - table_name: "logical1_table2" + # - table_name: "logical1_edgydata" + + ### SOURCE SCHEMA 4: logical2 + #- source_schema: "logical2" + # target_schema: "ppw_e2e_tap_postgres_logical2" + # tables: + # - table_name: "logical2_table1" diff --git a/tests/end_to_end/test-project/tap_postgres_to_bq_split_large_files.yml.template b/tests/end_to_end/test-project/tap_postgres_to_bq_split_large_files.yml.template new file mode 100644 index 000000000..4e5e81f86 --- /dev/null +++ b/tests/end_to_end/test-project/tap_postgres_to_bq_split_large_files.yml.template @@ -0,0 +1,46 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "postgres_to_bq_split_large_files" +name: "PostgreSQL source test database" +type: "tap-postgres" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - PostgreSQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_POSTGRES_HOST}" # PostgreSQL host + logical_poll_total_seconds: 3 # Time out if no LOG_BASED changes received for 3 seconds + port: ${TAP_POSTGRES_PORT} # PostgreSQL port + user: "${TAP_POSTGRES_USER}" # PostgreSQL user + password: "${TAP_POSTGRES_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_POSTGRES_DB}" # PostgreSQL database name + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "bigquery" # ID of the target connector where the data will be loaded +batch_size_rows: 1000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes + +# Enable splitting large files to smaller pieces +split_large_files: True +split_file_chunk_size_mb: 1 +split_file_max_chunks: 5 + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + - source_schema: "public" + target_schema: "ppw_e2e_tap_postgres" + + tables: + - table_name: "city" diff --git a/tests/end_to_end/test-project/tap_s3_csv_to_bq.yml.template b/tests/end_to_end/test-project/tap_s3_csv_to_bq.yml.template new file mode 100644 index 000000000..bdcb85131 --- /dev/null +++ b/tests/end_to_end/test-project/tap_s3_csv_to_bq.yml.template @@ -0,0 +1,67 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "s3_csv_to_bq" # Unique identifier of the tap +name: "Sample CSV files on S3" # Name of the tap +type: "tap-s3-csv" # !! THIS SHOULD NOT CHANGE !! +owner: "somebody@foo.com" # Data owner to contact + + +# ------------------------------------------------------------------------------ +# Source (Tap) - S3 connection details +# ------------------------------------------------------------------------------ +db_conn: + aws_access_key_id: "${TAP_S3_CSV_AWS_KEY}" # Plain string or vault encrypted + aws_secret_access_key: "${TAP_S3_CSV_AWS_SECRET_ACCESS_KEY}" # Plain string or vault encrypted + bucket: "${TAP_S3_CSV_BUCKET}" # S3 Bucket name + start_date: "2000-01-01" # File before this data will be excluded + fastsync_parallelism: 10 + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "bigquery" # ID of the target connector where the data will be loaded +batch_size_rows: 20000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes +primary_key_required: False # Optional: in case you want to load tables without key + # properties, uncomment this. Please note + # that files without primary keys will not + # be de-duplicated and could cause + # duplicates. Aloways try selecting + # a reasonable key from the CSV file +default_target_schema: "ppw_e2e_tap_s3_csv" + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + - source_schema: "s3_feeds" # This is mandatory, but can be anything in this tap type + target_schema: "ppw_e2e_tap_s3_csv" # Target schema in the destination Data Warehouse + + # List of CSV files to destination tables + tables: + + # Every file in S3 bucket that matches the search pattern will be loaded into this table + - table_name: "people" + replication_method: "FULL_TABLE" + s3_csv_mapping: + search_pattern: "^ppw_e2e_tap_s3_csv/mock_data_1.csv$" + date_overrides: + - birth_date + transformations: + - column: "email" + type: "HASH" + - column: "group" + type: "MASK-HIDDEN" + - column: "ip_address" + type: "SET-NULL" + + - table_name: "countries" + s3_csv_mapping: + search_pattern: "^ppw_e2e_tap_s3_csv/mock_data_2.csv$" + key_properties: ["id"] diff --git a/tests/end_to_end/test-project/target_bigquery.yml.template b/tests/end_to_end/test-project/target_bigquery.yml.template new file mode 100644 index 000000000..d3167d734 --- /dev/null +++ b/tests/end_to_end/test-project/target_bigquery.yml.template @@ -0,0 +1,15 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "bigquery" # Unique identifier of the target +name: "Bigquery" # Name of the target +type: "target-bigquery" # !! THIS SHOULD NOT CHANGE !! + + +# ------------------------------------------------------------------------------ +# Target - Data Warehouse connection details +# ------------------------------------------------------------------------------ +db_conn: + project_id: "${TARGET_BIGQUERY_PROJECT}" # Bigquery account diff --git a/tests/end_to_end/test_target_bigquery.py b/tests/end_to_end/test_target_bigquery.py new file mode 100644 index 000000000..2081afd44 --- /dev/null +++ b/tests/end_to_end/test_target_bigquery.py @@ -0,0 +1,309 @@ +import os +import uuid +from datetime import datetime, timezone +from random import randint + +import bson +import pytest +from bson import Timestamp +from pipelinewise.fastsync import mysql_to_bigquery + +from .helpers import tasks +from .helpers import assertions +from .helpers.env import E2EEnv + +DIR = os.path.dirname(__file__) +TAP_MARIADB_ID = 'mariadb_to_bq' +TAP_MARIADB_SPLIT_LARGE_FILES_ID = 'mariadb_to_bq_split_large_files' +TAP_MARIADB_BUFFERED_STREAM_ID = 'mariadb_to_bq_buffered_stream' +TAP_POSTGRES_ID = 'postgres_to_bq' +TAP_POSTGRES_SPLIT_LARGE_FILES_ID = 'postgres_to_bq_split_large_files' +TAP_MONGODB_ID = 'mongo_to_bq' +TAP_S3_CSV_ID = 's3_csv_to_bq' +TARGET_ID = 'bigquery' + + +# pylint: disable=attribute-defined-outside-init +class TestTargetBigquery: + """ + End to end tests for Target Bigquery + """ + + def setup_method(self): + """Initialise test project by generating YAML files from + templates for all the configured connectors""" + self.project_dir = os.path.join(DIR, 'test-project') + + # Init query runner methods + self.e2e = E2EEnv(self.project_dir) + self.run_query_tap_mysql = self.e2e.run_query_tap_mysql + self.run_query_tap_postgres = self.e2e.run_query_tap_postgres + self.run_query_target_bigquery = self.e2e.run_query_target_bigquery + self.mongodb_con = self.e2e.get_tap_mongodb_connection() + + def teardown_method(self): + """Delete test directories and database objects""" + + @pytest.mark.dependency(name='import_config') + def test_import_project(self): + """Import the YAML project with taps and target and do discovery mode + to write the JSON files for singer connectors """ + + # Skip every target_postgres related test if required env vars not provided + if not self.e2e.env['TARGET_BIGQUERY']['is_configured']: + pytest.skip('Target Bigquery environment variables are not provided') + + # Setup and clean source and target databases + self.e2e.setup_tap_mysql() + self.e2e.setup_tap_postgres() + if self.e2e.env['TAP_S3_CSV']['is_configured']: + self.e2e.setup_tap_s3_csv() + self.e2e.setup_tap_mongodb() + self.e2e.setup_target_bigquery() + + # Import project + [return_code, stdout, stderr] = tasks.run_command(f'pipelinewise import_config --dir {self.project_dir}') + assertions.assert_command_success(return_code, stdout, stderr) + + @pytest.mark.dependency(depends=['import_config']) + def test_replicate_mariadb_to_bq(self, tap_mariadb_id=TAP_MARIADB_ID): + """Replicate data from MariaDB to Bigquery""" + # 1. Run tap first time - both fastsync and a singer should be triggered + assertions.assert_run_tap_success(tap_mariadb_id, TARGET_ID, ['fastsync', 'singer']) + assertions.assert_row_counts_equal(self.run_query_tap_mysql, self.run_query_target_bigquery) + assertions.assert_all_columns_exist(self.run_query_tap_mysql, self.e2e.run_query_target_bigquery, + mysql_to_bigquery.tap_type_to_target_type) + + # 2. Make changes in MariaDB source database + # LOG_BASED + self.run_query_tap_mysql('UPDATE weight_unit SET isactive = 0 WHERE weight_unit_id IN (2, 3, 4)') + self.run_query_tap_mysql('INSERT INTO edgydata (c_varchar, `group`, `case`, cjson, c_time) VALUES' + '(\'Lorem ipsum dolor sit amet\', 10, \'A\', \'[]\', \'00:00:00\'),' + '(\'Thai: แผ่นดินฮั่นเสื่อมโทรมแสนสังเวช\', 20, \'A\', \'{}\', \'12:00:59\'),' + '(\'Chinese: 和毛泽东 <<重上井冈山>>. 严永欣, 一九八八年.\', null,\'B\', ' + '\'[{"key": "ValueOne", "actions": []}, {"key": "ValueTwo", "actions": []}]\',' + ' \'9:1:00\'),' + '(\'Special Characters: [\"\\,''!@£$%^&*()]\\\\\', null, \'B\', ' + 'null, \'12:00:00\'),' + '(\' \', 20, \'B\', null, \'15:36:10\'),' + '(CONCAT(CHAR(0x0000 using utf16), \'<- null char\'), 20, \'B\', null, \'15:36:10\')') + + self.run_query_tap_mysql('UPDATE all_datatypes SET c_point = NULL') + + # INCREMENTAL + self.run_query_tap_mysql('INSERT INTO address(isactive, street_number, date_created, date_updated,' + ' supplier_supplier_id, zip_code_zip_code_id)' + 'VALUES (1, 1234, NOW(), NOW(), 0, 1234)') + self.run_query_tap_mysql('UPDATE address SET street_number = 9999, date_updated = NOW()' + ' WHERE address_id = 1') + # FULL_TABLE + self.run_query_tap_mysql('DELETE FROM no_pk_table WHERE id > 10') + + # 3. Run tap second time - both fastsync and a singer should be triggered, there are some FULL_TABLE + assertions.assert_run_tap_success(tap_mariadb_id, TARGET_ID, ['fastsync', 'singer']) + assertions.assert_row_counts_equal(self.run_query_tap_mysql, self.run_query_target_bigquery) + assertions.assert_all_columns_exist(self.run_query_tap_mysql, self.e2e.run_query_target_bigquery, + mysql_to_bigquery.tap_type_to_target_type, {'blob_col'}) + + # Checking if mask-date transformation is working + result = self.run_query_target_bigquery( + 'SELECT count(1) FROM ppw_e2e_tap_mysql.address ' + 'where EXTRACT(MONTH FROM date_created) != 1 or EXTRACT(DAY FROM date_created) != 1;')[0][0] + + assert result == 0 + + # Checking if conditional MASK-NUMBER transformation is working + result = self.run_query_target_bigquery( + 'SELECT count(1) FROM ppw_e2e_tap_mysql.address ' + 'where zip_code_zip_code_id != 0 and REGEXP_CONTAINS(street_number, \'[801]\');')[0][0] + + assert result == 0 + + # Checking if conditional SET-NULL transformation is working + result = self.run_query_target_bigquery( + 'SELECT count(1) FROM ppw_e2e_tap_mysql.edgydata ' + 'where "GROUP" is not null and "CASE" = \'B\';')[0][0] + + assert result == 0 + + @pytest.mark.dependency(depends=['import_config']) + def test_resync_mariadb_to_bq(self, tap_mariadb_id=TAP_MARIADB_ID): + """Resync tables from MariaDB to Bigquery""" + assertions.assert_resync_tables_success(tap_mariadb_id, TARGET_ID, profiling=True) + assertions.assert_row_counts_equal(self.run_query_tap_mysql, self.run_query_target_bigquery) + assertions.assert_all_columns_exist(self.run_query_tap_mysql, self.run_query_target_bigquery, + mysql_to_bigquery.tap_type_to_target_type) + + # pylint: disable=invalid-name + @pytest.mark.dependency(depends=['import_config']) + def test_resync_mariadb_to_bq_with_split_large_files(self, tap_mariadb_id=TAP_MARIADB_SPLIT_LARGE_FILES_ID): + """Resync tables from MariaDB to Bigquery using splitting large files option""" + assertions.assert_resync_tables_success(tap_mariadb_id, TARGET_ID, profiling=True) + assertions.assert_row_counts_equal(self.run_query_tap_mysql, self.run_query_target_bigquery) + assertions.assert_all_columns_exist(self.run_query_tap_mysql, self.run_query_target_bigquery, + mysql_to_bigquery.tap_type_to_target_type) + + # pylint: disable=invalid-name + @pytest.mark.dependency(depends=['import_config']) + def test_resync_pg_to_bq_with_split_large_files(self, tap_postgres_id=TAP_POSTGRES_SPLIT_LARGE_FILES_ID): + """Resync tables from Postgres to Bigquery using splitting large files option""" + assertions.assert_resync_tables_success(tap_postgres_id, TARGET_ID, profiling=True) + assertions.assert_row_counts_equal(self.run_query_tap_mysql, self.run_query_target_bigquery) + assertions.assert_all_columns_exist(self.run_query_tap_mysql, self.run_query_target_bigquery, + mysql_to_bigquery.tap_type_to_target_type) + + # pylint: disable=invalid-name + @pytest.mark.dependency(depends=['import_config']) + def test_replicate_mariadb_to_bq_with_custom_buffer_size(self): + """Replicate data from MariaDB to Bigquery with custom buffer size + Same tests cases as test_replicate_mariadb_to_bq but using another tap with custom stream buffer size""" + self.test_replicate_mariadb_to_bq(tap_mariadb_id=TAP_MARIADB_BUFFERED_STREAM_ID) + + @pytest.mark.dependency(depends=['import_config']) + def test_replicate_pg_to_bq(self): + """Replicate data from Postgres to Bigquery""" + # Run tap first time - both fastsync and a singer should be triggered + assertions.assert_run_tap_success(TAP_POSTGRES_ID, TARGET_ID, ['fastsync', 'singer']) + assertions.assert_row_counts_equal(self.run_query_tap_postgres, self.run_query_target_bigquery) + assertions.assert_all_columns_exist(self.run_query_tap_postgres, self.run_query_target_bigquery) + + result = self.run_query_target_bigquery( + 'SELECT updated_at FROM ' + 'ppw_e2e_tap_postgres.`table_with_space_and_uppercase` ' + 'where cvarchar=\'H\';')[0][0] + + assert result == datetime(9999, 12, 31, 23, 59, 59, 999008, tzinfo=timezone.utc) + + result = self.run_query_target_bigquery( + 'SELECT updated_at FROM ' + 'ppw_e2e_tap_postgres.`table_with_space_and_uppercase` ' + 'where cvarchar=\'I\';')[0][0] + + assert result == datetime(9999, 12, 31, 23, 59, 59, 999008, tzinfo=timezone.utc) + + # 2. Make changes in PG source database + # LOG_BASED + self.run_query_tap_postgres('insert into public."table_with_space and UPPERCase" (cvarchar, updated_at) values ' + "('X', '2020-01-01 08:53:56.8+10')," + "('Y', '2020-12-31 12:59:00.148+00')," + "('faaaar future', '15000-05-23 12:40:00.148')," + "('BC', '2020-01-23 01:40:00 BC')," + "('Z', null)," + "('W', '2020-03-03 12:30:00');") + # INCREMENTAL + self.run_query_tap_postgres('INSERT INTO public.city (id, name, countrycode, district, population) ' + "VALUES (4080, 'Bath', 'GBR', 'England', 88859)") + self.run_query_tap_postgres('UPDATE public.edgydata SET ' + "cjson = json '{\"data\": 1234}', " + "cjsonb = jsonb '{\"data\": 2345}', " + "cvarchar = 'Liewe Maatjies UPDATED' WHERE cid = 23") + # FULL_TABLE + self.run_query_tap_postgres("DELETE FROM public.country WHERE code = 'UMI'") + + # 3. Run tap second time - both fastsync and a singer should be triggered, there are some FULL_TABLE + assertions.assert_run_tap_success(TAP_POSTGRES_ID, TARGET_ID, ['fastsync', 'singer'], profiling=True) + assertions.assert_row_counts_equal(self.run_query_tap_postgres, self.run_query_target_bigquery) + assertions.assert_all_columns_exist(self.run_query_tap_postgres, self.run_query_target_bigquery) + + result = self.run_query_target_bigquery( + 'SELECT updated_at FROM ppw_e2e_tap_postgres.`table_with_space_and_uppercase` where cvarchar=\'X\';')[0][0] + + assert result == datetime(2019, 12, 31, 22, 53, 56, 800000, tzinfo=timezone.utc) + + result = self.run_query_target_bigquery( + 'SELECT updated_at FROM ' + 'ppw_e2e_tap_postgres.`table_with_space_and_uppercase` ' + 'where cvarchar=\'faaaar future\';')[0][0] + + assert result == datetime(9999, 12, 31, 23, 59, 59, 999008, tzinfo=timezone.utc) + + result = self.run_query_target_bigquery( + 'SELECT updated_at FROM ' + 'ppw_e2e_tap_postgres.`table_with_space_and_uppercase` ' + 'where cvarchar=\'BC\';')[0][0] + + assert result == datetime(9999, 12, 31, 23, 59, 59, 999008, tzinfo=timezone.utc) + + @pytest.mark.dependency(depends=['import_config']) + def test_replicate_s3_to_bq(self): + """Replicate csv files from s3 to Bigquery, check if return code is zero and success log file created""" + # Skip tap_s3_csv related test if required env vars not provided + if not self.e2e.env['TAP_S3_CSV']['is_configured']: + pytest.skip('Tap S3 CSV environment variables are not provided') + + def assert_columns_exist(): + """Helper inner function to test if every table and column exists in target bigquery""" + assertions.assert_cols_in_table(self.run_query_target_bigquery, 'ppw_e2e_tap_s3_csv', 'countries', + ['city', 'country', 'currency', 'id', 'language']) + assertions.assert_cols_in_table(self.run_query_target_bigquery, 'ppw_e2e_tap_s3_csv', 'people', + ['birth_date', 'email', 'first_name', 'gender', 'group', 'id', + 'ip_address', 'is_pensioneer', 'last_name']) + + # 1. Run tap first time - both fastsync and a singer should be triggered + assertions.assert_run_tap_success(TAP_S3_CSV_ID, TARGET_ID, ['fastsync', 'singer']) + assert_columns_exist() + + # 2. Run tap second time - both fastsync and a singer should be triggered + assertions.assert_run_tap_success(TAP_S3_CSV_ID, TARGET_ID, ['fastsync', 'singer']) + assert_columns_exist() + + @pytest.mark.dependency(depends=['import_config']) + def test_replicate_mongodb_to_bq(self): + """Replicate mongodb to Bigquery""" + + def assert_columns_exist(table): + """Helper inner function to test if every table and column exists in the target""" + assertions.assert_cols_in_table(self.run_query_target_bigquery, 'ppw_e2e_tap_mongodb', table, + ['_id', 'document', '_sdc_extracted_at', + '_sdc_batched_at', '_sdc_deleted_at']) + + def assert_row_counts_equal(target_schema, table, count_in_source): + assert count_in_source == \ + self.run_query_target_bigquery(f'select count(_id) from {target_schema}.{table}')[0][0] + + # Run tap first time - fastsync and singer should be triggered + assertions.assert_run_tap_success(TAP_MONGODB_ID, TARGET_ID, ['fastsync', 'singer']) + assert_columns_exist('listings') + assert_columns_exist('my_collection') + + listing_count = self.mongodb_con['listings'].count_documents({}) + my_coll_count = self.mongodb_con['my_collection'].count_documents({}) + + assert_row_counts_equal('ppw_e2e_tap_mongodb', 'listings', listing_count) + assert_row_counts_equal('ppw_e2e_tap_mongodb', 'my_collection', my_coll_count) + + result_insert = self.mongodb_con.my_collection.insert_many([ + { + 'age': randint(10, 30), + 'id': 1001, + 'uuid': uuid.uuid4(), + 'ts': Timestamp(12030, 500) + }, + { + 'date': datetime.utcnow(), + 'id': 1002, + 'uuid': uuid.uuid4(), + 'regex': bson.Regex(r'^[A-Z]\\w\\d{2,6}.*$') + }, + { + 'uuid': uuid.uuid4(), + 'id': 1003, + 'nested_json': {'a': 1, 'b': 3, 'c': {'key': bson.datetime.datetime(2020, 5, 3, 10, 0, 0)}} + } + ]) + my_coll_count += len(result_insert.inserted_ids) + + result_del = self.mongodb_con.my_collection.delete_one({'_id': result_insert.inserted_ids[0]}) + my_coll_count -= result_del.deleted_count + + result_update = self.mongodb_con.my_collection.update_many({}, {'$set': {'id': 0}}) + + assertions.assert_run_tap_success(TAP_MONGODB_ID, TARGET_ID, ['singer']) + + assert result_update.modified_count == self.run_query_target_bigquery( + """select count(_id) + from ppw_e2e_tap_mongodb.my_collection + where SAFE_CAST(JSON_EXTRACT_SCALAR(document, '$.id') AS INT64) = 0 + """)[0][0] + + assert_row_counts_equal('ppw_e2e_tap_mongodb', 'my_collection', my_coll_count) diff --git a/tests/units/fastsync/commons/test_fastsync_target_bigquery.py b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py new file mode 100644 index 000000000..43902c6bb --- /dev/null +++ b/tests/units/fastsync/commons/test_fastsync_target_bigquery.py @@ -0,0 +1,284 @@ +import pytest +from unittest.mock import Mock, patch, ANY, mock_open +from google.cloud import bigquery +from pipelinewise.fastsync.commons.target_bigquery import FastSyncTargetBigquery + +@pytest.fixture(name='query_result') +def fixture_query_result(): + """ + Mocked Bigquery Run Query Results + """ + mocked_qr = Mock() + mocked_qr.return_value = [] + return mocked_qr + +# pylint: disable=W0613 +@pytest.fixture(name='bigquery_job') +def fixture_bigquery_job(query_result): + """ + Mocked Bigquery Job Query + """ + mocked_qj = Mock() + mocked_qj.output_rows = 0 + mocked_qj.job_id = 1 + mocked_qj.result().total_rows = 0 + return mocked_qj + +@pytest.fixture(name='bigquery_job_config') +def fixture_bigquery_job_config(): + """ + Mocked Bigquery Job Config + """ + mocked_qc = Mock() + return mocked_qc + +class FastSyncTargetBigqueryMock(FastSyncTargetBigquery): + """ + Mocked FastSyncTargetBigquery class + """ + def __init__(self, connection_config, transformation_config=None): + super().__init__(connection_config, transformation_config) + self.executed_queries = [] + + +# pylint: disable=attribute-defined-outside-init +class TestFastSyncTargetBigquery: + """ + Unit tests for fastsync target bigquery + """ + def setup_method(self): + """Initialise test FastSyncTargetPostgres object""" + self.bigquery = FastSyncTargetBigqueryMock(connection_config={'project_id': 'dummy-project'}, + transformation_config={}) + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_create_schema(self, client, bigquery_job): + """Validate if create schema queries generated correctly""" + client().query.return_value = bigquery_job + self.bigquery.create_schema('new_schema') + client().create_dataset.assert_called_with('new_schema', exists_ok=True) + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_drop_table(self, client, bigquery_job): + """Validate if drop table queries generated correctly""" + client().query.return_value = bigquery_job + + self.bigquery.drop_table('test_schema', 'test_table') + client().query.assert_called_with( + 'DROP TABLE IF EXISTS test_schema.`test_table`', job_config=ANY) + + self.bigquery.drop_table('test_schema', 'test_table', is_temporary=True) + client().query.assert_called_with( + 'DROP TABLE IF EXISTS test_schema.`test_table_temp`', job_config=ANY) + + self.bigquery.drop_table('test_schema', 'UPPERCASE_TABLE') + client().query.assert_called_with( + 'DROP TABLE IF EXISTS test_schema.`uppercase_table`', job_config=ANY) + + self.bigquery.drop_table('test_schema', 'UPPERCASE_TABLE', is_temporary=True) + client().query.assert_called_with( + 'DROP TABLE IF EXISTS test_schema.`uppercase_table_temp`', job_config=ANY) + + self.bigquery.drop_table('test_schema', 'test_table_with_space') + client().query.assert_called_with( + 'DROP TABLE IF EXISTS test_schema.`test_table_with_space`', job_config=ANY) + + self.bigquery.drop_table('test_schema', 'test table with space', is_temporary=True) + client().query.assert_called_with( + 'DROP TABLE IF EXISTS test_schema.`test_table_with_space_temp`', job_config=ANY) + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_create_table(self, client, bigquery_job): + """Validate if create table queries generated correctly""" + client().query.return_value = bigquery_job + + # Create table with standard table and column names + self.bigquery.create_table(target_schema='test_schema', + table_name='test_table', + columns=['`id` INTEGER', + '`txt` STRING']) + client().query.assert_called_with( + 'CREATE OR REPLACE TABLE test_schema.`test_table` (' + '`id` integer,`txt` string,' + '_sdc_extracted_at timestamp,' + '_sdc_batched_at timestamp,' + '_sdc_deleted_at timestamp)', + job_config=ANY) + + # Create table with reserved words in table and column names + self.bigquery.create_table(target_schema='test_schema', + table_name='order', + columns=['`id` INTEGER', + '`txt` STRING', + '`select` STRING']) + client().query.assert_called_with( + 'CREATE OR REPLACE TABLE test_schema.`order` (' + '`id` integer,`txt` string,`select` string,' + '_sdc_extracted_at timestamp,' + '_sdc_batched_at timestamp,' + '_sdc_deleted_at timestamp)', + job_config=ANY) + + # Create table with mixed lower and uppercase and space characters + self.bigquery.create_table(target_schema='test_schema', + table_name='TABLE with SPACE', + columns=['`ID` INTEGER', + '`COLUMN WITH SPACE` STRING']) + client().query.assert_called_with( + 'CREATE OR REPLACE TABLE test_schema.`table_with_space` (' + '`id` integer,`column with space` string,' + '_sdc_extracted_at timestamp,' + '_sdc_batched_at timestamp,' + '_sdc_deleted_at timestamp)', + job_config=ANY) + + # Create table with no primary key + self.bigquery.create_table(target_schema='test_schema', + table_name='test_table_no_pk', + columns=['`ID` INTEGER', + '`TXT` STRING']) + client().query.assert_called_with( + 'CREATE OR REPLACE TABLE test_schema.`test_table_no_pk` (' + '`id` integer,`txt` string,' + '_sdc_extracted_at timestamp,' + '_sdc_batched_at timestamp,' + '_sdc_deleted_at timestamp)', + job_config=ANY) + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.LoadJobConfig') + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_copy_to_table(self, client, load_job_config, bigquery_job_config, bigquery_job): + """Validate if COPY command generated correctly""" + # COPY table with standard table and column names + client().load_table_from_file.return_value = bigquery_job + load_job_config.return_value = bigquery_job_config + mocked_open = mock_open() + with patch('pipelinewise.fastsync.commons.target_bigquery.open', mocked_open): + self.bigquery.copy_to_table(filepath='/path/to/dummy-file.csv.gz', + target_schema='test_schema', + table_name='test_table', + size_bytes=1000, + is_temporary=False, + skip_csv_header=False) + mocked_open.assert_called_with('/path/to/dummy-file.csv.gz', 'rb') + assert bigquery_job_config.source_format == bigquery.SourceFormat.CSV + assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' + assert bigquery_job_config.allow_quoted_newlines is True + assert bigquery_job_config.skip_leading_rows == 0 + client().dataset.assert_called_with('test_schema') + client().dataset().table.assert_called_with('test_table') + assert client().load_table_from_file.call_count == 1 + + # COPY table with reserved word in table and column names in temp table + with patch('pipelinewise.fastsync.commons.target_bigquery.open', mocked_open): + self.bigquery.copy_to_table(filepath='/path/to/full-file.csv.gz', + target_schema='test_schema', + table_name='full', + size_bytes=1000, + is_temporary=True, + skip_csv_header=False) + mocked_open.assert_called_with('/path/to/full-file.csv.gz', 'rb') + assert bigquery_job_config.source_format == bigquery.SourceFormat.CSV + assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' + assert bigquery_job_config.allow_quoted_newlines is True + assert bigquery_job_config.skip_leading_rows == 0 + client().dataset.assert_called_with('test_schema') + client().dataset().table.assert_called_with('full_temp') + assert client().load_table_from_file.call_count == 2 + + # COPY table with space and uppercase in table name and s3 key + with patch('pipelinewise.fastsync.commons.target_bigquery.open', mocked_open): + self.bigquery.copy_to_table(filepath='/path/to/file with space.csv.gz', + target_schema='test_schema', + table_name='table with SPACE and UPPERCASE', + size_bytes=1000, + is_temporary=True, + skip_csv_header=False) + mocked_open.assert_called_with('/path/to/file with space.csv.gz', 'rb') + assert bigquery_job_config.source_format == bigquery.SourceFormat.CSV + assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' + assert bigquery_job_config.allow_quoted_newlines is True + assert bigquery_job_config.skip_leading_rows == 0 + client().dataset.assert_called_with('test_schema') + client().dataset().table.assert_called_with('table_with_space_and_uppercase_temp') + assert client().load_table_from_file.call_count == 3 + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_grant_select_on_table(self, client, bigquery_job): + """Validate if GRANT command generated correctly""" + # GRANT table with standard table and column names + client().query.return_value = bigquery_job + self.bigquery.grant_select_on_table(target_schema='test_schema', + table_name='test_table', + role='test_role', + is_temporary=False) + client().query.assert_called_with( + 'GRANT SELECT ON test_schema.`test_table` TO ROLE test_role', job_config=ANY) + + # GRANT table with reserved word in table and column names in temp table + self.bigquery.grant_select_on_table(target_schema='test_schema', + table_name='full', + role='test_role', + is_temporary=False) + client().query.assert_called_with( + 'GRANT SELECT ON test_schema.`full` TO ROLE test_role', job_config=ANY) + + # GRANT table with with space and uppercase in table name and s3 key + self.bigquery.grant_select_on_table(target_schema='test_schema', + table_name='table with SPACE and UPPERCASE', + role='test_role', + is_temporary=False) + client().query.assert_called_with( + 'GRANT SELECT ON test_schema.`table_with_space_and_uppercase` TO ROLE test_role', job_config=ANY) + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_grant_usage_on_schema(self, client, bigquery_job): + """Validate if GRANT command generated correctly""" + self.bigquery.grant_usage_on_schema(target_schema='test_schema', + role='test_role') + client().query.assert_called_with( + 'GRANT USAGE ON SCHEMA test_schema TO ROLE test_role', job_config=ANY) + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_grant_select_on_schema(self, client, bigquery_job): + """Validate if GRANT command generated correctly""" + self.bigquery.grant_select_on_schema(target_schema='test_schema', + role='test_role') + client().query.assert_called_with( + 'GRANT SELECT ON ALL TABLES IN SCHEMA test_schema TO ROLE test_role', job_config=ANY) + + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.CopyJobConfig') + @patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client') + def test_swap_tables(self, client, copy_job_config, bigquery_job_config, bigquery_job): + """Validate if swap table commands generated correctly""" + # Swap tables with standard table and column names + client().copy_table.return_value = bigquery_job + copy_job_config.return_value = bigquery_job_config + self.bigquery.swap_tables(schema='test_schema', + table_name='test_table') + assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' + client().copy_table.assert_called_with( + 'dummy-project.test_schema.test_table_temp', + 'dummy-project.test_schema.test_table', + job_config=ANY) + client().delete_table.assert_called_with('dummy-project.test_schema.test_table_temp') + + # Swap tables with reserved word in table and column names in temp table + self.bigquery.swap_tables(schema='test_schema', + table_name='full') + assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' + client().copy_table.assert_called_with( + 'dummy-project.test_schema.full_temp', + 'dummy-project.test_schema.full', + job_config=ANY) + client().delete_table.assert_called_with('dummy-project.test_schema.full_temp') + + # Swap tables with with space and uppercase in table name and s3 key + self.bigquery.swap_tables(schema='test_schema', + table_name='table with SPACE and UPPERCASE') + assert bigquery_job_config.write_disposition == 'WRITE_TRUNCATE' + client().copy_table.assert_called_with( + 'dummy-project.test_schema.table_with_space_and_uppercase_temp', + 'dummy-project.test_schema.table_with_space_and_uppercase', + job_config=ANY) + client().delete_table.assert_called_with('dummy-project.test_schema.table_with_space_and_uppercase_temp') diff --git a/tests/units/fastsync/test_mongodb_to_bigquery.py b/tests/units/fastsync/test_mongodb_to_bigquery.py new file mode 100644 index 000000000..88ed4a620 --- /dev/null +++ b/tests/units/fastsync/test_mongodb_to_bigquery.py @@ -0,0 +1,40 @@ +import unittest +from . import assertions + +from pipelinewise.fastsync.mongodb_to_bigquery import tap_type_to_target_type, sync_table, main_impl + +PACKAGE_IN_SCOPE = 'pipelinewise.fastsync.mongodb_to_bigquery' +TAP = 'FastSyncTapMongoDB' +TARGET = 'FastSyncTargetBigquery' + + +# pylint: disable=missing-function-docstring,invalid-name,no-self-use +class MongoDBToBigquery(unittest.TestCase): + """ + Unit tests for fastsync MongoDB to postgres + """ + def test_tap_type_to_target_type_with_defined_tap_type_returns_equivalent_target_type(self): + self.assertEqual('STRING', tap_type_to_target_type('object')) + + def test_tap_type_to_target_type_with_undefined_tap_type_returns_STRING(self): + self.assertEqual('STRING', tap_type_to_target_type('random-type')) + + @staticmethod + def test_sync_table_runs_successfully_returns_true(): + assertions.assert_sync_table_returns_true_on_success(sync_table, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_sync_table_exception_on_copy_table_returns_failed_table_name_and_exception(): + assertions.assert_sync_table_exception_on_failed_copy(sync_table, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_main_impl_with_all_tables_synced_successfully_should_exit_normally(): + assertions.assert_main_impl_exit_normally_on_success(main_impl, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_main_impl_with_one_table_fails_to_sync_should_exit_with_error(): + assertions.assert_main_impl_should_exit_with_error_on_failure(main_impl, PACKAGE_IN_SCOPE, TAP, TARGET) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/units/fastsync/test_mysql_to_bigquery.py b/tests/units/fastsync/test_mysql_to_bigquery.py new file mode 100644 index 000000000..b41a5e99b --- /dev/null +++ b/tests/units/fastsync/test_mysql_to_bigquery.py @@ -0,0 +1,48 @@ +import unittest +from . import assertions + +from pipelinewise.fastsync.mysql_to_bigquery import tap_type_to_target_type, sync_table, main_impl + +PACKAGE_IN_SCOPE = 'pipelinewise.fastsync.mysql_to_bigquery' +TAP = 'FastSyncTapMySql' +TARGET = 'FastSyncTargetBigquery' + + +# pylint: disable=missing-function-docstring,invalid-name,no-self-use +class MySQLToBigQuery(unittest.TestCase): + """ + Unit tests for fastsync mysql to bigquery + """ + def test_tap_type_to_target_type_with_defined_tap_type_returns_equivalent_target_type(self): + self.assertEqual('STRING', tap_type_to_target_type('binary', None)) + self.assertEqual('STRING', tap_type_to_target_type('geometry', None)) + self.assertEqual('STRING', tap_type_to_target_type('point', None)) + self.assertEqual('STRING', tap_type_to_target_type('linestring', None)) + self.assertEqual('STRING', tap_type_to_target_type('polygon', None)) + self.assertEqual('STRING', tap_type_to_target_type('multipoint', None)) + self.assertEqual('STRING', tap_type_to_target_type('multilinestring', None)) + self.assertEqual('STRING', tap_type_to_target_type('multipolygon', None)) + self.assertEqual('STRING', tap_type_to_target_type('geometrycollection', None)) + + def test_tap_type_to_target_type_with_undefined_tap_type_returns_STRING(self): + self.assertEqual('STRING', tap_type_to_target_type('random-type', 'random-type')) + + @staticmethod + def test_sync_table_runs_successfully_returns_true(): + assertions.assert_sync_table_returns_true_on_success(sync_table, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_sync_table_exception_on_copy_table_returns_failed_table_name_and_exception(): + assertions.assert_sync_table_exception_on_failed_copy(sync_table, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_main_impl_with_all_tables_synced_successfully_should_exit_normally(): + assertions.assert_main_impl_exit_normally_on_success(main_impl, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_main_impl_with_one_table_fails_to_sync_should_exit_with_error(): + assertions.assert_main_impl_should_exit_with_error_on_failure(main_impl, PACKAGE_IN_SCOPE, TAP, TARGET) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/units/fastsync/test_postgres_to_bigquery.py b/tests/units/fastsync/test_postgres_to_bigquery.py new file mode 100644 index 000000000..a1b680414 --- /dev/null +++ b/tests/units/fastsync/test_postgres_to_bigquery.py @@ -0,0 +1,40 @@ +import unittest +from . import assertions + +from pipelinewise.fastsync.postgres_to_bigquery import tap_type_to_target_type, sync_table, main_impl + +PACKAGE_IN_SCOPE = 'pipelinewise.fastsync.postgres_to_bigquery' +TAP = 'FastSyncTapPostgres' +TARGET = 'FastSyncTargetBigquery' + + +# pylint: disable=missing-function-docstring,invalid-name,no-self-use +class PostgresToBigquery(unittest.TestCase): + """ + Unit tests for fastsync postgres to bigquery + """ + def test_tap_type_to_target_type_with_defined_tap_type_returns_equivalent_target_type(self): + self.assertEqual('INT64', tap_type_to_target_type('serial')) + + def test_tap_type_to_target_type_with_undefined_tap_type_returns_STRING(self): + self.assertEqual('STRING', tap_type_to_target_type('random-type')) + + @staticmethod + def test_sync_table_runs_successfully_returns_true(): + assertions.assert_sync_table_returns_true_on_success(sync_table, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_sync_table_exception_on_copy_table_returns_failed_table_name_and_exception(): + assertions.assert_sync_table_exception_on_failed_copy(sync_table, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_main_impl_with_all_tables_synced_successfully_should_exit_normally(): + assertions.assert_main_impl_exit_normally_on_success(main_impl, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_main_impl_with_one_table_fails_to_sync_should_exit_with_error(): + assertions.assert_main_impl_should_exit_with_error_on_failure(main_impl, PACKAGE_IN_SCOPE, TAP, TARGET) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/units/fastsync/test_s3_csv_to_bigquery.py b/tests/units/fastsync/test_s3_csv_to_bigquery.py new file mode 100644 index 000000000..28a94d470 --- /dev/null +++ b/tests/units/fastsync/test_s3_csv_to_bigquery.py @@ -0,0 +1,40 @@ +import unittest +from . import assertions + +from pipelinewise.fastsync.s3_csv_to_bigquery import tap_type_to_target_type, sync_table, main_impl + +PACKAGE_IN_SCOPE = 'pipelinewise.fastsync.s3_csv_to_bigquery' +TAP = 'FastSyncTapS3Csv' +TARGET = 'FastSyncTargetBigquery' + + +# pylint: disable=missing-function-docstring,invalid-name,no-self-use +class S3CsvToPostgres(unittest.TestCase): + """ + Unit tests for fastsync s3 csv to bigquery + """ + def test_tap_type_to_target_type_with_defined_tap_type_returns_equivalent_target_type(self): + self.assertEqual('INT64', tap_type_to_target_type('integer')) + + def test_tap_type_to_target_type_with_undefined_tap_type_returns_STRING(self): + self.assertEqual('STRING', tap_type_to_target_type('random-type')) + + @staticmethod + def test_sync_table_runs_successfully_returns_true(): + assertions.assert_sync_table_returns_true_on_success(sync_table, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_sync_table_exception_on_copy_table_returns_failed_table_name_and_exception(): + assertions.assert_sync_table_exception_on_failed_copy(sync_table, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_main_impl_with_all_tables_synced_successfully_should_exit_normally(): + assertions.assert_main_impl_exit_normally_on_success(main_impl, PACKAGE_IN_SCOPE, TAP, TARGET) + + @staticmethod + def test_main_impl_with_one_table_fails_to_sync_should_exit_with_error(): + assertions.assert_main_impl_should_exit_with_error_on_failure(main_impl, PACKAGE_IN_SCOPE, TAP, TARGET) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/units/test_utils.py b/tests/units/test_utils.py index 682383588..3d947b0d5 100644 --- a/tests/units/test_utils.py +++ b/tests/units/test_utils.py @@ -16,6 +16,14 @@ def test_safe_column_name_case_1(self): self.assertEqual('"GROUP"', utils.safe_column_name(input_name)) def test_safe_column_name_case_2(self): + """ + Given an all lower case word would be wrapped in backticks and capitalized + """ + input_name = 'group' + + self.assertEqual('`GROUP`', utils.safe_column_name(input_name, '`')) + + def test_safe_column_name_case_3(self): """ Given a mixed-case word would be wrapped in double quotes and capitalized """ @@ -23,6 +31,14 @@ def test_safe_column_name_case_2(self): self.assertEqual('"CA SE"', utils.safe_column_name(input_name)) + def test_safe_column_name_case_4(self): + """ + Given a mixed-case word would be wrapped in backticks and capitalized + """ + input_name = 'CA se' + + self.assertEqual('`CA SE`', utils.safe_column_name(input_name, '`')) + def test_safe_column_name_is_null(self): """ Given a null word, we should get null back