diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 9481ac85ab547..00209682f1bad 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -64,7 +64,7 @@ def get_asset_backfill_preview( asset_partitions = [] for asset_key in asset_backfill_data.get_targeted_asset_keys_topological_order(asset_graph): - if asset_graph.get_partitions_def(asset_key): + if asset_graph.get(asset_key).partitions_def: partitions_subset = asset_backfill_data.target_subset.partitions_subsets_by_asset_key[ asset_key ] diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index 79194e39ccdec..7cfe4a8515c3f 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -603,12 +603,12 @@ def test_launch_asset_backfill_with_upstream_anchor_asset(): asset_graph = repo.asset_graph assert target_subset == AssetGraphSubset( partitions_subsets_by_asset_key={ - AssetKey("hourly"): asset_graph.get_partitions_def( + AssetKey("hourly"): asset_graph.get( AssetKey("hourly") - ).subset_with_partition_keys(hourly_partitions), - AssetKey("daily"): asset_graph.get_partitions_def( + ).partitions_def.subset_with_partition_keys(hourly_partitions), + AssetKey("daily"): asset_graph.get( AssetKey("daily") - ).subset_with_partition_keys(["2020-01-02", "2020-01-03"]), + ).partitions_def.subset_with_partition_keys(["2020-01-02", "2020-01-03"]), }, ) @@ -668,15 +668,15 @@ def test_launch_asset_backfill_with_two_anchor_assets(): asset_graph = repo.asset_graph assert target_subset == AssetGraphSubset( partitions_subsets_by_asset_key={ - AssetKey("hourly1"): asset_graph.get_partitions_def( + AssetKey("hourly1"): asset_graph.get( AssetKey("hourly1") - ).subset_with_partition_keys(hourly_partitions), - AssetKey("hourly2"): asset_graph.get_partitions_def( + ).partitions_def.subset_with_partition_keys(hourly_partitions), + AssetKey("hourly2"): asset_graph.get( AssetKey("hourly2") - ).subset_with_partition_keys(hourly_partitions), - AssetKey("daily"): asset_graph.get_partitions_def( + ).partitions_def.subset_with_partition_keys(hourly_partitions), + AssetKey("daily"): asset_graph.get( AssetKey("daily") - ).subset_with_partition_keys(["2020-01-02", "2020-01-03"]), + ).partitions_def.subset_with_partition_keys(["2020-01-02", "2020-01-03"]), }, ) @@ -724,13 +724,13 @@ def test_launch_asset_backfill_with_upstream_anchor_asset_and_non_partitioned_as non_partitioned_asset_keys={AssetKey("non_partitioned")}, partitions_subsets_by_asset_key={ AssetKey("hourly"): ( - asset_graph.get_partitions_def(AssetKey("hourly")) - .empty_subset() + asset_graph.get(AssetKey("hourly")) + .partitions_def.empty_subset() .with_partition_keys(hourly_partitions) ), AssetKey("daily"): ( - asset_graph.get_partitions_def(AssetKey("daily")) - .empty_subset() + asset_graph.get(AssetKey("daily")) + .partitions_def.empty_subset() .with_partition_keys(["2020-01-02", "2020-01-03"]) ), }, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 58ed133389699..8fc251c330397 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -256,7 +256,7 @@ def _mock_asset_backfill_runs( status: DagsterRunStatus, partition_key: Optional[str], ): - partitions_def = asset_graph.get_partitions_def(asset_key) + partitions_def = asset_graph.get(asset_key).partitions_def @asset( partitions_def=partitions_def, diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition_evaluation_context.py index 4011c62781103..97942144deebf 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition_evaluation_context.py @@ -88,7 +88,7 @@ def create( evaluation_state_by_key: Mapping[AssetKey, "AssetConditionEvaluationState"], expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]], ) -> "AssetConditionEvaluationContext": - partitions_def = instance_queryer.asset_graph.get_partitions_def(asset_key) + partitions_def = instance_queryer.asset_graph.get(asset_key).partitions_def return AssetConditionEvaluationContext( asset_key=asset_key, @@ -136,7 +136,7 @@ def asset_graph(self) -> BaseAssetGraph: @property def partitions_def(self) -> Optional[PartitionsDefinition]: - return self.asset_graph.get_partitions_def(self.asset_key) + return self.asset_graph.get(self.asset_key).partitions_def @property def evaluation_time(self) -> datetime.datetime: @@ -190,7 +190,7 @@ def parent_will_update_subset(self) -> ValidAssetSubset: can be materialized in the same run as this asset. """ subset = self.empty_subset() - for parent_key in self.asset_graph.get_parents(self.asset_key): + for parent_key in self.asset_graph.get(self.asset_key).parent_keys: if not self.materializable_in_same_run(self.asset_key, parent_key): continue parent_info = self.evaluation_state_by_key.get(parent_key) @@ -302,17 +302,19 @@ def materializable_in_same_run(self, child_key: AssetKey, parent_key: AssetKey) """Returns whether a child asset can be materialized in the same run as a parent asset.""" from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + child_node = self.asset_graph.get(child_key) + parent_node = self.asset_graph.get(parent_key) return ( # both assets must be materializable - child_key in self.asset_graph.materializable_asset_keys - and parent_key in self.asset_graph.materializable_asset_keys + child_node.is_materializable + and parent_node.is_materializable # the parent must have the same partitioning - and self.asset_graph.have_same_partitioning(child_key, parent_key) + and child_node.partitions_def == parent_node.partitions_def # the parent must have a simple partition mapping to the child and ( - not self.asset_graph.is_partitioned(parent_key) + not parent_node.is_partitioned or isinstance( - self.asset_graph.get_partition_mapping(child_key, parent_key), + self.asset_graph.get_partition_mapping(child_node.key, parent_node.key), (TimeWindowPartitionMapping, IdentityPartitionMapping), ) ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 961a84e51ceca..79063c9a41d73 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -55,7 +55,7 @@ def get_implicit_auto_materialize_policy( """For backcompat with pre-auto materialize policy graphs, assume a default scope of 1 day.""" auto_materialize_policy = asset_graph.get(asset_key).auto_materialize_policy if auto_materialize_policy is None: - time_partitions_def = get_time_partitions_def(asset_graph.get_partitions_def(asset_key)) + time_partitions_def = get_time_partitions_def(asset_graph.get(asset_key).partitions_def) if time_partitions_def is None: max_materializations_per_minute = None elif time_partitions_def.schedule_type == ScheduleType.HOURLY: @@ -141,7 +141,7 @@ def auto_materialize_asset_keys_and_parents(self) -> AbstractSet[AssetKey]: return { parent for asset_key in self.auto_materialize_asset_keys - for parent in self.asset_graph.get_parents(asset_key) + for parent in self.asset_graph.get(asset_key).parent_keys } | self.auto_materialize_asset_keys @property @@ -267,9 +267,9 @@ def get_asset_condition_evaluations( # if we need to materialize any partitions of a non-subsettable multi-asset, we need to # materialize all of them - execution_unit_keys = self.asset_graph.get_execution_set_asset_keys(asset_key) - if len(execution_unit_keys) > 1 and num_requested > 0: - for neighbor_key in execution_unit_keys: + execution_set_keys = self.asset_graph.get(asset_key).execution_set_asset_keys + if len(execution_set_keys) > 1 and num_requested > 0: + for neighbor_key in execution_set_keys: expected_data_time_mapping[neighbor_key] = expected_data_time # make sure that the true_subset of the neighbor is accurate -- when it was @@ -353,7 +353,7 @@ def build_run_requests( for asset_partition in asset_partitions: assets_to_reconcile_by_partitions_def_partition_key[ - asset_graph.get_partitions_def(asset_partition.asset_key), asset_partition.partition_key + asset_graph.get(asset_partition.asset_key).partitions_def, asset_partition.partition_key ].add(asset_partition.asset_key) run_requests = [] @@ -414,7 +414,7 @@ def build_run_requests_with_backfill_policies( # here we are grouping assets by their partitions def and partition keys selected. for asset_key, partition_keys in asset_partition_keys.items(): assets_to_reconcile_by_partitions_def_partition_keys[ - asset_graph.get_partitions_def(asset_key), + asset_graph.get(asset_key).partitions_def, frozenset(partition_keys) if partition_keys else None, ].add(asset_key) @@ -581,7 +581,7 @@ def get_auto_observe_run_requests( for repository_asset_keys in asset_graph.split_asset_keys_by_repository(assets_to_auto_observe): asset_keys_by_partitions_def = defaultdict(list) for asset_key in repository_asset_keys: - partitions_def = asset_graph.get_partitions_def(asset_key) + partitions_def = asset_graph.get(asset_key).partitions_def asset_keys_by_partitions_def[partitions_def].append(asset_key) partitions_def_and_asset_key_groups.extend(asset_keys_by_partitions_def.values()) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 9ada07ace4d76..b8316ec6a64d5 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -199,7 +199,7 @@ def backcompat_deserialize_asset_daemon_cursor_str( partition_subsets_by_asset_key = {} for key_str, serialized_str in data.get("handled_root_partitions_by_asset_key", {}).items(): asset_key = AssetKey.from_user_string(key_str) - partitions_def = asset_graph.get_partitions_def(asset_key) if asset_graph else None + partitions_def = asset_graph.get(asset_key).partitions_def if asset_graph else None if not partitions_def: continue try: @@ -221,7 +221,7 @@ def backcompat_deserialize_asset_daemon_cursor_str( latest_evaluation_by_asset_key = {} for key_str, serialized_evaluation in serialized_latest_evaluation_by_asset_key.items(): key = AssetKey.from_user_string(key_str) - partitions_def = asset_graph.get_partitions_def(key) if asset_graph else None + partitions_def = asset_graph.get(key).partitions_def if asset_graph else None evaluation = deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids( serialized_evaluation, partitions_def @@ -239,7 +239,7 @@ def backcompat_deserialize_asset_daemon_cursor_str( latest_evaluation_result = latest_evaluation_by_asset_key.get(asset_key) # create a placeholder evaluation result if we don't have one if not latest_evaluation_result: - partitions_def = asset_graph.get_partitions_def(asset_key) if asset_graph else None + partitions_def = asset_graph.get(asset_key).partitions_def if asset_graph else None latest_evaluation_result = AssetConditionEvaluation( condition_snapshot=AssetConditionSnapshot("", "", ""), true_subset=AssetSubset.empty(asset_key, partitions_def), diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py index 72eaf0838e998..6c8f7bf89612d 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py @@ -128,23 +128,25 @@ def _compare_base_and_branch_assets(self, asset_key: "AssetKey") -> Sequence[Cha ): changes.append(ChangeReason.CODE_VERSION) - if self.branch_asset_graph.get_parents(asset_key) != self.base_asset_graph.get_parents( - asset_key + if ( + self.branch_asset_graph.get(asset_key).parent_keys + != self.base_asset_graph.get(asset_key).parent_keys ): changes.append(ChangeReason.INPUTS) else: # if the set of inputs is different, then we don't need to check if the partition mappings # for inputs have changed since ChangeReason.INPUTS is already in the list of changes - for upstream_asset in self.branch_asset_graph.get_parents(asset_key): + for upstream_asset in self.branch_asset_graph.get(asset_key).parent_keys: if self.branch_asset_graph.get_partition_mapping( asset_key, upstream_asset ) != self.base_asset_graph.get_partition_mapping(asset_key, upstream_asset): changes.append(ChangeReason.INPUTS) break - if self.branch_asset_graph.get_partitions_def( - asset_key - ) != self.base_asset_graph.get_partitions_def(asset_key): + if ( + self.branch_asset_graph.get(asset_key).partitions_def + != self.base_asset_graph.get(asset_key).partitions_def + ): changes.append(ChangeReason.PARTITIONS_DEFINITION) return changes diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index 40199982a4142..d2eaaf41fbdd4 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -83,7 +83,7 @@ def get_asset_subset(self, asset_key: AssetKey, asset_graph: BaseAssetGraph) -> """Returns an AssetSubset representing the subset of a specific asset that this AssetGraphSubset contains. """ - partitions_def = asset_graph.get_partitions_def(asset_key) + partitions_def = asset_graph.get(asset_key).partitions_def if partitions_def is None: return AssetSubset( asset_key=asset_key, value=asset_key in self.non_partitioned_asset_keys @@ -100,7 +100,7 @@ def get_partitions_subset( self, asset_key: AssetKey, asset_graph: Optional[BaseAssetGraph] = None ) -> PartitionsSubset: if asset_graph: - partitions_def = asset_graph.get_partitions_def(asset_key) + partitions_def = asset_graph.get(asset_key).partitions_def if partitions_def is None: check.failed("Can only call get_partitions_subset on a partitioned asset") @@ -148,7 +148,7 @@ def to_storage_dict( }, "serializable_partitions_def_ids_by_asset_key": { key.to_user_string(): check.not_none( - asset_graph.get_partitions_def(key) + asset_graph.get(key).partitions_def ).get_serializable_unique_identifier( dynamic_partitions_store=dynamic_partitions_store ) @@ -156,7 +156,7 @@ def to_storage_dict( }, "partitions_def_class_names_by_asset_key": { key.to_user_string(): check.not_none( - asset_graph.get_partitions_def(key) + asset_graph.get(key).partitions_def ).__class__.__name__ for key, _ in self.partitions_subsets_by_asset_key.items() }, @@ -255,7 +255,7 @@ def from_asset_partition_set( return AssetGraphSubset( partitions_subsets_by_asset_key={ asset_key: ( - cast(PartitionsDefinition, asset_graph.get_partitions_def(asset_key)) + cast(PartitionsDefinition, asset_graph.get(asset_key).partitions_def) .empty_subset() .with_partition_keys(partition_keys) ) @@ -278,7 +278,7 @@ def can_deserialize( for key, value in serialized_dict["partitions_subsets_by_asset_key"].items(): asset_key = AssetKey.from_user_string(key) - partitions_def = asset_graph.get_partitions_def(asset_key) + partitions_def = asset_graph.get(asset_key).partitions_def if partitions_def is None: # Asset had a partitions definition at storage time, but no longer does @@ -320,7 +320,7 @@ def from_storage_dict( ) continue - partitions_def = asset_graph.get_partitions_def(asset_key) + partitions_def = asset_graph.get(asset_key).partitions_def if partitions_def is None: if not allow_partial: @@ -382,7 +382,7 @@ def from_asset_keys( non_partitioned_asset_keys: Set[AssetKey] = set() for asset_key in asset_keys: - partitions_def = asset_graph.get_partitions_def(asset_key) + partitions_def = asset_graph.get(asset_key).partitions_def if partitions_def: partitions_subsets_by_asset_key[asset_key] = ( partitions_def.empty_subset().with_partition_keys( diff --git a/python_modules/dagster/dagster/_core/definitions/asset_selection.py b/python_modules/dagster/dagster/_core/definitions/asset_selection.py index 6536c2cafb081..0be3175c516a2 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_selection.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_selection.py @@ -620,7 +620,7 @@ def resolve_inner(self, asset_graph: BaseAssetGraph) -> AbstractSet[AssetKey]: selection = self.child.resolve_inner(asset_graph) output = set(selection) for asset_key in selection: - output.update(asset_graph.get_execution_set_asset_keys(asset_key)) + output.update(asset_graph.get(asset_key).execution_set_asset_keys) return output def to_serializable_asset_selection(self, asset_graph: BaseAssetGraph) -> "AssetSelection": diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index c620ad2008be3..e19d14c212726 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -859,7 +859,7 @@ def evaluate_for_asset( else: # At least one upstream partition in each upstream asset must be updated in order # for the candidate to be updated - parent_asset_keys = context.asset_graph.get_parents(context.asset_key) + parent_asset_keys = context.asset_graph.get(context.asset_key).parent_keys updated_parent_keys = {ap.asset_key for ap in updated_parent_partitions} non_updated_parent_keys = parent_asset_keys - updated_parent_keys @@ -964,11 +964,11 @@ def get_parent_subsets_updated_since_cron_by_key( partitioned parents, as their partitions encode the time windows they have processed. """ updated_subsets_by_key = {} - for parent_asset_key in context.asset_graph.get_parents(context.asset_key): + for parent_asset_key in context.asset_graph.get(context.asset_key).parent_keys: # no need to incrementally calculate updated time-window partitions definitions, as # their partitions encode the time windows they have processed. if isinstance( - context.asset_graph.get_partitions_def(parent_asset_key), + context.asset_graph.get(parent_asset_key).partitions_def, TimeWindowPartitionsDefinition, ): continue @@ -988,7 +988,7 @@ def parent_updated_since_cron( """Returns if, for a given child asset partition, the given parent asset been updated with information from the required time window. """ - parent_partitions_def = context.asset_graph.get_partitions_def(parent_asset_key) + parent_partitions_def = context.asset_graph.get(parent_asset_key).partitions_def if isinstance(parent_partitions_def, TimeWindowPartitionsDefinition): # for time window partitions definitions, we simply assert that all time partitions that @@ -1069,7 +1069,7 @@ def evaluate_for_asset( candidate, updated_subsets_by_key.get(parent_asset_key, context.empty_subset()), ) - for parent_asset_key in context.asset_graph.get_parents(candidate.asset_key) + for parent_asset_key in context.asset_graph.get(candidate.asset_key).parent_keys ) }, ) diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 3832eb1de04d8..7262a3af9c1a9 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -22,7 +22,6 @@ ) import toposort -from typing_extensions import Self import dagster._check as check from dagster._core.definitions.asset_check_spec import AssetCheckKey @@ -77,6 +76,10 @@ class BaseAssetNode(ABC): parent_keys: AbstractSet[AssetKey] child_keys: AbstractSet[AssetKey] + @property + def has_self_dependency(self) -> bool: + return self.key in self.parent_keys + @property @abstractmethod def group_name(self) -> Optional[str]: ... @@ -109,13 +112,6 @@ def partitions_def(self) -> Optional[PartitionsDefinition]: ... @abstractmethod def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: ... - def get_partition_mapping(self, parent_asset: Self) -> PartitionMapping: - return infer_partition_mapping( - self.partition_mappings.get(parent_asset.key), - self.partitions_def, - parent_asset.partitions_def, - ) - @property @abstractmethod def freshness_policy(self) -> Optional[FreshnessPolicy]: ... @@ -244,64 +240,24 @@ def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: def all_group_names(self) -> AbstractSet[str]: return {a.group_name for a in self.asset_nodes if a.group_name is not None} - def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]: - # Performing an existence check temporarily until we change callsites - return self.get(asset_key).partitions_def if self.has(asset_key) else None - def get_partition_mapping( - self, asset_key: AssetKey, in_asset_key: AssetKey + self, asset_key: AssetKey, parent_asset_key: AssetKey ) -> PartitionMapping: - return self.get(asset_key).get_partition_mapping(self.get(in_asset_key)) - - def get_partitions_in_range( - self, - asset_key: AssetKey, - partition_key_range: PartitionKeyRange, - dynamic_partitions_store: DynamicPartitionsStore, - ) -> Sequence[AssetKeyPartitionKey]: - partition_def = self.get_partitions_def(asset_key) - partition_keys_in_range = check.not_none(partition_def).get_partition_keys_in_range( - partition_key_range, dynamic_partitions_store - ) - return [ - AssetKeyPartitionKey(asset_key, partition_key) - for partition_key in partition_keys_in_range - ] - - def is_partitioned(self, asset_key: AssetKey) -> bool: - return self.get_partitions_def(asset_key) is not None - - def have_same_partitioning(self, asset_key1: AssetKey, asset_key2: AssetKey) -> bool: - """Returns whether the given assets have the same partitions definition.""" - return self.get(asset_key1).partitions_def == self.get(asset_key2).partitions_def - - def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool: - partitions_defs = [] - for asset_key in asset_keys: - partitions_def = self.get(asset_key).partitions_def - if partitions_def: - partitions_defs.append(partitions_def) - - return len(partitions_defs) <= 1 or all( - partitions_defs[i] == partitions_defs[0] for i in range(1, len(partitions_defs)) + node = self.get(asset_key) + return infer_partition_mapping( + node.partition_mappings.get(parent_asset_key), + node.partitions_def, + self.get(parent_asset_key).partitions_def, ) - def get_child_nodes(self, node: T_AssetNode) -> AbstractSet[T_AssetNode]: + def get_children(self, node: T_AssetNode) -> AbstractSet[T_AssetNode]: """Returns all asset nodes that directly depend on the given asset node.""" return {self._asset_nodes_by_key[key] for key in self.get(node.key).child_keys} - def get_children(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: - """Returns all asset keys that directly depend on the given asset key.""" - return self.get(asset_key).child_keys - - def get_parent_nodes(self, node: T_AssetNode) -> AbstractSet[T_AssetNode]: + def get_parents(self, node: T_AssetNode) -> AbstractSet[T_AssetNode]: """Returns all asset nodes that are direct dependencies on the given asset node.""" return {self._asset_nodes_by_key[key] for key in self.get(node.key).parent_keys} - def get_parents(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: - """Returns all asset keys that are direct dependencies on the given asset key.""" - return self.get(asset_key).parent_keys - def get_ancestor_asset_keys( self, asset_key: AssetKey, include_self: bool = False ) -> AbstractSet[AssetKey]: @@ -322,6 +278,21 @@ def get_ancestor_asset_keys( ancestors.add(asset_key) return ancestors + def get_partitions_in_range( + self, + asset_key: AssetKey, + partition_key_range: PartitionKeyRange, + dynamic_partitions_store: DynamicPartitionsStore, + ) -> Sequence[AssetKeyPartitionKey]: + partition_def = self.get(asset_key).partitions_def + partition_keys_in_range = check.not_none(partition_def).get_partition_keys_in_range( + partition_key_range, dynamic_partitions_store + ) + return [ + AssetKeyPartitionKey(asset_key, partition_key) + for partition_key in partition_keys_in_range + ] + def get_parent_asset_subset( self, child_asset_subset: ValidAssetSubset, @@ -333,8 +304,8 @@ def get_parent_asset_subset( relevant PartitionMapping. """ child_asset_key = child_asset_subset.asset_key - child_partitions_def = self.get_partitions_def(child_asset_key) - parent_partitions_def = self.get_partitions_def(parent_asset_key) + child_partitions_def = self.get(child_asset_key).partitions_def + parent_partitions_def = self.get(parent_asset_key).partitions_def if parent_partitions_def is None: return ValidAssetSubset(parent_asset_key, value=child_asset_subset.size > 0) @@ -363,8 +334,8 @@ def get_child_asset_subset( relevant PartitionMapping. """ parent_asset_key = parent_asset_subset.asset_key - parent_partitions_def = self.get_partitions_def(parent_asset_key) - child_partitions_def = self.get_partitions_def(child_asset_key) + parent_partitions_def = self.get(parent_asset_key).partitions_def + child_partitions_def = self.get(child_asset_key).partitions_def if parent_partitions_def is None: if parent_asset_subset.size > 0: @@ -398,7 +369,7 @@ def get_children_partitions( partition of that asset. """ result: Set[AssetKeyPartitionKey] = set() - for child in self.get_child_nodes(self.get(asset_key)): + for child in self.get_children(self.get(asset_key)): if child.is_partitioned: for child_partition_key in self.get_child_partition_keys_of_parent( dynamic_partitions_store, @@ -434,8 +405,8 @@ def get_child_partition_keys_of_parent( Sequence[str]: A list of the corresponding downstream partitions in child_asset_key that partition_key maps to. """ - child_partitions_def = self.get_partitions_def(child_asset_key) - parent_partitions_def = self.get_partitions_def(parent_asset_key) + child_partitions_def = self.get(child_asset_key).partitions_def + parent_partitions_def = self.get(parent_asset_key).partitions_def if child_partitions_def is None: raise DagsterInvalidInvocationError( @@ -475,8 +446,8 @@ def get_parents_partitions( """ valid_parent_partitions: Set[AssetKeyPartitionKey] = set() required_but_nonexistent_parent_partitions: Set[AssetKeyPartitionKey] = set() - for parent_asset_key in self.get_parents(asset_key): - if self.has(parent_asset_key) and self.is_partitioned(parent_asset_key): + for parent_asset_key in self.get(asset_key).parent_keys: + if self.has(parent_asset_key) and self.get(parent_asset_key).is_partitioned: mapped_partitions_result = self.get_parent_partition_keys_for_child( partition_key, parent_asset_key, @@ -529,8 +500,8 @@ def get_parent_partition_keys_for_child( """ partition_key = check.opt_str_param(partition_key, "partition_key") - child_partitions_def = cast(PartitionsDefinition, self.get_partitions_def(child_asset_key)) - parent_partitions_def = self.get_partitions_def(parent_asset_key) + child_partitions_def = cast(PartitionsDefinition, self.get(child_asset_key).partitions_def) + parent_partitions_def = self.get(parent_asset_key).partitions_def if parent_partitions_def is None: raise DagsterInvalidInvocationError( @@ -557,7 +528,7 @@ def has_materializable_parents(self, asset_key: AssetKey) -> bool: return False return any( self.has(parent_key) and self.get(parent_key).is_materializable - for parent_key in self.get_parents(asset_key) - {asset_key} + for parent_key in self.get(asset_key).parent_keys - {asset_key} ) def get_materializable_roots(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: @@ -580,18 +551,12 @@ def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]: queue = deque([asset_key]) while queue: current_key = queue.popleft() - for parent_key in self.get_parents(current_key): + for parent_key in self.get(current_key).parent_keys: if parent_key not in visited: yield parent_key queue.append(parent_key) visited.add(parent_key) - def get_execution_set_asset_keys(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: - """For a given asset_key, return the set of asset keys that must be - materialized at the same time. - """ - return self.get(asset_key).execution_set_asset_keys - @abstractmethod def get_execution_set_asset_and_check_keys( self, asset_key_or_check_key: AssetKeyOrCheckKey @@ -618,9 +583,6 @@ def get_downstream_freshness_policies( return downstream_policies - def has_self_dependency(self, asset_key: AssetKey) -> bool: - return asset_key in self.get_parents(asset_key) - def bfs_filter_subsets( self, dynamic_partitions_store: DynamicPartitionsStore, @@ -648,7 +610,7 @@ def bfs_filter_subsets( queued_subsets_by_asset_key: Dict[AssetKey, Optional[PartitionsSubset]] = { initial_asset_key: ( initial_subset.get_partitions_subset(initial_asset_key, self) - if self.get_partitions_def(initial_asset_key) + if self.get(initial_asset_key).is_partitioned else None ), } @@ -666,9 +628,9 @@ def bfs_filter_subsets( ), ) - for child in self.get_children(asset_key): - partition_mapping = self.get_partition_mapping(child, asset_key) - child_partitions_def = self.get_partitions_def(child) + for child_key in self.get(asset_key).child_keys: + partition_mapping = self.get_partition_mapping(child_key, asset_key) + child_partitions_def = self.get(child_key).partitions_def if child_partitions_def: if partitions_subset is None: @@ -678,19 +640,21 @@ def bfs_filter_subsets( dynamic_partitions_store=dynamic_partitions_store, ) ) - queued_subsets_by_asset_key[child] = child_partitions_subset + queued_subsets_by_asset_key[child_key] = child_partitions_subset else: child_partitions_subset = ( partition_mapping.get_downstream_partitions_for_partitions( partitions_subset, - check.not_none(self.get_partitions_def(asset_key)), + check.not_none(self.get(asset_key).partitions_def), downstream_partitions_def=child_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, ) ) - prior_child_partitions_subset = queued_subsets_by_asset_key.get(child) - queued_subsets_by_asset_key[child] = ( + prior_child_partitions_subset = queued_subsets_by_asset_key.get( + child_key + ) + queued_subsets_by_asset_key[child_key] = ( child_partitions_subset if not prior_child_partitions_subset else child_partitions_subset | prior_child_partitions_subset @@ -698,9 +662,9 @@ def bfs_filter_subsets( else: child_partitions_subset = None - if child not in all_assets: - queue.append(child) - all_assets.add(child) + if child_key not in all_assets: + queue.append(child_key) + all_assets.add(child_key) return result @@ -770,7 +734,7 @@ def sort_key_for_asset_partition( Assets with a time window partition dimension will be sorted from newest to oldest, unless they have a self-dependency, in which case they are sorted from oldest to newest. """ - partitions_def = asset_graph.get_partitions_def(asset_partition.asset_key) + partitions_def = asset_graph.get(asset_partition.asset_key).partitions_def time_partitions_def = get_time_partitions_def(partitions_def) if time_partitions_def is None: return 0 @@ -781,7 +745,7 @@ def sort_key_for_asset_partition( time_partition_key ).timestamp() - if asset_graph.has_self_dependency(asset_partition.asset_key): + if asset_graph.get(asset_partition.asset_key).has_self_dependency: # sort self dependencies from oldest to newest, as older partitions must exist before # new ones can execute return partition_timestamp @@ -850,7 +814,7 @@ def _queue_item( asset_key = asset_partition.asset_key if self._include_full_execution_set: - execution_set_keys = self._asset_graph.get_execution_set_asset_keys(asset_key) + execution_set_keys = self._asset_graph.get(asset_key).execution_set_asset_keys else: execution_set_keys = {asset_key} diff --git a/python_modules/dagster/dagster/_core/definitions/data_time.py b/python_modules/dagster/dagster/_core/definitions/data_time.py index ef441f233b250..ca38f3005a1d5 100644 --- a/python_modules/dagster/dagster/_core/definitions/data_time.py +++ b/python_modules/dagster/dagster/_core/definitions/data_time.py @@ -175,7 +175,7 @@ def _upstream_records_by_key( ) -> Mapping[AssetKey, "EventLogRecord"]: upstream_records: Dict[AssetKey, EventLogRecord] = {} - for parent_key in self.asset_graph.get_parents(asset_key): + for parent_key in self.asset_graph.get(asset_key).parent_keys: if not ( self.asset_graph.has(parent_key) and self.asset_graph.get(parent_key).is_executable ): @@ -311,7 +311,7 @@ def _calculate_data_time_by_key( return {key: None for key in self.asset_graph.get_materializable_roots(asset_key)} record_timestamp = check.not_none(record_timestamp) - partitions_def = self.asset_graph.get_partitions_def(asset_key) + partitions_def = self.asset_graph.get(asset_key).partitions_def if isinstance(partitions_def, TimeWindowPartitionsDefinition): return self._calculate_data_time_by_key_time_partitioned( asset_key=asset_key, @@ -377,7 +377,7 @@ def _get_in_progress_data_time_in_run( return current_time data_time = current_time - for parent_key in self.asset_graph.get_parents(asset_key): + for parent_key in self.asset_graph.get(asset_key).parent_keys: if parent_key not in self.asset_graph.materializable_asset_keys: continue parent_data_time = self._get_in_progress_data_time_in_run( diff --git a/python_modules/dagster/dagster/_core/definitions/data_version.py b/python_modules/dagster/dagster/_core/definitions/data_version.py index 64c587b676166..a88ea2c4813f2 100644 --- a/python_modules/dagster/dagster/_core/definitions/data_version.py +++ b/python_modules/dagster/dagster/_core/definitions/data_version.py @@ -491,7 +491,7 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato code_version = self.asset_graph.get(key.asset_key).code_version provenance = self._get_current_data_provenance(key=key) - asset_deps = self.asset_graph.get_parents(key.asset_key) + asset_deps = self.asset_graph.get(key.asset_key).parent_keys # only used if no provenance available materialization = check.not_none(self._get_latest_data_version_record(key=key)) @@ -663,7 +663,7 @@ def _is_volatile(self, *, key: "AssetKey") -> bool: if asset.is_external: return asset.is_observable else: - deps = asset.get_parents(key) + deps = asset.get(key).parent_keys return len(deps) == 0 or any(self._is_volatile(key=dep_key) for dep_key in deps) @cached_method @@ -713,11 +713,11 @@ def _get_partition_dependencies( AssetKeyPartitionKey, ) - asset_deps = self.asset_graph.get_parents(key.asset_key) + asset_deps = self.asset_graph.get(key.asset_key).parent_keys deps = [] for dep_asset_key in asset_deps: - if not self.asset_graph.is_partitioned(dep_asset_key): + if not self.asset_graph.get(dep_asset_key).is_partitioned: deps.append(AssetKeyPartitionKey(dep_asset_key, None)) elif key.asset_key == dep_asset_key and self._exceeds_self_partition_limit( key.asset_key @@ -744,6 +744,6 @@ def _get_partition_dependencies( def _exceeds_self_partition_limit(self, asset_key: "AssetKey") -> bool: return ( - check.not_none(self.asset_graph.get_partitions_def(asset_key)).get_num_partitions() + check.not_none(self.asset_graph.get(asset_key).partitions_def).get_num_partitions() >= SKIP_PARTITION_DATA_VERSION_SELF_DEPENDENCY_THRESHOLD ) diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py index f81085159b1aa..3dbfdf43acd3c 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py @@ -132,7 +132,7 @@ def get_expected_data_time_for_asset_key( return context.data_time_resolver.get_current_data_time(asset_key, current_time) elif asset_graph.has_materializable_parents(asset_key): expected_data_time = None - for parent_key in asset_graph.get_parents(asset_key): + for parent_key in asset_graph.get(asset_key).parent_keys: # if the parent will be materialized on this tick, and it's not in the same repo, then # we must wait for this asset to be materialized if isinstance(asset_graph, RemoteAssetGraph) and context.will_update_asset_partition( @@ -168,9 +168,10 @@ def freshness_evaluation_results_for_asset_key( asset_key = context.asset_key current_time = context.evaluation_time - if not context.asset_graph.get_downstream_freshness_policies( - asset_key=asset_key - ) or context.asset_graph.is_partitioned(asset_key): + if ( + not context.asset_graph.get_downstream_freshness_policies(asset_key=asset_key) + or context.asset_graph.get(asset_key).is_partitioned + ): return context.empty_subset(), [] # figure out the current contents of this asset diff --git a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py index 534bc65301fe3..b28adf9103470 100644 --- a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py @@ -381,7 +381,7 @@ def get_implicit_job_name_for_assets( ) # for observable assets, we need to select the job based on the partitions def target_partitions_defs = { - self.get_partitions_def(asset_key) for asset_key in asset_keys + self.get(asset_key).partitions_def for asset_key in asset_keys } check.invariant(len(target_partitions_defs) == 1, "Expected exactly one partitions def") target_partitions_def = next(iter(target_partitions_defs)) diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index 5a3e9512fd072..009aef38a25bc 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -198,7 +198,7 @@ def resolve( asset_keys_by_partitions_def = defaultdict(set) for asset_key in selected_asset_keys: - partitions_def = asset_graph.get_partitions_def(asset_key) + partitions_def = asset_graph.get(asset_key).partitions_def if partitions_def is not None: asset_keys_by_partitions_def[partitions_def].add(asset_key) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 89e787e6bd3d3..ac03be0fa50c8 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -211,7 +211,7 @@ def _get_self_and_downstream_targeted_subset( for asset_key in self.target_subset.asset_keys if all( parent not in self.target_subset.asset_keys - for parent in instance_queryer.asset_graph.get_parents(asset_key) + for parent in instance_queryer.asset_graph.get(asset_key).parent_keys - {asset_key} # Do not include an asset as its own parent ) } @@ -321,7 +321,7 @@ def get_backfill_status_per_asset_key( def _get_status_for_asset_key( asset_key: AssetKey, ) -> Union[PartitionedAssetBackfillStatus, UnpartitionedAssetBackfillStatus]: - if asset_graph.get_partitions_def(asset_key) is not None: + if asset_graph.get(asset_key).is_partitioned: materialized_subset = self.materialized_subset.get_partitions_subset( asset_key, asset_graph ) @@ -456,7 +456,7 @@ def from_partitions_by_assets( for partitions_by_asset_selector in partitions_by_assets: asset_key = partitions_by_asset_selector.asset_key partitions = partitions_by_asset_selector.partitions - partition_def = asset_graph.get_partitions_def(asset_key) + partition_def = asset_graph.get(asset_key).partitions_def if partitions and partition_def: if partitions.partition_range: # a range of partitions is selected @@ -512,14 +512,14 @@ def from_asset_partitions( partitioned_asset_keys = { asset_key for asset_key in asset_selection - if asset_graph.get_partitions_def(asset_key) is not None + if asset_graph.get(asset_key).is_partitioned } root_partitioned_asset_keys = ( AssetSelection.keys(*partitioned_asset_keys).sources().resolve(asset_graph) ) root_partitions_defs = { - asset_graph.get_partitions_def(asset_key) + asset_graph.get(asset_key).partitions_def for asset_key in root_partitioned_asset_keys } if len(root_partitions_defs) > 1: @@ -631,7 +631,7 @@ def _get_requested_asset_partitions_from_run_requests( selected_assets = cast(Sequence[AssetKey], run_request.asset_selection) check.invariant(len(selected_assets) > 0) partitions_defs = set( - asset_graph.get_partitions_def(asset_key) for asset_key in selected_assets + asset_graph.get(asset_key).partitions_def for asset_key in selected_assets ) check.invariant( len(partitions_defs) == 1, @@ -775,7 +775,7 @@ def _check_target_partitions_subset_is_valid( f"Asset {asset_key} existed at storage-time, but no longer does" ) - partitions_def = asset_graph.get_partitions_def(asset_key) + partitions_def = asset_graph.get(asset_key).partitions_def if target_partitions_subset: # Asset was partitioned at storage time if partitions_def is None: @@ -1318,20 +1318,18 @@ def can_run_with_parent( this tick. """ parent_target_subset = target_subset.get_asset_subset(parent.asset_key, asset_graph) - parent_backfill_policy = asset_graph.get(parent.asset_key).backfill_policy candidate_target_subset = target_subset.get_asset_subset(candidate.asset_key, asset_graph) - candidate_backfill_policy = asset_graph.get(candidate.asset_key).backfill_policy partition_mapping = asset_graph.get_partition_mapping( - candidate.asset_key, in_asset_key=parent.asset_key + candidate.asset_key, parent_asset_key=parent.asset_key ) + parent_node = asset_graph.get(parent.asset_key) + candidate_node = asset_graph.get(candidate.asset_key) # checks if there is a simple partition mapping between the parent and the child has_identity_partition_mapping = ( # both unpartitioned - ( - not asset_graph.is_partitioned(candidate.asset_key) - and not asset_graph.is_partitioned(parent.asset_key) - ) + not candidate_node.is_partitioned + and not parent_node.is_partitioned # normal identity partition mapping or isinstance(partition_mapping, IdentityPartitionMapping) # for assets with the same time partitions definition, a non-offset partition @@ -1343,10 +1341,9 @@ def can_run_with_parent( ) ) return ( - parent_backfill_policy == candidate_backfill_policy - and asset_graph.get_repository_handle(candidate.asset_key) - is asset_graph.get_repository_handle(parent.asset_key) - and asset_graph.have_same_partitioning(parent.asset_key, candidate.asset_key) + parent_node.backfill_policy == candidate_node.backfill_policy + and parent_node.priority_repository_handle is candidate_node.priority_repository_handle + and parent_node.partitions_def == candidate_node.partitions_def and ( parent.partition_key in asset_partitions_to_request_map[parent.asset_key] or parent in candidates_unit @@ -1359,14 +1356,14 @@ def can_run_with_parent( # parent if... or ( # there is a backfill policy for the parent - parent_backfill_policy is not None + parent_node.backfill_policy is not None # the same subset of parents is targeted as the child and parent_target_subset.value == candidate_target_subset.value and ( # there is no limit on the size of a single run or... - parent_backfill_policy.max_partitions_per_run is None + parent_node.backfill_policy.max_partitions_per_run is None # a single run can materialize all requested parent partitions - or parent_backfill_policy.max_partitions_per_run + or parent_node.backfill_policy.max_partitions_per_run > len(asset_partitions_to_request_map[parent.asset_key]) ) # all targeted parents are being requested this tick diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index 1a639a672fb1f..a27e48e59b2c2 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -860,7 +860,7 @@ def _evaluate_auto_materialize_tick( ) evaluations_by_asset_key = { evaluation_record.asset_key: evaluation_record.get_evaluation_with_run_ids( - partitions_def=asset_graph.get_partitions_def(evaluation_record.asset_key) + partitions_def=asset_graph.get(evaluation_record.asset_key).partitions_def ) for evaluation_record in evaluation_records } diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index bca552deab7f0..cdd75f9328c85 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -130,7 +130,7 @@ def _get_updated_cache_value(self, *, asset_key: AssetKey) -> Optional["AssetSta get_and_update_asset_status_cache_value, ) - partitions_def = check.not_none(self.asset_graph.get_partitions_def(asset_key)) + partitions_def = check.not_none(self.asset_graph.get(asset_key).partitions_def) asset_record = self.get_asset_record(asset_key) return get_and_update_asset_status_cache_value( instance=self.instance, @@ -145,7 +145,7 @@ def get_failed_or_in_progress_subset(self, *, asset_key: AssetKey) -> Partitions """Returns a PartitionsSubset representing the set of partitions that are either in progress or whose last materialization attempt failed. """ - partitions_def = check.not_none(self.asset_graph.get_partitions_def(asset_key)) + partitions_def = check.not_none(self.asset_graph.get(asset_key).partitions_def) cache_value = self._get_updated_cache_value(asset_key=asset_key) if cache_value is None: return partitions_def.empty_subset() @@ -157,7 +157,7 @@ def get_failed_or_in_progress_subset(self, *, asset_key: AssetKey) -> Partitions @cached_method def get_materialized_asset_subset(self, *, asset_key: AssetKey) -> AssetSubset: """Returns an AssetSubset representing the subset of the asset that has been materialized.""" - partitions_def = self.asset_graph.get_partitions_def(asset_key) + partitions_def = self.asset_graph.get(asset_key).partitions_def if partitions_def: cache_value = self._get_updated_cache_value(asset_key=asset_key) if cache_value is None: @@ -243,7 +243,7 @@ def _get_latest_materialization_or_observation_storage_ids_by_asset_partition( latest_storage_ids = { asset_partition: latest_record.storage_id if latest_record is not None else None } - if self.asset_graph.is_partitioned(asset_key): + if self.asset_graph.get(asset_key).is_partitioned: latest_storage_ids.update( { AssetKeyPartitionKey(asset_key, partition_key): storage_id @@ -553,7 +553,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( AssetKeyPartitionKey(child_asset_key) ) ] - for parent_asset_key in self.asset_graph.get_parents(child_asset_key): + for parent_asset_key in self.asset_graph.get(child_asset_key).parent_keys: # ignore non-existent parents if not self.asset_graph.has(parent_asset_key): continue @@ -574,7 +574,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( ) ) - parent_partitions_def = self.asset_graph.get_partitions_def(parent_asset_key) + parent_partitions_def = self.asset_graph.get(parent_asset_key).partitions_def if parent_partitions_def is None: latest_parent_record = check.not_none( self.get_latest_materialization_or_observation_record( @@ -622,7 +622,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( # we are mapping from the partitions of the parent asset to the partitions of # the child asset partition_mapping = self.asset_graph.get_partition_mapping( - asset_key=child_asset_key, in_asset_key=parent_asset_key + asset_key=child_asset_key, parent_asset_key=parent_asset_key ) try: child_partitions_subset = ( @@ -691,7 +691,7 @@ def _asset_partitions_data_versions( after_cursor: Optional[int] = None, before_cursor: Optional[int] = None, ) -> Mapping[AssetKeyPartitionKey, Optional[DataVersion]]: - if not self.asset_graph.is_partitioned(asset_key): + if not self.asset_graph.get(asset_key).is_partitioned: asset_partition = AssetKeyPartitionKey(asset_key) latest_record = self.get_latest_materialization_or_observation_record( asset_partition, after_cursor=after_cursor, before_cursor=before_cursor @@ -822,7 +822,7 @@ def get_asset_subset_updated_after_cursor( self, *, asset_key: AssetKey, after_cursor: Optional[int] ) -> ValidAssetSubset: """Returns the AssetSubset of the given asset that has been updated after the given cursor.""" - partitions_def = self.asset_graph.get_partitions_def(asset_key) + partitions_def = self.asset_graph.get(asset_key).partitions_def if partitions_def is None: return ValidAssetSubset( asset_key, @@ -855,7 +855,7 @@ def get_asset_subset_updated_after_time( self, *, asset_key: AssetKey, after_time: datetime ) -> ValidAssetSubset: """Returns the AssetSubset of the given asset that has been updated after the given time.""" - partitions_def = self.asset_graph.get_partitions_def(asset_key) + partitions_def = self.asset_graph.get(asset_key).partitions_def method = ( self.instance.fetch_materializations @@ -893,7 +893,7 @@ def get_parent_asset_partitions_updated_after_child( for parent in parent_asset_partitions: parent_asset_partitions_by_key[parent.asset_key].add(parent) - partitions_def = self.asset_graph.get_partitions_def(asset_partition.asset_key) + partitions_def = self.asset_graph.get(asset_partition.asset_key).partitions_def updated_parents = set() for parent_key, parent_asset_partitions in parent_asset_partitions_by_key.items(): @@ -909,7 +909,7 @@ def get_parent_asset_partitions_updated_after_child( # historical time partitions if ( isinstance(partitions_def, TimeWindowPartitionsDefinition) - and not self.asset_graph.is_partitioned(parent_key) + and not self.asset_graph.get(parent_key).is_partitioned and asset_partition.partition_key != partitions_def.get_last_partition_key( current_time=self.evaluation_time, dynamic_partitions_store=self @@ -963,7 +963,7 @@ def get_outdated_ancestors( # the set of parent keys which we don't need to check ignored_parent_keys = { parent - for parent in self.asset_graph.get_parents(asset_key) + for parent in self.asset_graph.get(asset_key).parent_keys if self.have_ignorable_partition_mapping_for_outdated(asset_key, parent) } diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index ebbf0980dc7c5..84a5b8698b2ba 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -83,18 +83,22 @@ def asset3(asset1, asset2): ... assets = [asset0, asset1, asset2, asset3] asset_graph = asset_graph_from_assets(assets) + asset0_node = asset_graph.get(asset0.key) + asset1_node = asset_graph.get(asset1.key) + asset2_node = asset_graph.get(asset2.key) + asset3_node = asset_graph.get(asset3.key) assert asset_graph.all_asset_keys == {asset0.key, asset1.key, asset2.key, asset3.key} - assert not asset_graph.is_partitioned(asset0.key) - assert asset_graph.is_partitioned(asset1.key) - assert asset_graph.have_same_partitioning(asset1.key, asset2.key) - assert not asset_graph.have_same_partitioning(asset1.key, asset3.key) - assert asset_graph.get_children(asset0.key) == {asset1.key, asset2.key} - assert asset_graph.get_parents(asset3.key) == {asset1.key, asset2.key} - for asset_def in assets: - assert asset_graph.get_execution_set_asset_keys(asset_def.key) == {asset_def.key} - assert asset_graph.get(asset0.key).code_version == "1" - assert asset_graph.get(asset1.key).code_version is None + assert not asset0_node.is_partitioned + assert asset1_node.is_partitioned + assert asset1_node.partitions_def == asset2_node.partitions_def + assert asset1_node.partitions_def != asset3_node.partitions_def + assert asset0_node.child_keys == {asset1.key, asset2.key} + assert asset3_node.parent_keys == {asset1.key, asset2.key} + for node in [asset0_node, asset1_node, asset2_node, asset3_node]: + assert node.execution_set_asset_keys == {node.key} + assert asset0_node.code_version == "1" + assert asset1_node.code_version is None def test_get_children_partitions_unpartitioned_parent_partitioned_child( @@ -351,7 +355,7 @@ def non_subsettable_multi_asset(): ... asset_graph = asset_graph_from_assets([non_subsettable_multi_asset]) for asset_key in non_subsettable_multi_asset.keys: assert ( - asset_graph.get_execution_set_asset_keys(asset_key) == non_subsettable_multi_asset.keys + asset_graph.get(asset_key).execution_set_asset_keys == non_subsettable_multi_asset.keys ) @@ -366,7 +370,7 @@ def subsettable_multi_asset(): ... asset_graph = asset_graph_from_assets([subsettable_multi_asset]) for asset_key in subsettable_multi_asset.keys: - assert asset_graph.get_execution_set_asset_keys(asset_key) == {asset_key} + assert asset_graph.get(asset_key).execution_set_asset_keys == {asset_key} def test_required_multi_asset_sets_graph_backed_multi_asset( @@ -389,7 +393,7 @@ def graph1(): graph_backed_multi_asset = AssetsDefinition.from_graph(graph1) asset_graph = asset_graph_from_assets([graph_backed_multi_asset]) for asset_key in graph_backed_multi_asset.keys: - assert asset_graph.get_execution_set_asset_keys(asset_key) == graph_backed_multi_asset.keys + assert asset_graph.get(asset_key).execution_set_asset_keys == graph_backed_multi_asset.keys def test_required_multi_asset_sets_same_op_in_different_assets( @@ -404,7 +408,7 @@ def op1(): ... asset_graph = asset_graph_from_assets(assets) for asset_def in assets: - assert asset_graph.get_execution_set_asset_keys(asset_def.key) == {asset_def.key} + assert asset_graph.get(asset_def.key).execution_set_asset_keys == {asset_def.key} def test_partitioned_source_asset(asset_graph_from_assets: Callable[..., BaseAssetGraph]): @@ -424,8 +428,8 @@ def downstream_of_partitioned_source(): if isinstance(asset_graph, RemoteAssetGraph): pytest.xfail("not supported with RemoteAssetGraph") - assert asset_graph.is_partitioned(AssetKey("partitioned_source")) - assert asset_graph.is_partitioned(AssetKey("downstream_of_partitioned_source")) + assert asset_graph.get(AssetKey("partitioned_source")).is_partitioned + assert asset_graph.get(AssetKey("downstream_of_partitioned_source")).is_partitioned def test_bfs_filter_asset_subsets(asset_graph_from_assets: Callable[..., BaseAssetGraph]): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py index ffe974a464f88..ba53633d9bd39 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py @@ -235,8 +235,8 @@ def test_cross_repo_dep_no_source_asset(instance): def test_partitioned_source_asset(instance): asset_graph = RemoteAssetGraph.from_workspace(_make_context(instance, ["partitioned_defs"])) - assert asset_graph.is_partitioned(AssetKey("partitioned_source")) - assert asset_graph.is_partitioned(AssetKey("downstream_of_partitioned_source")) + assert asset_graph.get(AssetKey("partitioned_source")).is_partitioned + assert asset_graph.get(AssetKey("downstream_of_partitioned_source")).is_partitioned def test_get_implicit_job_name_for_assets(instance): diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 766eb74fd7074..ed35d4ab834f2 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -465,7 +465,7 @@ def make_random_subset( # all partitions downstream of half of the partitions in each partitioned root asset root_asset_partitions: Set[AssetKeyPartitionKey] = set() for i, root_asset_key in enumerate(sorted(asset_graph.root_materializable_asset_keys)): - partitions_def = asset_graph.get_partitions_def(root_asset_key) + partitions_def = asset_graph.get(root_asset_key).partitions_def if partitions_def is not None: partition_keys = list( @@ -498,7 +498,7 @@ def make_subset_from_partition_keys( ) -> AssetGraphSubset: root_asset_partitions: Set[AssetKeyPartitionKey] = set() for i, root_asset_key in enumerate(sorted(asset_graph.root_materializable_asset_keys)): - if asset_graph.get_partitions_def(root_asset_key) is not None: + if asset_graph.get(root_asset_key).is_partitioned: root_asset_partitions.update( AssetKeyPartitionKey(root_asset_key, partition_key) for partition_key in partition_keys diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/asset_daemon_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/asset_daemon_scenario.py index bfaa59b68a79c..0228cb6202493 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/asset_daemon_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/asset_daemon_scenario.py @@ -102,7 +102,7 @@ def resolve(self, asset_key: AssetKey, asset_graph: BaseAssetGraph) -> AssetSubs """ subset = AssetSubset.from_asset_partitions_set( asset_key, - asset_graph.get_partitions_def(asset_key), + asset_graph.get(asset_key).partitions_def, { AssetKeyPartitionKey(asset_key, partition_key) for partition_key in self.partitions or [None] @@ -253,7 +253,7 @@ def _evaluate_tick_daemon( ] new_evaluations = [ e.get_evaluation_with_run_ids( - self.asset_graph.get_partitions_def(e.asset_key) + self.asset_graph.get(e.asset_key).partitions_def ).evaluation for e in check.not_none( self.instance.schedule_storage @@ -398,7 +398,7 @@ def _assert_evaluation_daemon( assert ( new_run_ids_for_asset == evaluation_record.get_evaluation_with_run_ids( - self.asset_graph.get_partitions_def(key) + self.asset_graph.get(key).partitions_def ).run_ids ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_partition_status_cache.py b/python_modules/dagster/dagster_tests/storage_tests/test_partition_status_cache.py index 18017c3528ace..ec8a995386cac 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_partition_status_cache.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_partition_status_cache.py @@ -54,7 +54,7 @@ def asset1(): asset_job.execute_in_process(instance=instance) cached_status = get_and_update_asset_status_cache_value( - instance, asset_key, asset_graph.get_partitions_def(asset_key) + instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status @@ -111,7 +111,7 @@ def _swap_partitions_def(new_partitions_def, asset, asset_graph, asset_job): asset_job.execute_in_process(instance=created_instance, partition_key="2022-02-02") cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status @@ -154,7 +154,7 @@ def _swap_partitions_def(new_partitions_def, asset, asset_graph, asset_job): asset_job.execute_in_process(instance=created_instance, partition_key="2022-02-01") cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status assert cached_status.latest_storage_id @@ -173,7 +173,7 @@ def _swap_partitions_def(new_partitions_def, asset, asset_graph, asset_job): asset_job.execute_in_process(instance=created_instance, partition_key="2022-02-02") cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status assert cached_status.latest_storage_id @@ -195,7 +195,7 @@ def _swap_partitions_def(new_partitions_def, asset, asset_graph, asset_job): ) asset_job.execute_in_process(instance=created_instance, partition_key="a") cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status assert cached_status.serialized_materialized_partition_subset @@ -236,7 +236,7 @@ def asset1(): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status assert cached_status.latest_storage_id @@ -253,7 +253,7 @@ def asset1(): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status assert cached_status.serialized_materialized_partition_subset @@ -288,7 +288,7 @@ def asset1(): asset_job.execute_in_process(instance=created_instance, partition_key="2022-02-01") cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status assert cached_status.serialized_materialized_partition_subset @@ -320,7 +320,7 @@ def asset1(): asset_job.execute_in_process(instance=created_instance, partition_key="a_partition") cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status assert cached_status.serialized_materialized_partition_subset is None @@ -341,7 +341,7 @@ def asset1(context): with instance_for_test() as created_instance: # no events cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert not cached_status @@ -350,7 +350,7 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) # failed partition assert partitions_def.deserialize_subset( @@ -371,7 +371,7 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) # cache is updated with new failed partition, successful partition is ignored assert partitions_def.deserialize_subset( @@ -403,7 +403,7 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) # cache is updated after successful materialization of fail1 assert partitions_def.deserialize_subset( @@ -433,7 +433,7 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) # in progress materialization is ignored assert partitions_def.deserialize_subset( @@ -470,7 +470,7 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) # failed partition assert partitions_def.deserialize_subset( @@ -510,13 +510,13 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) created_instance.report_run_failed(run_1) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status.deserialize_failed_partition_subsets( partitions_def @@ -547,7 +547,7 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status.deserialize_failed_partition_subsets( partitions_def @@ -559,7 +559,7 @@ def asset1(context): created_instance.report_run_failed(run_2) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status.deserialize_failed_partition_subsets( partitions_def @@ -604,7 +604,7 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) early_id = cached_status.earliest_in_progress_materialization_event_id assert cached_status.deserialize_in_progress_partition_subsets( @@ -630,7 +630,7 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert ( partitions_def.deserialize_subset( @@ -646,7 +646,7 @@ def asset1(context): created_instance.report_run_failed(run_2) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert partitions_def.deserialize_subset( cached_status.serialized_failed_partition_subset @@ -659,7 +659,7 @@ def asset1(context): created_instance.report_run_canceled(run_1) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert partitions_def.deserialize_subset( cached_status.serialized_failed_partition_subset @@ -723,7 +723,7 @@ def asset1(context): ) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert cached_status.deserialize_in_progress_partition_subsets( partitions_def @@ -733,7 +733,7 @@ def asset1(context): created_instance.report_run_failed(run_2) cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) assert partitions_def.deserialize_subset( cached_status.serialized_failed_partition_subset @@ -778,10 +778,10 @@ def my_asset(): asset_key = AssetKey("my_asset") cached_status = get_and_update_asset_status_cache_value( - created_instance, asset_key, asset_graph.get_partitions_def(asset_key) + created_instance, asset_key, asset_graph.get(asset_key).partitions_def ) failed_subset = cached_status.deserialize_failed_partition_subsets( - asset_graph.get_partitions_def(asset_key) + asset_graph.get(asset_key).partitions_def ) assert failed_subset.get_partition_keys() == set()