Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates code with locations that need to change to have a generic wit… #818

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions docs/reference/decorators/with_columns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
with_columns
=======================

** Overview **

--------
Overview
--------

This is part of the hamilton pyspark integration. To install, run:

`pip install sf-hamilton[pyspark]`
``pip install sf-hamilton[pyspark]``

**Reference Documentation**
-----------------------
Reference Documentation
-----------------------

.. autoclass:: hamilton.plugins.h_spark.with_columns
:special-members: __init__
21 changes: 16 additions & 5 deletions hamilton/plugins/h_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SparkKoalasGraphAdapter(base.HamiltonGraphAdapter, base.ResultMixin):
using the \
`Pandas API on Spark <https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html>`__

Use `pip install sf-hamilton[spark]` to get the dependencies required to run this.
Use ``pip install sf-hamilton[spark]`` to get the dependencies required to run this.

Currently, this class assumes you're running SPARK 3.2+. You'd generally use this if you have an existing spark \
cluster running in your workplace, and you want to scale to very large data set sizes.
Expand Down Expand Up @@ -371,6 +371,8 @@ def _fabricate_spark_function(
return FunctionType(func_code, {**globals(), **{"partial_fn": partial_fn}}, func_name)


# TODO -- change this to have a different implementation based on the dataframe type. This will have
# to likely be custom to each dataframe type
def _lambda_udf(df: DataFrame, node_: node.Node, actual_kwargs: Dict[str, Any]) -> DataFrame:
"""Function to create a lambda UDF for a function.

Expand Down Expand Up @@ -757,7 +759,7 @@ def transform_node(

Note that, at this point, we don't actually know which columns will come from the
base dataframe, and which will come from the upstream nodes. This is handled in the
`with_columns` decorator, so for now, we need to give it enough information to topologically
``with_columns`` decorator, so for now, we need to give it enough information to topologically
sort/assign dependencies.

:param node_: Node to transform
Expand Down Expand Up @@ -948,10 +950,10 @@ def __init__(
"""Initializes a with_columns decorator for spark. This allows you to efficiently run
groups of map operations on a dataframe, represented as pandas/primitives UDFs. This
effectively "linearizes" compute -- meaning that a DAG of map operations can be run
as a set of `.withColumn` operations on a single dataframe -- ensuring that you don't have
to do a complex `extract` then `join` process on spark, which can be inefficient.
as a set of ``.withColumn`` operations on a single dataframe -- ensuring that you don't have
to do a complex ``extract`` then ``join`` process on spark, which can be inefficient.

Here's an example of calling it -- if you've seen `@subdag`, you should be familiar with
Here's an example of calling it -- if you've seen :py:class:`@subdag <hamilton.function_modifiers.recursive>`, you should be familiar with
the concepts:

.. code-block:: python
Expand Down Expand Up @@ -1080,12 +1082,16 @@ def create_selector_node(
"""

def new_callable(**kwargs) -> DataFrame:
# TODO -- change to have a `select` that's generic to the library
# Use the registry
return kwargs[upstream_name].select(*columns)

return node.Node(
name=node_name,
# TODO -- change to have the right dataframe type (from the registry)
typ=DataFrame,
callabl=new_callable,
# TODO -- change to have the right dataframe type (from the registry)
input_types={upstream_name: DataFrame},
)

Expand All @@ -1107,8 +1113,10 @@ def new_callable(**kwargs) -> DataFrame:

return node.Node(
name=node_name,
# TODO -- change to have the right dataframe type (from the registry)
typ=DataFrame,
callabl=new_callable,
# TODO -- change to have the right dataframe type (from the registry)
input_types={upstream_name: DataFrame},
)

Expand Down Expand Up @@ -1195,7 +1203,9 @@ def generate_nodes(self, fn: Callable, config: Dict[str, Any]) -> List[node.Node
column for column in node_.input_types if column in columns_passed_in_from_dataframe
}
# In the case that we are using pyspark UDFs
# TODO -- use the right dataframe type to do this correctly
if require_columns.is_decorated_pyspark_udf(node_):
# TODO -- change to use the right "sparkification" function that is dataframe-type-agnostic
sparkified = require_columns.sparkify_node(
node_,
current_dataframe_node,
Expand All @@ -1206,6 +1216,7 @@ def generate_nodes(self, fn: Callable, config: Dict[str, Any]) -> List[node.Node
)
# otherwise we're using pandas/primitive UDFs
else:
# TODO -- change to use the right "sparkification" function that is dataframe-type-agnostic
sparkified = sparkify_node_with_udf(
node_,
current_dataframe_node,
Expand Down