Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polars Datatype Catalog Entry Cannot Partition on Saving Parquet #908

Open
alexdavis24 opened this issue Oct 18, 2024 · 3 comments
Open
Labels
Community Issue/PR opened by the open-source community support: needs more info

Comments

@alexdavis24
Copy link

alexdavis24 commented Oct 18, 2024

Description

Context

  • Trying to partition a large dataframe using polars across a single column

Steps to Reproduce

  1. Sample dataset that runs locally with no issues:
df = pl.DataFrame(
    {"A": [1, 2, 3],
        "B": [4, 5, 6]}
)
path = "tmp/test.parquet"
df.write_parquet(
    path,
    use_pyarrow=True,
    pyarrow_options={"partition_cols": ["B"]}
)
# this also runs with no issues
df.write_parquet(
    path,
    partition_by=["B"]
)
  1. Sample code following the same implementation:
# pipelines.py
from kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline
import polars as pl
def my_func():
    return pl.DataFrame(
    {"A": [1, 2, 3],
        "B": [4, 5, 6]}
    )
def create_pipeline() -> Pipeline:
    return pipeline(
    node(
                func=my_func,
                inputs={}
                outputs="my_entry",
                name="partition_polars"        
    )
# catalog.yml
# using Rust
my_entry:
  # also tried with polars.LazyPolarsDatset
  type: polars.EagerPolarsDataset 
  filepath: /tmp/test.parquet
  file_format: parquet
  save_args:
    partition_by: 
      - B
# catalog.yml
# using pyarrow (C++)
my_entry:
  type: polars.EagerPolarsDataset
  filepath: /tmp/test.parquet
  file_format: parquet
  save_args:
    use_pyarrow: True
    pyarrow_options:
      partition_cols: 
      - B
  fs_args:
    filesystem: pyarrow._fs.FileSystem

Expected Result

  • New partitioned parquet file should be created locally or in S3

Actual Result

From Rust implementation:

DatasetError: Failed while saving data to data set
EagerPolarsDataset(file_format=parquet, filepath=/tmp/test.parquet,
load_args={}, protocol=file, save_args={'partition_by': ['dt1y']}).
'BytesIO' object cannot be converted to 'PyString'

From Pyarrow:

DatasetError: Failed while saving data to data set
 LazyPolarsDataset(filepath=/tmp/test.parquet, load_args={}, protocol=file, 
save_args={'pyarrow_options': {'compression': zstd, 'partition_cols': ['dt1y'],
'write_statistics': True}, 'use_pyarrow': True}).
Argument 'filesystem' has incorrect type (expected pyarrow._fs.FileSystem, got 
NoneType)

Your Environment

  • Kedro version used (pip show kedro or kedro -V): 0.19.3
  • Polars: 1.9.0 and 1.6.0
  • Python version used (python -V): 3.11
  • Operating system and version: MacOS M1 using Docker Compose + Docker Desktop
@datajoely
Copy link
Contributor

Just want to say thanks for such a clear write up and investigation 💪

@SajidAlamQB
Copy link
Contributor

Thank you @alexdavis24 for reporting this, we'll have a look.

@astrojuanlu astrojuanlu transferred this issue from kedro-org/kedro Oct 25, 2024
@astrojuanlu astrojuanlu added the Community Issue/PR opened by the open-source community label Nov 18, 2024
@merelcht
Copy link
Member

Hi @alexdavis24 , I've been able to replicate the issue. I'm not super familiar with Polars and/or Pyarrow, but I think your analysis that the issue lies in the saving with BytesIO is correct. It also seems that because in the implementation of the save method, the data is written to a BytesIO buffer and then uses fsspec to write it to the target path, it completely bypasses the PyArrow filesystem and shouldn't require you to pass a filesystem argument. However, if PyArrow is being invoked with a None filesystem somehow, the issue might be with how fsspec or the BytesIO buffer is handled.

I managed to get things working with the following catalog entry:

my_entry:
  type: polars.EagerPolarsDataset
  filepath: /tmp/test.parquet
  file_format: parquet
  save_args:
    pyarrow_options:
      partition_cols:
      - B

So removing the explicit filesystem argument and also removing use_pyarrow: True. I don't know if this produces the desired result though. Let me know what you think of this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community support: needs more info
Projects
None yet
Development

No branches or pull requests

5 participants