Skip to content

Commit

Permalink
[external-assets] refactor AssetGraph
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 20, 2024
1 parent b5da3fc commit 0f657ae
Show file tree
Hide file tree
Showing 21 changed files with 497 additions and 387 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def spec(self) -> AssetCheckSpec:
def specs(self) -> Iterable[AssetCheckSpec]:
return self._specs_by_output_name.values()

@property
def keys(self) -> Iterable[AssetCheckKey]:
return self._specs_by_handle.keys()

@property
def specs_by_output_name(self) -> Mapping[str, AssetCheckSpec]:
return self._specs_by_output_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def evaluate_asset(
"""
# convert the legacy AutoMaterializePolicy to an Evaluator
asset_condition = check.not_none(
self.asset_graph.auto_materialize_policies_by_key.get(asset_key)
self.asset_graph.get_auto_materialize_policy(asset_key)
).to_asset_condition()

asset_cursor = self.cursor.get_previous_evaluation_state(asset_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def backcompat_deserialize_asset_daemon_cursor_str(

previous_evaluation_state = []
cursor_keys = (
asset_graph.auto_materialize_policies_by_key.keys()
asset_graph.materializable_asset_keys
if asset_graph
else latest_evaluation_by_asset_key.keys()
)
Expand Down
345 changes: 103 additions & 242 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
from dagster._serdes.serdes import whitelist_for_serdes

from .asset_check_spec import AssetCheckKey
from .asset_graph import AssetGraph, InternalAssetGraph
from .asset_graph import AssetGraph
from .assets import AssetsDefinition
from .events import (
AssetKey,
CoercibleToAssetKey,
CoercibleToAssetKeyPrefix,
key_prefix_from_coercible,
)
from .internal_asset_graph import InternalAssetGraph
from .source_asset import SourceAsset

CoercibleToAssetSelection: TypeAlias = Union[
Expand Down Expand Up @@ -685,9 +686,10 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]:
else asset_graph.materializable_asset_keys
)
return {
asset_key
for asset_key, group in asset_graph.group_names_by_key.items()
if group in self.selected_groups and asset_key in base_set
key
for group in self.selected_groups
for key in asset_graph.asset_keys_for_group(group)
if key in base_set
}

def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def get_minutes_overdue(
asset_key: AssetKey,
evaluation_time: datetime.datetime,
) -> Optional[FreshnessMinutes]:
freshness_policy = self.asset_graph.freshness_policies_by_key.get(asset_key)
freshness_policy = self.asset_graph.get_freshness_policy(asset_key)
if freshness_policy is None:
raise DagsterInvariantViolationError(
"Cannot calculate minutes late for asset without a FreshnessPolicy"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
attach_resource_id_to_key_mapping,
)
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_graph import InternalAssetGraph
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.internal_asset_graph import InternalAssetGraph
from dagster._core.definitions.logger_definition import LoggerDefinition
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.build_resources import wrap_resources_for_execution
Expand Down
Loading

0 comments on commit 0f657ae

Please sign in to comment.