Skip to content

ENH: Support third-party execution engines in Series.map #61467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 27, 2025
Merged
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Other enhancements
- :meth:`DataFrameGroupBy.transform`, :meth:`SeriesGroupBy.transform`, :meth:`DataFrameGroupBy.agg`, :meth:`SeriesGroupBy.agg`, :meth:`RollingGroupby.apply`, :meth:`ExpandingGroupby.apply`, :meth:`Rolling.apply`, :meth:`Expanding.apply`, :meth:`DataFrame.apply` with ``engine="numba"`` now supports positional arguments passed as kwargs (:issue:`58995`)
- :meth:`Rolling.agg`, :meth:`Expanding.agg` and :meth:`ExponentialMovingWindow.agg` now accept :class:`NamedAgg` aggregations through ``**kwargs`` (:issue:`28333`)
- :meth:`Series.map` can now accept kwargs to pass on to func (:issue:`59814`)
- :meth:`Series.map` now accepts an ``engine`` parameter to allow execution with a third-party execution engine (:issue:`61125`)
- :meth:`Series.str.get_dummies` now accepts a ``dtype`` parameter to specify the dtype of the resulting DataFrame (:issue:`47872`)
- :meth:`pandas.concat` will raise a ``ValueError`` when ``ignore_index=True`` and ``keys`` is not ``None`` (:issue:`59274`)
- :py:class:`frozenset` elements in pandas objects are now natively printed (:issue:`60690`)
Expand Down
39 changes: 39 additions & 0 deletions pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -4326,6 +4326,7 @@ def map(
self,
func: Callable | Mapping | Series | None = None,
na_action: Literal["ignore"] | None = None,
engine: Callable | None = None,
**kwargs,
) -> Series:
"""
Expand All @@ -4342,6 +4343,25 @@ def map(
na_action : {None, 'ignore'}, default None
If 'ignore', propagate NaN values, without passing them to the
mapping correspondence.
engine : decorator, optional
Choose the execution engine to use to run the function. Only used for
functions. If ``map`` is called with a mapping or ``Series``, an
exception will be raised. If ``engine`` is not provided the function will
be executed by the regular Python interpreter.

Options include JIT compilers such as Numba, Bodo or Blosc2, which in some
cases can speed up the execution. To use an executor you can provide the
decorators ``numba.jit``, ``numba.njit``, ``bodo.jit`` or ``blosc2.jit``.
You can also provide the decorator with parameters, like
``numba.jit(nogit=True)``.

Not all functions can be executed with all execution engines. In general,
JIT compilers will require type stability in the function (no variable
should change data type during the execution). And not all pandas and
NumPy APIs are supported. Check the engine documentation for limitations.

.. versionadded:: 3.0.0

**kwargs
Additional keyword arguments to pass as keywords arguments to
`arg`.
Expand Down Expand Up @@ -4421,6 +4441,25 @@ def map(
else:
raise ValueError("The `func` parameter is required")

if engine is not None:
if not callable(func):
raise ValueError(
"The engine argument can only be specified when func is a function"
)
if not hasattr(engine, "__pandas_udf__"):
raise ValueError(f"Not a valid engine: {engine!r}")
result = engine.__pandas_udf__.map( # type: ignore[attr-defined]
data=self,
func=func,
args=(),
kwargs=kwargs,
decorator=engine,
skip_na=na_action == "ignore",
)
if not isinstance(result, Series):
result = Series(result, index=self.index, name=self.name)
return result.__finalize__(self, method="map")

if callable(func):
func = functools.partial(func, **kwargs)
new_values = self._map_values(func, na_action=na_action)
Expand Down
57 changes: 57 additions & 0 deletions pandas/tests/apply/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,64 @@
import numpy as np

from pandas import (
DataFrame,
Series,
)
from pandas.api.executors import BaseExecutionEngine
from pandas.core.groupby.base import transformation_kernels

# There is no Series.cumcount or DataFrame.cumcount
series_transform_kernels = [
x for x in sorted(transformation_kernels) if x != "cumcount"
]
frame_transform_kernels = [x for x in sorted(transformation_kernels) if x != "cumcount"]


class MockExecutionEngine(BaseExecutionEngine):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this to the conftest.py as well? In the past, I've moved away from defining objects in non test_ or conftest.py files

"""
Execution Engine to test if the execution engine interface receives and
uses all parameters provided by the user.

Making this engine work as the default Python engine by calling it, no extra
functionality is implemented here.

When testing, this will be called when this engine is provided, and then the
same pandas.map and pandas.apply function will be called, but without engine,
executing the default behavior from the python engine.
"""

def map(data, func, args, kwargs, decorator, skip_na):
kwargs_to_pass = kwargs if isinstance(data, DataFrame) else {}
return data.map(func, na_action="ignore" if skip_na else None, **kwargs_to_pass)

def apply(data, func, args, kwargs, decorator, axis):
if isinstance(data, Series):
return data.apply(func, convert_dtype=True, args=args, by_row=False)
elif isinstance(data, DataFrame):
return data.apply(
func,
axis=axis,
raw=False,
result_type=None,
args=args,
by_row="compat",
**kwargs,
)
else:
assert isinstance(data, np.ndarray)

def wrap_function(func):
# https://github.com/numpy/numpy/issues/8352
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
if isinstance(result, str):
result = np.array(result, dtype=object)
return result

return wrapper

return np.apply_along_axis(wrap_function(func), axis, data, *args, **kwargs)


class MockEngineDecorator:
__pandas_udf__ = MockExecutionEngine
8 changes: 8 additions & 0 deletions pandas/tests/apply/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pytest

from pandas.tests.apply.common import MockEngineDecorator


@pytest.fixture(params=[None, MockEngineDecorator])
def engine(request):
return request.param
54 changes: 1 addition & 53 deletions pandas/tests/apply/test_frame_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,11 @@
date_range,
)
import pandas._testing as tm
from pandas.api.executors import BaseExecutionEngine
from pandas.tests.apply.common import MockEngineDecorator
from pandas.tests.frame.common import zip_frames
from pandas.util.version import Version


class MockExecutionEngine(BaseExecutionEngine):
"""
Execution Engine to test if the execution engine interface receives and
uses all parameters provided by the user.

