Skip to content

Commit

Permalink
Refactor to enable RayGraphAdapter and HamiltonTracker to work well t…
Browse files Browse the repository at this point in the history
…ogether

This is a squash commit:
- issue=#1079
- PR=#1103

Describes what to do in `graph_functions.py`

Adds comments to lifecycle base

Update h_ray.py with comments for ray tracking compatibility

Replicate previous error

Inline function, unsure if catching errors and exceptions to be handadled differently

BaseDoRemoteExecute has the added Callable function that snadwisched lifecycle hooks

method fails, says AssertionError about ray.remote decorator

simple script for now to check telemetry, execution yield the ray.remote AssertionError

passing pointer through and arguments to lifecycle wrapper into ray.remote

post-execute hook for node not called

finally executed only when exception occurs, hamilton tracker not executed

atexit.register does not work, node keeps running inui

added stop() method, but doesn't get called

Ray telemtry works for single node, problem with connected nodes

Ray telemtry works for single node, problem with connected nodes

Ray telemtry works for single node, problem with connected nodes

Fixes ray object dereferencing

Ray does not resolve nested arguments:
https://docs.ray.io/en/latest/ray-core/objects.html#passing-object-arguments

So one option is to make them all top level:

- one way to do that is to make the other arguments not clash with any
possible user parameters -- hence the `__` prefix. This is what I did.
- another way would be in the ray adapter, wrap the incoming function,
and explicitly do a ray.get() on any ray object references in the
kwargs arguments. i.e. keep the nested structure, but when the ray
task starts way for all inputs... not sure which is best, but this
now works correctly.

ray works checkpoint, pre-commit fixed

fixed graph level telemtry proposal

pinned ruff

Correct output, added option to start ray cluster

Unit test mimicks the DoNodeExecute unit test

Refactored driver so all tests pass

Workaround to not break ray by calling init on an open cluster

raw_execute does not have post_graph_execute and is private now

Correct version for depraction warning

all tests work

this looks better

ruff version comment

Refactored pre- and post-graph-execute hooks outside of raw_execute which now has deprecation warning

added readme, notebook and made script cli interactive

made cluster init optional through inserting config dict

User has option to shutdown ray cluster

Co-authored-by: Stefan Krawczyk <[email protected]>
  • Loading branch information
2 people authored and elijahbenizzy committed Sep 9, 2024
1 parent 004ac5e commit a92cc0e
Show file tree
Hide file tree
Showing 17 changed files with 609 additions and 84 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repos:
args: [ --fix ]
# Run the formatter.
- id: ruff-format
# args: [ --diff ] # Use for previewing changes
# args: [ --diff ] # Use for previewing changes
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
Expand Down
29 changes: 29 additions & 0 deletions examples/ray/ray_Hamilton_UI_tracking/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Tracking telemetry in Hamilton UI for Ray clusters

