From 325e35efcf6f679c8ab6de31d4145f19d2465b70 Mon Sep 17 00:00:00 2001 From: Daniel Gafni Date: Wed, 25 Oct 2023 13:22:42 +0200 Subject: [PATCH] fixes --- dagster_polars/io_managers/delta.py | 6 ++++-- tests/test_polars_delta.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dagster_polars/io_managers/delta.py b/dagster_polars/io_managers/delta.py index d37b39b..9e10d2f 100644 --- a/dagster_polars/io_managers/delta.py +++ b/dagster_polars/io_managers/delta.py @@ -65,17 +65,19 @@ def dump_df_to_path(self, context: OutputContext, df: pl.DataFrame, path: UPath) def scan_df_from_path(self, path: UPath, context: InputContext) -> pl.LazyFrame: assert context.metadata is not None + version_override = context.metadata.get("version") or self.version or None + version = DeltaTable( str(path), storage_options=self.get_storage_options(path), - version=context.metadata.get("version") or self.version or None, + version=int(version_override) if version_override else None, ).version() context.log.debug(f"Reading Delta table with version: {version}") return pl.scan_delta( str(path), - version=version, + version=int(version) if version else None, delta_table_options=context.metadata.get("delta_table_options"), pyarrow_options=context.metadata.get("pyarrow_options"), storage_options=self.get_storage_options(path), diff --git a/tests/test_polars_delta.py b/tests/test_polars_delta.py index e2aa019..eb64e9b 100644 --- a/tests/test_polars_delta.py +++ b/tests/test_polars_delta.py @@ -230,7 +230,7 @@ def upstream(context: OpExecutionContext, config: UpstreamConfig) -> pl.DataFram # get_saved_path(result, "upstream") - @asset(ins={"upstream": AssetIn(metadata={"version": "0"})}) + @asset(ins={"upstream": AssetIn(metadata={"version": 0})}) def downstream_0(upstream: pl.DataFrame) -> None: assert upstream["foo"].head(1).item() == "a"