From 9c2119a72ff45fe270155deced364f8e9023317a Mon Sep 17 00:00:00 2001 From: Rex Ledesma Date: Mon, 11 Mar 2024 16:04:11 -0400 Subject: [PATCH] feat(dbt): emit column lineage using `sqlglot` --- docs/content/integrations/dbt/reference.mdx | 18 +- pyright/alt-1/requirements-pinned.txt | 10 +- pyright/master/requirements-pinned.txt | 14 +- .../dagster_dbt/core/resources_v2.py | 146 +++++++- .../dagster-dbt/dagster_dbt_tests/conftest.py | 24 +- .../dbt_packages/test_columns_metadata.py | 317 ++++++++++++++++-- .../test_dagster_metadata/dbt_project.yml | 4 +- .../models/select_star_customers.sql | 1 + .../macros/log_column_level_metadata.sql | 63 ++++ .../macros/log_columns_in_relation.sql | 14 - python_modules/libraries/dagster-dbt/setup.py | 1 + 11 files changed, 543 insertions(+), 69 deletions(-) create mode 100644 python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/select_star_customers.sql create mode 100644 python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql delete mode 100644 python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_columns_in_relation.sql diff --git a/docs/content/integrations/dbt/reference.mdx b/docs/content/integrations/dbt/reference.mdx index 878dac7f64fc1..3385efc6ed595 100644 --- a/docs/content/integrations/dbt/reference.mdx +++ b/docs/content/integrations/dbt/reference.mdx @@ -530,18 +530,18 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): --- -## Emit column schema as materialization metadata +## Emit column-level metadata as materialization metadata - Emitting column schema as materialization metadata is currently an + Emitting column-level metadata as materialization metadata is currently an experimental feature.{" "} {" "} - To use this feature, you'll need to be on at least `dagster==1.6.6` and - `dagster-dbt==0.22.6`. + To use this feature, you'll need to be on at least `dagster>=1.6.12` and + `dagster-dbt>=0.22.12`. -Dagster allows you to emit column schema [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata), which includes the column names and data types of your materialized dbt models, seeds, and snapshots. +Dagster allows you to emit column-level metadata, like column schema and column dependencies, as [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata). With this metadata, you can view documentation in Dagster for all columns, not just columns described in your dbt project. @@ -554,20 +554,20 @@ packages: revision: DAGSTER_VERSION # replace with the version of `dagster` you are using. ``` -Then, enable the `dagster.log_columns_in_relation()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots: +Then, enable the `dagster.log_column_level_metadata()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots: ```yaml models: +post-hook: - - "{{ dagster.log_columns_in_relation() }}" + - "{{ dagster.log_column_level_metadata() }}" seeds: +post-hook: - - "{{ dagster.log_columns_in_relation() }}" + - "{{ dagster.log_column_level_metadata() }}" snapshots: +post-hook: - - "{{ dagster.log_columns_in_relation() }}" + - "{{ dagster.log_column_level_metadata() }}" ``` --- diff --git a/pyright/alt-1/requirements-pinned.txt b/pyright/alt-1/requirements-pinned.txt index d1aaf4e23f4c9..471216e45930a 100644 --- a/pyright/alt-1/requirements-pinned.txt +++ b/pyright/alt-1/requirements-pinned.txt @@ -17,6 +17,7 @@ asn1crypto==1.5.1 astroid==3.1.0 asttokens==2.4.1 async-lru==2.0.4 +async-timeout==4.0.3 attrs==23.2.0 babel==2.14.0 backoff==2.2.1 @@ -76,6 +77,7 @@ distlib==0.3.8 docker==7.0.0 docstring-parser==0.15 duckdb==0.10.0 +exceptiongroup==1.2.0 execnet==2.0.2 executing==2.0.1 fastjsonschema==2.19.1 @@ -110,7 +112,7 @@ httplib2==0.22.0 httptools==0.6.1 httpx==0.27.0 humanfriendly==10.0 -hypothesis==6.99.2 +hypothesis==6.99.4 idna==3.6 importlib-metadata==6.11.0 iniconfig==2.0.0 @@ -178,7 +180,7 @@ overrides==7.7.0 packaging==24.0 pandas==2.0.3 pandas-stubs==2.2.0.240218 -pandera==0.18.2 +pandera==0.18.3 pandocfilters==1.5.1 parsedatetime==2.6 parso==0.8.3 @@ -256,6 +258,8 @@ snowflake-sqlalchemy==1.5.1 sortedcontainers==2.4.0 soupsieve==2.5 sqlalchemy==1.4.52 +sqlglot==22.3.1 +sqlglotrs==0.1.2 sqlparse==0.4.4 stack-data==0.6.3 starlette==0.37.2 @@ -315,7 +319,7 @@ webcolors==1.13 webencodings==0.5.1 websocket-client==1.7.0 websockets==12.0 -wheel==0.42.0 +wheel==0.43.0 wrapt==1.16.0 yarl==1.9.4 zipp==3.17.0 diff --git a/pyright/master/requirements-pinned.txt b/pyright/master/requirements-pinned.txt index aff6e50a4d701..b6280c7592ce2 100644 --- a/pyright/master/requirements-pinned.txt +++ b/pyright/master/requirements-pinned.txt @@ -35,6 +35,7 @@ asn1crypto==1.5.1 -e examples/assets_pandas_pyspark asttokens==2.4.1 async-lru==2.0.4 +async-timeout==4.0.3 attrs==23.2.0 autodocsumm==0.2.12 autoflake==2.3.0 @@ -56,8 +57,8 @@ bitmath==1.3.3.1 bleach==6.1.0 blinker==1.7.0 bokeh==3.3.4 -boto3==1.34.59 -botocore==1.34.59 +boto3==1.34.60 +botocore==1.34.60 buildkite-test-collector==0.1.7 cachecontrol==0.14.0 cached-property==1.5.2 @@ -195,6 +196,7 @@ duckdb==0.10.0 ecdsa==0.18.0 email-validator==1.3.1 entrypoints==0.4 +exceptiongroup==1.2.0 execnet==2.0.2 executing==2.0.1 expandvars==0.12.0 @@ -252,7 +254,7 @@ httplib2==0.22.0 httptools==0.6.1 httpx==0.27.0 humanfriendly==10.0 -hypothesis==6.99.2 +hypothesis==6.99.4 idna==3.6 ijson==3.2.3 imagesize==1.4.1 @@ -374,7 +376,7 @@ packaging==23.2 pandas==2.0.3 pandas-gbq==0.22.0 pandas-stubs==2.2.0.240218 -pandera==0.18.2 +pandera==0.18.3 pandocfilters==1.5.1 papermill==2.5.0 papermill-origami==0.0.30 @@ -507,6 +509,8 @@ sphinxcontrib-serializinghtml==1.1.10 sqlalchemy==1.4.52 sqlalchemy-jsonfield==1.0.2 sqlalchemy-utils==0.41.1 +sqlglot==22.3.1 +sqlglotrs==0.1.2 sqlparse==0.4.4 sshpubkeys==3.3.1 sshtunnel==0.4.0 @@ -587,7 +591,7 @@ webencodings==0.5.1 websocket-client==1.7.0 websockets==12.0 werkzeug==2.2.3 -wheel==0.42.0 +wheel==0.43.0 widgetsnbextension==4.0.10 -e examples/with_airflow -e examples/with_great_expectations diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py index c4d59529f4087..b5f839cb39a8f 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py @@ -17,6 +17,8 @@ Mapping, Optional, Sequence, + Set, + Tuple, Union, cast, ) @@ -27,10 +29,12 @@ AssetCheckResult, AssetCheckSeverity, AssetExecutionContext, + AssetKey, AssetMaterialization, AssetObservation, AssetsDefinition, ConfigurableResource, + JsonMetadataValue, OpExecutionContext, Output, get_dagster_logger, @@ -43,6 +47,13 @@ from dbt.version import __version__ as dbt_version from packaging import version from pydantic import Field, validator +from sqlglot import ( + MappingSchema, + exp, + parse_one, +) +from sqlglot.lineage import lineage +from sqlglot.optimizer import optimize from typing_extensions import Literal from ..asset_utils import ( @@ -102,12 +113,18 @@ def log_level(self) -> str: """The log level of the event.""" return self.raw_event["info"]["level"] + @property + def has_column_lineage_metadata(self) -> bool: + """Whether the event has column level lineage metadata.""" + return bool(self._event_history_metadata) and "parents" in self._event_history_metadata + @public def to_default_asset_events( self, manifest: DbtManifestParam, dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(), context: Optional[OpExecutionContext] = None, + target_path: Optional[Path] = None, ) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]: """Convert a dbt CLI event to a set of corresponding Dagster events. @@ -115,6 +132,9 @@ def to_default_asset_events( manifest (Union[Mapping[str, Any], str, Path]): The dbt manifest blob. dagster_dbt_translator (DagsterDbtTranslator): Optionally, a custom translator for linking dbt nodes to Dagster assets. + context (Optional[OpExecutionContext]): The execution context. + target_path (Optional[Path]): An explicit path to a target folder used to retrieve + dbt artifacts while generating events. Returns: Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]: @@ -173,6 +193,16 @@ def to_default_asset_events( finished_at = dateutil.parser.isoparse(event_node_info["node_finished_at"]) duration_seconds = (finished_at - started_at).total_seconds() + lineage_metadata = ( + self._build_column_lineage_metadata( + manifest=manifest, + dagster_dbt_translator=dagster_dbt_translator, + target_path=target_path, + ) + if target_path + else {} + ) + if has_asset_def: yield Output( value=None, @@ -181,6 +211,7 @@ def to_default_asset_events( **default_metadata, "Execution Duration": duration_seconds, **adapter_response_metadata, + **lineage_metadata, }, ) else: @@ -193,6 +224,7 @@ def to_default_asset_events( **default_metadata, "Execution Duration": duration_seconds, **adapter_response_metadata, + **lineage_metadata, }, ) elif manifest and node_resource_type == NodeType.Test and is_node_finished: @@ -284,6 +316,108 @@ def _process_adapter_response_metadata( return processed_adapter_response + def _build_column_lineage_metadata( + self, + manifest: Mapping[str, Any], + dagster_dbt_translator: DagsterDbtTranslator, + target_path: Path, + ) -> Dict[str, Any]: + """Process the lineage metadata for a dbt CLI event. + + Args: + manifest (Mapping[str, Any]): The dbt manifest blob. + dagster_dbt_translator (DagsterDbtTranslator): The translator for dbt nodes to Dagster assets. + target_path (Path): The path to the dbt target folder. + + Returns: + Dict[str, Any]: The lineage metadata. + """ + if ( + # The dbt project name is only available from the manifest in `dbt-core>=1.6`. + version.parse(dbt_version) < version.parse("1.6.0") + # Column lineage can only be built if initial metadata is provided. + or not self.has_column_lineage_metadata + ): + return {} + + # TODO: this should be refactored as a Python object that renders the JSON metadata. + lineage_metadata: Dict[str, List[Tuple[AssetKey, str]]] = {} + sqlglot_mapping_schema = MappingSchema() + + event_node_info: Dict[str, Any] = self.raw_event["data"].get("node_info") + unique_id: str = event_node_info["unique_id"] + dbt_resource_props: Dict[str, Any] = manifest["nodes"][unique_id] + + # If the unique_id is a seed, then we don't need to process lineage. + if unique_id.startswith("seed"): + return {} + + # 1. Retrieve the current node's SQL file and its parents' column schemas. + for relation_name, relation_metadata in self._event_history_metadata["parents"].items(): + sqlglot_mapping_schema.add_table( + table=relation_name, + column_mapping={ + column_name: column_metadata["data_type"] + for column_name, column_metadata in relation_metadata["columns"].items() + }, + ) + + node_sql_path = target_path.joinpath( + "run", manifest["metadata"]["project_name"], dbt_resource_props["original_file_path"] + ) + node_ast = parse_one(sql=node_sql_path.read_text()).expression + optimized_node_ast = cast( + exp.Query, + optimize( + node_ast, + schema=sqlglot_mapping_schema, + validate_qualify_columns=False, # Don't throw an error if we can't qualify a column without ambiguity. + ), + ) + + # 2. Retrieve the column names from the current node. + column_names = cast(exp.Query, optimized_node_ast).named_selects + + # 3. For each column, retrieve its dependencies on upstream columns from direct parents. + for column_name in column_names: + dbt_parent_resource_props_by_alias: Dict[str, Dict[str, Any]] = { + parent_dbt_resource_props["alias"]: parent_dbt_resource_props + for parent_dbt_resource_props in map( + lambda parent_unique_id: manifest["nodes"][parent_unique_id], + dbt_resource_props["depends_on"]["nodes"], + ) + } + + parent_columns: Set[Tuple[AssetKey, str]] = set() + for sqlglot_lineage_node in lineage( + column=column_name, sql=optimized_node_ast, schema=sqlglot_mapping_schema + ).walk(): + column = sqlglot_lineage_node.expression.find(exp.Column) + if column and column.table in dbt_parent_resource_props_by_alias: + parent_resource_props = dbt_parent_resource_props_by_alias[column.table] + parent_asset_key = dagster_dbt_translator.get_asset_key(parent_resource_props) + + parent_columns.add((parent_asset_key, column.name)) + + lineage_metadata[column_name] = list(parent_columns) + + # 4. Render the lineage as a JSON blob. + # TODO: this should just call a method on a Python object that renders the JSON metadata. + return { + "dagster/column_lineage": JsonMetadataValue( + { + column_name: [ + { + "upstream_asset_key": parent_asset_key, + "upstream_column_name": parent_column_name, + } + for parent_asset_key, parent_column_name in parent_columns + ] + for column_name, parent_columns in lineage_metadata.items() + } + ) + } + @dataclass class DbtCliInvocation: @@ -454,6 +588,7 @@ def my_dbt_assets(context, dbt: DbtCliResource): manifest=self.manifest, dagster_dbt_translator=self.dagster_dbt_translator, context=self.context, + target_path=self.target_path, ) @public @@ -487,14 +622,15 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]: if is_error_message: self._error_messages.append(str(event)) - # Attempt to parse the columns metadata from the event message. + # Attempt to parse the column level metadata from the event message. # If it exists, save it as historical metadata to attach to the NodeFinished event. if event.raw_event["info"]["name"] == "JinjaLogInfo": with contextlib.suppress(orjson.JSONDecodeError): - columns = orjson.loads(event.raw_event["info"]["msg"]) - event_history_metadata_by_unique_id[cast(str, unique_id)] = { - "columns": columns - } + column_level_metadata = orjson.loads(event.raw_event["info"]["msg"]) + + event_history_metadata_by_unique_id[cast(str, unique_id)] = ( + column_level_metadata + ) # Don't show this message in stdout continue diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py index 35383c34736a6..d567da9637ce0 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py @@ -54,12 +54,15 @@ def disable_openblas_threading_affinity_fixture() -> None: os.environ["GOTOBLAS_MAIN_FREE"] = "1" -def _create_dbt_invocation(project_dir: Path) -> DbtCliInvocation: +def _create_dbt_invocation(project_dir: Path, build_project: bool = False) -> DbtCliInvocation: dbt = DbtCliResource(project_dir=os.fspath(project_dir), global_config_flags=["--quiet"]) dbt.cli(["deps"]).wait() dbt_invocation = dbt.cli(["compile"]).wait() + if build_project: + dbt.cli(["build"], raise_on_error=False).wait() + return dbt_invocation @@ -83,15 +86,10 @@ def test_jaffle_shop_manifest_fixture( @pytest.fixture(name="test_asset_checks_manifest", scope="session") def test_asset_checks_manifest_fixture() -> Dict[str, Any]: # Prepopulate duckdb with jaffle shop data to support testing individual asset checks. - ( - DbtCliResource( - project_dir=os.fspath(test_asset_checks_path), global_config_flags=["--quiet"] - ) - .cli(["build"], raise_on_error=False) - .wait() - ) - - return _create_dbt_invocation(test_asset_checks_path).get_artifact("manifest.json") + return _create_dbt_invocation( + test_asset_checks_path, + build_project=True, + ).get_artifact("manifest.json") @pytest.fixture(name="test_asset_key_exceptions_manifest", scope="session") @@ -138,4 +136,8 @@ def test_meta_config_manifest_fixture() -> Dict[str, Any]: @pytest.fixture(name="test_metadata_manifest", scope="session") def test_metadata_manifest_fixture() -> Dict[str, Any]: - return _create_dbt_invocation(test_metadata_path).get_artifact("manifest.json") + # Prepopulate duckdb with jaffle shop data to support testing individual column metadata. + return _create_dbt_invocation( + test_metadata_path, + build_project=True, + ).get_artifact("manifest.json") diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py index d6a9e4cf8322a..1951d9ad91905 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py @@ -1,10 +1,14 @@ import json import os import subprocess -from typing import Any, Dict, cast +from typing import Any, Dict, Optional, cast +import pytest from dagster import ( AssetExecutionContext, + AssetKey, + AssetSelection, + JsonMetadataValue, Output, TableColumn, TableSchema, @@ -13,6 +17,8 @@ from dagster._core.definitions.metadata import TableMetadataEntries from dagster_dbt.asset_decorator import dbt_assets from dagster_dbt.core.resources_v2 import DbtCliResource +from dbt.version import __version__ as dbt_version +from packaging import version from ...dbt_projects import test_jaffle_shop_path, test_metadata_path @@ -79,32 +85,303 @@ def assert_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource) assert result.success -def test_dbt_cli_no_jinja_log_info() -> None: - dbt = DbtCliResource(project_dir=os.fspath(test_metadata_path)) - dbt_cli_parse_invocation = dbt.cli(["parse"]) +@pytest.mark.skipif( + version.parse(dbt_version) < version.parse("1.6.0"), + reason="Retrieving the dbt project name from the manifest is only available in `dbt-core>=1.6`", +) +@pytest.mark.parametrize( + "asset_key_selection", + [ + None, + AssetKey(["raw_customers"]), + AssetKey(["stg_customers"]), + AssetKey(["customers"]), + AssetKey(["select_star_customers"]), + ], + ids=[ + "--select fqn:*", + "--select raw_customers", + "--select stg_customers", + "--select customers", + "--select select_star_customers", + ], +) +def test_dbt_cli_lineage_metadata( + test_metadata_manifest: Dict[str, Any], asset_key_selection: Optional[AssetKey] +) -> None: + @dbt_assets(manifest=test_metadata_manifest) + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() - assert dbt_cli_parse_invocation.is_successful() - assert not any( - event.raw_event["info"]["name"] == "JinjaLogInfo" - for event in dbt_cli_parse_invocation.stream_raw_events() + result = materialize( + [my_dbt_assets], + resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))}, + selection=asset_key_selection and AssetSelection.keys(asset_key_selection), ) + assert result.success + lineage_metadata_by_asset_key = { + event.materialization.asset_key: event.materialization.metadata.get( + "dagster/column_lineage" + ) + for event in result.get_asset_materialization_events() + } + expected_lineage_metadata_by_asset_key = { + AssetKey(["raw_payments"]): None, + AssetKey(["raw_customers"]): None, + AssetKey(["raw_orders"]): None, + AssetKey(["stg_customers"]): JsonMetadataValue( + data={ + "customer_id": [ + { + "upstream_asset_key": AssetKey(["raw_customers"]), + "upstream_column_name": "id", + } + ], + "first_name": [ + { + "upstream_asset_key": AssetKey(["raw_customers"]), + "upstream_column_name": "first_name", + } + ], + "last_name": [ + { + "upstream_asset_key": AssetKey(["raw_customers"]), + "upstream_column_name": "last_name", + } + ], + } + ), + AssetKey(["stg_orders"]): JsonMetadataValue( + data={ + "order_id": [ + { + "upstream_asset_key": AssetKey(["raw_orders"]), + "upstream_column_name": "id", + } + ], + "customer_id": [ + { + "upstream_asset_key": AssetKey(["raw_orders"]), + "upstream_column_name": "user_id", + } + ], + "order_date": [ + { + "upstream_asset_key": AssetKey(["raw_orders"]), + "upstream_column_name": "order_date", + } + ], + "status": [ + { + "upstream_asset_key": AssetKey(["raw_orders"]), + "upstream_column_name": "status", + } + ], + } + ), + AssetKey(["stg_payments"]): JsonMetadataValue( + data={ + "payment_id": [ + { + "upstream_asset_key": AssetKey(["raw_payments"]), + "upstream_column_name": "id", + } + ], + "order_id": [ + { + "upstream_asset_key": AssetKey(["raw_payments"]), + "upstream_column_name": "order_id", + } + ], + "payment_method": [ + { + "upstream_asset_key": AssetKey(["raw_payments"]), + "upstream_column_name": "payment_method", + } + ], + "amount": [ + { + "upstream_asset_key": AssetKey(["raw_payments"]), + "upstream_column_name": "amount", + } + ], + } + ), + AssetKey(["orders"]): JsonMetadataValue( + data={ + "order_id": [ + { + "upstream_asset_key": AssetKey(["stg_orders"]), + "upstream_column_name": "order_id", + } + ], + "customer_id": [ + { + "upstream_asset_key": AssetKey(["stg_orders"]), + "upstream_column_name": "customer_id", + } + ], + "order_date": [ + { + "upstream_asset_key": AssetKey(["stg_orders"]), + "upstream_column_name": "order_date", + } + ], + "status": [ + { + "upstream_asset_key": AssetKey(["stg_orders"]), + "upstream_column_name": "status", + } + ], + "credit_card_amount": [ + { + "upstream_asset_key": AssetKey(["stg_payments"]), + "upstream_column_name": "amount", + } + ], + "coupon_amount": [ + { + "upstream_asset_key": AssetKey(["stg_payments"]), + "upstream_column_name": "amount", + } + ], + "bank_transfer_amount": [ + { + "upstream_asset_key": AssetKey(["stg_payments"]), + "upstream_column_name": "amount", + } + ], + "gift_card_amount": [ + { + "upstream_asset_key": AssetKey(["stg_payments"]), + "upstream_column_name": "amount", + } + ], + "amount": [ + { + "upstream_asset_key": AssetKey(["stg_payments"]), + "upstream_column_name": "amount", + } + ], + } + ), + AssetKey(["customers"]): JsonMetadataValue( + data={ + "customer_id": [ + { + "upstream_asset_key": AssetKey(["stg_customers"]), + "upstream_column_name": "customer_id", + } + ], + "first_name": [ + { + "upstream_asset_key": AssetKey(["stg_customers"]), + "upstream_column_name": "first_name", + } + ], + "last_name": [ + { + "upstream_asset_key": AssetKey(["stg_customers"]), + "upstream_column_name": "last_name", + } + ], + "first_order": [ + { + "upstream_asset_key": AssetKey(["stg_orders"]), + "upstream_column_name": "order_date", + } + ], + "most_recent_order": [ + { + "upstream_asset_key": AssetKey(["stg_orders"]), + "upstream_column_name": "order_date", + } + ], + "number_of_orders": [ + { + "upstream_asset_key": AssetKey(["stg_orders"]), + "upstream_column_name": "order_id", + } + ], + "customer_lifetime_value": [ + { + "upstream_asset_key": AssetKey(["stg_payments"]), + "upstream_column_name": "amount", + } + ], + } + ), + AssetKey(["select_star_customers"]): JsonMetadataValue( + data={ + "customer_id": [ + { + "upstream_asset_key": AssetKey(["customers"]), + "upstream_column_name": "customer_id", + } + ], + "first_name": [ + { + "upstream_asset_key": AssetKey(["customers"]), + "upstream_column_name": "first_name", + } + ], + "last_name": [ + { + "upstream_asset_key": AssetKey(["customers"]), + "upstream_column_name": "last_name", + } + ], + "first_order": [ + { + "upstream_asset_key": AssetKey(["customers"]), + "upstream_column_name": "first_order", + } + ], + "most_recent_order": [ + { + "upstream_asset_key": AssetKey(["customers"]), + "upstream_column_name": "most_recent_order", + } + ], + "number_of_orders": [ + { + "upstream_asset_key": AssetKey(["customers"]), + "upstream_column_name": "number_of_orders", + } + ], + "customer_lifetime_value": [ + { + "upstream_asset_key": AssetKey(["customers"]), + "upstream_column_name": "customer_lifetime_value", + } + ], + } + ), + } + if asset_key_selection: + expected_lineage_metadata_by_asset_key = { + asset_key: lineage_metadata + for asset_key, lineage_metadata in lineage_metadata_by_asset_key.items() + if asset_key == asset_key_selection + } -def test_dbt_raw_cli_no_empty_jinja_log_info() -> None: - result = subprocess.check_output( - ["dbt", "--log-format", "json", "--no-partial-parse", "parse"], - text=True, - cwd=test_metadata_path, - ) - - assert not any( - json.loads(line)["info"]["name"] == "JinjaLogInfo" for line in result.splitlines() - ) + assert lineage_metadata_by_asset_key == expected_lineage_metadata_by_asset_key -def test_dbt_raw_cli_no_jinja_log_info() -> None: +@pytest.mark.parametrize( + "command", + ["parse", "build"], + ids=[ + "no empty jinja log info on parse", + "no jinja log info on execution", + ], +) +def test_dbt_raw_cli_no_jinja_log_info( + test_metadata_manifest: Dict[str, Any], command: str +) -> None: result = subprocess.check_output( - ["dbt", "--log-format", "json", "--no-partial-parse", "build"], + ["dbt", "--log-format", "json", "--no-partial-parse", command], text=True, cwd=test_metadata_path, ) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml index b6fe930b2d3cb..53288d76b1284 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml @@ -21,7 +21,7 @@ require-dbt-version: [">=1.0.0", "<2.0.0"] models: +post-hook: - - "{{ dagster.log_columns_in_relation() }}" + - "{{ dagster.log_column_level_metadata() }}" test_dagster_metadata: materialized: table staging: @@ -29,4 +29,4 @@ models: seeds: +post-hook: - - "{{ dagster.log_columns_in_relation() }}" + - "{{ dagster.log_column_level_metadata() }}" diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/select_star_customers.sql b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/select_star_customers.sql new file mode 100644 index 0000000000000..52c20f845ef0d --- /dev/null +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/select_star_customers.sql @@ -0,0 +1 @@ +select * from {{ ref('customers') }} diff --git a/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql b/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql new file mode 100644 index 0000000000000..5fcdf70e7b6cb --- /dev/null +++ b/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql @@ -0,0 +1,63 @@ +{% macro log_column_level_metadata() %} + -- This macro should only be run in the context of a `dagster-dbt` invocation. + {%- set is_dagster_dbt_cli = env_var('DAGSTER_DBT_CLI', '') == 'true' -%} + + {%- if execute and is_dagster_dbt_cli -%} + -- Retrieve the column metadata of the current node. + {%- set columns = adapter.get_columns_in_relation(this) -%} + {%- set column_schema = {} -%} + + {% for column in columns %} + {%- set serializable_column = {column.name: {'data_type': column.data_type}} -%} + {%- set _ = column_schema.update(serializable_column) -%} + {%- endfor -%} + + -- For column level lineage, retrieve the column metadata of the current node's parents. + -- The parents are defined by the current node's dbt refs and sources. + {%- set parent_relations = [] -%} + + {%- for ref_args in model.refs -%} + {%- set ref_relation = ref(ref_args['name'], package=ref_args.get('package'), version=ref_args.get('version'))-%} + {%- set _ = parent_relations.append(ref_relation) -%} + {%- endfor -%} + + {%- for source_args in model.sources -%} + {%- set source_relation = source(source_args[0], sources_args[1])-%} + {%- set _ = parent_relations.append(ref_relation) -%} + {%- endfor -%} + + -- Return a structured log of + -- { + -- "relation_name": str, + -- "columns": { + -- : { + -- "data_type": str + -- } + -- }, + -- "parents": { + -- : { + -- "columns": { + -- : { + -- "data_type": str + -- } + -- } + -- } + -- } + -- } + {%- set structured_log = {'relation_name': this.render(), 'columns': column_schema, 'parents': {}} -%} + + {%- for parent_relation in parent_relations -%} + {%- set parent_relation_columns = adapter.get_columns_in_relation(parent_relation) -%} + {%- set parent_relation_column_schema = {} -%} + {%- for column in parent_relation_columns -%} + {%- set serializable_column = {column.name: {'data_type': column.data_type}} -%} + {%- set _ = parent_relation_column_schema.update(serializable_column) -%} + {%- endfor -%} + + {%- set structured_parent_relation_metadata = {parent_relation.render(): {'columns': parent_relation_column_schema}} -%} + {%- set _ = structured_log['parents'].update(structured_parent_relation_metadata) -%} + {%- endfor -%} + + {%- do log(tojson(structured_log), info=true) -%} + {%- endif -%} +{% endmacro %} diff --git a/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_columns_in_relation.sql b/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_columns_in_relation.sql deleted file mode 100644 index 821a87a81bada..0000000000000 --- a/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_columns_in_relation.sql +++ /dev/null @@ -1,14 +0,0 @@ -{% macro log_columns_in_relation() %} - {%- set is_dagster_dbt_cli = env_var('DAGSTER_DBT_CLI', '') == 'true' -%} - {%- set columns = adapter.get_columns_in_relation(this) -%} - {%- set table_schema = {} -%} - - {% for column in columns %} - {%- set serializable_column = {column.name: {'data_type': column.data_type}} -%} - {%- set _ = table_schema.update(serializable_column) -%} - {% endfor %} - - {% if is_dagster_dbt_cli and table_schema %} - {% do log(tojson(table_schema), info=true) %} - {% endif %} -{% endmacro %} diff --git a/python_modules/libraries/dagster-dbt/setup.py b/python_modules/libraries/dagster-dbt/setup.py index 709124a517872..9f60431eb975a 100644 --- a/python_modules/libraries/dagster-dbt/setup.py +++ b/python_modules/libraries/dagster-dbt/setup.py @@ -43,6 +43,7 @@ def get_version() -> str: "orjson", "requests", "rich", + "sqlglot[rs]", "typer>=0.9.0", "packaging", ],