Skip to content

Commit

Permalink
Pull in dbt tags onto asset definitions (#20549)
Browse files Browse the repository at this point in the history
## Summary & Motivation

## How I Tested These Changes

---------

Co-authored-by: Rex Ledesma <[email protected]>
  • Loading branch information
2 people authored and PedramNavid committed Mar 28, 2024
1 parent 66cb31e commit 3533cfb
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 0 deletions.
41 changes: 41 additions & 0 deletions docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ In Dagster, each asset definition has attributes. Dagster automatically generate
- [Customizing group names](#customizing-group-names)
- [Customizing descriptions](#customizing-descriptions)
- [Customizing metadata](#customizing-metadata)
- [Customizing tags](#customizing-tags)
- [Customizing auto-materialize policies](#customizing-auto-materialize-policies)
- [Customizing freshness policies](#customizing-freshness-policies)

Expand Down Expand Up @@ -390,6 +391,46 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```

### Customizing tags

<Note>
In Dagster, tags are key-value pairs. However, in dbt, tags are strings. To bridge this divide, the dbt tag string is used as the Dagster tag key, and the Dagster tag value is set to special sentinel value `"__dagster_no_value"`.

Any dbt tags that don't match Dagster's supported tag key format (e.g. they contain unsupported characters) will be ignored by default. </Note>

For dbt models, seeds, and snapshots, the default Dagster tags will be the dbt node's configured tags.

Any dbt tags that don't match Dagster's supported tag key format (e.g. they contain unsupported characters) will be ignored.

To override the Dagster tags for all dbt nodes in your dbt project, you can create a custom <PyObject module="dagster_dbt" object="DagsterDbtTranslator" /> and implement <PyObject module="dagster_dbt" object="DagsterDbtTranslator" method="get_tags"/>. The following converts dbt tags of the form "foo=bar" to key/value pairs:

```python startafter=start_custom_tags_dagster_dbt_translator endbefore=end_custom_tags_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping
manifest_path = Path("path/to/dbt_project/target/manifest.json")
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
dbt_tags = dbt_resource_props.get("tags", [])
dagster_tags = {}
for tag in dbt_tags:
key, _, value = tag.partition("=")
dagster_tags[key] = value if value else "__dagster_no_value"
return dagster_tags
@dbt_assets(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```

### Customizing auto-materialize policies

For dbt models, seeds, and snapshots, the default <PyObject object="AutoMaterializePolicy"/> will be `None`.
Expand Down
30 changes: 30 additions & 0 deletions examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,36 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
# end_custom_metadata_dagster_dbt_translator


def scope_custom_tags_dagster_dbt_translator():
# start_custom_tags_dagster_dbt_translator
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping

manifest_path = Path("path/to/dbt_project/target/manifest.json")

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
dbt_tags = dbt_resource_props.get("tags", [])
dagster_tags = {}
for tag in dbt_tags:
key, _, value = tag.partition("=")

dagster_tags[key] = value if value else "__dagster_no_value"

return dagster_tags

@dbt_assets(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

# end_custom_tags_dagster_dbt_translator


def scope_custom_auto_materialize_policy_dagster_dbt_translator():
# start_custom_auto_materialize_policy_dagster_dbt_translator
from pathlib import Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ def get_dbt_multi_asset_args(
DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest),
DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator,
},
tags=dagster_dbt_translator.get_tags(dbt_resource_props),
group_name=dagster_dbt_translator.get_group_name(dbt_resource_props),
code_version=default_code_version_fn(dbt_resource_props),
freshness_policy=dagster_dbt_translator.get_freshness_policy(dbt_resource_props),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
CoercibleToAssetKeyPrefix,
check_opt_coercible_to_asset_key_prefix_param,
)
from dagster._core.definitions.utils import is_valid_definition_tag_key
from dagster._core.storage.tags import TAG_NO_VALUE

from .asset_utils import (
default_asset_key_fn,
Expand Down Expand Up @@ -218,6 +220,48 @@ def get_metadata(cls, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any
"""
return default_metadata_from_dbt_resource_props(dbt_resource_props)

@classmethod
@public
def get_tags(cls, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
"""A function that takes a dictionary representing properties of a dbt resource, and
returns the Dagster tags for that resource.
Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents
a model, seed, snapshot or source in a given dbt project. You can learn more about dbt
resources and the properties available in this dictionary here:
https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
dbt tags are strings, but Dagster tags are key-value pairs. To bridge this divide, the dbt
tag string is used as the Dagster tag key, and the Dagster tag value is set to special
sentinel value `"__dagster_no_value"`.
Any dbt tags that don't match Dagster's supported tag key format (e.g. they contain
unsupported characters) will be ignored.
This method can be overridden to provide custom tags for a dbt resource.
Args:
dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
Returns:
Mapping[str, str]: A dictionary representing the Dagster tags for the dbt resource.
Examples:
.. code-block:: python
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_tags(cls, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
return {"custom": "tag"}
"""
tags = dbt_resource_props.get("tags", [])
return {tag: TAG_NO_VALUE for tag in tags if is_valid_definition_tag_key(tag)}

@classmethod
@public
def get_group_name(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.storage.tags import TAG_NO_VALUE
from dagster._core.types.dagster_type import DagsterType
from dagster_dbt.asset_decorator import DUPLICATE_ASSET_KEY_ERROR_MESSAGE, dbt_assets
from dagster_dbt.core.resources_v2 import DbtCliResource
Expand Down Expand Up @@ -495,6 +496,23 @@ def my_dbt_assets(): ...
assert metadata["customized"] == "metadata"


def test_with_tag_replacements(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
expected_tags = {"customized": "tag"}

class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_tags(cls, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
return expected_tags

@dbt_assets(
manifest=test_jaffle_shop_manifest, dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def my_dbt_assets(): ...

for metadata in my_dbt_assets.tags_by_key.values():
assert metadata["customized"] == "tag"


def test_with_group_replacements(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
expected_group = "customized_group"

Expand Down Expand Up @@ -612,6 +630,18 @@ def my_dbt_assets(): ...
}


def test_dbt_config_tags(test_meta_config_manifest: Dict[str, Any]) -> None:
@dbt_assets(manifest=test_meta_config_manifest)
def my_dbt_assets(): ...

assert my_dbt_assets.tags_by_key[AssetKey("customers")] == {
"foo": TAG_NO_VALUE,
"bar-baz": TAG_NO_VALUE,
}
for asset_key in my_dbt_assets.keys - {AssetKey("customers")}:
assert my_dbt_assets.tags_by_key[asset_key] == {}


def test_dbt_with_downstream_asset_via_definition(test_meta_config_manifest: Dict[str, Any]):
@dbt_assets(manifest=test_meta_config_manifest)
def my_dbt_assets(): ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ sources:
models:
- name: customers
description: This table has basic information about a customer, as well as some derived facts based on a customer's orders
config:
tags:
- foo
- bar-baz
- invalid_tag_key=fdjskl
meta:
dagster:
has_self_dependency: True
Expand Down

0 comments on commit 3533cfb

Please sign in to comment.