Skip to content

Commit

Permalink
Ignore legacy AutoMaterializePolicies in AnyDownstreamConditions (#24411
Browse files Browse the repository at this point in the history
)

## Summary & Motivation

Due to how we gate access to the legacy context object, and the fact
that the "fully resolved" automation condition is not available until we
execute the AnyDownstreamConditions condition (because we need access to
the full asset graph), this would previously cause errors.

These errors are weird, as the method says "any downstream
*conditions*", whereas the downstream causing that failure has an
AutoMaterializePolicy, not a condition.

This change just ignores AMPs that are found downstream, avoiding the
error

## How I Tested These Changes

## Changelog

Fixed issue which could cause errors when using
`AutomationCondition.any_downstream_condition()` with downstream
`AutoMaterializePolicy` objects.

- [ ] `NEW` _(added new feature or capability)_
- [x] `BUGFIX` _(fixed a bug)_
- [ ] `DOCS` _(added or updated documentation)_
  • Loading branch information
OwenKephart authored Sep 13, 2024
1 parent bf14de6 commit e7c4c96
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import AbstractSet, Optional, Sequence
from typing import AbstractSet, Mapping, Optional, Sequence

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.declarative_automation.automation_condition import (
Expand Down Expand Up @@ -73,10 +73,19 @@ def _get_ignored_conditions(
ignored_conditions.add(context.condition)
return ignored_conditions

def _get_validated_downstream_conditions(
self, downstream_conditions: Mapping[AutomationCondition, AbstractSet[AssetKey]]
) -> Mapping[AutomationCondition, AbstractSet[AssetKey]]:
return {
condition: keys
for condition, keys in downstream_conditions.items()
if not condition.has_rule_condition
}

def evaluate(self, context: AutomationContext) -> AutomationResult:
ignored_conditions = self._get_ignored_conditions(context)
downstream_conditions = context.asset_graph.get_downstream_automation_conditions(
asset_key=context.asset_key
downstream_conditions = self._get_validated_downstream_conditions(
context.asset_graph.get_downstream_automation_conditions(asset_key=context.asset_key)
)

true_slice = context.get_empty_slice()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
evaluate_automation_conditions,
)
from dagster._core.definitions.asset_key import CoercibleToAssetKey
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult
from dagster._core.definitions.declarative_automation.operators.boolean_operators import (
AndAutomationCondition,
Expand Down Expand Up @@ -39,6 +40,23 @@ def b(): ...
assert a_result.child_results[0].child_results[0].condition == cond2


def test_basic_with_amp() -> None:
cond1 = AutomationCondition.any_downstream_conditions()
cond2 = AutoMaterializePolicy.eager()

@asset(automation_condition=cond1)
def a(): ...

@asset(auto_materialize_policy=cond2, deps=[a])
def b(): ...

result = evaluate_automation_conditions([a, b], instance=DagsterInstance.ephemeral())

a_result = _get_result(a.key, result.results)
# do not pick up child result
assert len(a_result.child_results) == 0


def test_multiple_downstreams() -> None:
cond1 = AutomationCondition.any_downstream_conditions()
cond2 = AutomationCondition.in_progress()
Expand Down

0 comments on commit e7c4c96

Please sign in to comment.