-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[DbtProject] Update dbt reference to include DbtProject (#23012)
## Summary & Motivation This PR updates dbt reference to include `DbtProject`. ## How I Tested These Changes make apidoc-build make next-watch-build
- Loading branch information
1 parent
e34e34a
commit 3491e5e
Showing
5 changed files
with
181 additions
and
70 deletions.
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ For a step-by-step implementation walkthrough, refer to the [Using dbt with Dags | |
| <PyObject module="dagster_dbt" object="dbt_assets" decorator /> | A decorator used to define Dagster assets for dbt models defined in a dbt manifest. | | ||
| <PyObject module="dagster_dbt" object="DbtCliResource" /> | A class that defines a Dagster resource used to execute dbt CLI commands. | | ||
| <PyObject module="dagster_dbt" object="DbtCliInvocation" /> | A class that defines the representation of an invoked dbt command. | | ||
| <PyObject module="dagster_dbt" object="DbtProject" /> | A class that defines the representation of a dbt project and related settings that assist with managing dependencies and `manifest.json` preparation. | | ||
| <PyObject module="dagster_dbt" object="DagsterDbtTranslator" /> | A class that can be overridden to customize how Dagster asset metadata is derived from a dbt manifest. | | ||
| <PyObject module="dagster_dbt" object="DagsterDbtTranslatorSettings" /> | A class with settings to enable Dagster features for a dbt project. | | ||
| <PyObject module="dagster_dbt" object="DbtManifestAssetSelection" /> | A class that defines a selection of assets from a dbt manifest and a dbt selection string. | | ||
|
@@ -56,7 +57,7 @@ For a step-by-step implementation walkthrough, refer to the [Using dbt with Dags | |
You can create a Dagster project that wraps your dbt project by using the [`dagster-dbt project scaffold`](/\_apidocs/libraries/dagster-dbt#dagster-dbt-project-scaffold) command line interface. | ||
|
||
```shell | ||
dagster-dbt project scaffold --project-name project_dagster --dbt-project-dir path/to/dbt/project | ||
dagster-dbt project scaffold --use-dbt-project --project-name project_dagster --dbt-project-dir path/to/dbt/project | ||
``` | ||
|
||
This creates a directory called `project_dagster/` inside the current directory. The `project_dagster/` directory contains a set of files that define a Dagster project that loads the dbt project at the path defined by `--dbt-project-dir`. The path to the dbt project must contain a `dbt_project.yml`. | ||
|
@@ -82,6 +83,55 @@ The manifest can be created in two ways: | |
|
||
When deploying your Dagster project to production, **we recommend generating the manifest at build time** to avoid the overhead of recompiling your dbt project every time your Dagster code is executed. A `manifest.json` should be precompiled and included in the Python package for your Dagster code. | ||
|
||
<TabGroup> | ||
<TabItem name="Select an option"> | ||
|
||
Select one of the following to handle the creation of your manifest: | ||
|
||
- [**Option 1:**](#option-1-using-dbtproject) Using `DbtProject` | ||
- [**Option 2:**](#option-2-using-dbtcliresource) Using `DbtCliResource` | ||
|
||
</TabItem> | ||
<TabItem name="Option 1 (recommended): Using DbtProject"> | ||
|
||
### Option 1: Using DbtProject | ||
|
||
The easiest way to handle the creation of your manifest file is to use <PyObject object="DbtProject" module="dagster_dbt" />. | ||
|
||
In the Dagster project created by the [`dagster-dbt project scaffold`](/\_apidocs/libraries/dagster-dbt#dagster-dbt-project-scaffold) command, the creation of your manifest is handled during development: | ||
|
||
```python startafter=start_compile_dbt_manifest_with_dbt_project endbefore=end_compile_dbt_manifest_with_dbt_project file=/integrations/dbt/dbt.py dedent=4 | ||
"""✅ This is recommended!""" | ||
from pathlib import Path | ||
|
||
from dagster_dbt import DbtProject | ||
|
||
my_dbt_project = DbtProject( | ||
project_dir=Path(__file__).joinpath("..", "..", "..").resolve(), | ||
packaged_project_dir=Path(__file__) | ||
.joinpath("..", "..", "dbt-project") | ||
.resolve(), | ||
) | ||
my_dbt_project.prepare_if_dev() | ||
``` | ||
|
||
The manifest path can then be accessed with `my_dbt_project.manifest_path`. | ||
|
||
When developing locally, you can run the following command to generate the manifest at run time for your dbt and Dagster project: | ||
|
||
```shell | ||
dagster dev | ||
``` | ||
|
||
In production, a precompiled manifest should be used. Using <PyObject object="DbtProject" module="dagster_dbt" />, the manifest can be created at build time by running the [`dagster-dbt project prepare-and-package`](/\_apidocs/libraries/dagster-dbt#dagster-dbt-project-prepare-and-package) command in your CI/CD workflow. For more information, see the [Deploying a Dagster project with a dbt project](#deploying-a-dagster-project-with-a-dbt-project) section. | ||
|
||
</TabItem> | ||
<TabItem name="Option 2: Using DbtCliResource"> | ||
|
||
### Option 2: Using DbtCliResource | ||
|
||
Creating the manifest can be done by running `dbt parse` using the CLI with <PyObject object="DbtCliResource" module="dagster_dbt" />. | ||
|
||
Creating the manifest at runtime in production is known to cause issues and is not recommended. This is often caused by the following code and should be avoided. | ||
|
||
```python startafter=start_troubleshooting_dbt_manifest endbefore=end_troubleshooting_dbt_manifest file=/integrations/dbt/dbt.py dedent=4 | ||
|
@@ -147,6 +197,9 @@ DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1 dagster dev | |
|
||
In production, `DAGSTER_DBT_PARSE_PROJECT_ON_LOAD` should be unset so that your project uses the precompiled manifest. | ||
|
||
</TabItem> | ||
</TabGroup> | ||
|
||
--- | ||
|
||
## Deploying a Dagster project with a dbt project | ||
|
@@ -170,8 +223,14 @@ In your CI/CD workflows for your Dagster project: | |
|
||
1. Include any secrets that are required by your dbt project in your CI/CD environment. | ||
2. Clone the dbt project repository as a subdirectory of your Dagster project. | ||
3. Run `dbt deps` to build your dbt project's dependencies. | ||
4. Run `dbt parse` to create a dbt manifest for your Dagster project. | ||
3. Depending on whether <PyObject object="DbtCliProject" module="dagster_dbt" /> is used, this step will vary: | ||
- **For projects using `DbtProject`**, run `dagster-dbt project prepare-and-package --file path/to/project.py` to | ||
- Build your dbt project's dependencies, | ||
- Create a dbt manifest for your Dagster project, and | ||
- Package your dbt project | ||
- **For projects using `DbtCliResource`**: | ||
- Run `dbt deps` to build your dbt project's dependencies | ||
- Run `dbt parse` to create a dbt manifest for your Dagster project | ||
|
||
In the CI/CD workflows for your dbt project, set up a dispatch action to trigger a deployment of your Dagster project when your dbt project changes. | ||
|
||
|
@@ -188,8 +247,14 @@ If you are managing your Dagster project in the same git repository as your dbt | |
In your CI/CD workflows for your Dagster and dbt project: | ||
|
||
1. Include any secrets that are required by your dbt project in your CI/CD environment. | ||
2. Run `dbt deps` to build your dbt project's dependencies. | ||
3. Run `dbt parse` to create a dbt manifest for your Dagster project. | ||
2. Depending on whether <PyObject object="DbtCliProject" module="dagster_dbt" /> is used, this step will vary: | ||
- **For projects using `DbtProject`**, run `dagster-dbt project prepare-and-package --file path/to/project.py` to | ||
- Build your dbt project's dependencies, | ||
- Create a dbt manifest for your Dagster project, and | ||
- Package your dbt project | ||
- **For projects using `DbtCliResource`**: | ||
- Run `dbt deps` to build your dbt project's dependencies | ||
- Run `dbt parse` to create a dbt manifest for your Dagster project | ||
|
||
--- | ||
|
||
|
@@ -203,12 +268,14 @@ In the context of dbt, this can be useful if you want to run commands or flags f | |
from pathlib import Path | ||
|
||
from dagster import AssetExecutionContext, Config | ||
from dagster_dbt import DbtCliResource, dbt_assets | ||
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets | ||
|
||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
|
||
class MyDbtConfig(Config): | ||
full_refresh: bool | ||
|
||
@dbt_assets(manifest=Path("target", "manifest.json")) | ||
@dbt_assets(manifest=my_dbt_project.manifest_path) | ||
def my_dbt_assets( | ||
context: AssetExecutionContext, dbt: DbtCliResource, config: MyDbtConfig | ||
): | ||
|
@@ -342,17 +409,17 @@ Alternatively, to override the asset key generation for all dbt nodes in your db | |
```python startafter=start_custom_asset_key_dagster_dbt_translator endbefore=end_custom_asset_key_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4 | ||
from pathlib import Path | ||
from dagster import AssetKey, AssetExecutionContext | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets | ||
from typing import Any, Mapping | ||
manifest_path = Path("path/to/dbt_project/target/manifest.json") | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
class CustomDagsterDbtTranslator(DagsterDbtTranslator): | ||
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: | ||
return super().get_asset_key(dbt_resource_props).with_prefix("snowflake") | ||
@dbt_assets( | ||
manifest=manifest_path, | ||
manifest=my_dbt_project.manifest_path, | ||
dagster_dbt_translator=CustomDagsterDbtTranslator(), | ||
) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
|
@@ -390,10 +457,10 @@ Alternatively, to override the Dagster group name generation for all dbt nodes i | |
```python startafter=start_custom_group_name_dagster_dbt_translator endbefore=end_custom_group_name_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4 | ||
from pathlib import Path | ||
from dagster import AssetExecutionContext | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets | ||
from typing import Any, Mapping, Optional | ||
manifest_path = Path("path/to/dbt_project/target/manifest.json") | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
class CustomDagsterDbtTranslator(DagsterDbtTranslator): | ||
def get_group_name( | ||
|
@@ -402,7 +469,7 @@ class CustomDagsterDbtTranslator(DagsterDbtTranslator): | |
return "snowflake" | ||
@dbt_assets( | ||
manifest=manifest_path, | ||
manifest=my_dbt_project.manifest_path, | ||
dagster_dbt_translator=CustomDagsterDbtTranslator(), | ||
) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
|
@@ -442,10 +509,10 @@ Alternatively, to override the Dagster generation of owners for all dbt nodes in | |
```python startafter=start_custom_owners_dagster_dbt_translator endbefore=end_custom_owners_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4 | ||
from pathlib import Path | ||
from dagster import AssetExecutionContext | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets | ||
from typing import Any, Mapping, Optional, Sequence | ||
manifest_path = Path("path/to/dbt_project/target/manifest.json") | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
class CustomDagsterDbtTranslator(DagsterDbtTranslator): | ||
def get_owners( | ||
|
@@ -454,7 +521,7 @@ class CustomDagsterDbtTranslator(DagsterDbtTranslator): | |
return ["[email protected]", "team:[email protected]"] | ||
@dbt_assets( | ||
manifest=manifest_path, | ||
manifest=my_dbt_project.manifest_path, | ||
dagster_dbt_translator=CustomDagsterDbtTranslator(), | ||
) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
|
@@ -471,17 +538,17 @@ To override the Dagster description for all dbt nodes in your dbt project, you c | |
import textwrap | ||
from pathlib import Path | ||
from dagster import AssetExecutionContext | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets | ||
from typing import Any, Mapping | ||
manifest_path = Path("path/to/dbt_project/target/manifest.json") | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
class CustomDagsterDbtTranslator(DagsterDbtTranslator): | ||
def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str: | ||
return textwrap.indent(dbt_resource_props.get("raw_sql", ""), "\t") | ||
@dbt_assets( | ||
manifest=manifest_path, | ||
manifest=my_dbt_project.manifest_path, | ||
dagster_dbt_translator=CustomDagsterDbtTranslator(), | ||
) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
|
@@ -497,10 +564,10 @@ To override the Dagster definition metadata for all dbt nodes in your dbt projec | |
```python startafter=start_custom_metadata_dagster_dbt_translator endbefore=end_custom_metadata_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4 | ||
from pathlib import Path | ||
from dagster import MetadataValue, AssetExecutionContext | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets | ||
from typing import Any, Mapping | ||
manifest_path = Path("path/to/dbt_project/target/manifest.json") | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
class CustomDagsterDbtTranslator(DagsterDbtTranslator): | ||
def get_metadata( | ||
|
@@ -511,7 +578,7 @@ class CustomDagsterDbtTranslator(DagsterDbtTranslator): | |
} | ||
@dbt_assets( | ||
manifest=manifest_path, | ||
manifest=my_dbt_project.manifest_path, | ||
dagster_dbt_translator=CustomDagsterDbtTranslator(), | ||
) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
|
@@ -576,10 +643,10 @@ To override the Dagster tags for all dbt nodes in your dbt project, you can crea | |
```python startafter=start_custom_tags_dagster_dbt_translator endbefore=end_custom_tags_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4 | ||
from pathlib import Path | ||
from dagster import AssetExecutionContext | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets | ||
from typing import Any, Mapping | ||
manifest_path = Path("path/to/dbt_project/target/manifest.json") | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
class CustomDagsterDbtTranslator(DagsterDbtTranslator): | ||
def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]: | ||
|
@@ -593,7 +660,7 @@ class CustomDagsterDbtTranslator(DagsterDbtTranslator): | |
return dagster_tags | ||
@dbt_assets( | ||
manifest=manifest_path, | ||
manifest=my_dbt_project.manifest_path, | ||
dagster_dbt_translator=CustomDagsterDbtTranslator(), | ||
) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
|
@@ -626,10 +693,10 @@ Alternatively, to override the Dagster auto-materialize policy generation for al | |
```python startafter=start_custom_auto_materialize_policy_dagster_dbt_translator endbefore=end_custom_auto_materialize_policy_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4 | ||
from pathlib import Path | ||
from dagster import AssetExecutionContext, AutoMaterializePolicy | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets | ||
from typing import Any, Mapping, Optional | ||
manifest_path = Path("path/to/dbt_project/target/manifest.json") | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
class CustomDagsterDbtTranslator(DagsterDbtTranslator): | ||
def get_auto_materialize_policy( | ||
|
@@ -638,7 +705,7 @@ class CustomDagsterDbtTranslator(DagsterDbtTranslator): | |
return AutoMaterializePolicy.eager() | ||
@dbt_assets( | ||
manifest=manifest_path, | ||
manifest=my_dbt_project.manifest_path, | ||
dagster_dbt_translator=CustomDagsterDbtTranslator(), | ||
) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
|
@@ -705,16 +772,17 @@ from dagster_dbt import ( | |
DagsterDbtTranslator, | ||
DagsterDbtTranslatorSettings, | ||
DbtCliResource, | ||
DbtProject, | ||
dbt_assets, | ||
) | ||
manifest_path = Path("path/to/dbt_project/target/manifest.json") | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
dagster_dbt_translator = DagsterDbtTranslator( | ||
settings=DagsterDbtTranslatorSettings(enable_asset_checks=False) | ||
) | ||
@dbt_assets( | ||
manifest=manifest_path, | ||
manifest=my_dbt_project.manifest_path, | ||
dagster_dbt_translator=dagster_dbt_translator, | ||
) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
|
@@ -745,13 +813,13 @@ Row counts are fetched in parallel to the execution of your dbt models. To enabl | |
```python startafter=start_fetch_row_count endbefore=end_fetch_row_count file=/integrations/dbt/dbt.py dedent=4 | ||
from pathlib import Path | ||
from dagster import AssetKey, AssetExecutionContext | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets | ||
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets | ||
from typing import Any, Mapping | ||
manifest_path = Path("path/to/dbt_project/target/manifest.json") | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
@dbt_assets( | ||
manifest=manifest_path, | ||
manifest=my_dbt_project.manifest_path, | ||
) | ||
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
yield from dbt.cli(["build"], context=context).stream().fetch_row_counts() | ||
|
@@ -966,25 +1034,23 @@ Partitioned assets will be able to access the <PyObject module="dagster" object= | |
|
||
When a partition definition to passed to the <PyObject module="dagster_dbt" object="dbt_assets" decorator/> decorator, all assets are defined to operate on the same partitions. With this in mind, we can retrieve any time window from <PyObject module="dagster" object="AssetExecutionContext" method="partition_time_window"/> property in order to get the current start and end partitions. | ||
|
||
```python | ||
```python startafter=start_build_incremental_model endbefore=end_build_incremental_model file=/integrations/dbt/dbt.py dedent=4 | ||
import json | ||
from pathlib import Path | ||
from dagster import DailyPartitionDefinition, OpExecutionContext | ||
from dagster_dbt import DbtCliResource, dbt_assets | ||
from dagster import DailyPartitionsDefinition, OpExecutionContext | ||
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets | ||
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) | ||
@dbt_assets( | ||
manifest=Path("target", "manifest.json"), | ||
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01") | ||
manifest=my_dbt_project.manifest_path, | ||
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"), | ||
) | ||
def partitionshop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): | ||
start, end = context.partition_time_window | ||
dbt_vars = { | ||
"min_date": start.isoformat(), | ||
"max_date": end.isoformat() | ||
} | ||
dbt_vars = {"min_date": start.isoformat(), "max_date": end.isoformat()} | ||
dbt_build_args = ["build", "--vars", json.dumps(dbt_vars)] | ||
yield from dbt.cli(dbt_build_args, context=context).stream() | ||
|
Oops, something went wrong.
3491e5e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deploy preview for dagster-docs ready!
✅ Preview
https://dagster-docs-h7ei9ci35-elementl.vercel.app
https://master.dagster.dagster-docs.io
Built with commit 3491e5e.
This pull request is being automatically deployed with vercel-action