Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
fix bigquery
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 31, 2023
1 parent 02ab713 commit 0fef8a4
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions dagster_polars/io_managers/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

from dagster_polars.io_managers.utils import get_polars_metadata

lazy_import.lazy_module("google.cloud")
lazy_import.lazy_module("google.cloud.bigquery")
lazy_import.lazy_module("dagster_gcp.bigquery.io_manager")


if TYPE_CHECKING:
from dagster_gcp.bigquery.io_manager import BigQueryClient, BigQueryIOManager
from google.cloud import bigquery as bigquery
import dagster_gcp
import dagster_gcp.bigquery.io_manager
import google.cloud.bigquery


class BigQueryPolarsTypeHandler(DbTypeHandler[pl.DataFrame]):
Expand Down Expand Up @@ -48,9 +49,9 @@ def my_table() -> pd.DataFrame: # the name of the asset will be the table name

def handle_output(self, context: OutputContext, table_slice: TableSlice, obj: pl.DataFrame, connection):
"""Stores the polars DataFrame in BigQuery."""
assert isinstance(connection, bigquery.Client)
assert isinstance(connection, google.cloud.bigquery.Client)
assert context.metadata is not None
job_config = bigquery.LoadJobConfig(write_disposition=context.metadata.get("write_disposition"))
job_config = google.cloud.bigquery.LoadJobConfig(write_disposition=context.metadata.get("write_disposition"))

# FIXME: load_table_from_dataframe writes the dataframe to a temporary parquet file
# and then calls load_table_from_file. This can cause problems in cloud environments
Expand All @@ -70,12 +71,12 @@ def handle_output(self, context: OutputContext, table_slice: TableSlice, obj: pl

def load_input(self, context: InputContext, table_slice: TableSlice, connection) -> pl.DataFrame:
"""Loads the input as a Polars DataFrame."""
assert isinstance(connection, bigquery.Client)
assert isinstance(connection, google.cloud.bigquery.Client)

if table_slice.partition_dimensions and len(context.asset_partition_keys) == 0:
return pl.DataFrame()
result = connection.query(
query=BigQueryClient.get_select_statement(table_slice),
query=dagster_gcp.bigquery.io_manager.BigQueryClient.get_select_statement(table_slice),
project=table_slice.database, # type: ignore
location=context.resource_config.get("location") if context.resource_config else None, # type: ignore
timeout=context.resource_config.get("timeout") if context.resource_config else None, # type: ignore
Expand All @@ -88,7 +89,7 @@ def supported_types(self):
return [pl.DataFrame]


class BigQueryPolarsIOManager(BigQueryIOManager):
class BigQueryPolarsIOManager(dagster_gcp.bigquery.io_manager.BigQueryIOManager):
"""An I/O manager definition that reads inputs from and writes polars DataFrames to BigQuery.
Returns:
Expand Down

0 comments on commit 0fef8a4

Please sign in to comment.