Skip to content

Commit

Permalink
[dagster-dbt] Add get_partitions_def() method to DagsterDbtTranslator (
Browse files Browse the repository at this point in the history
…#26625)

## Summary & Motivation

As title. This lets individual dbt models have distinct partitions definitions

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Dec 20, 2024
1 parent c79ff2e commit 1c4a630
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def to_spec(
key=key,
tags={**additional_tags, **self.tags} if self.tags else additional_tags,
deps=[*self._spec.deps, *deps],
partitions_def=partitions_def,
partitions_def=partitions_def if partitions_def is not None else ...,
)

@public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,14 +829,14 @@ def build_dbt_multi_asset_args(
automation_condition=dagster_dbt_translator.get_automation_condition(
dbt_resource_props
),
partitions_def=dagster_dbt_translator.get_partitions_def(dbt_resource_props),
)
if io_manager_key:
spec = spec.with_io_manager_key(io_manager_key)

outs[output_name] = AssetOut.from_spec(
spec=spec,
dagster_type=Nothing,
is_required=False,
io_manager_key=io_manager_key,
)

test_unique_ids = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
_check as check,
)
from dagster._annotations import experimental, public
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._utils.tags import is_valid_tag_key

from dagster_dbt.asset_utils import (
Expand Down Expand Up @@ -520,6 +521,40 @@ def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Opt
auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None
)

def get_partitions_def(
self, dbt_resource_props: Mapping[str, Any]
) -> Optional[PartitionsDefinition]:
"""[INTERNAL] A function that takes a dictionary representing properties of a dbt resource, and
returns the Dagster :py:class:`dagster.PartitionsDefinition` for that resource.
This method can be overridden to provide a custom PartitionsDefinition for a dbt resource.
Args:
dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
Returns:
Optional[PartitionsDefinition]: A Dagster partitions definition.
Examples:
Set a custom AutomationCondition for dbt resources with a specific tag:
.. code-block:: python
from typing import Any, Mapping
from dagster import DailyPartitionsDefinition
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_partitions_def(self, dbt_resource_props: Mapping[str, Any]) -> Optional[PartitionsDefinition]:
if "my_custom_tag" in dbt_resource_props.get("tags", []):
return DailyPartitionsDefinition(start_date="2022-01-01")
else:
return None
"""
return None


@dataclass
class DbtManifestWrapper:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,35 @@ def my_dbt_assets(): ...
)


def test_with_varying_partitions_defs(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")
override_keys = {AssetKey("customers"), AssetKey("orders")}

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_partitions_def(
self, dbt_resource_props: Mapping[str, Any]
) -> Optional[PartitionsDefinition]:
asset_key = super().get_asset_key(dbt_resource_props)
if asset_key in override_keys:
return daily_partitions
else:
return None

@dbt_assets(
manifest=test_jaffle_shop_manifest, dagster_dbt_translator=CustomDagsterDbtTranslator()
)
def my_dbt_assets(): ...

assert set(my_dbt_assets.keys) > override_keys

for spec in my_dbt_assets.specs:
partitions_def = spec.partitions_def
if spec.key in override_keys:
assert partitions_def == daily_partitions, spec.key
else:
assert partitions_def is None, spec.key


def test_dbt_meta_auto_materialize_policy(test_meta_config_manifest: Dict[str, Any]) -> None:
expected_auto_materialize_policy = AutoMaterializePolicy.eager()
expected_specs_by_key = {
Expand Down

0 comments on commit 1c4a630

Please sign in to comment.