diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 801aa50..6ff3073 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: - name: Unit Tests run: make unit_test - + - name: Approve run: echo For security reasons, all pull requests need to be approved first before running Integration tests. @@ -71,3 +71,5 @@ jobs: GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }} TARGET_BIGQUERY_PROJECT: ${{ secrets.TARGET_BIGQUERY_PROJECT }} TARGET_BIGQUERY_SCHEMA: ${{ secrets.TARGET_BIGQUERY_SCHEMA }}_MR${{ github.event.number }}_${{ matrix.python-version }} + TARGET_BIGQUERY_GCS_BUCKET: ${{ secrets.TARGET_BIGQUERY_GCS_BUCKET }} + TARGET_BIGQUERY_GCS_KEY_PREFIX: ${{ secrets.TARGET_BIGQUERY_GCS_KEY_PREFIX }} diff --git a/README.md b/README.md index ca59c09..49f2b31 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,9 @@ Full list of options in `config.json`: | Property | Type | Required? | Description | | ------------------------------------- | --------- | ------------ | --------------------------------------------------------------- | -| project_id | String | Yes | BigQuery project | +| project_id | String | Yes | BigQuery project +| gcs_bucket | String | Yes | Google Cloud Storage Bucket to use to stage files | +| gcs_key_prefix | String | | Prefix to use for staged files in Google Cloud Storage | | location | String | | Region where BigQuery stores your dataset | | default_target_schema | String | | Name of the schema where the tables will be created. If `schema_mapping` is not defined then every stream sent by the tap is loaded into this schema. | | default_target_schema_select_permission | String | | Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created | diff --git a/setup.py b/setup.py index 588e699..9175953 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ install_requires=[ 'pipelinewise-singer-python>=1,<3', 'google-cloud-bigquery>=2.20.0,<2.35.0', - 'fastavro>=0.22.8,<=1.4.9' + 'google-cloud-storage>=2.0.0,<3.0.0', ], extras_require={ "test": [ diff --git a/target_bigquery/__init__.py b/target_bigquery/__init__.py index 02c350d..c875a3b 100644 --- a/target_bigquery/__init__.py +++ b/target_bigquery/__init__.py @@ -6,17 +6,14 @@ import io import json import logging -import os import sys from multiprocessing.pool import ThreadPool as Pool -from tempfile import mkstemp -from fastavro import writer, parse_schema from jsonschema import Draft7Validator, FormatChecker from singer import get_logger from target_bigquery import stream_utils -from target_bigquery.db_sync import DbSync +from target_bigquery.db_sync import DbSync, flatten_record from target_bigquery.exceptions import ( RecordValidationException, InvalidValidationOperationException @@ -121,7 +118,21 @@ def persist_lines(config, lines) -> None: "or more) Try removing 'multipleOf' methods from JSON schema.") raise RecordValidationException(f"Record does not pass schema validation. RECORD: {o['record']}") - primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record']) + if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete): + record = stream_utils.add_metadata_values_to_record(o) + else: + record = stream_utils.remove_metadata_values_from_record( + o, stream_to_sync[stream].stream_schema_message['schema'] + ) + + # Flatten record + record = flatten_record( + record, + stream_to_sync[stream].stream_schema_message['schema'], + max_level=stream_to_sync[stream].data_flattening_max_level + ) + + primary_key_string = stream_to_sync[stream].record_primary_key_string(record) if not primary_key_string: primary_key_string = 'RID-{}'.format(total_row_count[stream]) @@ -131,10 +142,7 @@ def persist_lines(config, lines) -> None: total_row_count[stream] += 1 # append record - if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete): - records_to_load[stream][primary_key_string] = stream_utils.add_metadata_values_to_record(o) - else: - records_to_load[stream][primary_key_string] = o['record'] + records_to_load[stream][primary_key_string] = record flush = False if row_count[stream] >= batch_size_rows: @@ -370,17 +378,8 @@ def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=F def flush_records(stream, records_to_load, row_count, db_sync): - parsed_schema = parse_schema(db_sync.avro_schema()) - csv_fd, csv_file = mkstemp() - with open(csv_file, 'wb') as out: - writer(out, parsed_schema, db_sync.records_to_avro(records_to_load.values())) - # Seek to the beginning of the file and load - with open(csv_file, 'r+b') as f: - db_sync.load_avro(f, row_count) - - # Delete temp file - os.remove(csv_file) + db_sync.load_records(records_to_load.values(), row_count) def main(): diff --git a/target_bigquery/db_sync.py b/target_bigquery/db_sync.py index 892f208..7ec7fd2 100644 --- a/target_bigquery/db_sync.py +++ b/target_bigquery/db_sync.py @@ -6,11 +6,12 @@ import time import datetime from decimal import Decimal, getcontext -from typing import MutableMapping +from typing import Mapping, MutableMapping from google.cloud import bigquery from google.cloud.bigquery import SchemaField +from google.cloud import storage from google.cloud.exceptions import Conflict @@ -23,6 +24,43 @@ ALLOWED_DECIMALS = Decimal(10) ** Decimal(-SCALE) MAX_NUM = (Decimal(10) ** Decimal(PRECISION-SCALE)) - ALLOWED_DECIMALS + +class BigQueryJSONEncoder(json.JSONEncoder): + + def __init__(self, schema: Mapping[str, bigquery.SchemaField], data_flattening_max_level: int, *args, **kwargs): + super().__init__(*args, **kwargs) + self.schema = schema + self.data_flattening_max_level = data_flattening_max_level + + def default(self, o): + if isinstance(o, (datetime.date, datetime.datetime, datetime.time)): + return o.isoformat() + if isinstance(o, Decimal): + return str(o) + return json.JSONEncoder.default(self, o) + + def _bq_format(self, o, field_type): + if field_type == 'string' and not isinstance(o, str): + return json.JSONEncoder.encode(self, o) + if field_type == 'numeric': + n = Decimal(o) + return MAX_NUM if n > MAX_NUM else -MAX_NUM if n < -MAX_NUM else n.quantize(ALLOWED_DECIMALS) + return o + + def encode(self, o): + if isinstance(o, Mapping): + # Preprocess record to make sure it is compatible with the BQ schema. + for k, v in o.items(): + field_type = self.schema[k].field_type.lower() + field_mode = self.schema[k].mode.lower() + if field_mode == 'repeated': + o[k] = [self._bq_format(each, field_type) for each in v] + else: + o[k] = self._bq_format(v, field_type) + + return json.JSONEncoder.encode(self, o) + + def validate_config(config): errors = [] required_config_keys = [ @@ -101,59 +139,6 @@ def column_type(name, schema_property): return SchemaField(safe_name, result_type, 'NULLABLE') -def column_type_avro(name, schema_property): - property_type = schema_property['type'] - property_format = schema_property.get('format', None) - result = {"name": safe_column_name(name, quotes=False)} - - if 'array' in property_type: - try: - items_type = column_type_avro(name, schema_property['items']) - result_type = { - 'type': 'array', - 'items': items_type['type']} - except KeyError: - result_type = 'string' - elif 'object' in property_type: - items_types = [ - column_type_avro(col, schema_property) - for col, schema_property in schema_property.get('properties', {}).items()] - - if items_types: - result_type = { - 'type': 'record', - 'name': name + '_properties', - 'fields': items_types} - else: - result_type = 'string' - - elif property_format == 'date-time': - result_type = { - 'type': 'long', - 'logicalType': 'timestamp-millis'} - elif property_format == 'time': - result_type = { - 'type': 'int', - 'logicalType': 'time-millis'} - elif 'number' in property_type: - result_type = { - 'type': 'bytes', - 'logicalType': 'decimal', - 'scale': 9, - 'precision': 38} - elif 'integer' in property_type and 'string' in property_type: - result_type = 'string' - elif 'integer' in property_type: - result_type = 'long' - elif 'boolean' in property_type: - result_type = 'boolean' - else: - result_type = 'string' - - result['type'] = ['null', result_type] - return result - - def safe_column_name(name, quotes=False): name = name.replace('`', '') pattern = '[^a-zA-Z0-9_]' @@ -221,13 +206,17 @@ def flatten_schema(d, parent_key=[], sep='__', level=0, max_level=0): return dict(sorted_items) -def flatten_record(d, parent_key=[], sep='__', level=0, max_level=0): +# pylint: disable=too-many-arguments +def flatten_record(d, schema, parent_key=[], sep='__', level=0, max_level=0): items = [] for k, v in d.items(): - k = safe_column_name(k, quotes=False) - new_key = flatten_key(k, parent_key, sep) - if isinstance(v, MutableMapping) and level < max_level: - items.extend(flatten_record(v, parent_key + [k], sep=sep, level=level+1, max_level=max_level).items()) + safe_key = safe_column_name(k, quotes=False) + new_key = flatten_key(safe_key, parent_key, sep) + key_schema = schema['properties'][k] + if isinstance(v, MutableMapping) and level < max_level and 'properties' in key_schema: + items.extend(flatten_record( + v, key_schema, parent_key + [safe_key], sep=sep, level=level+1, max_level=max_level).items() + ) else: items.append((new_key, v if type(v) is list or type(v) is dict else v)) return dict(items) @@ -292,6 +281,7 @@ def __init__(self, connection_config, stream_schema_message=None): project_id = self.connection_config['project_id'] location = self.connection_config.get('location', None) self.client = bigquery.Client(project=project_id, location=location) + self.gcs = storage.Client(project=project_id) self.schema_name = None self.grantees = None @@ -394,89 +384,66 @@ def table_name(self, stream_name, is_temporary=False, without_schema=False): def record_primary_key_string(self, record): if len(self.stream_schema_message['key_properties']) == 0: return None - flatten = flatten_record(record, max_level=self.data_flattening_max_level) try: - key_props = [str(flatten[p.lower()]) for p in self.stream_schema_message['key_properties']] + key_props = [str(record[p.lower()]) for p in self.stream_schema_message['key_properties']] except Exception as exc: - logger.info("Cannot find {} primary key(s) in record: {}".format(self.stream_schema_message['key_properties'], flatten)) + logger.info("Cannot find {} primary key(s) in record: {}".format(self.stream_schema_message['key_properties'], record)) raise exc return ','.join(key_props) - def avro_schema(self): - project_id = self.connection_config['project_id'] - pattern = r"[^A-Za-z0-9_]" - clean_project_id = re.sub(pattern, '', project_id) - schema = { - "type": "record", - "namespace": "{}.{}.pipelinewise.avro".format( - clean_project_id, - self.schema_name, - ), - "name": self.stream_schema_message['stream'], - "fields": [column_type_avro(name, c) for name, c in self.flatten_schema.items()]} - - if re.search(pattern, schema['name']): - schema["alias"] = schema['name'] - schema["name"] = re.sub(pattern, "_", schema['name']) - - return schema - - # TODO: write tests for the json.dumps lines below and verify nesting - # TODO: improve performance - def records_to_avro(self, records): + def records_to_json(self, records, schema): + field_map = {field.name: field for field in schema} for record in records: - flatten = flatten_record(record, max_level=self.data_flattening_max_level) - result = {} - for name, props in self.flatten_schema.items(): - if name in flatten: - if is_unstructured_object(props): - result[name] = json.dumps(flatten[name]) - # dump to string if array without items or recursive - elif ('array' in props['type'] and - (not 'items' in props - or '$ref' in props['items'])): - result[name] = json.dumps(flatten[name]) - # dump array elements to strings - elif ( - 'array' in props['type'] and - is_unstructured_object(props.get('items', {})) - ): - result[name] = [json.dumps(value) for value in flatten[name]] - elif 'number' in props['type']: - if flatten[name] is None: - result[name] = None - else: - n = Decimal(flatten[name]) - # limit n to the range -MAX_NUM to MAX_NUM - result[name] = MAX_NUM if n > MAX_NUM else -MAX_NUM if n < -MAX_NUM else n.quantize(ALLOWED_DECIMALS) - else: - result[name] = flatten[name] if name in flatten else '' - else: - result[name] = None - yield result - - def load_avro(self, f, count): + yield json.dumps( + record, + cls=BigQueryJSONEncoder, + schema=field_map, + data_flattening_max_level=self.data_flattening_max_level, + ) + '\n' + + # pylint: disable=too-many-locals + def load_records(self, records, count): stream_schema_message = self.stream_schema_message - stream = stream_schema_message['stream'] - logger.info("Loading {} rows into '{}'".format(count, self.table_name(stream, False))) + logger.info("Loading {} rows into '{}'".format( + count, self.table_name(stream_schema_message['stream'], False)) + ) project_id = self.connection_config['project_id'] # TODO: make temp table creation and DML atomic with merge temp_table = self.table_name(stream_schema_message['stream'], is_temporary=True, without_schema=True) - logger.info("INSERTING INTO {} ({})".format( temp_table, ', '.join(self.column_names()) )) + schema = [column_type(name, schema) for name, schema in self.flatten_schema.items()] + dataset_id = self.connection_config.get('temp_schema', self.schema_name).strip() table_ref = bigquery.DatasetReference(project_id, dataset_id).table(temp_table) job_config = bigquery.LoadJobConfig() - job_config.source_format = bigquery.SourceFormat.AVRO - job_config.use_avro_logical_types = True + job_config.schema = schema + job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON job_config.write_disposition = 'WRITE_TRUNCATE' - job = self.client.load_table_from_file(f, table_ref, job_config=job_config) - job.result() + + # Upload JSONL file. + bucket = self.connection_config['gcs_bucket'] + prefix = self.connection_config.get('gcs_key_prefix', '') + blob_name = f'{prefix}{stream_schema_message["stream"]}-{datetime.datetime.now().isoformat()}.json' + blob = self.gcs.get_bucket(bucket).blob(blob_name) + with blob.open('w') as f: + f.writelines(self.records_to_json(records, schema)) + + # Ingest into temp table + job = self.client.load_table_from_uri( + f'gs://{blob.bucket.name}/{blob.name}', + table_ref, + job_config=job_config, + ) + + try: + job.result() + finally: + blob.delete() if len(self.stream_schema_message['key_properties']) > 0: query = self.update_from_temp_table(temp_table) diff --git a/target_bigquery/stream_utils.py b/target_bigquery/stream_utils.py index 14b7874..095d29d 100644 --- a/target_bigquery/stream_utils.py +++ b/target_bigquery/stream_utils.py @@ -111,6 +111,18 @@ def parse_datetime(dt): return extended_record +def remove_metadata_values_from_record(record_message, schema): + """Remove metadata _sdc columns from incoming record message if not in SCHEMA + The location of the required attributes are fixed in the stream + """ + expected_metadata = {k for k in schema['properties'].keys() if k.startswith('_sdc_')} + + reduced_record = record_message['record'] + reduced_record = { + k: v for k, v in reduced_record.items() if not k.startswith('_sdc_') or k in expected_metadata + } + + return reduced_record def stream_name_to_dict(stream_name, separator='-'): """Transform stream name string to dictionary""" diff --git a/tests/integration/test_target_bigquery.py b/tests/integration/test_target_bigquery.py index 6154233..3659e72 100644 --- a/tests/integration/test_target_bigquery.py +++ b/tests/integration/test_target_bigquery.py @@ -468,7 +468,7 @@ def test_nested_schema_flattening(self): [{ 'c_pk': 1, 'c_array': '[1, 2, 3]', - 'c_object': None, + 'c_object': '{"key_1": "value_1"}', 'c_object_with_props__key_1': 'value_1', 'c_nested_object__nested_prop_1': 'nested_value_1', 'c_nested_object__nested_prop_2': 'nested_value_2', @@ -617,7 +617,7 @@ def test_logical_streams_from_pg_with_hard_delete_mapping(self): # ---------------------------------------------------------------------- # Check rows in table_two # ---------------------------------------------------------------------- - delete_time = datetime.datetime(2019, 10, 13, 14, 6, 31, 838000, tzinfo=timezone.utc) + delete_time = datetime.datetime(2019, 10, 13, 14, 6, 31, 838328, tzinfo=timezone.utc) expected_table_two = [ {'cid': 1, 'cvarchar': "updated row", "_sdc_deleted_at": None}, {'cid': 2, 'cvarchar': 'updated row', "_sdc_deleted_at": None}, diff --git a/tests/integration/utils.py b/tests/integration/utils.py index 43ad29f..daf9ae4 100644 --- a/tests/integration/utils.py +++ b/tests/integration/utils.py @@ -14,6 +14,9 @@ def get_db_config(): # Snowflake instance config['project_id'] = os.environ.get('TARGET_BIGQUERY_PROJECT') config['default_target_schema'] = os.environ.get("TARGET_BIGQUERY_SCHEMA") + config['location'] = os.environ.get("TARGET_BIGQUERY_LOCATION") + config['gcs_bucket'] = os.environ.get("TARGET_BIGQUERY_GCS_BUCKET") + config['gcs_key_prefix'] = os.environ.get("TARGET_BIGQUERY_GCS_KEY_PREFIX") # -------------------------------------------------------------------------- # The following variables needs to be empty. diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index d554291..c7eead3 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -240,12 +240,20 @@ def test_flatten_record(self): flatten_record = db_sync.flatten_record empty_record = {} + empty_schema = {'properties': {}} # Empty record should be empty dict - self.assertEqual(flatten_record(empty_record), {}) + self.assertEqual(flatten_record(empty_record, empty_schema), {}) not_nested_record = {"c_pk": 1, "c_varchar": "1", "c_int": 1} + schema = { + "properties": { + "c_pk": {"type": ["null", "integer"]}, + "c_varchar": {"type": ["null", "string"]}, + "c_int": {"type": ["null", "integer"]}, + }, + } # NO FLATTENING - Record with simple properties should be a plain dictionary - self.assertEqual(flatten_record(not_nested_record), not_nested_record) + self.assertEqual(flatten_record(not_nested_record, schema), not_nested_record) nested_record = { "c_pk": 1, @@ -259,9 +267,39 @@ def test_flatten_record(self): "multi_nested_prop2": "multi_value_2", }}} + schema = { + "properties": { + "c_pk": {"type": ["null", "integer"]}, + "c_varchar": {"type": ["null", "string"]}, + "c_int": {"type": ["null", "integer"]}, + "c_obj": { + "type": ["null", "object"], + "properties": { + "nested_prop1": { + "type": ["null", "string"], + }, + "nested_prop2": { + "type": ["null", "string"], + }, + "nested_prop3": { + "type": ["null", "object"], + "properties": { + "multi_nested_prop1": { + "type": ["null", "string"] + }, + "multi_nested_prop2": { + "type": ["null", "string"] + }, + }, + }, + }, + }, + }, + } + # NO FLATTENING - No flattening (default) self.maxDiff = None - self.assertEqual(flatten_record(nested_record), + self.assertEqual(flatten_record(nested_record, schema), { "c_pk": 1, "c_varchar": "1", @@ -272,7 +310,7 @@ def test_flatten_record(self): # NO FLATTENING # max_level: 0 : No flattening (default) - self.assertEqual(flatten_record(nested_record, max_level=0), + self.assertEqual(flatten_record(nested_record, schema, max_level=0), { "c_pk": 1, "c_varchar": "1", @@ -283,19 +321,19 @@ def test_flatten_record(self): # SEMI FLATTENING # max_level: 1 : Semi-flattening (default) - self.assertEqual(flatten_record(nested_record, max_level=1), + self.assertEqual(flatten_record(nested_record, schema, max_level=1), { "c_pk": 1, "c_varchar": "1", "c_int": 1, "c_obj__nested_prop1": "value_1", "c_obj__nested_prop2": "value_2", - "c_obj__nested_prop3": {"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": + "c_obj__nested_prop3": {"multi_nested_prop1": "multi_value_1", "multi_nested_prop2": "multi_value_2"} }) # FLATTENING - self.assertEqual(flatten_record(nested_record, max_level=10), + self.assertEqual(flatten_record(nested_record, schema, max_level=10), { "c_pk": 1, "c_varchar": "1", @@ -305,3 +343,41 @@ def test_flatten_record(self): "c_obj__nested_prop3__multi_nested_prop1": "multi_value_1", "c_obj__nested_prop3__multi_nested_prop2": "multi_value_2" }) + + # SEMI FLATTENING + # The `nested_prop3` doesn't have `properties` defined as so cannot be + # flattened as we don't know the set of possible keys. + schema = { + "properties": { + "c_pk": {"type": ["null", "integer"]}, + "c_varchar": {"type": ["null", "string"]}, + "c_int": {"type": ["null", "integer"]}, + "c_obj": { + "type": ["null", "object"], + "properties": { + "nested_prop1": { + "type": ["null", "string"], + }, + "nested_prop2": { + "type": ["null", "string"], + }, + "nested_prop3": { + "type": ["null", "object"], + }, + }, + }, + }, + } + + self.assertEqual(flatten_record(nested_record, schema, max_level=10), + { + "c_pk": 1, + "c_varchar": "1", + "c_int": 1, + "c_obj__nested_prop1": "value_1", + "c_obj__nested_prop2": "value_2", + "c_obj__nested_prop3": { + "multi_nested_prop1": "multi_value_1", + "multi_nested_prop2": "multi_value_2", + }, + })