Skip to content

Commit

Permalink
new commit
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Oct 27, 2023
1 parent fb94fc5 commit 72e608e
Show file tree
Hide file tree
Showing 18 changed files with 743 additions and 557 deletions.
5 changes: 5 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,11 @@
{
"title": "Details and customization",
"path": "/guides/dagster-pipes/dagster-pipes-details-and-customization"
},
{
"title": "Integrating Databricks with Dagster Pipes",
"path": "/guides/dagster-pipes/databricks"

}
]
},
Expand Down
368 changes: 368 additions & 0 deletions docs/content/guides/dagster-pipes/databricks.mdx

Large diffs are not rendered by default.

This file was deleted.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions docs/screenshots/guides/dagster-pipes/databricks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- id: run.png
workspace: examples/docs_snippets/docs_snippets/legacy/dagster_pandas_guide/workspace.yaml
steps:
- materialize the `databricks_asset`
- go to the Run page for the launched run

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# start_databricks_asset
### dagster_databricks_pipes.py

import os
import sys

from dagster_databricks import PipesDatabricksClient

from dagster import AssetExecutionContext, Definitions, EnvVar, asset
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs


@asset
def databricks_asset(
context: AssetExecutionContext, pipes_databricks: PipesDatabricksClient
):
task = jobs.SubmitTask.from_dict(
{
# The cluster settings below are somewhat arbitrary. Dagster Pipes is
# not dependent on a specific spark version, node type, or number of
# workers.
"new_cluster": {
"spark_version": "12.2.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0,
"cluster_log_conf": {
"dbfs": {"destination": "dbfs:/cluster-logs-dir-noexist"},
},
},
"libraries": [
# Include the latest published version of dagster-pipes on PyPI
# in the task environment
{"pypi": {"package": "dagster-pipes"}},
],
"task_key": "some-key",
"spark_python_task": {
"python_file": "dbfs:/my_python_script.py", # location of target code file
"source": jobs.Source.WORKSPACE,
},
}
)

print("This will be forwarded back to Dagster stdout") # noqa: T201
print("This will be forwarded back to Dagster stderr", file=sys.stderr) # noqa: T201

extras = {"some_parameter": 100}

return pipes_databricks.run(
task=task,
context=context,
extras=extras,
).get_materialize_result()


# end_databricks_asset

# start_definitions

pipes_databricks_resource = PipesDatabricksClient(
client=WorkspaceClient(
host=os.getenv["DATABRICKS_HOST"], # type: ignore
token=os.getenv["DATABRICKS_TOKEN"], # type: ignore
)
)

defs = Definitions(
assets=[databricks_asset], resources={"pipes_databricks": pipes_databricks_resource}
)
# end_definitions
Loading

0 comments on commit 72e608e

Please sign in to comment.