Skip to content

Commit

Permalink
Make existing conditions work with Asset Checks
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 11, 2024
1 parent 0790ae2 commit aa4cfb6
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.instance import DagsterInstance
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionResolvedStatus
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer


U_EntityKey = TypeVar("U_EntityKey", AssetKey, AssetCheckKey, EntityKey)


Expand Down Expand Up @@ -343,21 +345,38 @@ def compute_latest_time_window_subset(
else:
check.failed(f"Unsupported partitions_def: {partitions_def}")

def compute_missing_subset(
self, asset_key: "AssetKey", from_subset: EntitySubset[AssetKey]
) -> EntitySubset[AssetKey]:
def compute_subset_with_status(
self, key: AssetCheckKey, status: Optional["AssetCheckExecutionResolvedStatus"]
):
"""Returns the subset of an asset check that matches a given status."""
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord

latest_record = AssetCheckExecutionRecord.blocking_get(self, key)
resolved_status = (
latest_record.resolve_status(self)
if latest_record and latest_record.targets_latest_materialization(self)
else None
)
if resolved_status == status:
return self.get_full_subset(key=key)
else:
return self.get_empty_subset(key=key)

def compute_missing_subset(self, key: EntityKey, from_subset: EntitySubset) -> EntitySubset:
"""Returns a subset which is the subset of the input subset that has never been materialized
(if it is a materializable asset) or observered (if it is an observable asset).
"""
if isinstance(key, AssetCheckKey):
return self.compute_subset_with_status(key, status=None)
# TODO: this logic should be simplified once we have a unified way of detecting both
# materializations and observations through the parittion status cache. at that point, the
# definition will slightly change to search for materializations and observations regardless
# of the materializability of the asset
if self.asset_graph.get(asset_key).is_materializable:
elif self.asset_graph.get(key).is_materializable:
# cheap call which takes advantage of the partition status cache
materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=asset_key)
materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=key)
materialized_subset = EntitySubset(
self, key=asset_key, value=_ValidatedEntitySubsetValue(materialized_subset.value)
self, key=key, value=_ValidatedEntitySubsetValue(materialized_subset.value)
)
return from_subset.compute_difference(materialized_subset)
else:
Expand All @@ -368,19 +387,36 @@ def compute_missing_subset(
if not self._queryer.asset_partition_has_materialization_or_observation(ap)
}
return self.get_asset_subset_from_asset_partitions(
key=asset_key, asset_partitions=missing_asset_partitions
key=key, asset_partitions=missing_asset_partitions
)

@cached_method
def compute_in_progress_asset_subset(self, *, asset_key: AssetKey) -> EntitySubset[AssetKey]:
# part of in progress run
value = self._queryer.get_in_progress_asset_subset(asset_key=asset_key).value
return EntitySubset(self, key=asset_key, value=_ValidatedEntitySubsetValue(value))
def compute_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
from dagster._core.storage.asset_check_execution_record import (
AssetCheckExecutionResolvedStatus,
)

if isinstance(key, AssetCheckKey):
return self.compute_subset_with_status(
key, AssetCheckExecutionResolvedStatus.IN_PROGRESS
)
else:
value = self._queryer.get_in_progress_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

@cached_method
def compute_failed_asset_subset(self, *, asset_key: "AssetKey") -> EntitySubset[AssetKey]:
value = self._queryer.get_failed_asset_subset(asset_key=asset_key).value
return EntitySubset(self, key=asset_key, value=_ValidatedEntitySubsetValue(value))
def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset:
from dagster._core.storage.asset_check_execution_record import (
AssetCheckExecutionResolvedStatus,
)

if isinstance(key, AssetCheckKey):
return self.compute_subset_with_status(
key, AssetCheckExecutionResolvedStatus.EXECUTION_FAILED
)
else:
value = self._queryer.get_failed_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

@cached_method
def compute_updated_since_cursor_subset(
Expand All @@ -391,6 +427,25 @@ def compute_updated_since_cursor_subset(
).value
return EntitySubset(self, key=asset_key, value=_ValidatedEntitySubsetValue(value))

@cached_method
def compute_updated_since_time_subset(
self, *, key: AssetCheckKey, time: datetime
) -> EntitySubset[AssetCheckKey]:
from dagster._core.events import DagsterEventType
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord

# intentionally left unimplemented for AssetKey, as this is a less performant query
record = AssetCheckExecutionRecord.blocking_get(self, key)
if (
record is None
or record.event is None
or record.event.dagster_event_type != DagsterEventType.ASSET_CHECK_EVALUATION
or record.event.timestamp < time.timestamp()
):
return self.get_empty_subset(key=key)
else:
return self.get_full_subset(key=key)

class MultiDimInfo(NamedTuple):
tw_dim: PartitionDimensionDefinition
secondary_dim: PartitionDimensionDefinition
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dagster._core.definitions.declarative_automation.operands import (
CodeVersionChangedCondition as CodeVersionChangedCondition,
CronTickPassedCondition as CronTickPassedCondition,
FailedAutomationCondition as FailedAutomationCondition,
ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
InProgressAutomationCondition as InProgressAutomationCondition,
MissingAutomationCondition as MissingAutomationCondition,
Expand Down
Loading

0 comments on commit aa4cfb6

Please sign in to comment.