Making this engine work as the default Python engine by calling it, no extra
functionality is implemented here.

When testing, this will be called when this engine is provided, and then the
same pandas.map and pandas.apply function will be called, but without engine,
executing the default behavior from the python engine.
"""

def map(data, func, args, kwargs, decorator, skip_na):
kwargs_to_pass = kwargs if isinstance(data, DataFrame) else {}
return data.map(
func, action_na="ignore" if skip_na else False, **kwargs_to_pass
)

def apply(data, func, args, kwargs, decorator, axis):
if isinstance(data, Series):
return data.apply(func, convert_dtype=True, args=args, by_row=False)
elif isinstance(data, DataFrame):
return data.apply(
func,
axis=axis,
raw=False,
result_type=None,
args=args,
by_row="compat",
**kwargs,
)
else:
assert isinstance(data, np.ndarray)

def wrap_function(func):
# https://github.com/numpy/numpy/issues/8352
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
if isinstance(result, str):
result = np.array(result, dtype=object)
return result

return wrapper

return np.apply_along_axis(wrap_function(func), axis, data, *args, **kwargs)


class MockEngineDecorator:
__pandas_udf__ = MockExecutionEngine


@pytest.fixture
def int_frame_const_col():
"""
Expand Down
4 changes: 2 additions & 2 deletions pandas/tests/apply/test_series_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ def test_demo():


@pytest.mark.parametrize("func", [str, lambda x: str(x)])
def test_apply_map_evaluate_lambdas_the_same(string_series, func, by_row):
def test_apply_map_evaluate_lambdas_the_same(string_series, func, by_row, engine):
# test that we are evaluating row-by-row first if by_row="compat"
# else vectorized evaluation
result = string_series.apply(func, by_row=by_row)

if by_row:
expected = string_series.map(func)
expected = string_series.map(func, engine=engine)
tm.assert_series_equal(result, expected)
else:
assert result == str(string_series)
Expand Down
34 changes: 27 additions & 7 deletions pandas/tests/series/methods/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
timedelta_range,
)
import pandas._testing as tm
from pandas.tests.apply.conftest import engine # noqa: F401
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this needs importing since it's already in the conftest.py

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I answered in a comment, so readers of that input don't need to ask themselves it. Most tests related to apply/map are in tests/apply, so the fixture is defined there. But it's also useful here in tests/series/methods/. test/apply/conftest.py is not in scope when running tests/series/methods, so I need to import manually in order to use it. Another alternative would be to move the fixture to the global conftest.py, but I think this approach keeps things better organized and simple.

I moved the mock classes to conftest.py as suggested, thanks for the review.

Copy link
Member

@mroeschke mroeschke May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if your work in #61125 continues to expand, I would be OK moving this to the global conftest.py



def test_series_map_box_timedelta():
Expand All @@ -32,16 +33,20 @@ def f(x):
ser.map(f)


def test_map_callable(datetime_series):
def test_map_callable(datetime_series, engine): # noqa: F811
with np.errstate(all="ignore"):
tm.assert_series_equal(datetime_series.map(np.sqrt), np.sqrt(datetime_series))
tm.assert_series_equal(
datetime_series.map(np.sqrt, engine=engine), np.sqrt(datetime_series)
)

# map function element-wise
tm.assert_series_equal(datetime_series.map(math.exp), np.exp(datetime_series))
tm.assert_series_equal(
datetime_series.map(math.exp, engine=engine), np.exp(datetime_series)
)

# empty series
s = Series(dtype=object, name="foo", index=Index([], name="bar"))
rs = s.map(lambda x: x)
rs = s.map(lambda x: x, engine=engine)
tm.assert_series_equal(s, rs)

# check all metadata (GH 9322)
Expand All @@ -52,7 +57,7 @@ def test_map_callable(datetime_series):

# index but no data
s = Series(index=[1, 2, 3], dtype=np.float64)
rs = s.map(lambda x: x)
rs = s.map(lambda x: x, engine=engine)
tm.assert_series_equal(s, rs)


Expand Down Expand Up @@ -269,10 +274,10 @@ def test_map_decimal(string_series):
assert isinstance(result.iloc[0], Decimal)


def test_map_na_exclusion():
def test_map_na_exclusion(engine): # noqa: F811
s = Series([1.5, np.nan, 3, np.nan, 5])

result = s.map(lambda x: x * 2, na_action="ignore")
result = s.map(lambda x: x * 2, na_action="ignore", engine=engine)
exp = s * 2
tm.assert_series_equal(result, exp)

Expand Down Expand Up @@ -628,3 +633,18 @@ def test_map_no_func_or_arg():
def test_map_func_is_none():
with pytest.raises(ValueError, match="The `func` parameter is required"):
Series([1, 2]).map(func=None)


@pytest.mark.parametrize("func", [{}, {1: 2}, Series([3, 4])])
def test_map_engine_no_function(func):
s = Series([1, 2])

with pytest.raises(ValueError, match="engine argument can only be specified"):
s.map(func, engine="something")


def test_map_engine_not_executor():
s = Series([1, 2])

with pytest.raises(ValueError, match="Not a valid engine: 'something'"):
s.map(lambda x: x, engine="something")
Loading