Skip to content

Commit

Permalink
amc
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Nov 25, 2024
1 parent 89391a6 commit 8634ced
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ Ultimately, we would like to kick off a run of `customer_metrics` whenever `load
```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_eager endbefore=end_eager
from dagster import AutomationCondition

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
automation_condition=AutomationCondition.eager(),
)
```
Expand Down Expand Up @@ -77,7 +76,6 @@ from dagster import (
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
Expand Down Expand Up @@ -114,16 +112,15 @@ load_customers_dag_asset = next(
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
# Add a dependency on the load_customers_dag_asset
),
)
# Add a dependency on the load_customers_dag_asset
).replace_attributes(
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,7 @@ defs = Definitions(
Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the `replace_attributes` function to add a dependency from the `load_customers` asset to the `customer_metrics` asset:

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_lineage endbefore=end_lineage
from dagster._core.definitions.asset_spec import replace_attributes

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
customer_metrics_dag_asset,
deps=[load_customers],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
Expand Down Expand Up @@ -42,16 +41,15 @@
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
# Add a dependency on the load_customers_dag_asset
),
)
# Add a dependency on the load_customers_dag_asset
).replace_attributes(
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dagster import Definitions
from dagster._core.definitions.asset_spec import replace_attributes
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
Expand Down Expand Up @@ -33,16 +32,15 @@
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
# Add a dependency on the load_customers_dag_asset
),
)
# Add a dependency on the load_customers_dag_asset
).replace_attributes(
deps=[load_customers_dag_asset],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
Expand Down Expand Up @@ -42,15 +41,14 @@
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
),
)
).replace_attributes(
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)
Expand Down Expand Up @@ -88,8 +86,7 @@ def run_customer_metrics() -> MaterializeResult:
# start_eager
from dagster import AutomationCondition

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
automation_condition=AutomationCondition.eager(),
)
# end_eager
Expand Down
4 changes: 1 addition & 3 deletions examples/airlift-federation-tutorial/snippets/observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@
)

# start_lineage
from dagster._core.definitions.asset_spec import replace_attributes

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
customer_metrics_dag_asset,
deps=[load_customers],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def replace_attributes(
skippable: bool = ...,
group_name: Optional[str] = ...,
code_version: Optional[str] = ...,
automation_condition: Optional[AutomationCondition] = ...,
owners: Optional[Sequence[str]] = ...,
tags: Optional[Mapping[str, str]] = ...,
kinds: Optional[Set[str]] = ...,
Expand All @@ -325,7 +326,9 @@ def replace_attributes(
group_name=group_name if group_name is not ... else self.group_name,
code_version=code_version if code_version is not ... else self.code_version,
freshness_policy=self.freshness_policy,
automation_condition=self.automation_condition,
automation_condition=automation_condition
if automation_condition is not ...
else self.automation_condition,
owners=owners if owners is not ... else self.owners,
tags=tags if tags is not ... else current_tags_without_kinds,
kinds=kinds if kinds is not ... else self.kinds,
Expand Down

0 comments on commit 8634ced

Please sign in to comment.