Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

feat: add lazyframe support for parquet and delta #52

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ concurrency:
on:
workflow_dispatch:
push:
branches:
- master
pull_request:
branches:
- master
release:
types:
- created
Expand All @@ -31,13 +36,10 @@ jobs:
- "3.9"
- "3.8"
dagster_version:
- "1.4.0"
- "1.5.0"
- "1.5.1"
- "1.6.0"
polars_version:
- "0.17.0"
- "0.18.0"
- "0.19.0"
- "0.20"
steps:
- name: Setup python for test ${{ matrix.py }}
uses: actions/setup-python@v2
Expand Down Expand Up @@ -76,13 +78,10 @@ jobs:
- "3.9"
- "3.8"
dagster_version:
- "1.4.0"
- "1.5.0"
- "1.5.1"
- "1.6.0"
polars_version:
- "0.17.0"
- "0.18.0"
- "0.19.0"
- "0.20"
steps:
- name: Setup python for test ${{ matrix.py }}
uses: actions/setup-python@v2
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ Complete description of `dagster_polars` behavior for all supported type annotat
| Type annotation | Behavior |
| --- |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `DataFrame` | read/write DataFrame. Raise error if it's not found in storage. |
| `LazyFrame` | read LazyFrame. Raise error if it's not found in storage. |
| `LazyFrame` | read/write LazyFrame. Raise error if it's not found in storage. |
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
| `Optional[DataFrame]` | read/write DataFrame. Skip if it's not found in storage or the output is `None`. |
| `Optional[LazyFrame]` | read LazyFrame. Skip if it's not found in storage |
| `Optional[LazyFrame]` | read/write LazyFrame. Skip if it's not found in storage |
| `DataFrameWithMetadata` | read/write DataFrame and metadata. Raise error if it's not found in storage. |
| `LazyFrameWithMetadata` | read LazyFrame and metadata. Raise error if it's not found in storage. |
| `LazyFrameWithMetadata` | read/write LazyFrame and metadata. Raise error if it's not found in storage. |
| `Optional[DataFrameWithMetadata]` | read/write DataFrame and metadata. Skip if it's not found in storage or the output is `None`. |
| `Optional[LazyFrameWithMetadata]` | read LazyFrame and metadata. Skip if it's not found in storage. |
| `Optional[LazyFrameWithMetadata]` | read/write LazyFrame and metadata. Skip if it's not found in storage. |
| `DataFramePartitions` | read multiple DataFrames as `Dict[str, DataFrame]`. Raise an error if any of thems is not found in storage, unless `"allow_missing_partitions"` input metadata is set to `True` |
| `LazyFramePartitions` | read multiple LazyFrames as `Dict[str, LazyFrame]`. Raise an error if any of thems is not found in storage, unless `"allow_missing_partitions"` input metadata is set to `True` |
| `DataFramePartitionsWithMetadata` | read multiple DataFrames and metadata as `Dict[str, Tuple[DataFrame, StorageMetadata]]`. Raise an error if any of thems is not found in storage, unless `"allow_missing_partitions"` input metadata is set to `True` |
Expand Down
102 changes: 72 additions & 30 deletions dagster_polars/io_managers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import polars as pl
from dagster import (
ConfigurableIOManager,
EnvVar,
InitResourceContext,
InputContext,
MetadataValue,
Expand All @@ -27,9 +28,9 @@
from dagster import (
_check as check,
)
from dagster._annotations import experimental
from dagster._core.storage.upath_io_manager import is_dict_type
from pydantic.fields import Field, PrivateAttr
from pydantic import PrivateAttr
from pydantic.fields import Field

from dagster_polars.io_managers.utils import get_polars_metadata
from dagster_polars.types import (
Expand Down Expand Up @@ -129,13 +130,22 @@ def annotation_for_storage_metadata(annotation) -> bool:
return annotation_is_tuple_with_metadata(annotation)


@experimental
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
def _process_env_vars(config: Mapping[str, Any]) -> Dict[str, Any]:
out = {}
for key, value in config.items():
if isinstance(value, dict) and len(value) == 1 and next(iter(value.keys())) == "env":
out[key] = EnvVar(next(iter(value.values()))).get_value()
else:
out[key] = value
return out


class BasePolarsUPathIOManager(ConfigurableIOManager, UPathIOManager):
"""Base class for `dagster-polars` IOManagers.

Doesn't define a specific storage format.

To implement a specific storage format (parquet, csv, etc), inherit from this class and implement the `dump_df_to_path` and `scan_df_from_path` methods.
To implement a specific storage format (parquet, csv, etc), inherit from this class and implement the `write_df_to_path` and `scan_df_from_path` methods.

Features:
- All the features of :py:class:`~dagster.UPathIOManager` - works with local and remote filesystems (like S3), supports loading multiple partitions with respect to :py:class:`~dagster.PartitionMapping`, and more
Expand All @@ -146,20 +156,23 @@ class BasePolarsUPathIOManager(ConfigurableIOManager, UPathIOManager):
"""

base_dir: Optional[str] = Field(default=None, description="Base directory for storing files.")

_base_path: "UPath" = PrivateAttr()
cloud_storage_options: Optional[Mapping[str, Any]] = Field(
danielgafni marked this conversation as resolved.
Show resolved Hide resolved
default=None, description="Storage authentication for cloud object store", alias="storage_options"
)
_base_path = PrivateAttr()

def setup_for_execution(self, context: InitResourceContext) -> None:
from upath import UPath

sp = _process_env_vars(self.cloud_storage_options) if self.cloud_storage_options is not None else {}
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
self._base_path = (
UPath(self.base_dir)
UPath(self.base_dir, **sp)
if self.base_dir is not None
else UPath(check.not_none(context.instance).storage_directory())
)

@abstractmethod
def dump_df_to_path(
def write_df_to_path(
self,
context: OutputContext,
df: pl.DataFrame,
Expand All @@ -168,6 +181,16 @@ def dump_df_to_path(
):
...

@abstractmethod
def sink_df_to_path(
self,
context: OutputContext,
df: pl.LazyFrame,
path: "UPath",
metadata: Optional[StorageMetadata] = None,
):
...

@overload
@abstractmethod
def scan_df_from_path(
Expand Down Expand Up @@ -225,25 +248,53 @@ def load_input(self, context: InputContext) -> Union[Any, Dict[str, Any]]:
def dump_to_path(
self,
context: OutputContext,
obj: Union[pl.DataFrame, Optional[pl.DataFrame], Tuple[pl.DataFrame, Dict[str, Any]]],
obj: Union[
pl.DataFrame,
Optional[pl.DataFrame],
Tuple[pl.DataFrame, Dict[str, Any]],
pl.LazyFrame,
Optional[pl.LazyFrame],
Tuple[pl.LazyFrame, Dict[str, Any]],
],
path: "UPath",
partition_key: Optional[str] = None,
):
if annotation_is_typing_optional(context.dagster_type.typing_type) and (
obj is None or annotation_for_storage_metadata(context.dagster_type.typing_type) and obj[0] is None
typing_type = context.dagster_type.typing_type

if annotation_is_typing_optional(typing_type) and (
obj is None or annotation_for_storage_metadata(typing_type) and obj[0] is None
):
context.log.warning(self.get_optional_output_none_log_message(context, path))
return
else:
assert obj is not None, "output should not be None if it's type is not Optional"
if not annotation_for_storage_metadata(context.dagster_type.typing_type):
obj = cast(pl.DataFrame, obj)
df = obj
self.dump_df_to_path(context=context, df=df, path=path)
if not annotation_for_storage_metadata(typing_type):
if typing_type == pl.DataFrame:
obj = cast(pl.DataFrame, obj)
df = obj
self.write_df_to_path(context=context, df=df, path=path)
elif typing_type == pl.LazyFrame:
obj = cast(pl.LazyFrame, obj)
df = obj
self.sink_df_to_path(context=context, df=df, path=path)
else:
raise NotImplementedError
else:
obj = cast(Tuple[pl.DataFrame, Dict[str, Any]], obj)
df, metadata = obj
self.dump_df_to_path(context=context, df=df, path=path, metadata=metadata)
if not annotation_is_typing_optional(typing_type):
frame_type = get_args(typing_type)[0]
else:
frame_type = get_args(get_args(typing_type)[0])[0]

if frame_type == pl.DataFrame:
obj = cast(Tuple[pl.DataFrame, Dict[str, Any]], obj)
df, metadata = obj
self.write_df_to_path(context=context, df=df, path=path, metadata=metadata)
elif frame_type == pl.LazyFrame:
obj = cast(Tuple[pl.LazyFrame, Dict[str, Any]], obj)
df, metadata = obj
self.sink_df_to_path(context=context, df=df, path=path, metadata=metadata)
else:
raise NotImplementedError

def load_from_path(
self, context: InputContext, path: "UPath", partition_key: Optional[str] = None
Expand Down Expand Up @@ -306,7 +357,9 @@ def load_from_path(
else:
raise NotImplementedError(f"Can't load object for type annotation {context.dagster_type.typing_type}")

def get_metadata(self, context: OutputContext, obj: pl.DataFrame) -> Dict[str, MetadataValue]:
def get_metadata(
self, context: OutputContext, obj: Union[pl.DataFrame, pl.LazyFrame, None]
) -> Dict[str, MetadataValue]:
if obj is None:
return {"missing": MetadataValue.bool(True)}
else:
Expand All @@ -316,17 +369,6 @@ def get_metadata(self, context: OutputContext, obj: pl.DataFrame) -> Dict[str, M
df = obj
return get_polars_metadata(context, df) if df is not None else {"missing": MetadataValue.bool(True)}

@staticmethod
def get_storage_options(path: "UPath") -> dict:
storage_options = {}

try:
storage_options.update(path.storage_options.copy())
except AttributeError:
pass

return storage_options

def get_path_for_partition(
self, context: Union[InputContext, OutputContext], path: "UPath", partition: str
) -> "UPath":
Expand Down
5 changes: 4 additions & 1 deletion dagster_polars/io_managers/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@

import polars as pl
from dagster import InputContext, MetadataValue, OutputContext
from dagster._annotations import experimental
from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice

from dagster_polars.io_managers.utils import get_polars_metadata

try:
from dagster_gcp.bigquery.io_manager import BigQueryClient, BigQueryIOManager
from google.cloud import bigquery as bigquery
except ImportError as e:
raise ImportError("Install 'dagster-polars[gcp]' to use BigQuery functionality") from e
from dagster_polars.io_managers.utils import get_polars_metadata


@experimental
class PolarsBigQueryTypeHandler(DbTypeHandler[pl.DataFrame]):
"""Plugin for the BigQuery I/O Manager that can store and load Polars DataFrames as BigQuery tables.

Expand Down
Loading
Loading