We show the ability to combine the [RayGraphAdapter](https://hamilton.dagworks.io/en/latest/reference/graph-adapters/RayGraphAdapter/) and [HamiltonTracker](https://hamilton.dagworks.io/en/latest/concepts/ui/) to run a dummy DAG.

# ray_lineage.py
Has three dummy functions:
- waiting 5s
- waiting 1s
- raising an error

That represent a basic DAG.

# run_lineage.py
Is where the driver code lives to create the DAG and exercise it.

To exercise it:
> Have an open instance of Hamilton UI: https://hamilton.dagworks.io/en/latest/concepts/ui/

```bash
python -m run_lineage.py
Usage: python -m run_lineage.py [OPTIONS] COMMAND [ARGS]...

Options:
--help Show this message and exit.

Commands:
project_id This command will select the created project in Hamilton UI
username This command will input the correct username to access the selected project_id
```
107 changes: 107 additions & 0 deletions examples/ray/ray_Hamilton_UI_tracking/hamilton_notebook.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Hamilton UI Adapter\n",
"\n",
"Needs a running instance of Hamilton UI: https://hamilton.dagworks.io/en/latest/concepts/ui/"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from hamilton_sdk.adapters import HamiltonTracker\n",
"\n",
"# Inputs required to track into correct project in the UI\n",
"project_id = 2\n",
"username = \"admin\"\n",
"\n",
"tracker_ray = HamiltonTracker(\n",
" project_id=project_id,\n",
" username=username,\n",
" dag_name=\"telemetry_with_ray\",)\n",
"\n",
"tracker_without_ray = HamiltonTracker(\n",
" project_id=project_id,\n",
" username=username,\n",
" dag_name=\"telemetry_without_ray\",\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Ray adapter\n",
"\n",
"https://hamilton.dagworks.io/en/latest/reference/graph-adapters/RayGraphAdapter/"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from hamilton import base\n",
"from hamilton.plugins.h_ray import RayGraphAdapter\n",
"\n",
"rga = RayGraphAdapter(result_builder=base.PandasDataFrameResult())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Importing Hamilton and the DAG modules"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from hamilton import driver\n",
"import ray_lineage"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" dr_ray = driver.Builder().with_modules(ray_lineage).with_adapters(rga, tracker_ray).build()\n",
" result_ray = dr_ray.execute(\n",
" final_vars=[\n",
" \"node_5s\",\n",
" \"node_1s_error\",\n",
" \"add_1_to_previous\",\n",
" ]\n",
" )\n",
" print(result_ray)\n",
"\n",
"except ValueError:\n",
" print(\"UI should display failure.\")\n",
"finally:\n",
" dr_without_ray = driver.Builder().with_modules(ray_lineage).with_adapters(tracker).build()\n",
" result_without_ray = dr_without_ray.execute(final_vars=[\"node_5s\", \"add_1_to_previous\"])\n",
" print(result_without_ray) \n"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
18 changes: 18 additions & 0 deletions examples/ray/ray_Hamilton_UI_tracking/ray_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import time


def node_5s() -> float:
start = time.time()
time.sleep(5)
return time.time() - start


def add_1_to_previous(node_5s: float) -> float:
start = time.time()
time.sleep(1)
return node_5s + (time.time() - start)


def node_1s_error(node_5s: float) -> float:
time.sleep(1)
raise ValueError("Does not break telemetry if executed through ray")
1 change: 1 addition & 0 deletions examples/ray/ray_Hamilton_UI_tracking/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sf-hamilton[ray,sdk,ui]
45 changes: 45 additions & 0 deletions examples/ray/ray_Hamilton_UI_tracking/run_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import click
import ray_lineage

from hamilton import base, driver
from hamilton.plugins.h_ray import RayGraphAdapter
from hamilton_sdk import adapters


@click.command()
@click.option("--username", required=True, type=str)
@click.option("--project_id", default=1, type=int)
def run(project_id, username):
try:
tracker_ray = adapters.HamiltonTracker(
project_id=project_id,
username=username,
dag_name="telemetry_with_ray",
)
rga = RayGraphAdapter(result_builder=base.PandasDataFrameResult())
dr_ray = driver.Builder().with_modules(ray_lineage).with_adapters(rga, tracker_ray).build()
result_ray = dr_ray.execute(
final_vars=[
"node_5s",
"node_1s_error",
"add_1_to_previous",
]
)
print(result_ray)

except ValueError:
print("UI should display failure.")
finally:
tracker = adapters.HamiltonTracker(
project_id=project_id, # modify this as needed
username=username,
dag_name="telemetry_without_ray",
)
dr_without_ray = driver.Builder().with_modules(ray_lineage).with_adapters(tracker).build()

result_without_ray = dr_without_ray.execute(final_vars=["node_5s", "add_1_to_previous"])
print(result_without_ray)


if __name__ == "__main__":
run()
6 changes: 3 additions & 3 deletions hamilton/dev_utils/deprecation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class deprecated:
@deprecate(
warn_starting=(1,10,0)
fail_starting=(2,0,0),
use_instead=parameterize_values,
reason='We have redefined the parameterization decorators to consist of `parametrize`, `parametrize_inputs`, and `parametrize_values`
use_this=parameterize_values,
explanation='We have redefined the parameterization decorators to consist of `parametrize`, `parametrize_inputs`, and `parametrize_values`
migration_guide="https://github.com/dagworks-inc/hamilton/..."
)
class parameterized(...):
Expand All @@ -66,7 +66,7 @@ class parameterized(...):
explanation: str
migration_guide: Optional[
str
] # If this is None, this means that the use_instead is a drop in replacement
] # If this is None, this means that the use_this is a drop in replacement
current_version: Union[Tuple[int, int, int], Version] = dataclasses.field(
default_factory=lambda: CURRENT_VERSION
)
Expand Down
Loading

0 comments on commit a92cc0e

Please sign in to comment.