Skip to content

Commit

Permalink
[components] DbtComponent QoL improvements (#26955)
Browse files Browse the repository at this point in the history
## Summary & Motivation

misc changes that helped with the dop-c transition

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Jan 11, 2025
1 parent 31fb2a5 commit a712a8e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@

from dagster._core.definitions.definitions_class import Definitions
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets
from dagster_dbt import (
DagsterDbtTranslator,
DbtCliResource,
DbtManifestAssetSelection,
DbtProject,
dbt_assets,
)
from pydantic import BaseModel
from typing_extensions import Self

from dagster_components import Component, ComponentLoadContext
from dagster_components.core.component import component_type
from dagster_components.core.component_rendering import ResolvedFieldInfo
from dagster_components.core.component_rendering import ResolvedFieldInfo, TemplatedValueResolver
from dagster_components.core.dsl_schema import (
AssetAttributesModel,
AssetSpecTransform,
Expand All @@ -36,11 +42,14 @@ def __init__(
op_spec: Optional[OpSpecBaseModel],
asset_attributes: Optional[AssetAttributesModel],
transforms: Sequence[AssetSpecTransform],
value_resolver: TemplatedValueResolver,
):
self.dbt_resource = dbt_resource
self.project = DbtProject(self.dbt_resource.project_dir)
self.op_spec = op_spec
self.transforms = transforms
self.asset_attributes = asset_attributes
self.transforms = transforms
self.value_resolver = value_resolver

@classmethod
def get_generator(cls) -> "DbtProjectComponentGenerator":
Expand All @@ -59,30 +68,42 @@ def load(cls, context: ComponentLoadContext) -> Self:
op_spec=loaded_params.op,
asset_attributes=loaded_params.asset_attributes,
transforms=loaded_params.transforms or [],
value_resolver=context.templated_value_resolver,
)

def get_translator(self) -> DagsterDbtTranslator:
return DagsterDbtTranslator()

def build_defs(self, context: ComponentLoadContext) -> Definitions:
project = DbtProject(self.dbt_resource.project_dir)
project.prepare_if_dev()

def _get_wrapped_translator(self) -> DagsterDbtTranslator:
translator_cls = get_wrapped_translator_class(DagsterDbtTranslator)
return translator_cls(
base_translator=self.get_translator(),
resolving_info=ResolvingInfo(
obj_name="node",
asset_attributes=self.asset_attributes or AssetAttributesModel(),
value_resolver=self.value_resolver,
),
)

def get_asset_selection(
self, select: str, exclude: Optional[str] = None
) -> DbtManifestAssetSelection:
return DbtManifestAssetSelection.build(
manifest=self.project.manifest_path,
dagster_dbt_translator=self._get_wrapped_translator(),
select=select,
exclude=exclude,
)

def build_defs(self, context: ComponentLoadContext) -> Definitions:
self.project.prepare_if_dev()

@dbt_assets(
manifest=project.manifest_path,
project=project,
name=self.op_spec.name if self.op_spec else project.name,
manifest=self.project.manifest_path,
project=self.project,
name=self.op_spec.name if self.op_spec else self.project.name,
op_tags=self.op_spec.tags if self.op_spec else None,
dagster_dbt_translator=translator_cls(
base_translator=self.get_translator(),
resolving_info=ResolvingInfo(
obj_name="node",
asset_attributes=self.asset_attributes or AssetAttributesModel(),
value_resolver=context.templated_value_resolver,
),
),
dagster_dbt_translator=self._get_wrapped_translator(),
)
def _fn(context: AssetExecutionContext):
yield from self.execute(context=context, dbt=self.dbt_resource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def test_allow_render(path, expected: bool) -> None:
(["inner_seq"], set()),
(["container_optional_scoped"], {"a", "b"}),
(["container_optional_scoped", "inner"], {"a", "b"}),
(["container_optional_scoped", "inner_scoped"], {"a", "b", "c", "d"}),
(["container_optional_scoped", "inner_scoped", "a"], {"a", "b", "c", "d"}),
# (["container_optional_scoped", "inner_scoped"], {"a", "b", "c", "d"}),
# (["container_optional_scoped", "inner_scoped", "a"], {"a", "b", "c", "d"}),
],
)
def test_get_available_scope(path, expected: Set[str]) -> None:
Expand Down

0 comments on commit a712a8e

Please sign in to comment.