Skip to content

Commit

Permalink
feat(dbt): emit column lineage using sqlglot
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed Mar 18, 2024
1 parent 826e7a3 commit 579cb49
Show file tree
Hide file tree
Showing 11 changed files with 575 additions and 111 deletions.
18 changes: 9 additions & 9 deletions docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -530,18 +530,18 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):

---

## Emit column schema as materialization metadata <Experimental />
## Emit column-level metadata as materialization metadata <Experimental />

<Note>
<strong>
Emitting column schema as materialization metadata is currently an
Emitting column-level metadata as materialization metadata is currently an
experimental feature.{" "}
</strong>{" "}
To use this feature, you'll need to be on at least `dagster==1.6.6` and
`dagster-dbt==0.22.6`.
To use this feature, you'll need to be on at least `dagster>=1.6.12` and
`dagster-dbt>=0.22.12`.
</Note>

Dagster allows you to emit column schema [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata), which includes the column names and data types of your materialized dbt models, seeds, and snapshots.
Dagster allows you to emit column-level metadata, like column schema and column dependencies, as [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata).

With this metadata, you can view documentation in Dagster for all columns, not just columns described in your dbt project.

Expand All @@ -554,20 +554,20 @@ packages:
revision: DAGSTER_VERSION # replace with the version of `dagster` you are using.
```

Then, enable the `dagster.log_columns_in_relation()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots:
Then, enable the `dagster.log_column_level_metadata()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots:

```yaml
models:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
- "{{ dagster.log_column_level_metadata() }}"

seeds:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
- "{{ dagster.log_column_level_metadata() }}"

