From 0fef8a45f086e130c7dd210b92adce17d0800e02 Mon Sep 17 00:00:00 2001 From: Daniel Gafni Date: Tue, 31 Oct 2023 13:20:07 +0100 Subject: [PATCH] fix bigquery --- dagster_polars/io_managers/bigquery.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/dagster_polars/io_managers/bigquery.py b/dagster_polars/io_managers/bigquery.py index efa34ba..1ecd9d6 100644 --- a/dagster_polars/io_managers/bigquery.py +++ b/dagster_polars/io_managers/bigquery.py @@ -7,13 +7,14 @@ from dagster_polars.io_managers.utils import get_polars_metadata -lazy_import.lazy_module("google.cloud") +lazy_import.lazy_module("google.cloud.bigquery") lazy_import.lazy_module("dagster_gcp.bigquery.io_manager") if TYPE_CHECKING: - from dagster_gcp.bigquery.io_manager import BigQueryClient, BigQueryIOManager - from google.cloud import bigquery as bigquery + import dagster_gcp + import dagster_gcp.bigquery.io_manager + import google.cloud.bigquery class BigQueryPolarsTypeHandler(DbTypeHandler[pl.DataFrame]): @@ -48,9 +49,9 @@ def my_table() -> pd.DataFrame: # the name of the asset will be the table name def handle_output(self, context: OutputContext, table_slice: TableSlice, obj: pl.DataFrame, connection): """Stores the polars DataFrame in BigQuery.""" - assert isinstance(connection, bigquery.Client) + assert isinstance(connection, google.cloud.bigquery.Client) assert context.metadata is not None - job_config = bigquery.LoadJobConfig(write_disposition=context.metadata.get("write_disposition")) + job_config = google.cloud.bigquery.LoadJobConfig(write_disposition=context.metadata.get("write_disposition")) # FIXME: load_table_from_dataframe writes the dataframe to a temporary parquet file # and then calls load_table_from_file. This can cause problems in cloud environments @@ -70,12 +71,12 @@ def handle_output(self, context: OutputContext, table_slice: TableSlice, obj: pl def load_input(self, context: InputContext, table_slice: TableSlice, connection) -> pl.DataFrame: """Loads the input as a Polars DataFrame.""" - assert isinstance(connection, bigquery.Client) + assert isinstance(connection, google.cloud.bigquery.Client) if table_slice.partition_dimensions and len(context.asset_partition_keys) == 0: return pl.DataFrame() result = connection.query( - query=BigQueryClient.get_select_statement(table_slice), + query=dagster_gcp.bigquery.io_manager.BigQueryClient.get_select_statement(table_slice), project=table_slice.database, # type: ignore location=context.resource_config.get("location") if context.resource_config else None, # type: ignore timeout=context.resource_config.get("timeout") if context.resource_config else None, # type: ignore @@ -88,7 +89,7 @@ def supported_types(self): return [pl.DataFrame] -class BigQueryPolarsIOManager(BigQueryIOManager): +class BigQueryPolarsIOManager(dagster_gcp.bigquery.io_manager.BigQueryIOManager): """An I/O manager definition that reads inputs from and writes polars DataFrames to BigQuery. Returns: