Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DataFrame fill_nan/fill_null #1019

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/source/user-guide/common-operations/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,39 @@ The function :py:func:`~datafusion.functions.in_list` allows to check a column f
.limit(20)
.to_pandas()
)


Handling Missing Values
=====================

DataFusion provides methods to handle missing values in DataFrames:

fill_null
---------

The ``fill_null()`` method replaces NULL values in specified columns with a provided value:

.. code-block:: python

# Fill all NULL values with 0 where possible
df = df.fill_null(0)

# Fill NULL values only in specific string columns
df = df.fill_null("missing", subset=["name", "category"])

The fill value will be cast to match each column's type. If casting fails for a column, that column remains unchanged.

fill_nan
--------

The ``fill_nan()`` method replaces NaN values in floating-point columns with a provided numeric value:

.. code-block:: python

# Fill all NaN values with 0 in numeric columns
df = df.fill_nan(0)

# Fill NaN values in specific numeric columns
df = df.fill_nan(99.9, subset=["price", "score"])

This only works on floating-point columns (float32, float64). The fill value must be numeric (int or float).
121 changes: 115 additions & 6 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import annotations

import warnings
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -33,8 +34,12 @@
overload,
)

import pyarrow as pa
from typing_extensions import deprecated

from datafusion import functions as f
from datafusion._internal import DataFrame as DataFrameInternal
from datafusion.expr import Expr, SortExpr, sort_or_default
from datafusion.plan import ExecutionPlan, LogicalPlan
from datafusion.record_batch import RecordBatchStream

Expand All @@ -44,12 +49,6 @@

import pandas as pd
import polars as pl
import pyarrow as pa

from enum import Enum

from datafusion._internal import DataFrame as DataFrameInternal
from datafusion.expr import Expr, SortExpr, sort_or_default


# excerpt from deltalake
Expand Down Expand Up @@ -853,3 +852,113 @@ def within_limit(df: DataFrame, limit: int) -> DataFrame:
DataFrame: After applying func to the original dataframe.
"""
return func(self, *args)

def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame":
"""Fill null values in specified columns with a value.

Args:
value: Value to replace nulls with. Will be cast to match column type.
subset: Optional list of column names to fill. If None, fills all columns.

Returns:
DataFrame with null values replaced where type casting is possible

Examples:
>>> df = df.fill_null(0) # Fill all nulls with 0 where possible
>>> # Fill nulls in specific string columns
>>> df = df.fill_null("missing", subset=["name", "category"])

Notes:
- Only fills nulls in columns where the value can be cast to the column type
- For columns where casting fails, the original column is kept unchanged
- For columns not in subset, the original column is kept unchanged
"""
# Get columns to process
if subset is None:
subset = self.schema().names
else:
schema_cols = self.schema().names
for col in subset:
if col not in schema_cols:
raise ValueError(f"Column '{col}' not found in DataFrame")

# Build expressions for select
exprs = []
for col_name in self.schema().names:
if col_name in subset:
# Get column type
col_type = self.schema().field(col_name).type

try:
# Try casting value to column type
typed_value = pa.scalar(value, type=col_type)
literal_expr = f.Expr.literal(typed_value)

# Build coalesce expression
expr = f.coalesce(f.col(col_name), literal_expr)
exprs.append(expr.alias(col_name))

except (pa.ArrowTypeError, pa.ArrowInvalid):
# If cast fails, keep original column
exprs.append(f.col(col_name))
else:
# Keep columns not in subset unchanged
exprs.append(f.col(col_name))

return self.select(*exprs)

def fill_nan(
self, value: float | int, subset: list[str] | None = None
) -> "DataFrame":
"""Fill NaN values in specified numeric columns with a value.

Args:
value: Numeric value to replace NaN values with.
subset: Optional list of column names to fill. If None, fills all numeric
columns.

Returns:
DataFrame with NaN values replaced in numeric columns.

Examples:
>>> df = df.fill_nan(0) # Fill all NaNs with 0 in numeric columns
>>> # Fill NaNs in specific numeric columns
>>> df = df.fill_nan(99.9, subset=["price", "score"])

