Skip to content

Commit

Permalink
chore: clean up code and bump deps, fix timeout propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed Jul 1, 2024
1 parent f9384ca commit b6839a5
Show file tree
Hide file tree
Showing 13 changed files with 1,124 additions and 1,127 deletions.
1,794 changes: 847 additions & 947 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "z3-target-bigquery"
version = "0.7.1"
version = "0.7.2"
description = "z3-target-bigquery is a Singer target for BigQuery. It supports storage write, GCS, streaming, and batch load methods. Built with the Meltano SDK."
authors = ["Alex Butler <[email protected]>"]
keywords = ["ELT", "BigQuery"]
Expand Down
23 changes: 12 additions & 11 deletions target_bigquery/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
#
# The above copyright notice and this permission notice shall be included in all copies or
# substantial portions of the Software.
"""BigQuery Batch Job Sink.
Throughput test: 6m 25s @ 1M rows / 150 keys / 1.5GB
NOTE: This is naive and will vary drastically based on network speed, for example on a GCP VM.
"""
"""BigQuery Batch Job Sink."""

import os
from io import BytesIO
from mmap import mmap
from multiprocessing import Process
from multiprocessing.dummy import Process as _Thread
from queue import Empty
from typing import Any, Dict, NamedTuple, Optional, Type, Union
from typing import Any, Dict, Optional, Type, Union, cast

import orjson
from google.cloud import bigquery
Expand All @@ -35,13 +34,15 @@
class Job:
def __init__(
self,
data: Union[memoryview, bytes],
table: str,
data: Union[memoryview, bytes, mmap],
table: bigquery.TableReference,
config: Dict[str, Any],
timeout: Optional[float] = 600.0,
) -> None:
self.data = data
self.table = table
self.config = config
self.timeout = timeout
self.attempt = 1


Expand All @@ -63,7 +64,7 @@ def run(self) -> None:
BytesIO(job.data),
job.table,
num_retries=3,
timeout=self.config.get("timeout", 600),
timeout=job.timeout,
job_config=bigquery.LoadJobConfig(**job.config),
).result()
except Exception as exc:
Expand All @@ -81,7 +82,7 @@ def run(self) -> None:
f"[{self.ext_id}] Loaded {len(job.data)} bytes into {job.table}."
)
finally:
self.queue.task_done()
self.queue.task_done() # type: ignore


class BatchJobThreadWorker(BatchJobWorker, _Thread):
Expand All @@ -93,7 +94,7 @@ class BatchJobProcessWorker(BatchJobWorker, Process):


class BigQueryBatchJobSink(BaseBigQuerySink):
MAX_WORKERS = os.cpu_count() * 2
MAX_WORKERS = (os.cpu_count() or 1) * 2
WORKER_CAPACITY_FACTOR = 1
WORKER_CREATION_MIN_INTERVAL = 10.0

Expand All @@ -114,7 +115,7 @@ def worker_cls_factory(
worker_executor_cls: Type[Process], config: Dict[str, Any]
) -> Type[Union[BatchJobThreadWorker, BatchJobProcessWorker]]:
Worker = type("Worker", (BatchJobWorker, worker_executor_cls), {})
return Worker
return cast(Type[BatchJobThreadWorker], Worker)

def process_record(self, record: Dict[str, Any], context: Dict[str, Any]) -> None:
self.buffer.write(orjson.dumps(record, option=orjson.OPT_APPEND_NEWLINE))
Expand Down
4 changes: 1 addition & 3 deletions target_bigquery/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,4 @@
"_sdc_table_version",
]

DEFAULT_BUCKET_PATH = (
"gs://{bucket}/target_bigquery/{dataset}/{table}/extracted_date={date}/{batch_id}.jsonl.gz"
)
DEFAULT_BUCKET_PATH = "gs://{bucket}/target_bigquery/{dataset}/{table}/extracted_date={date}/{batch_id}.jsonl.gz"
110 changes: 62 additions & 48 deletions target_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,21 @@
from subprocess import PIPE, Popen
from tempfile import TemporaryFile
from textwrap import dedent, indent
from typing import IO, TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type, Union

from google.api_core.exceptions import Conflict, Forbidden, NotFound
from typing import (
IO,
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
cast,
)