snapshots:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
- "{{ dagster.log_column_level_metadata() }}"
```
---
Expand Down
10 changes: 7 additions & 3 deletions pyright/alt-1/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ asn1crypto==1.5.1
astroid==3.1.0
asttokens==2.4.1
async-lru==2.0.4
async-timeout==4.0.3
attrs==23.2.0
babel==2.14.0
backoff==2.2.1
Expand Down Expand Up @@ -76,6 +77,7 @@ distlib==0.3.8
docker==7.0.0
docstring-parser==0.15
duckdb==0.10.0
exceptiongroup==1.2.0
execnet==2.0.2
executing==2.0.1
fastjsonschema==2.19.1
Expand Down Expand Up @@ -110,7 +112,7 @@ httplib2==0.22.0
httptools==0.6.1
httpx==0.27.0
humanfriendly==10.0
hypothesis==6.99.2
hypothesis==6.99.4
idna==3.6
importlib-metadata==6.11.0
iniconfig==2.0.0
Expand Down Expand Up @@ -178,7 +180,7 @@ overrides==7.7.0
packaging==24.0
pandas==2.0.3
pandas-stubs==2.2.0.240218
pandera==0.18.2
pandera==0.18.3
pandocfilters==1.5.1
parsedatetime==2.6
parso==0.8.3
Expand Down Expand Up @@ -256,6 +258,8 @@ snowflake-sqlalchemy==1.5.1
sortedcontainers==2.4.0
soupsieve==2.5
sqlalchemy==1.4.52
sqlglot==22.3.1
sqlglotrs==0.1.2
sqlparse==0.4.4
stack-data==0.6.3
starlette==0.37.2
Expand Down Expand Up @@ -315,7 +319,7 @@ webcolors==1.13
webencodings==0.5.1
websocket-client==1.7.0
websockets==12.0
wheel==0.42.0
wheel==0.43.0
wrapt==1.16.0
yarl==1.9.4
zipp==3.17.0
14 changes: 9 additions & 5 deletions pyright/master/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ asn1crypto==1.5.1
-e examples/assets_pandas_pyspark
asttokens==2.4.1
async-lru==2.0.4
async-timeout==4.0.3
attrs==23.2.0
autodocsumm==0.2.12
autoflake==2.3.0
Expand All @@ -56,8 +57,8 @@ bitmath==1.3.3.1
bleach==6.1.0
blinker==1.7.0
bokeh==3.3.4
boto3==1.34.59
botocore==1.34.59
boto3==1.34.60
botocore==1.34.60
buildkite-test-collector==0.1.7
cachecontrol==0.14.0
cached-property==1.5.2
Expand Down Expand Up @@ -195,6 +196,7 @@ duckdb==0.10.0
ecdsa==0.18.0
email-validator==1.3.1
entrypoints==0.4
exceptiongroup==1.2.0
execnet==2.0.2
executing==2.0.1
expandvars==0.12.0
Expand Down Expand Up @@ -252,7 +254,7 @@ httplib2==0.22.0
httptools==0.6.1
httpx==0.27.0
humanfriendly==10.0
hypothesis==6.99.2
hypothesis==6.99.4
idna==3.6
ijson==3.2.3
imagesize==1.4.1
Expand Down Expand Up @@ -374,7 +376,7 @@ packaging==23.2
pandas==2.0.3
pandas-gbq==0.22.0
pandas-stubs==2.2.0.240218
pandera==0.18.2
pandera==0.18.3
pandocfilters==1.5.1
papermill==2.5.0
papermill-origami==0.0.30
Expand Down Expand Up @@ -507,6 +509,8 @@ sphinxcontrib-serializinghtml==1.1.10
sqlalchemy==1.4.52
sqlalchemy-jsonfield==1.0.2
sqlalchemy-utils==0.41.1
sqlglot==22.3.1
sqlglotrs==0.1.2
sqlparse==0.4.4
sshpubkeys==3.3.1
sshtunnel==0.4.0
Expand Down Expand Up @@ -587,7 +591,7 @@ webencodings==0.5.1
websocket-client==1.7.0
websockets==12.0
werkzeug==2.2.3
wheel==0.42.0
wheel==0.43.0
widgetsnbextension==4.0.10
-e examples/with_airflow
-e examples/with_great_expectations
Expand Down
146 changes: 141 additions & 5 deletions python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand All @@ -27,10 +29,12 @@
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetMaterialization,
AssetObservation,
AssetsDefinition,
ConfigurableResource,
JsonMetadataValue,
OpExecutionContext,
Output,
get_dagster_logger,
Expand All @@ -43,6 +47,13 @@
from dbt.version import __version__ as dbt_version
from packaging import version
from pydantic import Field, validator
from sqlglot import (
MappingSchema,
exp,
parse_one,
)
from sqlglot.lineage import lineage
from sqlglot.optimizer import optimize
from typing_extensions import Literal

from ..asset_utils import (
Expand Down Expand Up @@ -104,19 +115,28 @@ def log_level(self) -> str:
"""The log level of the event."""
return self.raw_event["info"]["level"]

@property
def has_column_lineage_metadata(self) -> bool:
"""Whether the event has column level lineage metadata."""
return bool(self._event_history_metadata) and "parents" in self._event_history_metadata

@public
def to_default_asset_events(
self,
manifest: DbtManifestParam,
dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
context: Optional[OpExecutionContext] = None,
target_path: Optional[Path] = None,
) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]:
"""Convert a dbt CLI event to a set of corresponding Dagster events.
Args:
manifest (Union[Mapping[str, Any], str, Path]): The dbt manifest blob.
dagster_dbt_translator (DagsterDbtTranslator): Optionally, a custom translator for
linking dbt nodes to Dagster assets.
context (Optional[OpExecutionContext]): The execution context.
target_path (Optional[Path]): An explicit path to a target folder used to retrieve
dbt artifacts while generating events.
Returns:
Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]:
Expand Down Expand Up @@ -175,6 +195,16 @@ def to_default_asset_events(
finished_at = dateutil.parser.isoparse(event_node_info["node_finished_at"])
duration_seconds = (finished_at - started_at).total_seconds()

lineage_metadata = (
self._build_column_lineage_metadata(
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator,
target_path=target_path,
)
if target_path
else {}
)

if has_asset_def:
yield Output(
value=None,
Expand All @@ -183,6 +213,7 @@ def to_default_asset_events(
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
**lineage_metadata,
},
)
else:
Expand All @@ -195,6 +226,7 @@ def to_default_asset_events(
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
**lineage_metadata,
},
)
elif manifest and node_resource_type == NodeType.Test and is_node_finished:
Expand Down Expand Up @@ -286,6 +318,108 @@ def _process_adapter_response_metadata(

return processed_adapter_response

def _build_column_lineage_metadata(
self,
manifest: Mapping[str, Any],
dagster_dbt_translator: DagsterDbtTranslator,
target_path: Path,
) -> Dict[str, Any]:
"""Process the lineage metadata for a dbt CLI event.
Args:
manifest (Mapping[str, Any]): The dbt manifest blob.
dagster_dbt_translator (DagsterDbtTranslator): The translator for dbt nodes to Dagster assets.
target_path (Path): The path to the dbt target folder.
Returns:
Dict[str, Any]: The lineage metadata.
"""
if (
# The dbt project name is only available from the manifest in `dbt-core>=1.6`.
version.parse(dbt_version) < version.parse("1.6.0")
# Column lineage can only be built if initial metadata is provided.
or not self.has_column_lineage_metadata
):
return {}

# TODO: this should be refactored as a Python object that renders the JSON metadata.
lineage_metadata: Dict[str, List[Tuple[AssetKey, str]]] = {}
sqlglot_mapping_schema = MappingSchema()

event_node_info: Dict[str, Any] = self.raw_event["data"].get("node_info")
unique_id: str = event_node_info["unique_id"]
dbt_resource_props: Dict[str, Any] = manifest["nodes"][unique_id]

# If the unique_id is a seed, then we don't need to process lineage.
if unique_id.startswith("seed"):
return {}

# 1. Retrieve the current node's SQL file and its parents' column schemas.
for relation_name, relation_metadata in self._event_history_metadata["parents"].items():
sqlglot_mapping_schema.add_table(
table=relation_name,
column_mapping={
column_name: column_metadata["data_type"]
for column_name, column_metadata in relation_metadata["columns"].items()
},
)

node_sql_path = target_path.joinpath(
"run", manifest["metadata"]["project_name"], dbt_resource_props["original_file_path"]
)
node_ast = parse_one(sql=node_sql_path.read_text()).expression
optimized_node_ast = cast(
exp.Query,
optimize(
node_ast,
schema=sqlglot_mapping_schema,
validate_qualify_columns=False, # Don't throw an error if we can't qualify a column without ambiguity.
),
)

# 2. Retrieve the column names from the current node.
column_names = cast(exp.Query, optimized_node_ast).named_selects

# 3. For each column, retrieve its dependencies on upstream columns from direct parents.
for column_name in column_names:
dbt_parent_resource_props_by_alias: Dict[str, Dict[str, Any]] = {
parent_dbt_resource_props["alias"]: parent_dbt_resource_props
for parent_dbt_resource_props in map(
lambda parent_unique_id: manifest["nodes"][parent_unique_id],
dbt_resource_props["depends_on"]["nodes"],
)
}

parent_columns: Set[Tuple[AssetKey, str]] = set()
for sqlglot_lineage_node in lineage(
column=column_name, sql=optimized_node_ast, schema=sqlglot_mapping_schema
).walk():
column = sqlglot_lineage_node.expression.find(exp.Column)
if column and column.table in dbt_parent_resource_props_by_alias:
parent_resource_props = dbt_parent_resource_props_by_alias[column.table]
parent_asset_key = dagster_dbt_translator.get_asset_key(parent_resource_props)

parent_columns.add((parent_asset_key, column.name))

lineage_metadata[column_name] = list(parent_columns)

# 4. Render the lineage as a JSON blob.
# TODO: this should just call a method on a Python object that renders the JSON metadata.
return {
"dagster/column_lineage": JsonMetadataValue(
{
column_name: [
{
"upstream_asset_key": parent_asset_key,
"upstream_column_name": parent_column_name,
}
for parent_asset_key, parent_column_name in parent_columns
]
for column_name, parent_columns in lineage_metadata.items()
}
)
}


@dataclass
class DbtCliInvocation:
Expand Down Expand Up @@ -456,6 +590,7 @@ def my_dbt_assets(context, dbt: DbtCliResource):
manifest=self.manifest,
dagster_dbt_translator=self.dagster_dbt_translator,
context=self.context,
target_path=self.target_path,
)

@public
Expand Down Expand Up @@ -489,14 +624,15 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
if is_error_message:
self._error_messages.append(str(event))

# Attempt to parse the columns metadata from the event message.
# Attempt to parse the column level metadata from the event message.
# If it exists, save it as historical metadata to attach to the NodeFinished event.
if event.raw_event["info"]["name"] == "JinjaLogInfo":
with contextlib.suppress(orjson.JSONDecodeError):
columns = orjson.loads(event.raw_event["info"]["msg"])
event_history_metadata_by_unique_id[cast(str, unique_id)] = {
"columns": columns
}
column_level_metadata = orjson.loads(event.raw_event["info"]["msg"])

event_history_metadata_by_unique_id[cast(str, unique_id)] = (
column_level_metadata
)

# Don't show this message in stdout
continue
Expand Down
Loading

0 comments on commit 579cb49

Please sign in to comment.