From 9d0a95065ad9e695fcb6ec8a4c33ba2cf5367732 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 14 Sep 2023 12:12:09 -0400 Subject: [PATCH] update dbt examples and internal context to align with AssetExecutionContext --- .../docs_snippets/integrations/dbt/dbt.py | 40 +++++++++---------- .../dbt/tutorial/downstream_assets/assets.py | 4 +- .../dbt/tutorial/load_dbt_models/assets.py | 4 +- .../dbt/tutorial/upstream_assets/assets.py | 4 +- .../dagster_dbt/core/resources_v2.py | 10 ++--- 5 files changed, 31 insertions(+), 31 deletions(-) diff --git a/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py b/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py index c99c0a86b3f68..fe90f900be6f7 100644 --- a/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py +++ b/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py @@ -63,11 +63,11 @@ def my_dbt_assets(): def scope_downstream_asset(): - from dagster import OpExecutionContext, DbtCliResource + from dagster import AssetExecutionContext, DbtCliResource from dagster_dbt import dbt_assets @dbt_assets(manifest=MANIFEST_PATH) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): ... # start_downstream_asset @@ -82,11 +82,11 @@ def my_downstream_asset(): def scope_downstream_asset_pandas_df_manager(): - from dagster import OpExecutionContext, DbtCliResource + from dagster import AssetExecutionContext, DbtCliResource from dagster_dbt import dbt_assets @dbt_assets(manifest=MANIFEST_PATH) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): ... # start_downstream_asset_pandas_df_manager @@ -110,11 +110,11 @@ def my_downstream_asset(my_dbt_model): def scope_upstream_asset(): # start_upstream_asset - from dagster import asset, OpExecutionContext + from dagster import asset, AssetExecutionContext from dagster_dbt import DbtCliResource, get_asset_key_for_source, dbt_assets @dbt_assets(manifest=MANIFEST_PATH) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): ... @asset(key=get_asset_key_for_source([my_dbt_assets], "jaffle_shop")) @@ -125,11 +125,11 @@ def orders(): def scope_upstream_multi_asset(): - from dagster import OpExecutionContext + from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=MANIFEST_PATH) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): ... # start_upstream_multi_asset @@ -166,7 +166,7 @@ def upstream(): def scope_custom_asset_key_dagster_dbt_translator(): # start_custom_asset_key_dagster_dbt_translator from pathlib import Path - from dagster import AssetKey, OpExecutionContext + from dagster import AssetKey, AssetExecutionContext from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets from typing import Any, Mapping @@ -180,7 +180,7 @@ def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: manifest=manifest_path, dagster_dbt_translator=CustomDagsterDbtTranslator(), ) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() # end_custom_asset_key_dagster_dbt_translator @@ -189,7 +189,7 @@ def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): def scope_custom_group_name_dagster_dbt_translator(): # start_custom_group_name_dagster_dbt_translator from pathlib import Path - from dagster import OpExecutionContext + from dagster import AssetExecutionContext from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets from typing import Any, Mapping, Optional @@ -205,7 +205,7 @@ def get_group_name( manifest=manifest_path, dagster_dbt_translator=CustomDagsterDbtTranslator(), ) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() # end_custom_group_name_dagster_dbt_translator @@ -215,7 +215,7 @@ def scope_custom_description_dagster_dbt_translator(): # start_custom_description_dagster_dbt_translator import textwrap from pathlib import Path - from dagster import OpExecutionContext + from dagster import AssetExecutionContext from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets from typing import Any, Mapping @@ -229,7 +229,7 @@ def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str: manifest=manifest_path, dagster_dbt_translator=CustomDagsterDbtTranslator(), ) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() # end_custom_description_dagster_dbt_translator @@ -238,7 +238,7 @@ def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): def scope_custom_metadata_dagster_dbt_translator(): # start_custom_metadata_dagster_dbt_translator from pathlib import Path - from dagster import MetadataValue, OpExecutionContext + from dagster import MetadataValue, AssetExecutionContext from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets from typing import Any, Mapping @@ -256,7 +256,7 @@ def get_metadata( manifest=manifest_path, dagster_dbt_translator=CustomDagsterDbtTranslator(), ) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() # end_custom_metadata_dagster_dbt_translator @@ -265,7 +265,7 @@ def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): def scope_custom_auto_materialize_policy_dagster_dbt_translator(): # start_custom_auto_materialize_policy_dagster_dbt_translator from pathlib import Path - from dagster import OpExecutionContext, AutoMaterializePolicy + from dagster import AssetExecutionContext, AutoMaterializePolicy from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets from typing import Any, Mapping, Optional @@ -281,7 +281,7 @@ def get_auto_materialize_policy( manifest=manifest_path, dagster_dbt_translator=CustomDagsterDbtTranslator(), ) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() # end_custom_auto_materialize_policy_dagster_dbt_translator @@ -290,7 +290,7 @@ def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): def scope_custom_freshness_policy_dagster_dbt_translator(): # start_custom_freshness_policy_dagster_dbt_translator from pathlib import Path - from dagster import OpExecutionContext, FreshnessPolicy + from dagster import AssetExecutionContext, FreshnessPolicy from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets from typing import Any, Mapping, Optional @@ -306,7 +306,7 @@ def get_freshness_policy( manifest=manifest_path, dagster_dbt_translator=CustomDagsterDbtTranslator(), ) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() # end_custom_freshness_policy_dagster_dbt_translator diff --git a/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/downstream_assets/assets.py b/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/downstream_assets/assets.py index 5c329952019fa..ba88530bbab23 100644 --- a/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/downstream_assets/assets.py +++ b/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/downstream_assets/assets.py @@ -5,7 +5,7 @@ import duckdb import pandas as pd import plotly.express as px -from dagster import MetadataValue, OpExecutionContext, asset +from dagster import MetadataValue, AssetExecutionContext, asset from dagster_dbt import DbtCliResource, dbt_assets, get_asset_key_for_model from .constants import dbt_manifest_path, dbt_project_dir @@ -29,7 +29,7 @@ def raw_customers(context) -> None: @dbt_assets(manifest=dbt_manifest_path) -def jaffle_shop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): +def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() diff --git a/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/load_dbt_models/assets.py b/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/load_dbt_models/assets.py index c8453de1a60ea..47e51faf7d251 100644 --- a/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/load_dbt_models/assets.py +++ b/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/load_dbt_models/assets.py @@ -1,13 +1,13 @@ # ruff: noqa: I001 # start_dbt_assets -from dagster import OpExecutionContext +from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets from .constants import dbt_manifest_path @dbt_assets(manifest=dbt_manifest_path) -def jaffle_shop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): +def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() diff --git a/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/upstream_assets/assets.py b/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/upstream_assets/assets.py index 04995e20ca5e5..432bb8d486e15 100644 --- a/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/upstream_assets/assets.py +++ b/examples/docs_snippets/docs_snippets/integrations/dbt/tutorial/upstream_assets/assets.py @@ -4,7 +4,7 @@ import duckdb import pandas as pd -from dagster import OpExecutionContext, asset +from dagster import AssetExecutionContext, asset from dagster_dbt import DbtCliResource, dbt_assets from .constants import dbt_manifest_path, dbt_project_dir @@ -26,7 +26,7 @@ def raw_customers(context) -> None: @dbt_assets(manifest=dbt_manifest_path) -def jaffle_shop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): +def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py index cdf2896167b8f..5327dd08eec73 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py @@ -507,7 +507,7 @@ def _get_unique_target_path(self, *, context: Optional[AssetExecutionContext]) - unique_id = str(uuid.uuid4())[:7] path = unique_id if context: - path = f"{context.op.name}-{context.run_id[:7]}-{unique_id}" + path = f"{context.op_execution_context.op.name}-{context.run_id[:7]}-{unique_id}" return f"target/{path}" @@ -680,8 +680,8 @@ def my_dbt_op(dbt: DbtCliResource): selection_args = get_subset_selection_for_context( context=context, manifest=manifest, - select=context.op.tags.get("dagster-dbt/select"), - exclude=context.op.tags.get("dagster-dbt/exclude"), + select=context.op_execution_context.op.tags.get("dagster-dbt/select"), + exclude=context.op_execution_context.op.tags.get("dagster-dbt/exclude"), ) else: manifest = validate_manifest(manifest) if manifest else {} @@ -741,7 +741,7 @@ def get_subset_selection_for_context( # TODO: this should be a property on the context if this is a permanent indicator for # determining whether the current execution context is performing a subsetted execution. - is_subsetted_execution = len(context.selected_output_names) != len( + is_subsetted_execution = len(context.op_execution_context.selected_output_names) != len( context.assets_def.node_keys_by_output_name ) if not is_subsetted_execution: @@ -752,7 +752,7 @@ def get_subset_selection_for_context( return default_dbt_selection selected_dbt_resources = [] - for output_name in context.selected_output_names: + for output_name in context.op_execution_context.selected_output_names: dbt_resource_props = dbt_resource_props_by_output_name[output_name] # Explicitly select a dbt resource by its fully qualified name (FQN).