Skip to content

Commit cdbc302

Browse files
committed
[SPARK-28226][PYTHON] Document Pandas UDF mapInPandas
## What changes were proposed in this pull request? This PR proposes to document `MAP_ITER` with `mapInPandas`. ## How was this patch tested? Manually checked the documentation. ![Screen Shot 2019-07-05 at 1 52 30 PM](https://user-images.githubusercontent.com/6477701/60698812-26cf2d80-9f2c-11e9-8295-9c00c28f5569.png) ![Screen Shot 2019-07-05 at 1 48 53 PM](https://user-images.githubusercontent.com/6477701/60698710-ac061280-9f2b-11e9-8521-a4f361207e06.png) Closes apache#25025 from HyukjinKwon/SPARK-28226. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 4ad0c33 commit cdbc302

File tree

4 files changed

+89
-11
lines changed

4 files changed

+89
-11
lines changed

docs/sql-pyspark-pandas-with-arrow.md

+23
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,29 @@ The following example shows how to use this type of UDF to compute mean with gro
155155

156156
For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
157157

158+
159+
### Map Iterator
160+
161+
Map iterator Pandas UDFs are used to transform data with an iterator of batches. Map iterator
162+
Pandas UDFs can be used with
163+
[`pyspark.sql.DataFrame.mapInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
164+
It defines a map function that transforms an iterator of `pandas.DataFrame` to another.
165+
166+
It can return the output of arbitrary length in contrast to the scalar Pandas UDF. It maps an iterator of `pandas.DataFrame`s,
167+
that represents the current `DataFrame`, using the map iterator UDF and returns the result as a `DataFrame`.
168+
169+
The following example shows how to create map iterator Pandas UDFs:
170+
171+
<div class="codetabs">
172+
<div data-lang="python" markdown="1">
173+
{% include_example map_iter_pandas_udf python/sql/arrow.py %}
174+
</div>
175+
</div>
176+
177+
For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and
178+
[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
179+
180+
158181
## Usage Notes
159182

160183
### Supported SQL Types

examples/src/main/python/sql/arrow.py

+28
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,28 @@ def mean_udf(v):
236236
# $example off:grouped_agg_pandas_udf$
237237

238238

239+
def map_iter_pandas_udf_example(spark):
240+
# $example on:map_iter_pandas_udf$
241+
import pandas as pd
242+
243+
from pyspark.sql.functions import pandas_udf, PandasUDFType
244+
245+
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
246+
247+
@pandas_udf(df.schema, PandasUDFType.MAP_ITER)
248+
def filter_func(batch_iter):
249+
for pdf in batch_iter:
250+
yield pdf[pdf.id == 1]
251+
252+
df.mapInPandas(filter_func).show()
253+
# +---+---+
254+
# | id|age|
255+
# +---+---+
256+
# | 1| 21|
257+
# +---+---+
258+
# $example off:map_iter_pandas_udf$
259+
260+
239261
if __name__ == "__main__":
240262
spark = SparkSession \
241263
.builder \
@@ -246,7 +268,13 @@ def mean_udf(v):
246268
dataframe_with_arrow_example(spark)
247269
print("Running pandas_udf scalar example")
248270
scalar_pandas_udf_example(spark)
271+
print("Running pandas_udf scalar iterator example")
272+
scalar_iter_pandas_udf_example(spark)
249273
print("Running pandas_udf grouped map example")
250274
grouped_map_pandas_udf_example(spark)
275+
print("Running pandas_udf grouped agg example")
276+
grouped_agg_pandas_udf_example(spark)
277+
print("Running pandas_udf map iterator example")
278+
map_iter_pandas_udf_example(spark)
251279

252280
spark.stop()

python/pyspark/sql/dataframe.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -2195,25 +2195,25 @@ def toPandas(self):
21952195

21962196
def mapInPandas(self, udf):
21972197
"""
2198-
Maps each partition of the current :class:`DataFrame` using a pandas udf and returns
2199-
the result as a `DataFrame`.
2198+
Maps an iterator of batches in the current :class:`DataFrame` using a Pandas user-defined
2199+
function and returns the result as a :class:`DataFrame`.
22002200
2201-
The user-defined function should take an iterator of `pandas.DataFrame`s and return another
2202-
iterator of `pandas.DataFrame`s. For each partition, all columns are passed together as an
2203-
iterator of `pandas.DataFrame`s to the user-function and the returned iterator of
2204-
`pandas.DataFrame`s are combined as a :class:`DataFrame`.
2201+
The user-defined function should take an iterator of `pandas.DataFrame`\\s and return
2202+
another iterator of `pandas.DataFrame`\\s. All columns are passed
2203+
together as an iterator of `pandas.DataFrame`\\s to the user-defined function and the
2204+
returned iterator of `pandas.DataFrame`\\s are combined as a :class:`DataFrame`.
22052205
Each `pandas.DataFrame` size can be controlled by
22062206
`spark.sql.execution.arrow.maxRecordsPerBatch`.
2207-
Its schema must match the returnType of the pandas udf.
2207+
Its schema must match the returnType of the Pandas user-defined function.
22082208
22092209
:param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf`
22102210
22112211
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
22122212
>>> df = spark.createDataFrame([(1, 21), (2, 30)],
22132213
... ("id", "age")) # doctest: +SKIP
2214-
>>> @pandas_udf(df.schema, PandasUDFType.SCALAR_ITER) # doctest: +SKIP
2215-
... def filter_func(iterator):
2216-
... for pdf in iterator:
2214+
>>> @pandas_udf(df.schema, PandasUDFType.MAP_ITER) # doctest: +SKIP
2215+
... def filter_func(batch_iter):
2216+
... for pdf in batch_iter:
22172217
... yield pdf[pdf.id == 1]
22182218
>>> df.mapInPandas(filter_func).show() # doctest: +SKIP
22192219
+---+---+

python/pyspark/sql/functions.py

+28-1
Original file line numberDiff line numberDiff line change
@@ -2915,7 +2915,7 @@ def pandas_udf(f=None, returnType=None, functionType=None):
29152915
29162916
:class:`MapType`, nested :class:`StructType` are currently not supported as output types.
29172917
2918-
Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and
2918+
Scalar UDFs can be used with :meth:`pyspark.sql.DataFrame.withColumn` and
29192919
:meth:`pyspark.sql.DataFrame.select`.
29202920
29212921
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
@@ -3191,6 +3191,33 @@ def pandas_udf(f=None, returnType=None, functionType=None):
31913191
31923192
.. seealso:: :meth:`pyspark.sql.GroupedData.agg` and :class:`pyspark.sql.Window`
31933193
3194+
5. MAP_ITER
3195+
3196+
A map iterator Pandas UDFs are used to transform data with an iterator of batches.
3197+
It can be used with :meth:`pyspark.sql.DataFrame.mapInPandas`.
3198+
3199+
It can return the output of arbitrary length in contrast to the scalar Pandas UDF.
3200+
It maps an iterator of batches in the current :class:`DataFrame` using a Pandas user-defined
3201+
function and returns the result as a :class:`DataFrame`.
3202+
3203+
The user-defined function should take an iterator of `pandas.DataFrame`\\s and return another
3204+
iterator of `pandas.DataFrame`\\s. All columns are passed together as an
3205+
iterator of `pandas.DataFrame`\\s to the user-defined function and the returned iterator of
3206+
`pandas.DataFrame`\\s are combined as a :class:`DataFrame`.
3207+
3208+
>>> df = spark.createDataFrame([(1, 21), (2, 30)],
3209+
... ("id", "age")) # doctest: +SKIP
3210+
>>> @pandas_udf(df.schema, PandasUDFType.MAP_ITER) # doctest: +SKIP
3211+
... def filter_func(batch_iter):
3212+
... for pdf in batch_iter:
3213+
... yield pdf[pdf.id == 1]
3214+
>>> df.mapInPandas(filter_func).show() # doctest: +SKIP
3215+
+---+---+
3216+
| id|age|
3217+
+---+---+
3218+
| 1| 21|
3219+
+---+---+
3220+
31943221
.. note:: The user-defined functions are considered deterministic by default. Due to
31953222
optimization, duplicate invocations may be eliminated or the function may even be invoked
31963223
more times than it is present in the query. If your function is not deterministic, call

0 commit comments

Comments
 (0)