Skip to content

Commit

Permalink
docs(databricks): update guide based on feedback (#12669)
Browse files Browse the repository at this point in the history
### Summary & Motivation
Update Databricks docs based on feedback from dogfooding.

- Separate out custom op/asset examples
- Remove submit run API call from example so things work out of the box
- Clarify that `DATABRICKS_HOST` should start with `https://`

### How I Tested These Changes
eyes
  • Loading branch information
rexledesma committed Mar 2, 2023
1 parent 6d17799 commit ed9e77b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 68 deletions.
81 changes: 39 additions & 42 deletions docs/content/integrations/databricks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description: Dagster can orchestrate Databricks jobs alongside other technologie

# Using Databricks with Dagster

Dagster can orchestrate your Databricks jobs, making it easy to chain together multiple Databricks jobs, as well as orchestrate Databricks alongside your other technologies.
Dagster can orchestrate your Databricks jobs and other Databricks API calls, making it easy to chain together multiple Databricks jobs, as well as orchestrate Databricks alongside your other technologies.

---

Expand All @@ -21,7 +21,7 @@ You'll also want to have a Databricks workspace with an existing project that is

To manage your Databricks job from Dagster, you'll need three values, which can be set as [environment variables in Dagster](/guides/dagster/using-environment-variables-and-secrets):

1. A `host` for connecting with your Databricks workspace, stored in an environment variable `DATABRICKS_HOST`,
1. A `host` for connecting with your Databricks workspace, starting with `https://`, stored in an environment variable `DATABRICKS_HOST`,
2. A `token` corresponding to a personal access token for your Databricks workspace, stored in an environment variable `DATABRICKS_TOKEN`, and
3. A `DATABRICKS_JOB_ID` for the Databricks job you want to run.

Expand All @@ -31,7 +31,7 @@ You can follow the [Databricks API authentication instructions](https://docs.dat

## Step 1: Connecting to Databricks

The first step in using Databricks with Dagster is to tell Dagster how to connect to your Databricks workspace using a Databricks [resource](/concepts/resources). This resource contains information on where your Databricks workspace is located and any credentials sourced from environment variables that are needed to access it.
The first step in using Databricks with Dagster is to tell Dagster how to connect to your Databricks workspace using a Databricks [resource](/concepts/resources). This resource contains information on where your Databricks workspace is located and any credentials sourced from environment variables that are needed to access it. By configuring the resource, you can access the underlying [Databricks API client](https://docs.databricks.com/dev-tools/python-api.html) to communicate to your Databricks workspace.

For more information about the Databricks resource, see the [API reference](/\_apidocs/libraries/dagster-databricks).

Expand All @@ -48,14 +48,14 @@ databricks_client_instance = databricks_client.configured(

---

## Step 2: Create an op/asset from a Databricks job
## Step 2: Create an op/asset that connects to Databricks

In this step, we show several ways to model a Databricks job as either a Dagster [op](/concepts/ops-jobs-graphs/ops), or as the computation backing a [software-defined asset](/concepts/assets/software-defined-assets). You can either:
In this step, we show several ways to model a Databricks API call as either a Dagster [op](/concepts/ops-jobs-graphs/ops), or as the computation backing a [software-defined asset](/concepts/assets/software-defined-assets). You can either:

- Use the `dagster-databricks` op factories, which create ops that invoke the Databricks Jobs' [Run Now](https://docs.databricks.com/api-explorer/workspace/jobs/runnow) ([`create_databricks_run_now_op`](/\_apidocs/libraries/dagster-databricks)) or [Submit Run](https://docs.databricks.com/api-explorer/workspace/jobs/submit) ([`create_databricks_submit_run_op`](/\_apidocs/libraries/dagster-databricks)) APIs, or
- Manually create a Dagster op or asset that runs a Databricks job using the configured Databricks resource.
- Manually create a Dagster op or asset that connects to Databricks using the configured Databricks resource.

Afterwards, we create a Dagster [job](/concepts/ops-jobs-graphs/jobs) that either invokes the op or selects the asset in order to run the Databricks job.
Afterwards, we create a Dagster [job](/concepts/ops-jobs-graphs/jobs) that either invokes the op or selects the asset in order to run the Databricks API call.

For more information on how to decide whether to use an op or asset, see our [guide](/guides/dagster/how-assets-relate-to-ops-and-graphs) to understand how they relate.

Expand All @@ -64,47 +64,54 @@ For more information on how to decide whether to use an op or asset, see our [gu
<TabItem name="Using the op factories">

```python startafter=start_define_databricks_op_factories endbefore=end_define_databricks_op_factories file=/integrations/databricks/databricks.py dedent=4
from dagster_databricks import (
create_databricks_run_now_op,
create_databricks_submit_run_op,
)
from dagster_databricks import create_databricks_run_now_op

my_databricks_run_now_op = create_databricks_run_now_op(
databricks_job_id=DATABRICKS_JOB_ID,
)

my_databricks_submit_run_op = create_databricks_submit_run_op(
databricks_job_configuration={
"new_cluster": {
"spark_version": "2.1.0-db3-scala2.11",
"num_workers": 2,
},
"notebook_task": {
"notebook_path": "/Users/[email protected]/PrepareData",
},
},
@job(resource_defs={"databricks": databricks_client_instance})
def my_databricks_job():
my_databricks_run_now_op()
```

</TabItem>

<TabItem name="Creating a custom op using resources">

```python startafter=start_define_databricks_custom_op endbefore=end_define_databricks_custom_op file=/integrations/databricks/databricks.py dedent=4
from databricks_cli.sdk import DbfsService

from dagster import (
OpExecutionContext,
job,
op,
)

@job
@op(required_resource_keys={"databricks"})
def my_databricks_op(context: OpExecutionContext) -> None:
databricks_api_client = context.resources.databricks.api_client
dbfs_service = DbfsService(databricks_api_client)

dbfs_service.read(path="/tmp/HelloWorld.txt")

@job(resource_defs={"databricks": databricks_client_instance})
def my_databricks_job():
my_databricks_run_now_op()
my_databricks_submit_run_op()
my_databricks_op()
```

</TabItem>

<TabItem name="Manually using resources">
<TabItem name="Creating a custom asset using resources">

```python startafter=start_define_databricks_custom_ops_and_assets endbefore=end_define_databricks_custom_ops_and_assets file=/integrations/databricks/databricks.py dedent=4
```python startafter=start_define_databricks_custom_asset endbefore=end_define_databricks_custom_asset file=/integrations/databricks/databricks.py dedent=4
from databricks_cli.sdk import JobsService

from dagster import (
AssetSelection,
OpExecutionContext,
asset,
define_asset_job,
job,
op,
)

@asset(required_resource_keys={"databricks"})
Expand All @@ -118,27 +125,17 @@ materialize_databricks_table = define_asset_job(
name="materialize_databricks_table",
selection=AssetSelection.keys("my_databricks_table"),
)

@op(required_resource_keys={"databricks"})
def my_databricks_op(context: OpExecutionContext) -> None:
databricks_api_client = context.resources.databricks.api_client
jobs_service = JobsService(databricks_api_client)

jobs_service.run_now(job_id=DATABRICKS_JOB_ID)

@job
def my_databricks_job():
my_databricks_op()
```

</TabItem>

</TabGroup>

---

## Step 3: Schedule a Databricks job
## Step 3: Schedule your Databricks computation

Now that your Databricks job is modeled within Dagster, you can [schedule](/concepts/partitions-schedules-sensors/schedules) it to run on a regular cadence.
Now that your Databricks API calls are modeled within Dagster, you can [schedule](/concepts/partitions-schedules-sensors/schedules) it to run on a regular cadence.

In the example below, we schedule the `materialize_databricks_table` job to run daily, which materiali, and the `my_databricks_job` job to run daily.

Expand Down Expand Up @@ -170,7 +167,7 @@ defs = Definitions(

## What's next?

By now, you should have a working Databricks and Dagster integration, as well as a materialized Dagster asset!
By now, you should have a working Databricks and Dagster integration!

What's next? From here, you can:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ def scope_define_instance():
DATABRICKS_JOB_ID = 1


def scope_define_databricks_custom_ops_and_assets():
# start_define_databricks_custom_ops_and_assets
def scope_define_databricks_custom_asset():
# start_define_databricks_custom_asset
from databricks_cli.sdk import JobsService

from dagster import (
AssetSelection,
OpExecutionContext,
asset,
define_asset_job,
job,
op,
)

@asset(required_resource_keys={"databricks"})
Expand All @@ -42,48 +40,49 @@ def my_databricks_table(context: OpExecutionContext) -> None:
selection=AssetSelection.keys("my_databricks_table"),
)

# end_define_databricks_custom_asset


def scope_define_databricks_custom_op():
from dagster_databricks import databricks_client as databricks_client_instance

# start_define_databricks_custom_op
from databricks_cli.sdk import DbfsService

from dagster import (
OpExecutionContext,
job,
op,
)

@op(required_resource_keys={"databricks"})
def my_databricks_op(context: OpExecutionContext) -> None:
databricks_api_client = context.resources.databricks.api_client
jobs_service = JobsService(databricks_api_client)
dbfs_service = DbfsService(databricks_api_client)

jobs_service.run_now(job_id=DATABRICKS_JOB_ID)
dbfs_service.read(path="/tmp/HelloWorld.txt")

@job
@job(resource_defs={"databricks": databricks_client_instance})
def my_databricks_job():
my_databricks_op()

# end_define_databricks_custom_ops_and_assets
# end_define_databricks_custom_op


def scope_define_databricks_op_factories():
from dagster_databricks import databricks_client as databricks_client_instance

DATABRICKS_JOB_ID = 1
# start_define_databricks_op_factories
from dagster_databricks import (
create_databricks_run_now_op,
create_databricks_submit_run_op,
)
from dagster_databricks import create_databricks_run_now_op

my_databricks_run_now_op = create_databricks_run_now_op(
databricks_job_id=DATABRICKS_JOB_ID,
)

my_databricks_submit_run_op = create_databricks_submit_run_op(
databricks_job_configuration={
"new_cluster": {
"spark_version": "2.1.0-db3-scala2.11",
"num_workers": 2,
},
"notebook_task": {
"notebook_path": "/Users/[email protected]/PrepareData",
},
},
)

@job
@job(resource_defs={"databricks": databricks_client_instance})
def my_databricks_job():
my_databricks_run_now_op()
my_databricks_submit_run_op()

# end_define_databricks_op_factories

Expand Down

0 comments on commit ed9e77b

Please sign in to comment.