diff --git a/docs/source/user-guide/common-operations/functions.rst b/docs/source/user-guide/common-operations/functions.rst index 12097be8f..ce2344697 100644 --- a/docs/source/user-guide/common-operations/functions.rst +++ b/docs/source/user-guide/common-operations/functions.rst @@ -129,3 +129,39 @@ The function :py:func:`~datafusion.functions.in_list` allows to check a column f .limit(20) .to_pandas() ) + + +Handling Missing Values +===================== + +DataFusion provides methods to handle missing values in DataFrames: + +fill_null +--------- + +The ``fill_null()`` method replaces NULL values in specified columns with a provided value: + +.. code-block:: python + + # Fill all NULL values with 0 where possible + df = df.fill_null(0) + + # Fill NULL values only in specific string columns + df = df.fill_null("missing", subset=["name", "category"]) + +The fill value will be cast to match each column's type. If casting fails for a column, that column remains unchanged. + +fill_nan +-------- + +The ``fill_nan()`` method replaces NaN values in floating-point columns with a provided numeric value: + +.. code-block:: python + + # Fill all NaN values with 0 in numeric columns + df = df.fill_nan(0) + + # Fill NaN values in specific numeric columns + df = df.fill_nan(99.9, subset=["price", "score"]) + +This only works on floating-point columns (float32, float64). The fill value must be numeric (int or float). \ No newline at end of file diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 7413a5fa3..36d81d528 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -22,6 +22,7 @@ from __future__ import annotations import warnings +from enum import Enum from typing import ( TYPE_CHECKING, Any, @@ -33,8 +34,12 @@ overload, ) +import pyarrow as pa from typing_extensions import deprecated +from datafusion import functions as f +from datafusion._internal import DataFrame as DataFrameInternal +from datafusion.expr import Expr, SortExpr, sort_or_default from datafusion.plan import ExecutionPlan, LogicalPlan from datafusion.record_batch import RecordBatchStream @@ -44,12 +49,6 @@ import pandas as pd import polars as pl - import pyarrow as pa - -from enum import Enum - -from datafusion._internal import DataFrame as DataFrameInternal -from datafusion.expr import Expr, SortExpr, sort_or_default # excerpt from deltalake @@ -853,3 +852,113 @@ def within_limit(df: DataFrame, limit: int) -> DataFrame: DataFrame: After applying func to the original dataframe. """ return func(self, *args) + + def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": + """Fill null values in specified columns with a value. + + Args: + value: Value to replace nulls with. Will be cast to match column type. + subset: Optional list of column names to fill. If None, fills all columns. + + Returns: + DataFrame with null values replaced where type casting is possible + + Examples: + >>> df = df.fill_null(0) # Fill all nulls with 0 where possible + >>> # Fill nulls in specific string columns + >>> df = df.fill_null("missing", subset=["name", "category"]) + + Notes: + - Only fills nulls in columns where the value can be cast to the column type + - For columns where casting fails, the original column is kept unchanged + - For columns not in subset, the original column is kept unchanged + """ + # Get columns to process + if subset is None: + subset = self.schema().names + else: + schema_cols = self.schema().names + for col in subset: + if col not in schema_cols: + raise ValueError(f"Column '{col}' not found in DataFrame") + + # Build expressions for select + exprs = [] + for col_name in self.schema().names: + if col_name in subset: + # Get column type + col_type = self.schema().field(col_name).type + + try: + # Try casting value to column type + typed_value = pa.scalar(value, type=col_type) + literal_expr = f.Expr.literal(typed_value) + + # Build coalesce expression + expr = f.coalesce(f.col(col_name), literal_expr) + exprs.append(expr.alias(col_name)) + + except (pa.ArrowTypeError, pa.ArrowInvalid): + # If cast fails, keep original column + exprs.append(f.col(col_name)) + else: + # Keep columns not in subset unchanged + exprs.append(f.col(col_name)) + + return self.select(*exprs) + + def fill_nan( + self, value: float | int, subset: list[str] | None = None + ) -> "DataFrame": + """Fill NaN values in specified numeric columns with a value. + + Args: + value: Numeric value to replace NaN values with. + subset: Optional list of column names to fill. If None, fills all numeric + columns. + + Returns: + DataFrame with NaN values replaced in numeric columns. + + Examples: + >>> df = df.fill_nan(0) # Fill all NaNs with 0 in numeric columns + >>> # Fill NaNs in specific numeric columns + >>> df = df.fill_nan(99.9, subset=["price", "score"]) + + Notes: + - Only fills NaN values in numeric columns (float32, float64) + - Non-numeric columns are kept unchanged + - For columns not in subset, the original column is kept unchanged + - Value must be numeric (int or float) + """ + if not isinstance(value, (int, float)): + raise ValueError("Value must be numeric (int or float)") + + # Get columns to process + if subset is None: + # Only get numeric columns if no subset specified + subset = [ + field.name + for field in self.schema() + if pa.types.is_floating(field.type) + ] + else: + schema_cols = self.schema().names + for col in subset: + if col not in schema_cols: + raise ValueError(f"Column '{col}' not found in DataFrame") + if not pa.types.is_floating(self.schema().field(col).type): + raise ValueError(f"Column '{col}' is not a numeric column") + + # Build expressions for select + exprs = [] + for col_name in self.schema().names: + if col_name in subset: + # Use nanvl function to replace NaN values + expr = f.nanvl(f.col(col_name), f.lit(value)) + exprs.append(expr.alias(col_name)) + else: + # Keep columns not in subset unchanged + exprs.append(f.col(col_name)) + + return self.select(*exprs) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 5bc3fb094..4d1d348d0 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1196,3 +1196,129 @@ def test_dataframe_repr_html(df) -> None: # Ignore whitespace just to make this test look cleaner assert output.replace(" ", "") == ref_html.replace(" ", "") + + +def test_fill_null(df): + # Test filling nulls with integer value + df_with_nulls = df.with_column("d", literal(None).cast(pa.int64())) + df_filled = df_with_nulls.fill_null(0) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] + + # Test filling nulls with string value + df_with_nulls = df.with_column("d", literal(None).cast(pa.string())) + df_filled = df_with_nulls.fill_null("missing") + result = df_filled.to_pydict() + assert result["d"] == ["missing", "missing", "missing"] + + # Test filling nulls with subset of columns + df_with_nulls = df.with_columns( + literal(None).cast(pa.int64()).alias("d"), + literal(None).cast(pa.string()).alias("e"), + ) + df_filled = df_with_nulls.fill_null("missing", subset=["e"]) + result = df_filled.to_pydict() + assert result["d"] == [None, None, None] + assert result["e"] == ["missing", "missing", "missing"] + + # Test filling nulls with value that cannot be cast to column type + df_with_nulls = df.with_column("d", literal(None)) + df_filled = df_with_nulls.fill_null("invalid") + result = df_filled.to_pydict() + assert result["d"] == [None, None, None] + + # Test filling nulls with value that can be cast to some columns but not others + df_with_nulls = df.with_columns( + literal(None).alias("d").cast(pa.int64()), + literal(None).alias("e").cast(pa.string()), + ) + df_filled = df_with_nulls.fill_null(0) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] + assert result["e"] == [None, None, None] + + # Test filling nulls with subset of columns where some casts fail + df_with_nulls = df.with_columns( + literal(None).alias("d").cast(pa.int64()), + literal(None).alias("e").cast(pa.string()), + ) + df_filled = df_with_nulls.fill_null(0, subset=["d", "e"]) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] + assert result["e"] == [None, None, None] + + # Test filling nulls with subset of columns where all casts succeed + df_with_nulls = df.with_columns( + literal(None).alias("d").cast(pa.int64()), + literal(None).alias("e").cast(pa.string()), + ) + df_filled = df_with_nulls.fill_null("missing", subset=["e"]) + result = df_filled.to_pydict() + assert result["d"] == [None, None, None] + assert result["e"] == ["missing", "missing", "missing"] + + # Test filling nulls with subset of columns where some columns do not exist + df_with_nulls = df.with_columns( + literal(None).alias("d").cast(pa.int64()), + literal(None).alias("e").cast(pa.string()), + ) + with pytest.raises(ValueError, match="Column 'f' not found in DataFrame"): + df_with_nulls.fill_null("missing", subset=["e", "f"]) + + def test_fill_nan(df): + # Test filling NaNs with integer value + df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64())) + df_filled = df_with_nans.fill_nan(0) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] + + # Test filling NaNs with float value + df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64())) + df_filled = df_with_nans.fill_nan(99.9) + result = df_filled.to_pydict() + assert result["d"] == [99.9, 99.9, 99.9] + + # Test filling NaNs with subset of columns + df_with_nans = df.with_columns( + literal(float("nan")).cast(pa.float64()).alias("d"), + literal(float("nan")).cast(pa.float64()).alias("e"), + ) + df_filled = df_with_nans.fill_nan(99.9, subset=["e"]) + result = df_filled.to_pydict() + assert result["d"] == [float("nan"), float("nan"), float("nan")] + assert result["e"] == [99.9, 99.9, 99.9] + + # Test filling NaNs with value that cannot be cast to column type + df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64())) + with pytest.raises(ValueError, match="Value must be numeric"): + df_with_nans.fill_nan("invalid") + + # Test filling NaNs with subset of columns where some casts fail + df_with_nans = df.with_columns( + literal(float("nan")).alias("d").cast(pa.float64()), + literal(float("nan")).alias("e").cast(pa.float64()), + literal("abc").alias("f").cast(pa.string()), # non-numeric column + ) + df_filled = df_with_nans.fill_nan(0, subset=["d", "e", "f"]) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] # succeeds + assert result["e"] == [0, 0, 0] # succeeds + assert result["f"] == ["abc", "abc", "abc"] # skipped because not numeric + + # Test filling NaNs fails on non-numeric columns + df_with_mixed = df.with_columns( + literal(float("nan")).alias("d").cast(pa.float64()), + literal("abc").alias("e").cast(pa.string()), + ) + with pytest.raises(ValueError, match="Column 'e' is not a numeric column"): + df_with_mixed.fill_nan(0, subset=["d", "e"]) + + # Test filling NaNs with subset of columns where all casts succeed + df_with_nans = df.with_columns( + literal(float("nan")).alias("d").cast(pa.float64()), + literal(float("nan")).alias("e").cast(pa.float64()), + ) + df_filled = df_with_nans.fill_nan(99.9, subset=["e"]) + result = df_filled.to_pydict() + assert result["d"] == [float("nan"), float("nan"), float("nan")] + assert result["e"] == [99.9, 99.9, 99.9] diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index 796b1f76e..fa87d51d4 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -1173,3 +1173,57 @@ def test_between_default(df): actual = df.collect()[0].to_pydict() assert actual == expected + + +def test_coalesce(df): + # Create a DataFrame with null values + ctx = SessionContext() + batch = pa.RecordBatch.from_arrays( + [ + pa.array(["Hello", None, "!"]), # string column with null + pa.array([4, None, 6]), # integer column with null + pa.array(["hello ", None, " !"]), # string column with null + pa.array( + [datetime(2022, 12, 31), None, datetime(2020, 7, 2)] + ), # datetime with null + pa.array([False, None, True]), # boolean column with null + ], + names=["a", "b", "c", "d", "e"], + ) + df_with_nulls = ctx.create_dataframe([[batch]]) + + # Test coalesce with different data types + result_df = df_with_nulls.select( + f.coalesce(column("a"), literal("default")).alias("a_coalesced"), + f.coalesce(column("b"), literal(0)).alias("b_coalesced"), + f.coalesce(column("c"), literal("default")).alias("c_coalesced"), + f.coalesce(column("d"), literal(datetime(2000, 1, 1))).alias("d_coalesced"), + f.coalesce(column("e"), literal(False)).alias("e_coalesced"), + ) + + result = result_df.collect()[0] + + # Verify results + assert result.column(0) == pa.array( + ["Hello", "default", "!"], type=pa.string_view() + ) + assert result.column(1) == pa.array([4, 0, 6], type=pa.int64()) + assert result.column(2) == pa.array( + ["hello ", "default", " !"], type=pa.string_view() + ) + assert result.column(3) == pa.array( + [datetime(2022, 12, 31), datetime(2000, 1, 1), datetime(2020, 7, 2)], + type=pa.timestamp("us"), + ) + assert result.column(4) == pa.array([False, False, True], type=pa.bool_()) + + # Test multiple arguments + result_df = df_with_nulls.select( + f.coalesce(column("a"), literal(None), literal("fallback")).alias( + "multi_coalesce" + ) + ) + result = result_df.collect()[0] + assert result.column(0) == pa.array( + ["Hello", "fallback", "!"], type=pa.string_view() + )