Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 25, 2023
1 parent 5313502 commit 325e35e
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
6 changes: 4 additions & 2 deletions dagster_polars/io_managers/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,19 @@ def dump_df_to_path(self, context: OutputContext, df: pl.DataFrame, path: UPath)
def scan_df_from_path(self, path: UPath, context: InputContext) -> pl.LazyFrame:
assert context.metadata is not None

version_override = context.metadata.get("version") or self.version or None

version = DeltaTable(
str(path),
storage_options=self.get_storage_options(path),
version=context.metadata.get("version") or self.version or None,
version=int(version_override) if version_override else None,
).version()

context.log.debug(f"Reading Delta table with version: {version}")

return pl.scan_delta(
str(path),
version=version,
version=int(version) if version else None,
delta_table_options=context.metadata.get("delta_table_options"),
pyarrow_options=context.metadata.get("pyarrow_options"),
storage_options=self.get_storage_options(path),
Expand Down
2 changes: 1 addition & 1 deletion tests/test_polars_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def upstream(context: OpExecutionContext, config: UpstreamConfig) -> pl.DataFram

# get_saved_path(result, "upstream")

@asset(ins={"upstream": AssetIn(metadata={"version": "0"})})
@asset(ins={"upstream": AssetIn(metadata={"version": 0})})
def downstream_0(upstream: pl.DataFrame) -> None:
assert upstream["foo"].head(1).item() == "a"

Expand Down

0 comments on commit 325e35e

Please sign in to comment.