Skip to content

Commit

Permalink
fix(dbt): use correct relation identifier when creating BaseRelation (
Browse files Browse the repository at this point in the history
#22823)

## Summary & Motivation
When creating the `BaseRelation`, we want to use the name of the table
that's created in the data warehouse so that we can properly query the
relation.

Previously, we were using `dbt_resource_props["name"]` as the
identifier, which is the dbt's reference to that table. Instead, we
should use the underlying identifier name.

For dbt sources, this is `dbt_resource_props["identifier"]`. For dbt
models and seeds, this is `dbt_resource_props["alias"]`.

## How I Tested These Changes
pytest: added dbt aliases for source and model in existing
`test_dagster_metadata` project
  • Loading branch information
rexledesma authored Jul 2, 2024
1 parent 4e0c5fe commit 58ad7b0
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from dagster._model.pydantic_compat_layer import compat_model_validator
from dagster._utils import pushd
from dagster._utils.warnings import disable_dagster_warnings
from dbt.adapters.base.impl import BaseAdapter, BaseColumn
from dbt.adapters.base.impl import BaseAdapter, BaseColumn, BaseRelation
from dbt.adapters.factory import get_adapter, register_adapter, reset_adapters
from dbt.config import RuntimeConfig
from dbt.config.runtime import load_profile, load_project
Expand Down Expand Up @@ -774,6 +774,20 @@ class EventHistoryMetadata(NamedTuple):
parents: Dict[str, Dict[str, Any]]


def _build_relation_from_dbt_resource_props(
adapter: BaseAdapter, dbt_resource_props: Dict[str, Any]
) -> BaseRelation:
return adapter.Relation.create(
database=dbt_resource_props["database"],
schema=dbt_resource_props["schema"],
identifier=(
dbt_resource_props["identifier"]
if dbt_resource_props["unique_id"].startswith("source")
else dbt_resource_props["alias"]
),
)


def _build_column_lineage_metadata(
event_history_metadata: EventHistoryMetadata,
dbt_resource_props: Dict[str, Any],
Expand Down Expand Up @@ -940,10 +954,8 @@ def _fetch_column_metadata(

with adapter.connection_named(f"column_metadata_{dbt_resource_props['unique_id']}"):
try:
relation = adapter.Relation.create(
database=dbt_resource_props["database"],
schema=dbt_resource_props["schema"],
identifier=dbt_resource_props["name"],
relation = _build_relation_from_dbt_resource_props(
adapter=adapter, dbt_resource_props=dbt_resource_props
)
cols: List[BaseColumn] = adapter.get_columns_in_relation(relation=relation)
except Exception as e:
Expand Down Expand Up @@ -983,10 +995,8 @@ def _fetch_column_metadata(
parent_unique_id
) or invocation.manifest["sources"].get(parent_unique_id)

parent_relation = adapter.Relation.create(
database=dbt_parent_resource_props["database"],
schema=dbt_parent_resource_props["schema"],
identifier=dbt_parent_resource_props["name"],
parent_relation = _build_relation_from_dbt_resource_props(
adapter=adapter, dbt_resource_props=dbt_parent_resource_props
)
parent_columns: List[BaseColumn] = adapter.get_columns_in_relation(
relation=parent_relation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ models:
seeds:
+post-hook:
- "{{ dagster.log_column_level_metadata(enable_parent_relation_metadata_collection=var('dagster_enable_parent_relation_metadata_collection', 'true')) if env_var('DBT_LOG_COLUMN_METADATA', 'true') == 'true' else null }}"
test_dagster_metadata:
raw_orders:
+alias: aliased_raw_orders
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ models:
- name: orders
description: This table has basic information about orders, as well as some derived facts based on payments

config:
alias: aliased_orders

columns:
- name: order_id
tests:
Expand Down

0 comments on commit 58ad7b0

Please sign in to comment.