Skip to content

Commit

Permalink
update dbt examples and internal context to align with AssetExecution…
Browse files Browse the repository at this point in the history
…Context
  • Loading branch information
jamiedemaria committed Sep 14, 2023
1 parent e8fa9da commit 9d0a950
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 31 deletions.
40 changes: 20 additions & 20 deletions examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ def my_dbt_assets():


def scope_downstream_asset():
from dagster import OpExecutionContext, DbtCliResource
from dagster import AssetExecutionContext, DbtCliResource
from dagster_dbt import dbt_assets

@dbt_assets(manifest=MANIFEST_PATH)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
...

# start_downstream_asset
Expand All @@ -82,11 +82,11 @@ def my_downstream_asset():


def scope_downstream_asset_pandas_df_manager():
from dagster import OpExecutionContext, DbtCliResource
from dagster import AssetExecutionContext, DbtCliResource
from dagster_dbt import dbt_assets

@dbt_assets(manifest=MANIFEST_PATH)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
...

# start_downstream_asset_pandas_df_manager
Expand All @@ -110,11 +110,11 @@ def my_downstream_asset(my_dbt_model):

def scope_upstream_asset():
# start_upstream_asset
from dagster import asset, OpExecutionContext
from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource, get_asset_key_for_source, dbt_assets

@dbt_assets(manifest=MANIFEST_PATH)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
...

@asset(key=get_asset_key_for_source([my_dbt_assets], "jaffle_shop"))
Expand All @@ -125,11 +125,11 @@ def orders():


def scope_upstream_multi_asset():
from dagster import OpExecutionContext
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets

@dbt_assets(manifest=MANIFEST_PATH)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
...

# start_upstream_multi_asset
Expand Down Expand Up @@ -166,7 +166,7 @@ def upstream():
def scope_custom_asset_key_dagster_dbt_translator():
# start_custom_asset_key_dagster_dbt_translator
from pathlib import Path
from dagster import AssetKey, OpExecutionContext
from dagster import AssetKey, AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping

Expand All @@ -180,7 +180,7 @@ def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

# end_custom_asset_key_dagster_dbt_translator
Expand All @@ -189,7 +189,7 @@ def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def scope_custom_group_name_dagster_dbt_translator():
# start_custom_group_name_dagster_dbt_translator
from pathlib import Path
from dagster import OpExecutionContext
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping, Optional

Expand All @@ -205,7 +205,7 @@ def get_group_name(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

# end_custom_group_name_dagster_dbt_translator
Expand All @@ -215,7 +215,7 @@ def scope_custom_description_dagster_dbt_translator():
# start_custom_description_dagster_dbt_translator
import textwrap
from pathlib import Path
from dagster import OpExecutionContext
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping

Expand All @@ -229,7 +229,7 @@ def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

# end_custom_description_dagster_dbt_translator
Expand All @@ -238,7 +238,7 @@ def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def scope_custom_metadata_dagster_dbt_translator():
# start_custom_metadata_dagster_dbt_translator
from pathlib import Path
from dagster import MetadataValue, OpExecutionContext
from dagster import MetadataValue, AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping

Expand All @@ -256,7 +256,7 @@ def get_metadata(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

# end_custom_metadata_dagster_dbt_translator
Expand All @@ -265,7 +265,7 @@ def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def scope_custom_auto_materialize_policy_dagster_dbt_translator():
# start_custom_auto_materialize_policy_dagster_dbt_translator
from pathlib import Path
from dagster import OpExecutionContext, AutoMaterializePolicy
from dagster import AssetExecutionContext, AutoMaterializePolicy
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping, Optional

Expand All @@ -281,7 +281,7 @@ def get_auto_materialize_policy(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

# end_custom_auto_materialize_policy_dagster_dbt_translator
Expand All @@ -290,7 +290,7 @@ def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def scope_custom_freshness_policy_dagster_dbt_translator():
# start_custom_freshness_policy_dagster_dbt_translator
from pathlib import Path
from dagster import OpExecutionContext, FreshnessPolicy
from dagster import AssetExecutionContext, FreshnessPolicy
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping, Optional

Expand All @@ -306,7 +306,7 @@ def get_freshness_policy(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

# end_custom_freshness_policy_dagster_dbt_translator
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import duckdb
import pandas as pd
import plotly.express as px
from dagster import MetadataValue, OpExecutionContext, asset
from dagster import MetadataValue, AssetExecutionContext, asset
from dagster_dbt import DbtCliResource, dbt_assets, get_asset_key_for_model

from .constants import dbt_manifest_path, dbt_project_dir
Expand All @@ -29,7 +29,7 @@ def raw_customers(context) -> None:


@dbt_assets(manifest=dbt_manifest_path)
def jaffle_shop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# ruff: noqa: I001
# start_dbt_assets
from dagster import OpExecutionContext
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets

from .constants import dbt_manifest_path


@dbt_assets(manifest=dbt_manifest_path)
def jaffle_shop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import duckdb
import pandas as pd
from dagster import OpExecutionContext, asset
from dagster import AssetExecutionContext, asset
from dagster_dbt import DbtCliResource, dbt_assets

from .constants import dbt_manifest_path, dbt_project_dir
Expand All @@ -26,7 +26,7 @@ def raw_customers(context) -> None:


@dbt_assets(manifest=dbt_manifest_path)
def jaffle_shop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def _get_unique_target_path(self, *, context: Optional[AssetExecutionContext]) -
unique_id = str(uuid.uuid4())[:7]
path = unique_id
if context:
path = f"{context.op.name}-{context.run_id[:7]}-{unique_id}"
path = f"{context.op_execution_context.op.name}-{context.run_id[:7]}-{unique_id}"

return f"target/{path}"

Expand Down Expand Up @@ -680,8 +680,8 @@ def my_dbt_op(dbt: DbtCliResource):
selection_args = get_subset_selection_for_context(
context=context,
manifest=manifest,
select=context.op.tags.get("dagster-dbt/select"),
exclude=context.op.tags.get("dagster-dbt/exclude"),
select=context.op_execution_context.op.tags.get("dagster-dbt/select"),
exclude=context.op_execution_context.op.tags.get("dagster-dbt/exclude"),
)
else:
manifest = validate_manifest(manifest) if manifest else {}
Expand Down Expand Up @@ -741,7 +741,7 @@ def get_subset_selection_for_context(

# TODO: this should be a property on the context if this is a permanent indicator for
# determining whether the current execution context is performing a subsetted execution.
is_subsetted_execution = len(context.selected_output_names) != len(
is_subsetted_execution = len(context.op_execution_context.selected_output_names) != len(
context.assets_def.node_keys_by_output_name
)
if not is_subsetted_execution:
Expand All @@ -752,7 +752,7 @@ def get_subset_selection_for_context(
return default_dbt_selection

selected_dbt_resources = []
for output_name in context.selected_output_names:
for output_name in context.op_execution_context.selected_output_names:
dbt_resource_props = dbt_resource_props_by_output_name[output_name]

# Explicitly select a dbt resource by its fully qualified name (FQN).
Expand Down

0 comments on commit 9d0a950

Please sign in to comment.