Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use JSONL to upload to BigQuery #86

Open
wants to merge 8 commits into
base: jsonl
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:

- name: Unit Tests
run: make unit_test

- name: Approve
run: echo For security reasons, all pull requests need to be approved first before running Integration tests.

Expand Down Expand Up @@ -71,3 +71,5 @@ jobs:
GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }}
TARGET_BIGQUERY_PROJECT: ${{ secrets.TARGET_BIGQUERY_PROJECT }}
TARGET_BIGQUERY_SCHEMA: ${{ secrets.TARGET_BIGQUERY_SCHEMA }}_MR${{ github.event.number }}_${{ matrix.python-version }}
TARGET_BIGQUERY_GCS_BUCKET: ${{ secrets.TARGET_BIGQUERY_GCS_BUCKET }}
TARGET_BIGQUERY_GCS_KEY_PREFIX: ${{ secrets.TARGET_BIGQUERY_GCS_KEY_PREFIX }}
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ Full list of options in `config.json`:

| Property | Type | Required? | Description |
| ------------------------------------- | --------- | ------------ | --------------------------------------------------------------- |
| project_id | String | Yes | BigQuery project |
| project_id | String | Yes | BigQuery project
| gcs_bucket | String | Yes | Google Cloud Storage Bucket to use to stage files |
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this optional instead? I'd prefer if we don't force people to use GCS as I know not everyone is using it and it's a breaking change.
Should be OK to upload then read from GCS if this parameter is provided or use load_table_from_file if not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO there's no real reason not to ingest via GCS. It's certainly not a priority for us and I don't think I'd have the time to make the change + add additional testing in the near future. Though you're more than welcome to make the necessary changes. It will uglify the code a bit too.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thing is that we have been using it without uploading to GCS for years now and this would make it more difficult for us to have PII compliance unless we deleted the files later. I'm going to merge this to a new jsonl branch and I'll do that change and some testing on it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation does delete the files:

try:
job.result()
finally:
blob.delete()

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good! just for the sake of conversation. Let's say we make GCS mandatory. What would be the benefits of doing that?
I can see that it would be more similar to the Snowflake target maintained by PipelineWise that forces you to use a stage or S3. But I don't think you'd win any speed, debug capabilities (if we delete the file in a finally) or anything else.
What do you think?

Copy link
Contributor Author

@judahrand judahrand Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

from https://www.oreilly.com/library/view/google-bigquery-the/9781492044451/ch04.html

^ This suggests that when your network is fast loading into GCS is better. At least the way we're running this (and I'd guess is common) both our DB and (obviously) BQ are in GCP (in the same region in fact). So the network connection is very fast (10Gbps+) This makes GCS the obvious choice (and probably uncompressed better than compressed).

Do you disagree?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also given that we're already not using compression that also suggests that outside GCP loading via GCS is still quicker.

| gcs_key_prefix | String | | Prefix to use for staged files in Google Cloud Storage |
jmriego marked this conversation as resolved.
Show resolved Hide resolved
| location | String | | Region where BigQuery stores your dataset |
| default_target_schema | String | | Name of the schema where the tables will be created. If `schema_mapping` is not defined then every stream sent by the tap is loaded into this schema. |
| default_target_schema_select_permission | String | | Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created |
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
install_requires=[
'pipelinewise-singer-python>=1,<3',
'google-cloud-bigquery>=2.20.0,<2.35.0',
'fastavro>=0.22.8,<=1.4.9'
'google-cloud-storage>=2.0.0,<3.0.0',
],
extras_require={
"test": [
Expand Down
37 changes: 18 additions & 19 deletions target_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
import io
import json
import logging
import os
import sys
from multiprocessing.pool import ThreadPool as Pool

from tempfile import mkstemp
from fastavro import writer, parse_schema
from jsonschema import Draft7Validator, FormatChecker
from singer import get_logger

from target_bigquery import stream_utils
from target_bigquery.db_sync import DbSync
from target_bigquery.db_sync import DbSync, flatten_record
from target_bigquery.exceptions import (
RecordValidationException,
InvalidValidationOperationException
Expand Down Expand Up @@ -121,7 +118,21 @@ def persist_lines(config, lines) -> None:
"or more) Try removing 'multipleOf' methods from JSON schema.")
raise RecordValidationException(f"Record does not pass schema validation. RECORD: {o['record']}")

primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record'])
if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete):
record = stream_utils.add_metadata_values_to_record(o)
else:
record = stream_utils.remove_metadata_values_from_record(
o, stream_to_sync[stream].stream_schema_message['schema']
)

# Flatten record
record = flatten_record(
record,
stream_to_sync[stream].stream_schema_message['schema'],
max_level=stream_to_sync[stream].data_flattening_max_level
)

primary_key_string = stream_to_sync[stream].record_primary_key_string(record)
if not primary_key_string:
primary_key_string = 'RID-{}'.format(total_row_count[stream])

Expand All @@ -131,10 +142,7 @@ def persist_lines(config, lines) -> None:
total_row_count[stream] += 1

# append record
if config.get('add_metadata_columns') or hard_delete_mapping.get(stream, default_hard_delete):
records_to_load[stream][primary_key_string] = stream_utils.add_metadata_values_to_record(o)
else:
records_to_load[stream][primary_key_string] = o['record']
records_to_load[stream][primary_key_string] = record

flush = False
if row_count[stream] >= batch_size_rows:
Expand Down Expand Up @@ -370,17 +378,8 @@ def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=F


def flush_records(stream, records_to_load, row_count, db_sync):
parsed_schema = parse_schema(db_sync.avro_schema())
csv_fd, csv_file = mkstemp()
with open(csv_file, 'wb') as out:
writer(out, parsed_schema, db_sync.records_to_avro(records_to_load.values()))

# Seek to the beginning of the file and load
with open(csv_file, 'r+b') as f:
db_sync.load_avro(f, row_count)

# Delete temp file
os.remove(csv_file)
db_sync.load_records(records_to_load.values(), row_count)


def main():
Expand Down
Loading