From 9d589a25f2b2bc4b56f192c2d8fc6ce9ba2c86c2 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 6 Feb 2025 12:01:32 +0800 Subject: [PATCH 01/20] add test_view --- python/tests/test_view.py | 44 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 python/tests/test_view.py diff --git a/python/tests/test_view.py b/python/tests/test_view.py new file mode 100644 index 000000000..14306a37c --- /dev/null +++ b/python/tests/test_view.py @@ -0,0 +1,44 @@ +""" +This script demonstrates how to register a filtered DataFrame as a table +using DataFusion's `ctx.register_table` method and then query it. +""" + +from datafusion import SessionContext, col, literal +import pyarrow as pa +import pytest + +def test_register_filtered_dataframe(): + # Create a new session context + ctx = SessionContext() + + # Create sample data as a dictionary + data = { + "a": [1, 2, 3, 4, 5], + "b": [10, 20, 30, 40, 50] + } + + # Create a DataFrame from the dictionary + df = ctx.from_pydict(data, "my_table") + + # Filter the DataFrame (for example, keep rows where a > 2) + df_filtered = df.filter(col("a") > literal(2)) + + # Register the filtered DataFrame as a table called "view1" + ctx.register_table("view1", df_filtered) + + # Now run a SQL query against the registered table "view1" + df_view = ctx.sql("SELECT * FROM view1") + + # Collect the results (as a list of Arrow RecordBatches) + results = df_view.collect() + + # Convert results to a list of dictionaries for easier assertion + result_dicts = [batch.to_pydict() for batch in results] + + # Expected results + expected_results = [ + {"a": [3, 4, 5], "b": [30, 40, 50]} + ] + + # Assert the results match the expected results + assert result_dicts == expected_results From 648c1854901ff8e71d18cb8a2c6dedcc91ea3b54 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 6 Feb 2025 14:48:58 +0800 Subject: [PATCH 02/20] feat: add into_view method to register DataFrame as a view --- python/datafusion/context.py | 2 +- python/datafusion/dataframe.py | 6 ++++++ python/tests/test_view.py | 1 + src/dataframe.rs | 1 + 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 864ef1c8b..29f9efdbe 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -636,7 +636,7 @@ def from_pylist( def from_pydict( self, data: dict[str, list[Any]], name: str | None = None - ) -> DataFrame: + ) -> DataFramee """Create a :py:class:`~datafusion.dataframe.DataFrame` from a dictionary. Args: diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 7413a5fa3..a9abfa3bb 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -121,6 +121,12 @@ def __init__(self, df: DataFrameInternal) -> None: """ self.df = df + def into_view(self) -> pa.Table: + """Register this DataFrame as a view in the current session. + + """ + return self.df.into_view() + def __getitem__(self, key: str | List[str]) -> DataFrame: """Return a new :py:class`DataFrame` with the specified column or columns. diff --git a/python/tests/test_view.py b/python/tests/test_view.py index 14306a37c..4937f3f3b 100644 --- a/python/tests/test_view.py +++ b/python/tests/test_view.py @@ -22,6 +22,7 @@ def test_register_filtered_dataframe(): # Filter the DataFrame (for example, keep rows where a > 2) df_filtered = df.filter(col("a") > literal(2)) + df_filtered = df_filtered.into_view() # Register the filtered DataFrame as a table called "view1" ctx.register_table("view1", df_filtered) diff --git a/src/dataframe.rs b/src/dataframe.rs index 6fb08ba25..c7e756796 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -33,6 +33,7 @@ use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::execution::SendableRecordBatchStream; use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel}; use datafusion::prelude::*; +use datafusion::sql::sqlparser::ast::Table; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; From e55ac9ff16c759e5f408c1ed4f7b984775d51ea9 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 6 Feb 2025 15:28:41 +0800 Subject: [PATCH 03/20] add pytableprovider --- python/datafusion/context.py | 2 +- python/tests/test_view.py | 4 ++-- src/dataframe.rs | 26 +++++++++++++++++++++++++- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 29f9efdbe..864ef1c8b 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -636,7 +636,7 @@ def from_pylist( def from_pydict( self, data: dict[str, list[Any]], name: str | None = None - ) -> DataFramee + ) -> DataFrame: """Create a :py:class:`~datafusion.dataframe.DataFrame` from a dictionary. Args: diff --git a/python/tests/test_view.py b/python/tests/test_view.py index 4937f3f3b..09a1b12c2 100644 --- a/python/tests/test_view.py +++ b/python/tests/test_view.py @@ -22,10 +22,10 @@ def test_register_filtered_dataframe(): # Filter the DataFrame (for example, keep rows where a > 2) df_filtered = df.filter(col("a") > literal(2)) - df_filtered = df_filtered.into_view() + view = df_filtered.into_view() # Register the filtered DataFrame as a table called "view1" - ctx.register_table("view1", df_filtered) + ctx.register_table("view1", view) # Now run a SQL query against the registered table "view1" df_view = ctx.sql("SELECT * FROM view1") diff --git a/src/dataframe.rs b/src/dataframe.rs index c7e756796..b083152ec 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -30,10 +30,10 @@ use datafusion::arrow::util::pretty; use datafusion::common::UnnestOptions; use datafusion::config::{CsvOptions, TableParquetOptions}; use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; +use datafusion::datasource::TableProvider; use datafusion::execution::SendableRecordBatchStream; use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel}; use datafusion::prelude::*; -use datafusion::sql::sqlparser::ast::Table; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; @@ -51,6 +51,21 @@ use crate::{ expr::{sort_expr::PySortExpr, PyExpr}, }; +#[pyclass(name = "TableProvider", module = "datafusion")] +pub struct PyTableProvider { + provider: Arc, +} + +impl PyTableProvider { + pub fn new(provider: Arc) -> Self { + Self { provider } + } + + pub fn get_provider(&self) -> Arc { + self.provider.clone() + } +} + /// A PyDataFrame is a representation of a logical plan and an API to compose statements. /// Use it to build a plan and `.collect()` to execute the plan and collect the result. /// The actual execution of a plan runs natively on Rust and Arrow on a multi-threaded environment. @@ -90,6 +105,15 @@ impl PyDataFrame { } } + /// Convert this DataFrame into a view (i.e. a TableProvider) that can be registered. + fn into_view(&self) -> PyDataFusionResult { + // Call the underlying Rust DataFrame::into_view method. + // Note that the Rust method consumes self; here we clone the inner Arc + // so that we don’t invalidate this PyDataFrame. + let table_provider = self.df.as_ref().clone().into_view(); + Ok(PyTableProvider::new(table_provider)) + } + fn __repr__(&self, py: Python) -> PyDataFusionResult { let df = self.df.as_ref().clone().limit(0, Some(10))?; let batches = wait_for_future(py, df.collect())?; From ca424495cc723040becf54d2294b60d3bdd34798 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 6 Feb 2025 17:42:46 +0800 Subject: [PATCH 04/20] feat: add as_table method to PyTableProvider and update into_view to return PyTable --- python/tests/test_view.py | 10 ++++------ src/dataframe.rs | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/python/tests/test_view.py b/python/tests/test_view.py index 09a1b12c2..eccd492cf 100644 --- a/python/tests/test_view.py +++ b/python/tests/test_view.py @@ -1,8 +1,3 @@ -""" -This script demonstrates how to register a filtered DataFrame as a table -using DataFusion's `ctx.register_table` method and then query it. -""" - from datafusion import SessionContext, col, literal import pyarrow as pa import pytest @@ -23,7 +18,8 @@ def test_register_filtered_dataframe(): # Filter the DataFrame (for example, keep rows where a > 2) df_filtered = df.filter(col("a") > literal(2)) view = df_filtered.into_view() - + + # Register the filtered DataFrame as a table called "view1" ctx.register_table("view1", view) @@ -43,3 +39,5 @@ def test_register_filtered_dataframe(): # Assert the results match the expected results assert result_dicts == expected_results + + assert view.kind == "view" diff --git a/src/dataframe.rs b/src/dataframe.rs index b083152ec..4a45a521f 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -40,6 +40,7 @@ use pyo3::pybacked::PyBackedStr; use pyo3::types::{PyCapsule, PyTuple, PyTupleMethods}; use tokio::task::JoinHandle; +use crate::catalog::PyTable; use crate::errors::{py_datafusion_err, PyDataFusionError}; use crate::expr::sort_expr::to_sort_expressions; use crate::physical_plan::PyExecutionPlan; @@ -64,6 +65,15 @@ impl PyTableProvider { pub fn get_provider(&self) -> Arc { self.provider.clone() } + + /// Convert this TableProvider into a concrete Table wrapper, if possible. + pub fn as_table(&self) -> PyDataFusionResult { + // Here, you’d write the logic to convert your inner Arc + // to a PyTable (which is a Python–exposed concrete table type). + let table: Arc = self.provider.clone(); + // Convert the trait object into your PyTable wrapper (if you have one) + Ok(PyTable::new(table)) + } } /// A PyDataFrame is a representation of a logical plan and an API to compose statements. @@ -106,12 +116,14 @@ impl PyDataFrame { } /// Convert this DataFrame into a view (i.e. a TableProvider) that can be registered. - fn into_view(&self) -> PyDataFusionResult { + fn into_view(&self) -> PyDataFusionResult { // Call the underlying Rust DataFrame::into_view method. // Note that the Rust method consumes self; here we clone the inner Arc // so that we don’t invalidate this PyDataFrame. let table_provider = self.df.as_ref().clone().into_view(); - Ok(PyTableProvider::new(table_provider)) + let table_provider = PyTableProvider::new(table_provider); + + Ok(table_provider.as_table()?) } fn __repr__(&self, py: Python) -> PyDataFusionResult { From d0c3163a6ed5473a0c6f30b4d4058518260737af Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 6 Feb 2025 18:00:35 +0800 Subject: [PATCH 05/20] refactor: simplify as_table method and update documentation for into_view --- src/dataframe.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 4a45a521f..77695399b 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -62,17 +62,9 @@ impl PyTableProvider { Self { provider } } - pub fn get_provider(&self) -> Arc { - self.provider.clone() - } - - /// Convert this TableProvider into a concrete Table wrapper, if possible. pub fn as_table(&self) -> PyDataFusionResult { - // Here, you’d write the logic to convert your inner Arc - // to a PyTable (which is a Python–exposed concrete table type). - let table: Arc = self.provider.clone(); - // Convert the trait object into your PyTable wrapper (if you have one) - Ok(PyTable::new(table)) + let table_provider: Arc = self.provider.clone(); + Ok(PyTable::new(table_provider)) } } @@ -115,7 +107,7 @@ impl PyDataFrame { } } - /// Convert this DataFrame into a view (i.e. a TableProvider) that can be registered. + /// Convert this DataFrame into a Table that can be used in register_table fn into_view(&self) -> PyDataFusionResult { // Call the underlying Rust DataFrame::into_view method. // Note that the Rust method consumes self; here we clone the inner Arc From 8578713a58ebc191783fa5db87e2ec4d4a658369 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 6 Feb 2025 18:04:29 +0800 Subject: [PATCH 06/20] test: improve test_register_filtered_dataframe by removing redundant comments and assertions --- python/tests/test_view.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/python/tests/test_view.py b/python/tests/test_view.py index eccd492cf..6263bcf52 100644 --- a/python/tests/test_view.py +++ b/python/tests/test_view.py @@ -3,41 +3,31 @@ import pytest def test_register_filtered_dataframe(): - # Create a new session context ctx = SessionContext() - # Create sample data as a dictionary data = { "a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50] } - # Create a DataFrame from the dictionary df = ctx.from_pydict(data, "my_table") - # Filter the DataFrame (for example, keep rows where a > 2) df_filtered = df.filter(col("a") > literal(2)) view = df_filtered.into_view() + assert view.kind == "view" - # Register the filtered DataFrame as a table called "view1" ctx.register_table("view1", view) - # Now run a SQL query against the registered table "view1" df_view = ctx.sql("SELECT * FROM view1") - # Collect the results (as a list of Arrow RecordBatches) results = df_view.collect() - # Convert results to a list of dictionaries for easier assertion result_dicts = [batch.to_pydict() for batch in results] - # Expected results expected_results = [ {"a": [3, 4, 5], "b": [30, 40, 50]} ] - # Assert the results match the expected results assert result_dicts == expected_results - assert view.kind == "view" From 9cdd0dcb73b2354221b3cca7cbd5e14c9bd27f0c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 6 Feb 2025 18:07:44 +0800 Subject: [PATCH 07/20] test: enhance test_register_filtered_dataframe with additional assertions for DataFrame results --- python/tests/test_view.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/python/tests/test_view.py b/python/tests/test_view.py index 6263bcf52..7aa656fa2 100644 --- a/python/tests/test_view.py +++ b/python/tests/test_view.py @@ -21,9 +21,9 @@ def test_register_filtered_dataframe(): df_view = ctx.sql("SELECT * FROM view1") - results = df_view.collect() + filtered_results = df_view.collect() - result_dicts = [batch.to_pydict() for batch in results] + result_dicts = [batch.to_pydict() for batch in filtered_results] expected_results = [ {"a": [3, 4, 5], "b": [30, 40, 50]} @@ -31,3 +31,13 @@ def test_register_filtered_dataframe(): assert result_dicts == expected_results + df_results = df.collect() + + df_result_dicts = [batch.to_pydict() for batch in df_results] + + expected_df_results = [ + {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]} + ] + + assert df_result_dicts == expected_df_results + From c207b6cd19524f2570bab4ee7a1ef83cda0b5436 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 6 Feb 2025 18:15:39 +0800 Subject: [PATCH 08/20] ruff formatted --- python/datafusion/dataframe.py | 6 ++---- python/tests/test_view.py | 21 +++++++-------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index a9abfa3bb..abe83e5c3 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -122,11 +122,9 @@ def __init__(self, df: DataFrameInternal) -> None: self.df = df def into_view(self) -> pa.Table: - """Register this DataFrame as a view in the current session. - - """ + """Register this DataFrame as a view in the current session.""" return self.df.into_view() - + def __getitem__(self, key: str | List[str]) -> DataFrame: """Return a new :py:class`DataFrame` with the specified column or columns. diff --git a/python/tests/test_view.py b/python/tests/test_view.py index 7aa656fa2..175bcecd0 100644 --- a/python/tests/test_view.py +++ b/python/tests/test_view.py @@ -2,21 +2,19 @@ import pyarrow as pa import pytest + def test_register_filtered_dataframe(): ctx = SessionContext() - data = { - "a": [1, 2, 3, 4, 5], - "b": [10, 20, 30, 40, 50] - } + data = {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]} df = ctx.from_pydict(data, "my_table") df_filtered = df.filter(col("a") > literal(2)) view = df_filtered.into_view() - + assert view.kind == "view" - + ctx.register_table("view1", view) df_view = ctx.sql("SELECT * FROM view1") @@ -25,19 +23,14 @@ def test_register_filtered_dataframe(): result_dicts = [batch.to_pydict() for batch in filtered_results] - expected_results = [ - {"a": [3, 4, 5], "b": [30, 40, 50]} - ] + expected_results = [{"a": [3, 4, 5], "b": [30, 40, 50]}] assert result_dicts == expected_results - + df_results = df.collect() df_result_dicts = [batch.to_pydict() for batch in df_results] - expected_df_results = [ - {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]} - ] + expected_df_results = [{"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]}] assert df_result_dicts == expected_df_results - From 20dbfe880f0c2b0f47b6ced70941bea33a6e520c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 6 Feb 2025 18:18:11 +0800 Subject: [PATCH 09/20] cleanup: remove unused imports from test_view.py --- python/tests/test_view.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/tests/test_view.py b/python/tests/test_view.py index 175bcecd0..1b4847210 100644 --- a/python/tests/test_view.py +++ b/python/tests/test_view.py @@ -1,6 +1,4 @@ from datafusion import SessionContext, col, literal -import pyarrow as pa -import pytest def test_register_filtered_dataframe(): From 4b4c641c93893016089d87090c35d9747358e2aa Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 7 Feb 2025 09:43:00 +0800 Subject: [PATCH 10/20] docs: add example for registering a DataFrame as a view in README.md --- README.md | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/README.md b/README.md index 5aaf7f5f3..0a6cc9c84 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,49 @@ This produces the following chart: ![Chart](examples/chart.png) +## Registering a DataFrame as a View + +You can use the `into_view` method to convert a DataFrame into a view and register it with the context. + +```python +from datafusion import SessionContext, col, literal + +# Create a DataFusion context +ctx = SessionContext() + +# Create sample data +data = {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]} + +# Create a DataFrame from the dictionary +df = ctx.from_pydict(data, "my_table") + +# Filter the DataFrame (for example, keep rows where a > 2) +df_filtered = df.filter(col("a") > literal(2)) + +# Convert the filtered DataFrame into a view +view = df_filtered.into_view() + +# Register the view with the context +ctx.register_table("view1", view) + +# Now run a SQL query against the registered view +df_view = ctx.sql("SELECT * FROM view1") + +# Collect the results +results = df_view.collect() + +# Convert results to a list of dictionaries for display +result_dicts = [batch.to_pydict() for batch in results] + +print(result_dicts) +``` + +This will output: + +```python +[{'a': [3, 4, 5], 'b': [30, 40, 50]}] +``` + ## Configuration It is possible to configure runtime (memory and disk settings) and configuration settings when creating a context. From 12c4fe3d042ef3fa31d29c930180d4e40f373aa3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 7 Feb 2025 09:50:02 +0800 Subject: [PATCH 11/20] docs: update docstring for into_view method to clarify usage as ViewTable --- python/datafusion/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index abe83e5c3..fc7d220a5 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -122,7 +122,7 @@ def __init__(self, df: DataFrameInternal) -> None: self.df = df def into_view(self) -> pa.Table: - """Register this DataFrame as a view in the current session.""" + """Register this DataFrame as a ViewTable which can be used in register_table""" return self.df.into_view() def __getitem__(self, key: str | List[str]) -> DataFrame: From 15ead1f71509ed548821690af93a284e3bda51ba Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 7 Feb 2025 09:53:14 +0800 Subject: [PATCH 12/20] chore: add license header to test_view.py --- python/tests/test_view.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/python/tests/test_view.py b/python/tests/test_view.py index 1b4847210..f471c1124 100644 --- a/python/tests/test_view.py +++ b/python/tests/test_view.py @@ -1,3 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + from datafusion import SessionContext, col, literal From 48eb8db46f0b14cbae29f79cb625e33aab29c506 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 7 Feb 2025 09:56:10 +0800 Subject: [PATCH 13/20] ruff correction --- python/datafusion/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index fc7d220a5..39f415f5c 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -122,7 +122,7 @@ def __init__(self, df: DataFrameInternal) -> None: self.df = df def into_view(self) -> pa.Table: - """Register this DataFrame as a ViewTable which can be used in register_table""" + """Convert DataFrame as a ViewTable which can be used in register_table.""" return self.df.into_view() def __getitem__(self, key: str | List[str]) -> DataFrame: From f73eebb5cb6d7a5ba210f31956ae5c653c4e2456 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 7 Feb 2025 11:18:09 +0800 Subject: [PATCH 14/20] refactor: rename into_view method to _into_view --- python/datafusion/dataframe.py | 6 +++++- src/dataframe.rs | 29 ++++++++++++++++------------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 39f415f5c..87bf268f4 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -123,7 +123,11 @@ def __init__(self, df: DataFrameInternal) -> None: def into_view(self) -> pa.Table: """Convert DataFrame as a ViewTable which can be used in register_table.""" - return self.df.into_view() + return self._into_view() + + def _into_view(self) -> pa.Table: + """Convert DataFrame as a ViewTable which can be used in register_table.""" + return self.df._into_view() def __getitem__(self, key: str | List[str]) -> DataFrame: """Return a new :py:class`DataFrame` with the specified column or columns. diff --git a/src/dataframe.rs b/src/dataframe.rs index 77695399b..7e77ecd5a 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -62,9 +62,9 @@ impl PyTableProvider { Self { provider } } - pub fn as_table(&self) -> PyDataFusionResult { + pub fn as_table(&self) -> PyTable { let table_provider: Arc = self.provider.clone(); - Ok(PyTable::new(table_provider)) + PyTable::new(table_provider) } } @@ -107,17 +107,6 @@ impl PyDataFrame { } } - /// Convert this DataFrame into a Table that can be used in register_table - fn into_view(&self) -> PyDataFusionResult { - // Call the underlying Rust DataFrame::into_view method. - // Note that the Rust method consumes self; here we clone the inner Arc - // so that we don’t invalidate this PyDataFrame. - let table_provider = self.df.as_ref().clone().into_view(); - let table_provider = PyTableProvider::new(table_provider); - - Ok(table_provider.as_table()?) - } - fn __repr__(&self, py: Python) -> PyDataFusionResult { let df = self.df.as_ref().clone().limit(0, Some(10))?; let batches = wait_for_future(py, df.collect())?; @@ -185,6 +174,20 @@ impl PyDataFrame { PyArrowType(self.df.schema().into()) } + /// Convert this DataFrame into a Table that can be used in register_table + fn _into_view(&self) -> PyDataFusionResult { + // Call the underlying Rust DataFrame::into_view method. + // Note that the Rust method consumes self; here we clone the inner Arc + // so that we don’t invalidate this PyDataFrame. + // _into_view because clippy says `into_*` usually take `self` by value + // but we cannot own self because Python objects are shared, + // so 'self' cannot be moved out of the Python interpreter + let table_provider = self.df.as_ref().clone().into_view(); + let table_provider = PyTableProvider::new(table_provider); + + Ok(table_provider.as_table()) + } + #[pyo3(signature = (*args))] fn select_columns(&self, args: Vec) -> PyDataFusionResult { let args = args.iter().map(|s| s.as_ref()).collect::>(); From 6bba2e2a498a7ed323fa15ac7e4b2d2a90dd9f63 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 12 Feb 2025 13:01:14 +0800 Subject: [PATCH 15/20] ruff lint --- python/datafusion/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 87bf268f4..151868607 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -124,7 +124,7 @@ def __init__(self, df: DataFrameInternal) -> None: def into_view(self) -> pa.Table: """Convert DataFrame as a ViewTable which can be used in register_table.""" return self._into_view() - + def _into_view(self) -> pa.Table: """Convert DataFrame as a ViewTable which can be used in register_table.""" return self.df._into_view() From 7b0cbf1cb9a336aaac9d52d73d124a652522d326 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 18 Feb 2025 12:41:27 +0800 Subject: [PATCH 16/20] refactor: simplify into_view method and update Rust binding convention --- python/datafusion/dataframe.py | 6 +----- src/dataframe.rs | 10 ++++++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 151868607..39f415f5c 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -123,11 +123,7 @@ def __init__(self, df: DataFrameInternal) -> None: def into_view(self) -> pa.Table: """Convert DataFrame as a ViewTable which can be used in register_table.""" - return self._into_view() - - def _into_view(self) -> pa.Table: - """Convert DataFrame as a ViewTable which can be used in register_table.""" - return self.df._into_view() + return self.df.into_view() def __getitem__(self, key: str | List[str]) -> DataFrame: """Return a new :py:class`DataFrame` with the specified column or columns. diff --git a/src/dataframe.rs b/src/dataframe.rs index 7e77ecd5a..faf97f089 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -175,13 +175,15 @@ impl PyDataFrame { } /// Convert this DataFrame into a Table that can be used in register_table - fn _into_view(&self) -> PyDataFusionResult { + /// By convention, into_... methods consume self and return the new object. + /// Disabling the clippy lint, so we can use &self + /// because we're working with Python bindings + /// where objects are shared + #[allow(clippy::wrong_self_convention)] + fn into_view(&self) -> PyDataFusionResult { // Call the underlying Rust DataFrame::into_view method. // Note that the Rust method consumes self; here we clone the inner Arc // so that we don’t invalidate this PyDataFrame. - // _into_view because clippy says `into_*` usually take `self` by value - // but we cannot own self because Python objects are shared, - // so 'self' cannot be moved out of the Python interpreter let table_provider = self.df.as_ref().clone().into_view(); let table_provider = PyTableProvider::new(table_provider); From f594b46ed6e2d88e97a7bb174f781c3700736186 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 18 Feb 2025 18:46:05 +0800 Subject: [PATCH 17/20] docs: add views section to user guide with example on registering views --- .../user-guide/common-operations/index.rst | 1 + .../user-guide/common-operations/views.rst | 61 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 docs/source/user-guide/common-operations/views.rst diff --git a/docs/source/user-guide/common-operations/index.rst b/docs/source/user-guide/common-operations/index.rst index d7c708c21..7abd1f138 100644 --- a/docs/source/user-guide/common-operations/index.rst +++ b/docs/source/user-guide/common-operations/index.rst @@ -23,6 +23,7 @@ The contents of this section are designed to guide a new user through how to use .. toctree:: :maxdepth: 2 + views basic-info select-and-filter expressions diff --git a/docs/source/user-guide/common-operations/views.rst b/docs/source/user-guide/common-operations/views.rst new file mode 100644 index 000000000..3c360c89d --- /dev/null +++ b/docs/source/user-guide/common-operations/views.rst @@ -0,0 +1,61 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +====================== +Registering Views +====================== + +You can use the ``into_view`` method to convert a DataFrame into a view and register it with the context. + +.. code-block:: python + + from datafusion import SessionContext, col, literal + + # Create a DataFusion context + ctx = SessionContext() + + # Create sample data + data = {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]} + + # Create a DataFrame from the dictionary + df = ctx.from_pydict(data, "my_table") + + # Filter the DataFrame (for example, keep rows where a > 2) + df_filtered = df.filter(col("a") > literal(2)) + + # Convert the filtered DataFrame into a view + view = df_filtered.into_view() + + # Register the view with the context + ctx.register_table("view1", view) + + # Now run a SQL query against the registered view + df_view = ctx.sql("SELECT * FROM view1") + + # Collect the results + results = df_view.collect() + + # Convert results to a list of dictionaries for display + result_dicts = [batch.to_pydict() for batch in results] + + print(result_dicts) + +This will output: + +.. code-block:: python + + [{'a': [3, 4, 5], 'b': [30, 40, 50]}] From 90a6a8b50ec5936af65fb4df654e826ec2f3657d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 7 Mar 2025 11:41:31 +0800 Subject: [PATCH 18/20] feat: add register_view method to SessionContext for DataFrame registration --- python/datafusion/context.py | 12 ++++++++++++ python/tests/test_view.py | 5 +---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 864ef1c8b..39bc1a7dd 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -704,6 +704,18 @@ def from_polars(self, data: polars.DataFrame, name: str | None = None) -> DataFr """ return DataFrame(self.ctx.from_polars(data, name)) + # https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116 + # is the discussion on how we arrived at adding register_view + def register_view(self, name: str, df: DataFrame): + """Register a :py:class: `~datafusion.detaframe.DataFrame` as a view. + + Args: + name (str): The name to register the view under. + df (DataFrame): The DataFrame to be converted into a view and registered. + """ + view = df.into_view() + self.ctx.register_table(name, view) + def register_table(self, name: str, table: Table) -> None: """Register a :py:class: `~datafusion.catalog.Table` as a table. diff --git a/python/tests/test_view.py b/python/tests/test_view.py index f471c1124..1d92cc0d4 100644 --- a/python/tests/test_view.py +++ b/python/tests/test_view.py @@ -27,11 +27,8 @@ def test_register_filtered_dataframe(): df = ctx.from_pydict(data, "my_table") df_filtered = df.filter(col("a") > literal(2)) - view = df_filtered.into_view() - assert view.kind == "view" - - ctx.register_table("view1", view) + ctx.register_view("view1", df_filtered) df_view = ctx.sql("SELECT * FROM view1") From f0837deceb4d76df878167406855f71e3d8578d4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 7 Mar 2025 11:59:33 +0800 Subject: [PATCH 19/20] docs: update README and user guide to reflect register_view method for DataFrame registration --- README.md | 9 +-- .../user-guide/common-operations/views.rst | 9 +-- src/dataframe.rs | 56 +++++++++++++++++++ 3 files changed, 62 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 48ea5e074..4f80dbe18 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ This produces the following chart: ## Registering a DataFrame as a View -You can use the `into_view` method to convert a DataFrame into a view and register it with the context. +You can use SessionContext's `register_view` method to convert a DataFrame into a view and register it with the context. ```python from datafusion import SessionContext, col, literal @@ -98,11 +98,8 @@ df = ctx.from_pydict(data, "my_table") # Filter the DataFrame (for example, keep rows where a > 2) df_filtered = df.filter(col("a") > literal(2)) -# Convert the filtered DataFrame into a view -view = df_filtered.into_view() - -# Register the view with the context -ctx.register_table("view1", view) +# Register the dataframe as a view with the context +ctx.register_view("view1", df_filtered) # Now run a SQL query against the registered view df_view = ctx.sql("SELECT * FROM view1") diff --git a/docs/source/user-guide/common-operations/views.rst b/docs/source/user-guide/common-operations/views.rst index 3c360c89d..df11e3abe 100644 --- a/docs/source/user-guide/common-operations/views.rst +++ b/docs/source/user-guide/common-operations/views.rst @@ -19,7 +19,7 @@ Registering Views ====================== -You can use the ``into_view`` method to convert a DataFrame into a view and register it with the context. +You can use the context's ``register_view`` method to register a DataFrame as a view .. code-block:: python @@ -37,11 +37,8 @@ You can use the ``into_view`` method to convert a DataFrame into a view and regi # Filter the DataFrame (for example, keep rows where a > 2) df_filtered = df.filter(col("a") > literal(2)) - # Convert the filtered DataFrame into a view - view = df_filtered.into_view() - - # Register the view with the context - ctx.register_table("view1", view) + # Register the dataframe as a view with the context + ctx.register_view("view1", df_filtered) # Now run a SQL query against the registered view df_view = ctx.sql("SELECT * FROM view1") diff --git a/src/dataframe.rs b/src/dataframe.rs index 6ca8f2511..37c4338ad 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -52,6 +52,9 @@ use crate::{ expr::{sort_expr::PySortExpr, PyExpr}, }; +// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116 +// - we have not decided on the table_provider approach yet +// this is an interim implementation #[pyclass(name = "TableProvider", module = "datafusion")] pub struct PyTableProvider { provider: Arc, @@ -71,6 +74,57 @@ impl PyTableProvider { /// A PyDataFrame is a representation of a logical plan and an API to compose statements. /// Use it to build a plan and `.collect()` to execute the plan and collect the result. /// The actual execution of a plan runs natively on Rust and Arrow on a multi-threaded environment. +/// +/// # Methods +/// +/// - `new`: Creates a new PyDataFrame. +/// - `__getitem__`: Enable selection for `df[col]`, `df[col1, col2, col3]`, and `df[[col1, col2, col3]]`. +/// - `__repr__`: Returns a string representation of the DataFrame. +/// - `_repr_html_`: Returns an HTML representation of the DataFrame. +/// - `describe`: Calculate summary statistics for a DataFrame. +/// - `schema`: Returns the schema from the logical plan. +/// - `into_view`: Convert this DataFrame into a Table that can be used in register_table. We have not finalized on PyTableProvider approach yet. +/// - `select_columns`: Select columns from the DataFrame. +/// - `select`: Select expressions from the DataFrame. +/// - `drop`: Drop columns from the DataFrame. +/// - `filter`: Filter the DataFrame based on a predicate. +/// - `with_column`: Add a new column to the DataFrame. +/// - `with_columns`: Add multiple new columns to the DataFrame. +/// - `with_column_renamed`: Rename a column in the DataFrame. +/// - `aggregate`: Aggregate the DataFrame based on group by and aggregation expressions. +/// - `sort`: Sort the DataFrame based on expressions. +/// - `limit`: Limit the number of rows in the DataFrame. +/// - `collect`: Executes the plan, returning a list of `RecordBatch`es. +/// - `cache`: Cache the DataFrame. +/// - `collect_partitioned`: Executes the DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning. +/// - `show`: Print the result, 20 lines by default. +/// - `distinct`: Filter out duplicate rows. +/// - `join`: Join two DataFrames. +/// - `join_on`: Join two DataFrames based on expressions. +/// - `explain`: Print the query plan. +/// - `logical_plan`: Get the logical plan for this DataFrame. +/// - `optimized_logical_plan`: Get the optimized logical plan for this DataFrame. +/// - `execution_plan`: Get the execution plan for this DataFrame. +/// - `repartition`: Repartition the DataFrame based on a logical partitioning scheme. +/// - `repartition_by_hash`: Repartition the DataFrame based on a hash partitioning scheme. +/// - `union`: Calculate the union of two DataFrames, preserving duplicate rows. +/// - `union_distinct`: Calculate the distinct union of two DataFrames. +/// - `unnest_column`: Unnest a column in the DataFrame. +/// - `unnest_columns`: Unnest multiple columns in the DataFrame. +/// - `intersect`: Calculate the intersection of two DataFrames. +/// - `except_all`: Calculate the exception of two DataFrames. +/// - `write_csv`: Write the DataFrame to a CSV file. +/// - `write_parquet`: Write the DataFrame to a Parquet file. +/// - `write_json`: Write the DataFrame to a JSON file. +/// - `to_arrow_table`: Convert the DataFrame to an Arrow Table. +/// - `__arrow_c_stream__`: Convert the DataFrame to an Arrow C Stream. +/// - `execute_stream`: Execute the DataFrame and return a RecordBatchStream. +/// - `execute_stream_partitioned`: Execute the DataFrame and return partitioned RecordBatchStreams. +/// - `to_pandas`: Convert the DataFrame to a Pandas DataFrame. +/// - `to_pylist`: Convert the DataFrame to a Python list. +/// - `to_pydict`: Convert the DataFrame to a Python dictionary. +/// - `to_polars`: Convert the DataFrame to a Polars DataFrame. +/// - `count`: Execute the DataFrame to get the total number of rows. #[pyclass(name = "DataFrame", module = "datafusion", subclass)] #[derive(Clone)] pub struct PyDataFrame { @@ -179,6 +233,8 @@ impl PyDataFrame { /// Disabling the clippy lint, so we can use &self /// because we're working with Python bindings /// where objects are shared + /// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116 + /// - we have not decided on the table_provider approach yet #[allow(clippy::wrong_self_convention)] fn into_view(&self) -> PyDataFusionResult { // Call the underlying Rust DataFrame::into_view method. From 9d8cdb544fbfcc63271ce12ede419de50b61989c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 7 Mar 2025 23:05:09 +0800 Subject: [PATCH 20/20] docs: remove some documentation from PyDataFrame --- src/dataframe.rs | 51 ------------------------------------------------ 1 file changed, 51 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 37c4338ad..243e2e14f 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -74,57 +74,6 @@ impl PyTableProvider { /// A PyDataFrame is a representation of a logical plan and an API to compose statements. /// Use it to build a plan and `.collect()` to execute the plan and collect the result. /// The actual execution of a plan runs natively on Rust and Arrow on a multi-threaded environment. -/// -/// # Methods -/// -/// - `new`: Creates a new PyDataFrame. -/// - `__getitem__`: Enable selection for `df[col]`, `df[col1, col2, col3]`, and `df[[col1, col2, col3]]`. -/// - `__repr__`: Returns a string representation of the DataFrame. -/// - `_repr_html_`: Returns an HTML representation of the DataFrame. -/// - `describe`: Calculate summary statistics for a DataFrame. -/// - `schema`: Returns the schema from the logical plan. -/// - `into_view`: Convert this DataFrame into a Table that can be used in register_table. We have not finalized on PyTableProvider approach yet. -/// - `select_columns`: Select columns from the DataFrame. -/// - `select`: Select expressions from the DataFrame. -/// - `drop`: Drop columns from the DataFrame. -/// - `filter`: Filter the DataFrame based on a predicate. -/// - `with_column`: Add a new column to the DataFrame. -/// - `with_columns`: Add multiple new columns to the DataFrame. -/// - `with_column_renamed`: Rename a column in the DataFrame. -/// - `aggregate`: Aggregate the DataFrame based on group by and aggregation expressions. -/// - `sort`: Sort the DataFrame based on expressions. -/// - `limit`: Limit the number of rows in the DataFrame. -/// - `collect`: Executes the plan, returning a list of `RecordBatch`es. -/// - `cache`: Cache the DataFrame. -/// - `collect_partitioned`: Executes the DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning. -/// - `show`: Print the result, 20 lines by default. -/// - `distinct`: Filter out duplicate rows. -/// - `join`: Join two DataFrames. -/// - `join_on`: Join two DataFrames based on expressions. -/// - `explain`: Print the query plan. -/// - `logical_plan`: Get the logical plan for this DataFrame. -/// - `optimized_logical_plan`: Get the optimized logical plan for this DataFrame. -/// - `execution_plan`: Get the execution plan for this DataFrame. -/// - `repartition`: Repartition the DataFrame based on a logical partitioning scheme. -/// - `repartition_by_hash`: Repartition the DataFrame based on a hash partitioning scheme. -/// - `union`: Calculate the union of two DataFrames, preserving duplicate rows. -/// - `union_distinct`: Calculate the distinct union of two DataFrames. -/// - `unnest_column`: Unnest a column in the DataFrame. -/// - `unnest_columns`: Unnest multiple columns in the DataFrame. -/// - `intersect`: Calculate the intersection of two DataFrames. -/// - `except_all`: Calculate the exception of two DataFrames. -/// - `write_csv`: Write the DataFrame to a CSV file. -/// - `write_parquet`: Write the DataFrame to a Parquet file. -/// - `write_json`: Write the DataFrame to a JSON file. -/// - `to_arrow_table`: Convert the DataFrame to an Arrow Table. -/// - `__arrow_c_stream__`: Convert the DataFrame to an Arrow C Stream. -/// - `execute_stream`: Execute the DataFrame and return a RecordBatchStream. -/// - `execute_stream_partitioned`: Execute the DataFrame and return partitioned RecordBatchStreams. -/// - `to_pandas`: Convert the DataFrame to a Pandas DataFrame. -/// - `to_pylist`: Convert the DataFrame to a Python list. -/// - `to_pydict`: Convert the DataFrame to a Python dictionary. -/// - `to_polars`: Convert the DataFrame to a Polars DataFrame. -/// - `count`: Execute the DataFrame to get the total number of rows. #[pyclass(name = "DataFrame", module = "datafusion", subclass)] #[derive(Clone)] pub struct PyDataFrame {