Notes:
- Only fills NaN values in numeric columns (float32, float64)
- Non-numeric columns are kept unchanged
- For columns not in subset, the original column is kept unchanged
- Value must be numeric (int or float)
"""
if not isinstance(value, (int, float)):
raise ValueError("Value must be numeric (int or float)")

# Get columns to process
if subset is None:
# Only get numeric columns if no subset specified
subset = [
field.name
for field in self.schema()
if pa.types.is_floating(field.type)
]
else:
schema_cols = self.schema().names
for col in subset:
if col not in schema_cols:
raise ValueError(f"Column '{col}' not found in DataFrame")
if not pa.types.is_floating(self.schema().field(col).type):
raise ValueError(f"Column '{col}' is not a numeric column")

# Build expressions for select
exprs = []
for col_name in self.schema().names:
if col_name in subset:
# Use nanvl function to replace NaN values
expr = f.nanvl(f.col(col_name), f.lit(value))
exprs.append(expr.alias(col_name))
else:
# Keep columns not in subset unchanged
exprs.append(f.col(col_name))

return self.select(*exprs)
126 changes: 126 additions & 0 deletions python/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,3 +1196,129 @@ def test_dataframe_repr_html(df) -> None:

# Ignore whitespace just to make this test look cleaner
assert output.replace(" ", "") == ref_html.replace(" ", "")


def test_fill_null(df):
# Test filling nulls with integer value
df_with_nulls = df.with_column("d", literal(None).cast(pa.int64()))
df_filled = df_with_nulls.fill_null(0)
result = df_filled.to_pydict()
assert result["d"] == [0, 0, 0]

# Test filling nulls with string value
df_with_nulls = df.with_column("d", literal(None).cast(pa.string()))
df_filled = df_with_nulls.fill_null("missing")
result = df_filled.to_pydict()
assert result["d"] == ["missing", "missing", "missing"]

# Test filling nulls with subset of columns
df_with_nulls = df.with_columns(
literal(None).cast(pa.int64()).alias("d"),
literal(None).cast(pa.string()).alias("e"),
)
df_filled = df_with_nulls.fill_null("missing", subset=["e"])
result = df_filled.to_pydict()
assert result["d"] == [None, None, None]
assert result["e"] == ["missing", "missing", "missing"]

# Test filling nulls with value that cannot be cast to column type
df_with_nulls = df.with_column("d", literal(None))
df_filled = df_with_nulls.fill_null("invalid")
result = df_filled.to_pydict()
assert result["d"] == [None, None, None]

# Test filling nulls with value that can be cast to some columns but not others
df_with_nulls = df.with_columns(
literal(None).alias("d").cast(pa.int64()),
literal(None).alias("e").cast(pa.string()),
)
df_filled = df_with_nulls.fill_null(0)
result = df_filled.to_pydict()
assert result["d"] == [0, 0, 0]
assert result["e"] == [None, None, None]

# Test filling nulls with subset of columns where some casts fail
df_with_nulls = df.with_columns(
literal(None).alias("d").cast(pa.int64()),
literal(None).alias("e").cast(pa.string()),
)
df_filled = df_with_nulls.fill_null(0, subset=["d", "e"])
result = df_filled.to_pydict()
assert result["d"] == [0, 0, 0]
assert result["e"] == [None, None, None]

# Test filling nulls with subset of columns where all casts succeed
df_with_nulls = df.with_columns(
literal(None).alias("d").cast(pa.int64()),
literal(None).alias("e").cast(pa.string()),
)
df_filled = df_with_nulls.fill_null("missing", subset=["e"])
result = df_filled.to_pydict()
assert result["d"] == [None, None, None]
assert result["e"] == ["missing", "missing", "missing"]

# Test filling nulls with subset of columns where some columns do not exist
df_with_nulls = df.with_columns(
literal(None).alias("d").cast(pa.int64()),
literal(None).alias("e").cast(pa.string()),
)
with pytest.raises(ValueError, match="Column 'f' not found in DataFrame"):
df_with_nulls.fill_null("missing", subset=["e", "f"])

def test_fill_nan(df):
# Test filling NaNs with integer value
df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64()))
df_filled = df_with_nans.fill_nan(0)
result = df_filled.to_pydict()
assert result["d"] == [0, 0, 0]

# Test filling NaNs with float value
df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64()))
df_filled = df_with_nans.fill_nan(99.9)
result = df_filled.to_pydict()
assert result["d"] == [99.9, 99.9, 99.9]

# Test filling NaNs with subset of columns
df_with_nans = df.with_columns(
literal(float("nan")).cast(pa.float64()).alias("d"),
literal(float("nan")).cast(pa.float64()).alias("e"),
)
df_filled = df_with_nans.fill_nan(99.9, subset=["e"])
result = df_filled.to_pydict()
assert result["d"] == [float("nan"), float("nan"), float("nan")]
assert result["e"] == [99.9, 99.9, 99.9]

# Test filling NaNs with value that cannot be cast to column type
df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64()))
with pytest.raises(ValueError, match="Value must be numeric"):
df_with_nans.fill_nan("invalid")

# Test filling NaNs with subset of columns where some casts fail
df_with_nans = df.with_columns(
literal(float("nan")).alias("d").cast(pa.float64()),
literal(float("nan")).alias("e").cast(pa.float64()),
literal("abc").alias("f").cast(pa.string()), # non-numeric column
)
df_filled = df_with_nans.fill_nan(0, subset=["d", "e", "f"])
result = df_filled.to_pydict()
assert result["d"] == [0, 0, 0] # succeeds
assert result["e"] == [0, 0, 0] # succeeds
assert result["f"] == ["abc", "abc", "abc"] # skipped because not numeric

# Test filling NaNs fails on non-numeric columns
df_with_mixed = df.with_columns(
literal(float("nan")).alias("d").cast(pa.float64()),
literal("abc").alias("e").cast(pa.string()),
)
with pytest.raises(ValueError, match="Column 'e' is not a numeric column"):
df_with_mixed.fill_nan(0, subset=["d", "e"])

# Test filling NaNs with subset of columns where all casts succeed
df_with_nans = df.with_columns(
literal(float("nan")).alias("d").cast(pa.float64()),
literal(float("nan")).alias("e").cast(pa.float64()),
)
df_filled = df_with_nans.fill_nan(99.9, subset=["e"])
result = df_filled.to_pydict()
assert result["d"] == [float("nan"), float("nan"), float("nan")]
assert result["e"] == [99.9, 99.9, 99.9]
54 changes: 54 additions & 0 deletions python/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1173,3 +1173,57 @@ def test_between_default(df):

actual = df.collect()[0].to_pydict()
assert actual == expected


def test_coalesce(df):
# Create a DataFrame with null values
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays(
[
pa.array(["Hello", None, "!"]), # string column with null
pa.array([4, None, 6]), # integer column with null
pa.array(["hello ", None, " !"]), # string column with null
pa.array(
[datetime(2022, 12, 31), None, datetime(2020, 7, 2)]
), # datetime with null
pa.array([False, None, True]), # boolean column with null
],
names=["a", "b", "c", "d", "e"],
)
df_with_nulls = ctx.create_dataframe([[batch]])

# Test coalesce with different data types
result_df = df_with_nulls.select(
f.coalesce(column("a"), literal("default")).alias("a_coalesced"),
f.coalesce(column("b"), literal(0)).alias("b_coalesced"),
f.coalesce(column("c"), literal("default")).alias("c_coalesced"),
f.coalesce(column("d"), literal(datetime(2000, 1, 1))).alias("d_coalesced"),
f.coalesce(column("e"), literal(False)).alias("e_coalesced"),
)

result = result_df.collect()[0]

# Verify results
assert result.column(0) == pa.array(
["Hello", "default", "!"], type=pa.string_view()
)
assert result.column(1) == pa.array([4, 0, 6], type=pa.int64())
assert result.column(2) == pa.array(
["hello ", "default", " !"], type=pa.string_view()
)
assert result.column(3) == pa.array(
[datetime(2022, 12, 31), datetime(2000, 1, 1), datetime(2020, 7, 2)],
type=pa.timestamp("us"),
)
assert result.column(4) == pa.array([False, False, True], type=pa.bool_())

# Test multiple arguments
result_df = df_with_nulls.select(
f.coalesce(column("a"), literal(None), literal("fallback")).alias(
"multi_coalesce"
)
Comment on lines +1178 to +1224
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find tests for coalesce which I used for fill_null, so I added these

)
result = result_df.collect()[0]
assert result.column(0) == pa.array(
["Hello", "fallback", "!"], type=pa.string_view()
)