from google.api_core.exceptions import NotFound
from google.cloud import bigquery, bigquery_storage_v1, storage
from google.cloud.bigquery import SchemaField
from google.cloud.bigquery.table import TimePartitioning, TimePartitioningType
Expand Down Expand Up @@ -123,6 +135,8 @@ def get_resolved_schema(self, apply_transforms: bool = False) -> List[bigquery.S
return DEFAULT_SCHEMA
elif self.ingestion_strategy is IngestionStrategy.DENORMALIZED:
return self.get_schema(apply_transforms)
else:
raise ValueError(f"Invalid ingestion strategy: {self.ingestion_strategy}")

def __str__(self) -> str:
return f"{self.project}.{self.dataset}.{self.name}"
Expand Down Expand Up @@ -166,30 +180,23 @@ def create_table(
This is a convenience method that wraps the creation of a dataset and
table in a single method call. It is idempotent and will not create
a new table if one already exists."""
try:
dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"]))
except NotFound:
dataset = client.create_dataset(self.as_dataset(**kwargs["dataset"]))
except (Conflict, Forbidden):
if dataset.location != kwargs["dataset"]["location"]:
raise Exception(
f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) "
f"does not match specified location: {kwargs['dataset']['location']}"
)
finally:
self._dataset = dataset
try:
self._table = client.get_table(self.as_ref())
except NotFound:
self._table = client.create_table(
self.as_table(
apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED,
**kwargs["table"],
if not hasattr(self, "_dataset"):
try:
self._dataset = client.get_dataset(self.as_dataset_ref())
except NotFound:
self._dataset = client.create_dataset(self.as_dataset(**kwargs["dataset"]))
if not hasattr(self, "_table"):
try:
self._table = client.get_table(self.as_ref())
except NotFound:
self._table = client.create_table(
self.as_table(
apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED,
**kwargs["table"],
)
)
)
else:
# Wait for eventual consistency
time.sleep(5)
# Wait for eventual consistency (for the sake of GRPC's default stream)
time.sleep(5)
return self._dataset, self._table

def default_table_options(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -314,7 +321,9 @@ def __init__(
):
self.merge_target = copy(self.table)
self.table = BigQueryTable(
name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts)
name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}",
**opts,
)
self.table.create_table(
self.client,
self.apply_transforms,
Expand All @@ -324,7 +333,8 @@ def __init__(
},
"dataset": {
"location": self.config.get(
"location", BigQueryTable.default_dataset_options()["location"]
"location",
BigQueryTable.default_dataset_options()["location"],
)
},
},
Expand All @@ -333,7 +343,9 @@ def __init__(
elif self._is_overwrite_candidate():
self.overwrite_target = copy(self.table)
self.table = BigQueryTable(
name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts)
name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}",
**opts,
)
self.table.create_table(
self.client,
self.apply_transforms,
Expand All @@ -343,7 +355,8 @@ def __init__(
},
"dataset": {
"location": self.config.get(
"location", BigQueryTable.default_dataset_options()["location"]
"location",
BigQueryTable.default_dataset_options()["location"],
)
},
},
Expand Down Expand Up @@ -456,7 +469,7 @@ def create_target(self, key_properties: Optional[List[str]] = None) -> None:
# Table opts
if key_properties and self.config.get("cluster_on_key_properties", False):
kwargs["table"]["clustering_fields"] = tuple(key_properties[:4])
partition_grain: str = self.config.get("partition_granularity")
partition_grain: Optional[str] = self.config.get("partition_granularity")
if partition_grain:
kwargs["table"]["time_partitioning"] = TimePartitioning(
type_=PARTITION_STRATEGY[partition_grain.upper()],
Expand Down Expand Up @@ -545,7 +558,7 @@ def clean_up(self) -> None:
f" {self.table.get_escaped_name()}; DROP TABLE IF EXISTS"
f" {self.table.get_escaped_name()};"
).result()
self.table = self.merge_target
self.table = cast(BigQueryTable, self.merge_target)
self.merge_target = None


Expand All @@ -561,7 +574,7 @@ class Denormalized:
wait=wait_fixed(1),
reraise=True,
)
def update_schema(self: BaseBigQuerySink) -> None:
def update_schema(self: BaseBigQuerySink) -> None: # type: ignore
"""Update the target schema."""
table = self.table.as_table()
current_schema = table.schema[:]
Expand All @@ -578,7 +591,9 @@ def update_schema(self: BaseBigQuerySink) -> None:
)

def preprocess_record(
self: BaseBigQuerySink, record: Dict[str, Any], context: Dict[str, Any]
self: BaseBigQuerySink, # type: ignore
record: Dict[str, Any],
context: Dict[str, Any],
) -> Dict[str, Any]:
"""Preprocess a record before writing it to the sink."""
return self.table.schema_translator.translate_record(record)
Expand Down Expand Up @@ -626,7 +641,7 @@ def storage_client_factory(
) -> bigquery_storage_v1.BigQueryWriteClient:
"""Get a BigQuery Storage Write client."""
if creds.path:
return bigquery_storage_v1.BigQueryWriteClient.from_service_account_file(creds.path)
return bigquery_storage_v1.BigQueryWriteClient.from_service_account_file(str(creds.path))
elif creds.json:
return bigquery_storage_v1.BigQueryWriteClient.from_service_account_info(
json.loads(creds.json)
Expand Down Expand Up @@ -798,6 +813,8 @@ def _jsonschema_property_to_bigquery_column(
return SchemaField(name, result_type, "NULLABLE")
except Exception:
return SchemaField(name, "JSON", "NULLABLE")
else:
raise ValueError(f"Invalid resolver version: {self.resolver_version}")

def _translate_record_to_bigquery_schema(
self, name: str, schema_property: dict, mode: str = "NULLABLE"
Expand All @@ -809,10 +826,7 @@ def _translate_record_to_bigquery_schema(
if len(properties) == 0:
return SchemaField(name, "JSON", mode)

fields = [
self._jsonschema_property_to_bigquery_column(col, t)
for col, t in properties
]
fields = [self._jsonschema_property_to_bigquery_column(col, t) for col, t in properties]
return SchemaField(name, "RECORD", mode, fields=fields)

def _bigquery_field_to_projection(
Expand Down Expand Up @@ -950,7 +964,7 @@ def write(self, data: bytes) -> None:
def flush(self) -> None:
"""Flush the compressor buffer."""
self._gzip.flush()
self._buffer.flush()
self.buffer.flush()

def close(self) -> None:
"""Close the compressor and wait for the gzip process to finish."""
Expand All @@ -959,30 +973,30 @@ def close(self) -> None:
self._gzip.close()
if self._compressor is not None:
self._compressor.wait()
self._buffer.flush()
self._buffer.seek(0)
self.buffer.flush()
self.buffer.seek(0)
self._closed = True

def getvalue(self) -> bytes:
"""Return the compressed buffer as a bytes object."""
if not self._closed:
self.close()
if self._compressor is not None:
return self._buffer.read()
return self._buffer.getvalue()
return self.buffer.read()
return self.buffer.getvalue() # type: ignore

def getbuffer(self) -> Union[memoryview, mmap.mmap]:
"""Return the compressed buffer as a memoryview or mmap."""
if not self._closed:
self.close()
if self._compressor is not None:
return mmap.mmap(self._buffer.fileno(), 0, access=mmap.ACCESS_READ)
return self._buffer.getbuffer()
return mmap.mmap(self.buffer.fileno(), 0, access=mmap.ACCESS_READ)
return self.buffer.getbuffer() # type: ignore

@property
def buffer(self) -> IO[bytes]:
"""Return the compressed buffer as a file-like object."""
return self._buffer
return cast(IO[bytes], self._buffer)

def __del__(self) -> None:
"""Close the compressor and wait for the gzip process to finish. Dereference the buffer."""
Expand All @@ -991,7 +1005,7 @@ def __del__(self) -> None:
# close the buffer, ignore error if we have an incremented rc due to memoryview
# the gc will take care of the rest when the worker dereferences the buffer
try:
self._buffer.close()
self.buffer.close()
except BufferError:
pass
if self._compressor is not None and self._compressor.poll() is None:
Expand Down
Loading

0 comments on commit b6839a5

Please sign in to comment.