diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index 1b7df48002069..5e32de027d0d6 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -876,6 +876,11 @@ } ] }, + { + "title": "Dagster Pipes + Databricks", + "path": "/guides/dagster-pipes/databricks" + + }, { "title": "Details and customization", "path": "/guides/dagster-pipes/dagster-pipes-details-and-customization" diff --git a/docs/content/guides/dagster-pipes/databricks.mdx b/docs/content/guides/dagster-pipes/databricks.mdx new file mode 100644 index 0000000000000..9028cb3de394a --- /dev/null +++ b/docs/content/guides/dagster-pipes/databricks.mdx @@ -0,0 +1,391 @@ +--- +title: "Integrating Databricks with Dagster Pipes | Dagster Docs" +description: "Learn to integrate Dagster Pipes with Databricks to launch external code from Dagster assets." +--- + +# Integrating Databricks with Dagster Pipes + +In this guide, we’ll show you how to use [Dagster Pipes](/guides/dagster-pipes) with Dagster’s Databricks integration to launch Databricks jobs. + +Pipes allows your Databricks jobs to stream logs (including `stdout` and `stderr` of the driver process) and events back to Dagster. This does not require a full Dagster environment on Databricks; instead: + +1. The Databricks environment needs to include [`dagster-pipes`](https://pypi.org/project/dagster-pipes), a single-file Python package with no dependencies that can be installed from PyPI or easily vendored, and +2. Databricks jobs must be launched from Dagster + +--- + +## Prerequisites + +To use Dagster Pipes with Databricks: + +- **In the orchestration environment**, you'll need to install the following packages: + + ```shell + pip install dagster dagster-webserver dagster-databricks + ``` + + Refer to the [Dagster installation guide](/getting-started/install) for more info. + +- **In Databricks**, you'll need: + + - **A Databricks workspace**. If you don’t have this, follow the [Databricks quickstart](https://docs.databricks.com/workflows/jobs/jobs-quickstart.html) to set one up. + - **The following information about your Databricks workspace**: + + - `host` - The host URL of your Databricks workspace, ex: `https://dbc-xxxxxxx-yyyy.cloud.databricks.com/` + - `token` - A personal access token for the Databricks workspace. Refer to the Databricks API authentication documentation for more info about retrieving these values. + + You should set and export the Databricks host and token environment variables in your shell session: + + ```shell + export DATABRICKS_HOST= + export DATABRICKS_TOKEN + ``` + +--- + +## Step 1: Create an asset computed in Databricks + +In this step, you’ll create a Dagster asset that, when materialized, opens a Dagster pipes session and launches a Databricks job. + +### Step 1.1: Define the Dagster asset + +In your Dagster project, create a file named `dagster_databricks_pipes.py` and paste in the following code: + +```python file=/guides/dagster/dagster_pipes/databricks/databricks_asset_client.py startafter=start_databricks_asset endbefore=end_databricks_asset +### dagster_databricks_pipes.py + +import os +import sys + +from dagster_databricks import PipesDatabricksClient + +from dagster import AssetExecutionContext, Definitions, EnvVar, asset +from databricks.sdk import WorkspaceClient +from databricks.sdk.service import jobs + + +@asset +def databricks_asset( + context: AssetExecutionContext, pipes_databricks: PipesDatabricksClient +): + task = jobs.SubmitTask.from_dict( + { + # The cluster settings below are somewhat arbitrary. Dagster Pipes is + # not dependent on a specific spark version, node type, or number of + # workers. + "new_cluster": { + "spark_version": "12.2.x-scala2.12", + "node_type_id": "i3.xlarge", + "num_workers": 0, + "cluster_log_conf": { + "dbfs": {"destination": "dbfs:/cluster-logs-dir-noexist"}, + }, + }, + "libraries": [ + # Include the latest published version of dagster-pipes on PyPI + # in the task environment + {"pypi": {"package": "dagster-pipes"}}, + ], + "task_key": "some-key", + "spark_python_task": { + "python_file": "dbfs:/my_python_script.py", # location of target code file + "source": jobs.Source.WORKSPACE, + }, + } + ) + + print("This will be forwarded back to Dagster stdout") # noqa: T201 + print("This will be forwarded back to Dagster stderr", file=sys.stderr) # noqa: T201 + + extras = {"some_parameter": 100} + + return pipes_databricks.run( + task=task, + context=context, + extras=extras, + ).get_materialize_result() +``` + +Let's review what's happening in this code: + +- **Includes a number of imports from Dagster and the Databricks SDK.** There are a few that aren't used in this code block, but will be later in this guide. + +- **Creates an asset named `databricks_asset`.** We also: + + - Provided as the `context` argument to the asset. This object provides access to system APIs such as resources, config, and logging. We’ll come back to this a bit later in this section. + - Specified a resource for the asset to use. We’ll also come back to this later. + +- **Defines a Databricks `SubmitTask` object in the asset body.** Coverage of all the fields on this object is beyond the scope of this guide, but you can find further information in the [Databricks SDK API docs](https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs.html) and [source code](https://github.com/databricks/databricks-sdk-py/blob/main/databricks/sdk/service/jobs.py) for the `SubmitTask` object. + + The submitted task must: + + - **Specify `dagster-pipes` as a PyPI dependency**. You can include a version pin (e.g. `dagster-pipes==1.5.4`) if desired. + - **Use `new_cluster` as opposed to an existing cluster**. This is because environment variables are injected into the specification used to create the cluster. + - Use a `spark_python_task`. + - **Optionally include `new_cluster.cluster_log_conf.dbfs`**. If set, the will automatically set up objects for `stdout` and `stderr` of the driver node. These will periodically forward the `stdout` and `stderr` logs written by Databricks back to Dagster. **Note**: Because Databricks only updates these log files every five minutes, that is the maximum frequency at which Dagster can forward the logs. + +- **Defines an `extras` dictionary containing some arbitrary data (`some_parameter`).** This is where you can put various data, e.g. from the Dagster run config, that you want to be available in Databricks. Anything added here must be JSON-serializable. + +- **Passes the `SubmitTask` object, `AssetExecutionContext`, and `extras` dictionary to the `run` method of **. This method synchronously executes the Databricks job specified by the `SubmitTask` object. It slightly modifies the object by injecting some environment variables under `new_cluster.spark_env_vars` before submitting the object to the Databricks API. + +- **Returns a object representing the result of execution**. This is obtained by calling `get_materialize_result` on the object returned by `run` after the Databricks job has finished. **Note**: Execution can take several minutes even for trivial scripts due to Databricks cluster provisioning times. + +### Step 1.2: Define the Databricks Pipes client and definitions + +The [`dagster-databricks`](/\_apidocs/libraries/dagster-databricks) library provides a , which is a pre-built Dagster resource that allows you to quickly get Pipes working with your Databricks workspace. + +Add the following to the bottom of `dagster_databricks_pipes.py` to define the resource and a object that binds it to the `databricks_asset`: + +```python file=/guides/dagster/dagster_pipes/databricks/databricks_asset_client.py startafter=start_definitions endbefore=end_definitions +pipes_databricks_resource = PipesDatabricksClient( + client=WorkspaceClient( + host=os.getenv("DATABRICKS_HOST"), + token=os.getenv("DATABRICKS_TOKEN"), + ) +) + +defs = Definitions( + assets=[databricks_asset], resources={"pipes_databricks": pipes_databricks_resource} +) +``` + +--- + +## Step 2: Write a script for execution on Databricks + +The next step is to write the code that will be executed on Databricks. In the Databricks task specification in [Step 1.1](#step-11-define-the-dagster-asset), we referenced a file `dbfs:/my_python_script.py` in the `spark_python_task`: + +```python +"spark_python_task": { + "python_file": "dbfs:/my_python_script.py", # location of target code file + "source": jobs.Source.WORKSPACE, +} +``` + +We'll create this script from scratch and upload it to DBFS. You can use the Databricks UI or run a command from a shell to do this. To use the shell method, run: + +```shell +dbfs cp my_python_script.py dbfs:/my_python_script.py +``` + +Let's look at the script itself: + +```python file=/guides/dagster/dagster_pipes/databricks/databricks_script.py +### dbfs:/my_python_script.py + +# `dagster_pipes` must be available in the databricks python environment +from dagster_pipes import ( + PipesDbfsContextLoader, + PipesDbfsMessageWriter, + open_dagster_pipes, +) + +# Sets up communication channels and downloads the context data sent from Dagster. +# Note that while other `context_loader` and `message_writer` settings are +# possible, it is recommended to use `PipesDbfsContextLoader` and +# `PipesDbfsMessageWriter` for Databricks. +with open_dagster_pipes( + context_loader=PipesDbfsContextLoader(), + message_writer=PipesDbfsMessageWriter(), +) as pipes: + # Access the `extras` dict passed when launching the job from Dagster. + some_parameter_value = pipes.get_extra("some_parameter") + + # Stream log message back to Dagster + pipes.log.info(f"Using some_parameter value: {some_parameter_value}") + + # ... your code that computes and persists the asset + + # Stream asset materialization metadata and data version back to Dagster. + # This should be called after you've computed and stored the asset value. We + # omit the asset key here because there is only one asset in scope, but for + # multi-assets you can pass an `asset_key` parameter. + pipes.report_asset_materialization( + metadata={ + "some_metric": {"raw_value": some_parameter_value + 1, "type": "int"} + }, + data_version="alpha", + ) +``` + +Before we go any futher, let's review what this script does: + +- **Imports `PipesDbfsContextLoader`, `PipesDbfsMessageWriter`, and `open_dagster_pipes` from `dagster_pipes`.** The and are DBFS-specific implementations of the and . Refer to the [Dagster Pipes details and customization Guide](/guides/dagster-pipes/dagster-pipes-details-and-customization) for protocol details. + + Both objects write temporary files on DBFS for communication between the orchestration and external process. The and match a corresponding `PipesDbfsContextInjector` and `PipesDbfsMessageReader` on the orchestration end, which are instantiated inside the . + +- **Passes the context loader and message writer to the context manager**, which yields an instance of called `pipes`. + + Inside the body of the context manager are various calls against `pipes` to retrieve an extra, log, and report an asset materialization. All of these calls will use the DBFS temporary file-based communications channels established by and . To see the full range of what you can do with the , see the API docs or the general [Pipes guide](/guides/dagster-pipes). + +At this point you can execute the rest of your Databricks code as normal, invoking various APIs as needed. + +#### Existing codebases + +For illustrative purposes, we've created a Python script from scratch. However, you may want to apply Pipes to an existing codebase. + +One approach that can be useful is to wrap the context manager around an existing `main` function or entry point. You can either pass the down through your business logic, or simply report an asset materialization after your business logic is done: + +```python file=/guides/dagster/dagster_pipes/databricks/databricks_script_existing.py +from dagster_pipes import ( + PipesDbfsContextLoader, + PipesDbfsMessageWriter, + open_dagster_pipes, +) + +# ... existing code + +if __name__ == "__main__": + with open_dagster_pipes( + context_loader=PipesDbfsContextLoader(), + message_writer=PipesDbfsMessageWriter(), + ) as pipes: + # ... existing logic + pipes.report_asset_materialization( + asset_key="foo", + metadata={"some_key": "some_value"}, + data_version="alpha", + ) +``` + +--- + +## Step 3: Run the Databricks job from the Dagster UI + +In this step, you’ll run the Databricks job you created in [Step 1.2](#step-12-define-the-databricks-pipes-client-and-definitions) from the Dagster UI. + +1. In a new command line session, run the following to start the UI: + + ```shell + dagster dev -f dagster_databricks_pipes.py + ``` + +2. Navigate to [localhost:3000](http://localhost:3000/), where you should see the UI: + + Databricks asset + +3. Click **Materialize** near the top right corner of the page, then click **View** on the **Launched Run** popup. Wait for the run to complete, and the event log should look like this: + + Event log for Databricks run + +--- + +## Advanced: Customization using open_pipes_session + +The is a high-level API that doesn't cover all use cases. If you have existing code to launch/poll the job you do not want to change, you want to stream back materializations as they occur, or you just want more control than is permitted by , you can use instead of . + +To use : + +1. Your Databricks job be launched within the scope of the context manager; and +2. Your job is launched on a cluster containing the environment variables available on the yielded `pipes_session` + +While your Databricks code is running, any calls to `report_asset_materialization` in the external script are streamed back to Dagster, causing a `MaterializationResult` object to be buffered on the `pipes_session`. You can either: + +- Leave these objects buffered until execution is complete (**Option 1** in below example code), or +- Stream them to Dagster machinery during execution by calling `yield pipes_session.get_results()` (**Option 2**) + +With either option, once the block closes, you must call `yield pipes_session.get_results()` to yield any remaining buffered results, since we cannot guarantee that all communications from Databricks have been processed until the `open_pipes_session` block closes. + +```python file=/guides/dagster/dagster_pipes/databricks/databricks_asset_open_pipes_session.py +import os +import sys + +from dagster_databricks import PipesDbfsContextInjector, PipesDbfsMessageReader +from dagster_databricks.pipes import PipesDbfsLogReader + +from dagster import AssetExecutionContext, asset, open_pipes_session +from databricks.sdk import WorkspaceClient + + +@asset +def databricks_asset(context: AssetExecutionContext): + client = WorkspaceClient( + host=os.environ["DATABRICKS_HOST"], + token=os.environ["DATABRICKS_TOKEN"], + ) + + # Arbitrary json-serializable data you want access to from the `PipesContext` + # in the Databricks runtime. Assume `sample_rate` is a parameter used by + # the target job's business logic. + extras = {"sample_rate": 1.0} + + # Sets up Pipes communications channels + with open_pipes_session( + context=context, + extras=extras, + context_injector=PipesDbfsContextInjector(client=client), + message_reader=PipesDbfsMessageReader( + client=client, + # These log readers are optional. If you provide them, then you must set the + # `new_cluster.cluster_log_conf.dbfs.destination` field in the job you submit to a valid + # DBFS path. This will configure Databricks to write stdout/stderr to the specified + # location every 5 minutes. Dagster will poll this location and forward the + # stdout/stderr logs every time they are updated to the orchestration process + # stdout/stderr. + log_readers=[ + PipesDbfsLogReader( + client=client, remote_log_name="stdout", target_stream=sys.stdout + ), + PipesDbfsLogReader( + client=client, remote_log_name="stderr", target_stream=sys.stderr + ), + ], + ), + ) as pipes_session: + ##### Option (1) + # NON-STREAMING. Just pass the necessary environment variables down. + # During execution, all reported materializations are buffered on the + # `pipes_session`. Yield them all after Databricks execution is finished. + + # Dict[str, str] with environment variables containing Pipes comms info. + env_vars = pipes_session.get_bootstrap_env_vars() + + # Some function that handles launching/monitoring of the Databricks job. + # It must ensure that the `env_vars` are set on the executing cluster. + custom_databricks_launch_code(env_vars) + + ##### Option (2) + # STREAMING. Pass `pipes_session` down. During execution, you can yield any + # asset materializations that have been reported by calling ` + # pipes_session.get_results()` as often as you like. `get_results` returns + # an iterator that your custom code can `yield from` to forward the + # results back to the materialize function. Note you will need to extract + # the env vars by calling `pipes_session.get_pipes_bootstrap_env_vars()`, + # and launch the Databricks job in the same way as with (1). + + # The function should return an `Iterator[MaterializeResult]`. + yield from custom_databricks_launch_code(pipes_session) + + # With either option (1) or (2), this is required to yield any remaining + # buffered results. + yield from pipes_session.get_results() +``` + +--- + +## Related + + + + + + diff --git a/docs/content/guides/dagster-pipes/integrating-databricks-with-dagster-pipes.mdx b/docs/content/guides/dagster-pipes/integrating-databricks-with-dagster-pipes.mdx deleted file mode 100644 index 2128f8baaf8a1..0000000000000 --- a/docs/content/guides/dagster-pipes/integrating-databricks-with-dagster-pipes.mdx +++ /dev/null @@ -1,6 +0,0 @@ ---- -title: "Integrating Databricks with Dagster Pipes | Dagster Docs" -description: "" ---- - -# Integrating Databricks with Dagster Pipes diff --git a/docs/next/public/images/guides/dagster-pipes/databricks/asset.png b/docs/next/public/images/guides/dagster-pipes/databricks/asset.png new file mode 100644 index 0000000000000..189780996948e Binary files /dev/null and b/docs/next/public/images/guides/dagster-pipes/databricks/asset.png differ diff --git a/docs/next/public/images/guides/dagster-pipes/databricks/run.png b/docs/next/public/images/guides/dagster-pipes/databricks/run.png new file mode 100644 index 0000000000000..0e3bfe8e53f51 Binary files /dev/null and b/docs/next/public/images/guides/dagster-pipes/databricks/run.png differ diff --git a/docs/screenshots/guides/dagster-pipes/databricks.yaml b/docs/screenshots/guides/dagster-pipes/databricks.yaml new file mode 100644 index 0000000000000..82914a17dbed1 --- /dev/null +++ b/docs/screenshots/guides/dagster-pipes/databricks.yaml @@ -0,0 +1,6 @@ +- id: run.png + workspace: examples/docs_snippets/docs_snippets/legacy/dagster_pandas_guide/workspace.yaml + steps: + - materialize the `databricks_asset` + - go to the Run page for the launched run + diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/dagster_pipes_details_and_customization/__init__.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/dagster_pipes_details_and_customization/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/__init__.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_asset_client.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_asset_client.py new file mode 100644 index 0000000000000..c844cc17fbc5e --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_asset_client.py @@ -0,0 +1,70 @@ +# start_databricks_asset +### dagster_databricks_pipes.py + +import os +import sys + +from dagster_databricks import PipesDatabricksClient + +from dagster import AssetExecutionContext, Definitions, EnvVar, asset +from databricks.sdk import WorkspaceClient +from databricks.sdk.service import jobs + + +@asset +def databricks_asset( + context: AssetExecutionContext, pipes_databricks: PipesDatabricksClient +): + task = jobs.SubmitTask.from_dict( + { + # The cluster settings below are somewhat arbitrary. Dagster Pipes is + # not dependent on a specific spark version, node type, or number of + # workers. + "new_cluster": { + "spark_version": "12.2.x-scala2.12", + "node_type_id": "i3.xlarge", + "num_workers": 0, + "cluster_log_conf": { + "dbfs": {"destination": "dbfs:/cluster-logs-dir-noexist"}, + }, + }, + "libraries": [ + # Include the latest published version of dagster-pipes on PyPI + # in the task environment + {"pypi": {"package": "dagster-pipes"}}, + ], + "task_key": "some-key", + "spark_python_task": { + "python_file": "dbfs:/my_python_script.py", # location of target code file + "source": jobs.Source.WORKSPACE, + }, + } + ) + + print("This will be forwarded back to Dagster stdout") # noqa: T201 + print("This will be forwarded back to Dagster stderr", file=sys.stderr) # noqa: T201 + + extras = {"some_parameter": 100} + + return pipes_databricks.run( + task=task, + context=context, + extras=extras, + ).get_materialize_result() + + +# end_databricks_asset + +# start_definitions + +pipes_databricks_resource = PipesDatabricksClient( + client=WorkspaceClient( + host=os.getenv("DATABRICKS_HOST"), + token=os.getenv("DATABRICKS_TOKEN"), + ) +) + +defs = Definitions( + assets=[databricks_asset], resources={"pipes_databricks": pipes_databricks_resource} +) +# end_definitions diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_asset_open_pipes_session.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_asset_open_pipes_session.py new file mode 100644 index 0000000000000..5973f908d9b8e --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_asset_open_pipes_session.py @@ -0,0 +1,72 @@ +import os +import sys + +from dagster_databricks import PipesDbfsContextInjector, PipesDbfsMessageReader +from dagster_databricks.pipes import PipesDbfsLogReader + +from dagster import AssetExecutionContext, asset, open_pipes_session +from databricks.sdk import WorkspaceClient + + +@asset +def databricks_asset(context: AssetExecutionContext): + client = WorkspaceClient( + host=os.environ["DATABRICKS_HOST"], + token=os.environ["DATABRICKS_TOKEN"], + ) + + # Arbitrary json-serializable data you want access to from the `PipesContext` + # in the Databricks runtime. Assume `sample_rate` is a parameter used by + # the target job's business logic. + extras = {"sample_rate": 1.0} + + # Sets up Pipes communications channels + with open_pipes_session( + context=context, + extras=extras, + context_injector=PipesDbfsContextInjector(client=client), + message_reader=PipesDbfsMessageReader( + client=client, + # These log readers are optional. If you provide them, then you must set the + # `new_cluster.cluster_log_conf.dbfs.destination` field in the job you submit to a valid + # DBFS path. This will configure Databricks to write stdout/stderr to the specified + # location every 5 minutes. Dagster will poll this location and forward the + # stdout/stderr logs every time they are updated to the orchestration process + # stdout/stderr. + log_readers=[ + PipesDbfsLogReader( + client=client, remote_log_name="stdout", target_stream=sys.stdout + ), + PipesDbfsLogReader( + client=client, remote_log_name="stderr", target_stream=sys.stderr + ), + ], + ), + ) as pipes_session: + ##### Option (1) + # NON-STREAMING. Just pass the necessary environment variables down. + # During execution, all reported materializations are buffered on the + # `pipes_session`. Yield them all after Databricks execution is finished. + + # Dict[str, str] with environment variables containing Pipes comms info. + env_vars = pipes_session.get_bootstrap_env_vars() + + # Some function that handles launching/monitoring of the Databricks job. + # It must ensure that the `env_vars` are set on the executing cluster. + custom_databricks_launch_code(env_vars) # type: ignore # noqa: F821 + + ##### Option (2) + # STREAMING. Pass `pipes_session` down. During execution, you can yield any + # asset materializations that have been reported by calling ` + # pipes_session.get_results()` as often as you like. `get_results` returns + # an iterator that your custom code can `yield from` to forward the + # results back to the materialize function. Note you will need to extract + # the env vars by calling `pipes_session.get_pipes_bootstrap_env_vars()`, + # and launch the Databricks job in the same way as with (1). + + # The function should return an `Iterator[MaterializeResult]`. + yield from custom_databricks_launch_code(pipes_session) # type: ignore # noqa: F821 + + # With either option (1) or (2), this is required to yield any remaining + # buffered results. + yield from pipes_session.get_results() diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_script.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_script.py new file mode 100644 index 0000000000000..64ff7ba3fcef2 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_script.py @@ -0,0 +1,35 @@ +### dbfs:/my_python_script.py + +# `dagster_pipes` must be available in the databricks python environment +from dagster_pipes import ( + PipesDbfsContextLoader, + PipesDbfsMessageWriter, + open_dagster_pipes, +) + +# Sets up communication channels and downloads the context data sent from Dagster. +# Note that while other `context_loader` and `message_writer` settings are +# possible, it is recommended to use `PipesDbfsContextLoader` and +# `PipesDbfsMessageWriter` for Databricks. +with open_dagster_pipes( + context_loader=PipesDbfsContextLoader(), + message_writer=PipesDbfsMessageWriter(), +) as pipes: + # Access the `extras` dict passed when launching the job from Dagster. + some_parameter_value = pipes.get_extra("some_parameter") + + # Stream log message back to Dagster + pipes.log.info(f"Using some_parameter value: {some_parameter_value}") + + # ... your code that computes and persists the asset + + # Stream asset materialization metadata and data version back to Dagster. + # This should be called after you've computed and stored the asset value. We + # omit the asset key here because there is only one asset in scope, but for + # multi-assets you can pass an `asset_key` parameter. + pipes.report_asset_materialization( + metadata={ + "some_metric": {"raw_value": some_parameter_value + 1, "type": "int"} + }, + data_version="alpha", + ) diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_script_existing.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_script_existing.py new file mode 100644 index 0000000000000..b485729aa10b3 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/databricks/databricks_script_existing.py @@ -0,0 +1,19 @@ +from dagster_pipes import ( + PipesDbfsContextLoader, + PipesDbfsMessageWriter, + open_dagster_pipes, +) + +# ... existing code + +if __name__ == "__main__": + with open_dagster_pipes( + context_loader=PipesDbfsContextLoader(), + message_writer=PipesDbfsMessageWriter(), + ) as pipes: + # ... existing logic + pipes.report_asset_materialization( + asset_key="foo", + metadata={"some_key": "some_value"}, + data_version="alpha", + ) diff --git a/examples/docs_snippets/docs_snippets_tests/guides_tests/dagster_pipes_tests/test_databricks.py b/examples/docs_snippets/docs_snippets_tests/guides_tests/dagster_pipes_tests/test_databricks.py new file mode 100644 index 0000000000000..3cba596caca35 --- /dev/null +++ b/examples/docs_snippets/docs_snippets_tests/guides_tests/dagster_pipes_tests/test_databricks.py @@ -0,0 +1,57 @@ +import importlib.util +import os +import re + +import pytest +from dagster_databricks._test_utils import ( + databricks_client, + temp_dbfs_script, + upload_dagster_pipes_whl, +) + +IS_BUILDKITE = os.getenv("BUILDKITE") is not None + +# If we even try to import the sample code in an environment without Databricks credentials (BK), +# we'll get an error. +if not IS_BUILDKITE: + from dagster._core.definitions.events import AssetKey + from docs_snippets.guides.dagster.dagster_pipes.databricks.databricks_asset_client import ( + databricks_asset, + defs as databricks_asset_defs, + ) + + def _get_databricks_script_path(): + db_script_spec = importlib.util.find_spec( + "docs_snippets.guides.dagster.dagster_pipes.databricks.databricks_script" + ) + assert db_script_spec and db_script_spec.origin + return db_script_spec.origin + + def test_databricks_asset(databricks_client, capsys): + script_file = _get_databricks_script_path() + # with upload_dagster_pipes_whl(databricks_client) as dagster_pipes_whl_path: + with temp_dbfs_script( + databricks_client, + script_file=script_file, + dbfs_path="dbfs:/my_python_script.py", + ) as script_file: + job_def = databricks_asset_defs.get_implicit_job_def_for_assets( + [AssetKey("databricks_asset")], + ) + assert job_def + result = job_def.execute_in_process() + assert result.success + + mats = result.asset_materializations_for_node(databricks_asset.op.name) + assert mats[0].metadata["some_metric"].value == 101 + captured = capsys.readouterr() + assert re.search( + r"This will be forwarded back to Dagster stdout\n", + captured.out, + re.MULTILINE, + ) + assert re.search( + r"This will be forwarded back to Dagster stderr\n", + captured.err, + re.MULTILINE, + ) diff --git a/examples/docs_snippets/setup.py b/examples/docs_snippets/setup.py index 468b39155a8c7..278a134d46d44 100755 --- a/examples/docs_snippets/setup.py +++ b/examples/docs_snippets/setup.py @@ -24,6 +24,7 @@ "dagster-celery", "dagster-dbt", "dagster-dask", + "dagster-databricks", "dagster-duckdb", "dagster-duckdb-pandas", "dagster-embedded-elt", diff --git a/examples/docs_snippets/tox.ini b/examples/docs_snippets/tox.ini index 0a8256c0f34b5..32204cb67bbdf 100644 --- a/examples/docs_snippets/tox.ini +++ b/examples/docs_snippets/tox.ini @@ -14,6 +14,7 @@ deps = -e ../../python_modules/libraries/dagster-airflow -e ../../python_modules/libraries/dagster-aws -e ../../python_modules/libraries/dagster-celery + -e ../../python_modules/libraries/dagster-databricks -e ../../python_modules/libraries/dagster-dbt -e ../../python_modules/libraries/dagster-dask -e ../../python_modules/libraries/dagster-duckdb @@ -24,9 +25,11 @@ deps = -e ../../python_modules/libraries/dagster-k8s -e ../../python_modules/libraries/dagster-pandas -e ../../python_modules/libraries/dagster-postgres + -e ../../python_modules/libraries/dagster-pyspark -e ../../python_modules/libraries/dagster-slack -e ../../python_modules/libraries/dagster-gcp-pandas -e ../../python_modules/libraries/dagster-gcp-pyspark + -e ../../python_modules/libraries/dagster-spark -e ../../python_modules/libraries/dagster-snowflake -e ../../python_modules/libraries/dagster-snowflake-pandas -e ../../python_modules/libraries/dagster-snowflake-pyspark diff --git a/python_modules/libraries/dagster-databricks/README.md b/python_modules/libraries/dagster-databricks/README.md index 4952cdf8c3180..cda34ec50215f 100644 --- a/python_modules/libraries/dagster-databricks/README.md +++ b/python_modules/libraries/dagster-databricks/README.md @@ -3,185 +3,5 @@ The docs for `dagster-databricks` can be found [here](https://docs.dagster.io/_apidocs/libraries/dagster-databricks). -## Pipes example - -This package includes a prototype API for launching Databricks jobs with -Dagster's Pipes protocol. There are two ways to use the API: - -### (1) `PipesDatabricksClient` resource - -The `PipesDatabricksClient` resource provides a high-level API for launching -Databricks jobs using Dagster's Pipes protocol. - -`PipesDatabricksClient.run` takes a single -`databricks.sdk.service.jobs.SubmitTask` specification. After setting up Pipes -communications channels (which by default use DBFS), it injects the information -needed to connect to these channels from Databricks into the task -specification. It then launches a Databricks job by passing the specification -to `WorkspaceClient.jobs.submit`. It synchronously executes the job and returns -a `PipesClientCompletedInvocation` object that exposes a `get_results` method. -The output of `get_results` is a tuple of `PipesExecutionResult` objects that -you can return from the asset compute function. - - -``` -import os -from dagster import AssetExecutionContext, Definitions, asset -from dagster_databricks import PipesDatabricksClient -from databricks.sdk import WorkspaceClient -from databricks.sdk.service import jobs - -@asset -def databricks_asset(context: AssetExecutionContext, pipes_client: PipesDatabricksClient): - - # task specification will be passed to Databricks as-is, except for the - # injection of environment variables - task = jobs.SubmitTask.from_dict({ - "new_cluster": { ... }, - "libraries": [ - # must include dagster-pipes - {"pypi": {"package": "dagster-pipes"}}, - ], - "task_key": "some-key", - "spark_python_task": { - "python_file": "dbfs:/myscript.py", - "source": jobs.Source.WORKSPACE, - } - }) - - # Arbitrary json-serializable data you want access to from the `PipesSession` - # in the Databricks runtime. Assume `sample_rate` is a parameter used by - # the target job's business logic. - extras = {"sample_rate": 1.0} - - # synchronously execute the databricks job - return pipes_client.run( - task=task, - context=context, - extras=extras, - ).get_results() - -client = WorkspaceClient( - host=os.environ["DATABRICKS_HOST"], - token=os.environ["DATABRICKS_TOKEN"], -) - -defs = Definitions( - assets=[databricks_asset], - resources = {"pipes_client": PipesDatabricksClient(client)} -) -``` - -`PipesDatabricksClient.run` requires that the targeted python script -(`dbfs:/myscript.py` above) already exist in DBFS. Here is what it might look -like: - -``` -### dbfs:/myscript.py - -# `dagster_pipes` must be available in the databricks python environment -from dagster_pipes import PipesDbfsContextLoader, PipesDbfsMessageWriter, init_dagster_pipes - -# Sets up communication channels and downloads the context data sent from Dagster. -# Note that while other `context_loader` and `message_writer` settings are -# possible, it is recommended to use the below settings for Databricks. -context = init_dagster_pipes( - context_loader=PipesDbfsContextLoader(), - message_writer=PipesDbfsMessageWriter() -) - -# Access the `extras` dict passed when launching the job from Dagster. -sample_rate = context.get_extra("sample_rate") - -# Stream log message back to Dagster -context.log(f"Using sample rate: {sample_rate}") - -# ... your code that computes and persists the asset - -# Stream asset materialization metadata and data version back to Dagster. -# This should be called after you've computed and stored the asset value. We -# omit the asset key here because there is only one asset in scope, but for -# multi-assets you can pass an `asset_key` parameter. -context.report_asset_materialization( - metadata={"some_metric", {"raw_value": get_metric(), "type": "text"}}, - data_version = get_data_version() -) -``` - -### (2) `open_pipes_session` context manager - -If you have existing code to launch/poll the job you do not want to change, you -want to stream back results, or you just want more control than is permitted by -`PipesDatabricksClient`, you can use `open_pipes_session`. All that is -necessary is that (1) your Databricks job be launched within the scope of the -`open_pipes_session` context manager; (2) your job is launched on a cluster -containing the environment variables available on the yielded `pipes_session`. - -While your Databricks code is running, any calls to -`report_asset_materialization` in the external script are streamed back to -Dagster, causing a `MaterializationResult` object to be buffered on the -`pipes_session`. You can either leave these objects buffered until execution is -complete (Option (1) in below example code) or stream them to Dagster machinery -during execution by calling `yield pipes_session.get_results()` (Option (2)). - -With either option, once the `open_pipes_session` block closes, you must call -`yield pipes_session.get_results()` to yield any remaining buffered results, -since we cannot guarantee that all communications from Databricks have been -processed until the `open_pipes_session` block closes. - -``` -import os - -from dagster import AssetExecutionContext, ext_protocol -from dagster_databricks import PipesDbfsContextInjector, PipesDbfsMessageReader -from databricks.sdk import WorkspaceClient - -@asset -def databricks_asset(context: AssetExecutionContext): - - client = WorkspaceClient( - host=os.environ["DATABRICKS_HOST"], - token=os.environ["DATABRICKS_TOKEN"], - ) - - # Arbitrary json-serializable data you want access to from the `PipesContext` - # in the Databricks runtime. Assume `sample_rate` is a parameter used by - # the target job's business logic. - extras = {"sample_rate": 1.0} - - # Sets up Pipes communications channels - with open_pipes_session( - context=context, - extras=extras, - context_injector=PipesDbfsContextInjector(client=client), - message_reader=PipesDbfsMessageReader(client=client), - ) as pipes_session: - - ##### Option (1) - # NON-STREAMING. Just pass the necessary environment variables down. - # During execution, all reported materializations are buffered on the - # `pipes_session`. Yield them all after Databricks execution is finished. - - # Dict[str, str] with environment variables containing Pipes comms info. - env_vars = pipes_session.get_pipes_bootstrap_env_vars() - - # Some function that handles launching/monitoring of the Databricks job. - # It must ensure that the `env_vars` are set on the executing cluster. - custom_databricks_launch_code(env_vars) - - ##### Option (2) - # STREAMING. Pass `pipes_session` down. During execution, you can yield any - # asset materializations that have been reported by calling ` - # pipes_session.get_results()` as often as you like. `get_results` returns - # an iterator that your custom code can `yield from` to forward the - # results back to the materialize function. Note you will need to extract - # the env vars by calling `pipes_session.get_pipes_bootstrap_env_vars()`, - # and launch the Databricks job in the same way as with (1). - - # The function should return an `Iterator[MaterializeResult]`. - yield from custom_databricks_launch_code(pipes_session) - - # With either option (1) or (2), this is required to yield any remaining - # buffered results. - yield from pipes_session.get_results() -``` +A guide for integrating Databricks using Dagster Pipes can be found +[here](https://docs.dagster.io/guides/dagster-pipes/databricks). diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks/_test_utils.py b/python_modules/libraries/dagster-databricks/dagster_databricks/_test_utils.py new file mode 100644 index 0000000000000..b579748323f88 --- /dev/null +++ b/python_modules/libraries/dagster-databricks/dagster_databricks/_test_utils.py @@ -0,0 +1,89 @@ +import base64 +import inspect +import os +import subprocess +import textwrap +from contextlib import contextmanager +from typing import Any, Callable, Iterator, Optional + +import dagster._check as check +import pytest +from databricks.sdk import WorkspaceClient +from databricks.sdk.service import files + +from dagster_databricks.pipes import dbfs_tempdir + +DAGSTER_PIPES_WHL_FILENAME = "dagster_pipes-1!0+dev-py3-none-any.whl" + +# This has been manually uploaded to a test DBFS workspace. +DAGSTER_PIPES_WHL_PATH = f"dbfs:/FileStore/jars/{DAGSTER_PIPES_WHL_FILENAME}" + + +def get_repo_root() -> str: + path = os.path.dirname(__file__) + while not os.path.exists(os.path.join(path, ".git")): + path = os.path.dirname(path) + return path + + +# Upload the Dagster Pipes wheel to DBFS. Use this fixture to avoid needing to manually reupload +# dagster-pipes if it has changed between test runs. +@contextmanager +def upload_dagster_pipes_whl(databricks_client: WorkspaceClient) -> Iterator[str]: + dbfs_client = files.DbfsAPI(databricks_client.api_client) + repo_root = get_repo_root() + orig_wd = os.getcwd() + dagster_pipes_root = os.path.join(repo_root, "python_modules", "dagster-pipes") + os.chdir(dagster_pipes_root) + subprocess.check_call(["python", "setup.py", "bdist_wheel"]) + with dbfs_tempdir(dbfs_client) as tempdir: + path = os.path.join(f"dbfs:{tempdir}", DAGSTER_PIPES_WHL_FILENAME) + subprocess.check_call( + # ["dbfs", "cp", "--overwrite", f"dist/{DAGSTER_PIPES_WHL_FILENAME}", DAGSTER_PIPES_WHL_PATH] + ["dbfs", "cp", "--overwrite", f"dist/{DAGSTER_PIPES_WHL_FILENAME}", path] + ) + os.chdir(orig_wd) + yield path + + +@pytest.fixture +def databricks_client() -> WorkspaceClient: + return WorkspaceClient( + host=os.environ["DATABRICKS_HOST"], + token=os.environ["DATABRICKS_TOKEN"], + ) + + +@contextmanager +def temp_dbfs_script( + client: WorkspaceClient, + *, + script_fn: Optional[Callable[[], Any]] = None, + script_file: Optional[str] = None, + dbfs_path: Optional[str] = None, +) -> Iterator[str]: + # drop the signature line + if script_fn is None and script_file is None: + raise ValueError("Must provide either script_fn or script_file") + elif script_fn is not None and script_file is not None: + raise ValueError("Must provide only one of script_fn or script_file") + elif script_fn is not None: + source = textwrap.dedent(inspect.getsource(script_fn).split("\n", 1)[1]) + elif script_file is not None: + with open(script_file, "rb") as f: + source = f.read().decode("utf-8") + else: + check.failed("Unreachable") + dbfs_client = files.DbfsAPI(client.api_client) + contents = base64.b64encode(source.encode("utf-8")).decode("utf-8") + if dbfs_path is None: + with dbfs_tempdir(dbfs_client) as tempdir: + script_path = os.path.join(tempdir, "script.py") + dbfs_client.put(script_path, contents=contents, overwrite=True) + yield script_path + else: + try: + dbfs_client.put(dbfs_path, contents=contents, overwrite=True) + yield dbfs_path + finally: + dbfs_client.delete(dbfs_path, recursive=False) diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pipes.py b/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pipes.py index 459569eaa2acb..cf8c00409f3e4 100644 --- a/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pipes.py +++ b/python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pipes.py @@ -1,21 +1,19 @@ -import base64 -import inspect import os import re -import subprocess -import textwrap -from contextlib import contextmanager -from typing import Any, Callable, Iterator import pytest from dagster import AssetExecutionContext, asset, materialize from dagster._core.errors import DagsterPipesExecutionError +from dagster_databricks._test_utils import ( + databricks_client, # noqa: F401 + temp_dbfs_script, + upload_dagster_pipes_whl, +) from dagster_databricks.pipes import ( PipesDatabricksClient, - dbfs_tempdir, ) from databricks.sdk import WorkspaceClient -from databricks.sdk.service import files, jobs +from databricks.sdk.service import jobs IS_BUILDKITE = os.getenv("BUILDKITE") is not None @@ -43,26 +41,6 @@ def script_fn(): ) -@contextmanager -def temp_script(script_fn: Callable[[], Any], client: WorkspaceClient) -> Iterator[str]: - # drop the signature line - source = textwrap.dedent(inspect.getsource(script_fn).split("\n", 1)[1]) - dbfs_client = files.DbfsAPI(client.api_client) - with dbfs_tempdir(dbfs_client) as tempdir: - script_path = os.path.join(tempdir, "script.py") - contents = base64.b64encode(source.encode("utf-8")).decode("utf-8") - dbfs_client.put(script_path, contents=contents, overwrite=True) - yield script_path - - -@pytest.fixture -def client() -> WorkspaceClient: - return WorkspaceClient( - host=os.environ["DATABRICKS_HOST"], - token=os.environ["DATABRICKS_TOKEN"], - ) - - CLUSTER_DEFAULTS = { "spark_version": "12.2.x-scala2.12", "node_type_id": "i3.xlarge", @@ -71,36 +49,10 @@ def client() -> WorkspaceClient: TASK_KEY = "DAGSTER_PIPES_TASK" -DAGSTER_PIPES_WHL_FILENAME = "dagster_pipes-1!0+dev-py3-none-any.whl" - -# This has been manually uploaded to a test DBFS workspace. -DAGSTER_PIPES_WHL_PATH = f"dbfs:/FileStore/jars/{DAGSTER_PIPES_WHL_FILENAME}" - -def get_repo_root() -> str: - path = os.path.dirname(__file__) - while not os.path.exists(os.path.join(path, ".git")): - path = os.path.dirname(path) - return path - - -# Upload the Dagster Pipes wheel to DBFS. Use this fixture to avoid needing to manually reupload -# dagster-pipes if it has changed between test runs. -@contextmanager -def upload_dagster_pipes_whl(client: WorkspaceClient) -> Iterator[None]: - repo_root = get_repo_root() - orig_wd = os.getcwd() - dagster_pipes_root = os.path.join(repo_root, "python_modules", "dagster-pipes") - os.chdir(dagster_pipes_root) - subprocess.check_call(["python", "setup.py", "bdist_wheel"]) - subprocess.check_call( - ["dbfs", "cp", "--overwrite", f"dist/{DAGSTER_PIPES_WHL_FILENAME}", DAGSTER_PIPES_WHL_PATH] - ) - os.chdir(orig_wd) - yield - - -def _make_submit_task(path: str, forward_logs: bool) -> jobs.SubmitTask: +def _make_submit_task( + script_path: str, dagster_pipes_whl_path: str, forward_logs: bool +) -> jobs.SubmitTask: cluster_settings = CLUSTER_DEFAULTS.copy() if forward_logs: cluster_settings["cluster_log_conf"] = { @@ -110,11 +62,12 @@ def _make_submit_task(path: str, forward_logs: bool) -> jobs.SubmitTask: { "new_cluster": cluster_settings, "libraries": [ - {"whl": DAGSTER_PIPES_WHL_PATH}, + # {"whl": DAGSTER_PIPES_WHL_PATH}, + {"whl": dagster_pipes_whl_path}, ], "task_key": TASK_KEY, "spark_python_task": { - "python_file": f"dbfs:{path}", + "python_file": f"dbfs:{script_path}", "source": jobs.Source.WORKSPACE, }, } @@ -123,16 +76,14 @@ def _make_submit_task(path: str, forward_logs: bool) -> jobs.SubmitTask: # Test both with and without log forwarding. This is important because the PipesClient spins up log # readers before it knows the task specification - - @pytest.mark.skipif(IS_BUILDKITE, reason="Not configured to run on BK yet.") @pytest.mark.parametrize("forward_logs", [True, False]) -def test_pipes_client(capsys, client: WorkspaceClient, forward_logs: bool): +def test_pipes_client(capsys, databricks_client: WorkspaceClient, forward_logs: bool): # noqa: F811 @asset def number_x(context: AssetExecutionContext, pipes_client: PipesDatabricksClient): - with upload_dagster_pipes_whl(client): - with temp_script(script_fn, client) as script_path: - task = _make_submit_task(script_path, forward_logs) + with upload_dagster_pipes_whl(databricks_client) as dagster_pipes_whl_path: + with temp_dbfs_script(databricks_client, script_fn=script_fn) as script_path: + task = _make_submit_task(script_path, dagster_pipes_whl_path, forward_logs) return pipes_client.run( task=task, context=context, @@ -143,7 +94,7 @@ def number_x(context: AssetExecutionContext, pipes_client: PipesDatabricksClient [number_x], resources={ "pipes_client": PipesDatabricksClient( - client, + databricks_client, ) }, raise_on_error=False, @@ -158,14 +109,14 @@ def number_x(context: AssetExecutionContext, pipes_client: PipesDatabricksClient @pytest.mark.skipif(IS_BUILDKITE, reason="Not configured to run on BK yet.") -def test_nonexistent_entry_point(client: WorkspaceClient): +def test_nonexistent_entry_point(databricks_client: WorkspaceClient): # noqa: F811 @asset def fake(context: AssetExecutionContext, pipes_client: PipesDatabricksClient): - task = _make_submit_task("/fake/fake", forward_logs=False) + task = _make_submit_task("/fake/fake", "/fake/fake", forward_logs=False) return pipes_client.run(task=task, context=context).get_results() with pytest.raises(DagsterPipesExecutionError, match=r"Cannot read the python file"): materialize( [fake], - resources={"pipes_client": PipesDatabricksClient(client)}, + resources={"pipes_client": PipesDatabricksClient(databricks_client)}, )