Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Dataframe to be converted into views which can be used in register_table #1016

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ def __init__(self, df: DataFrameInternal) -> None:
"""
self.df = df

def into_view(self) -> pa.Table:
"""Register this DataFrame as a view in the current session."""
return self.df.into_view()

def __getitem__(self, key: str | List[str]) -> DataFrame:
"""Return a new :py:class`DataFrame` with the specified column or columns.

Expand Down
34 changes: 34 additions & 0 deletions python/tests/test_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datafusion import SessionContext, col, literal


def test_register_filtered_dataframe():
ctx = SessionContext()

data = {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]}

df = ctx.from_pydict(data, "my_table")

df_filtered = df.filter(col("a") > literal(2))
view = df_filtered.into_view()

assert view.kind == "view"

ctx.register_table("view1", view)
Comment on lines +30 to +34
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is modelled after how into_view is used in datafusion:

async fn with_column_renamed_ambiguous() -> Result<()> {
    let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?;
    let ctx = SessionContext::new();

    let table = df.into_view();
    ctx.register_table("t1", table.clone())?;
    ctx.register_table("t2", table)?;


df_view = ctx.sql("SELECT * FROM view1")

filtered_results = df_view.collect()

result_dicts = [batch.to_pydict() for batch in filtered_results]

expected_results = [{"a": [3, 4, 5], "b": [30, 40, 50]}]

assert result_dicts == expected_results

df_results = df.collect()

df_result_dicts = [batch.to_pydict() for batch in df_results]

expected_df_results = [{"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]}]

assert df_result_dicts == expected_df_results
29 changes: 29 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::arrow::util::pretty;
use datafusion::common::UnnestOptions;
use datafusion::config::{CsvOptions, TableParquetOptions};
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::datasource::TableProvider;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use datafusion::prelude::*;
Expand All @@ -39,6 +40,7 @@ use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyCapsule, PyTuple, PyTupleMethods};
use tokio::task::JoinHandle;

use crate::catalog::PyTable;
use crate::errors::{py_datafusion_err, PyDataFusionError};
use crate::expr::sort_expr::to_sort_expressions;
use crate::physical_plan::PyExecutionPlan;
Expand All @@ -50,6 +52,22 @@ use crate::{
expr::{sort_expr::PySortExpr, PyExpr},
};

#[pyclass(name = "TableProvider", module = "datafusion")]
pub struct PyTableProvider {
provider: Arc<dyn TableProvider>,
}

impl PyTableProvider {
pub fn new(provider: Arc<dyn TableProvider>) -> Self {
Self { provider }
}

pub fn as_table(&self) -> PyDataFusionResult<PyTable> {
let table_provider: Arc<dyn TableProvider> = self.provider.clone();
Ok(PyTable::new(table_provider))
}
}

/// A PyDataFrame is a representation of a logical plan and an API to compose statements.
/// Use it to build a plan and `.collect()` to execute the plan and collect the result.
/// The actual execution of a plan runs natively on Rust and Arrow on a multi-threaded environment.
Expand Down Expand Up @@ -89,6 +107,17 @@ impl PyDataFrame {
}
}

/// Convert this DataFrame into a Table that can be used in register_table
fn into_view(&self) -> PyDataFusionResult<PyTable> {
// Call the underlying Rust DataFrame::into_view method.
// Note that the Rust method consumes self; here we clone the inner Arc<DataFrame>
// so that we don’t invalidate this PyDataFrame.
let table_provider = self.df.as_ref().clone().into_view();
let table_provider = PyTableProvider::new(table_provider);

Ok(table_provider.as_table()?)
}

fn __repr__(&self, py: Python) -> PyDataFusionResult<String> {
let df = self.df.as_ref().clone().limit(0, Some(10))?;
let batches = wait_for_future(py, df.collect())?;
Expand Down
Loading