-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[docs] [pipes] - Add Databricks integration guide (#17114)
## Summary & Motivation This PR adds a guide for integrating Dagster Pipes with Databricks. TODO/?s: - [x] Finish descriptions of `SubmitTask` spec - [x] Finish UI section - [x] Check in code examples - [x] Check on some of the `PyObjects` - may be out of sync due to changes in libraries - [x] Add info about sending data back to Dagster (Step 2) ## How I Tested These Changes eyes, bk --------- Co-authored-by: Sean Mackesey <[email protected]>
- Loading branch information
1 parent
c72d9cb
commit b2ca074
Showing
18 changed files
with
770 additions
and
257 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
6 changes: 0 additions & 6 deletions
6
docs/content/guides/dagster-pipes/integrating-databricks-with-dagster-pipes.mdx
This file was deleted.
Oops, something went wrong.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
- id: run.png | ||
workspace: examples/docs_snippets/docs_snippets/legacy/dagster_pandas_guide/workspace.yaml | ||
steps: | ||
- materialize the `databricks_asset` | ||
- go to the Run page for the launched run | ||
|
Empty file.
Empty file.
70 changes: 70 additions & 0 deletions
70
...snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_asset_client.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
# start_databricks_asset | ||
### dagster_databricks_pipes.py | ||
|
||
import os | ||
import sys | ||
|
||
from dagster_databricks import PipesDatabricksClient | ||
|
||
from dagster import AssetExecutionContext, Definitions, EnvVar, asset | ||
from databricks.sdk import WorkspaceClient | ||
from databricks.sdk.service import jobs | ||
|
||
|
||
@asset | ||
def databricks_asset( | ||
context: AssetExecutionContext, pipes_databricks: PipesDatabricksClient | ||
): | ||
task = jobs.SubmitTask.from_dict( | ||
{ | ||
# The cluster settings below are somewhat arbitrary. Dagster Pipes is | ||
# not dependent on a specific spark version, node type, or number of | ||
# workers. | ||
"new_cluster": { | ||
"spark_version": "12.2.x-scala2.12", | ||
"node_type_id": "i3.xlarge", | ||
"num_workers": 0, | ||
"cluster_log_conf": { | ||
"dbfs": {"destination": "dbfs:/cluster-logs-dir-noexist"}, | ||
}, | ||
}, | ||
"libraries": [ | ||
# Include the latest published version of dagster-pipes on PyPI | ||
# in the task environment | ||
{"pypi": {"package": "dagster-pipes"}}, | ||
], | ||
"task_key": "some-key", | ||
"spark_python_task": { | ||
"python_file": "dbfs:/my_python_script.py", # location of target code file | ||
"source": jobs.Source.WORKSPACE, | ||
}, | ||
} | ||
) | ||
|
||
print("This will be forwarded back to Dagster stdout") # noqa: T201 | ||
print("This will be forwarded back to Dagster stderr", file=sys.stderr) # noqa: T201 | ||
|
||
extras = {"some_parameter": 100} | ||
|
||
return pipes_databricks.run( | ||
task=task, | ||
context=context, | ||
extras=extras, | ||
).get_materialize_result() | ||
|
||
|
||
# end_databricks_asset | ||
|
||
# start_definitions | ||
|
||
pipes_databricks_resource = PipesDatabricksClient( | ||
client=WorkspaceClient( | ||
host=os.getenv("DATABRICKS_HOST"), | ||
token=os.getenv("DATABRICKS_TOKEN"), | ||
) | ||
) | ||
|
||
defs = Definitions( | ||
assets=[databricks_asset], resources={"pipes_databricks": pipes_databricks_resource} | ||
) | ||
# end_definitions |
72 changes: 72 additions & 0 deletions
72
...s_snippets/guides/dagster/dagster_pipes/databricks/databricks_asset_open_pipes_session.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import os | ||
import sys | ||
|
||
from dagster_databricks import PipesDbfsContextInjector, PipesDbfsMessageReader | ||
from dagster_databricks.pipes import PipesDbfsLogReader | ||
|
||
from dagster import AssetExecutionContext, asset, open_pipes_session | ||
from databricks.sdk import WorkspaceClient | ||
|
||
|
||
@asset | ||
def databricks_asset(context: AssetExecutionContext): | ||
client = WorkspaceClient( | ||
host=os.environ["DATABRICKS_HOST"], | ||
token=os.environ["DATABRICKS_TOKEN"], | ||
) | ||
|
||
# Arbitrary json-serializable data you want access to from the `PipesContext` | ||
# in the Databricks runtime. Assume `sample_rate` is a parameter used by | ||
# the target job's business logic. | ||
extras = {"sample_rate": 1.0} | ||
|
||
# Sets up Pipes communications channels | ||
with open_pipes_session( | ||
context=context, | ||
extras=extras, | ||
context_injector=PipesDbfsContextInjector(client=client), | ||
message_reader=PipesDbfsMessageReader( | ||
client=client, | ||
# These log readers are optional. If you provide them, then you must set the | ||
# `new_cluster.cluster_log_conf.dbfs.destination` field in the job you submit to a valid | ||
# DBFS path. This will configure Databricks to write stdout/stderr to the specified | ||
# location every 5 minutes. Dagster will poll this location and forward the | ||
# stdout/stderr logs every time they are updated to the orchestration process | ||
# stdout/stderr. | ||
log_readers=[ | ||
PipesDbfsLogReader( | ||
client=client, remote_log_name="stdout", target_stream=sys.stdout | ||
), | ||
PipesDbfsLogReader( | ||
client=client, remote_log_name="stderr", target_stream=sys.stderr | ||
), | ||
], | ||
), | ||
) as pipes_session: | ||
##### Option (1) | ||
# NON-STREAMING. Just pass the necessary environment variables down. | ||
# During execution, all reported materializations are buffered on the | ||
# `pipes_session`. Yield them all after Databricks execution is finished. | ||
|
||
# Dict[str, str] with environment variables containing Pipes comms info. | ||
env_vars = pipes_session.get_bootstrap_env_vars() | ||
|
||
# Some function that handles launching/monitoring of the Databricks job. | ||
# It must ensure that the `env_vars` are set on the executing cluster. | ||
custom_databricks_launch_code(env_vars) # type: ignore # noqa: F821 | ||
|
||
##### Option (2) | ||
# STREAMING. Pass `pipes_session` down. During execution, you can yield any | ||
# asset materializations that have been reported by calling ` | ||
# pipes_session.get_results()` as often as you like. `get_results` returns | ||
# an iterator that your custom code can `yield from` to forward the | ||
# results back to the materialize function. Note you will need to extract | ||
# the env vars by calling `pipes_session.get_pipes_bootstrap_env_vars()`, | ||
# and launch the Databricks job in the same way as with (1). | ||
|
||
# The function should return an `Iterator[MaterializeResult]`. | ||
yield from custom_databricks_launch_code(pipes_session) # type: ignore # noqa: F821 | ||
|
||
# With either option (1) or (2), this is required to yield any remaining | ||
# buffered results. | ||
yield from pipes_session.get_results() |
35 changes: 35 additions & 0 deletions
35
.../docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_script.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
### dbfs:/my_python_script.py | ||
|
||
# `dagster_pipes` must be available in the databricks python environment | ||
from dagster_pipes import ( | ||
PipesDbfsContextLoader, | ||
PipesDbfsMessageWriter, | ||
open_dagster_pipes, | ||
) | ||
|
||
# Sets up communication channels and downloads the context data sent from Dagster. | ||
# Note that while other `context_loader` and `message_writer` settings are | ||
# possible, it is recommended to use `PipesDbfsContextLoader` and | ||
# `PipesDbfsMessageWriter` for Databricks. | ||
with open_dagster_pipes( | ||
context_loader=PipesDbfsContextLoader(), | ||
message_writer=PipesDbfsMessageWriter(), | ||
) as pipes: | ||
# Access the `extras` dict passed when launching the job from Dagster. | ||
some_parameter_value = pipes.get_extra("some_parameter") | ||
|
||
# Stream log message back to Dagster | ||
pipes.log.info(f"Using some_parameter value: {some_parameter_value}") | ||
|
||
# ... your code that computes and persists the asset | ||
|
||
# Stream asset materialization metadata and data version back to Dagster. | ||
# This should be called after you've computed and stored the asset value. We | ||
# omit the asset key here because there is only one asset in scope, but for | ||
# multi-assets you can pass an `asset_key` parameter. | ||
pipes.report_asset_materialization( | ||
metadata={ | ||
"some_metric": {"raw_value": some_parameter_value + 1, "type": "int"} | ||
}, | ||
data_version="alpha", | ||
) |
19 changes: 19 additions & 0 deletions
19
...ppets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_script_existing.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from dagster_pipes import ( | ||
PipesDbfsContextLoader, | ||
PipesDbfsMessageWriter, | ||
open_dagster_pipes, | ||
) | ||
|
||
# ... existing code | ||
|
||
if __name__ == "__main__": | ||
with open_dagster_pipes( | ||
context_loader=PipesDbfsContextLoader(), | ||
message_writer=PipesDbfsMessageWriter(), | ||
) as pipes: | ||
# ... existing logic | ||
pipes.report_asset_materialization( | ||
asset_key="foo", | ||
metadata={"some_key": "some_value"}, | ||
data_version="alpha", | ||
) |
57 changes: 57 additions & 0 deletions
57
...les/docs_snippets/docs_snippets_tests/guides_tests/dagster_pipes_tests/test_databricks.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import importlib.util | ||
import os | ||
import re | ||
|
||
import pytest | ||
from dagster_databricks._test_utils import ( | ||
databricks_client, | ||
temp_dbfs_script, | ||
upload_dagster_pipes_whl, | ||
) | ||
|
||
IS_BUILDKITE = os.getenv("BUILDKITE") is not None | ||
|
||
# If we even try to import the sample code in an environment without Databricks credentials (BK), | ||
# we'll get an error. | ||
if not IS_BUILDKITE: | ||
from dagster._core.definitions.events import AssetKey | ||
from docs_snippets.guides.dagster.dagster_pipes.databricks.databricks_asset_client import ( | ||
databricks_asset, | ||
defs as databricks_asset_defs, | ||
) | ||
|
||
def _get_databricks_script_path(): | ||
db_script_spec = importlib.util.find_spec( | ||
"docs_snippets.guides.dagster.dagster_pipes.databricks.databricks_script" | ||
) | ||
assert db_script_spec and db_script_spec.origin | ||
return db_script_spec.origin | ||
|
||
def test_databricks_asset(databricks_client, capsys): | ||
script_file = _get_databricks_script_path() | ||
# with upload_dagster_pipes_whl(databricks_client) as dagster_pipes_whl_path: | ||
with temp_dbfs_script( | ||
databricks_client, | ||
script_file=script_file, | ||
dbfs_path="dbfs:/my_python_script.py", | ||
) as script_file: | ||
job_def = databricks_asset_defs.get_implicit_job_def_for_assets( | ||
[AssetKey("databricks_asset")], | ||
) | ||
assert job_def | ||
result = job_def.execute_in_process() | ||
assert result.success | ||
|
||
mats = result.asset_materializations_for_node(databricks_asset.op.name) | ||
assert mats[0].metadata["some_metric"].value == 101 | ||
captured = capsys.readouterr() | ||
assert re.search( | ||
r"This will be forwarded back to Dagster stdout\n", | ||
captured.out, | ||
re.MULTILINE, | ||
) | ||
assert re.search( | ||
r"This will be forwarded back to Dagster stderr\n", | ||
captured.err, | ||
re.MULTILINE, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
b2ca074
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deploy preview for dagster-docs ready!
✅ Preview
https://dagster-docs-g9ifpl2x0-elementl.vercel.app
https://master.dagster.dagster-docs.io
Built with commit b2ca074.
This pull request is being automatically deployed with vercel-action