diff --git a/metaflow/client/core.py b/metaflow/client/core.py index 4edbcdac00c..9bbe2a16d78 100644 --- a/metaflow/client/core.py +++ b/metaflow/client/core.py @@ -1191,142 +1191,161 @@ def _iter_filter(self, x): # exclude private data artifacts return x.id[0] != "_" - def _iter_matching_tasks(self, steps, metadata_key, metadata_pattern): + def _get_matching_pathspecs(self, steps, metadata_key, metadata_pattern): """ - Yield tasks from specified steps matching a foreach path pattern. + Yield pathspecs of tasks from specified steps that match a given metadata pattern. Parameters ---------- - steps : List[str] - List of step names to search for tasks - pattern : str - Regex pattern to match foreach-indices metadata + steps : List[Step] + List of Step objects to search for tasks. + metadata_key : str + Metadata key to filter tasks on (e.g., 'foreach-execution-path'). + metadata_pattern : str + Regular expression pattern to match against the metadata value. - Returns - ------- - Iterator[Task] - Tasks matching the foreach path pattern + Yields + ------ + str + Pathspec of each task whose metadata value for the specified key matches the pattern. """ flow_id, run_id, _, _ = self.path_components - for step in steps: task_pathspecs = self._metaflow.metadata.filter_tasks_by_metadata( flow_id, run_id, step.id, metadata_key, metadata_pattern ) for task_pathspec in task_pathspecs: - yield Task(pathspec=task_pathspec, _namespace_check=False) + yield task_pathspec @property - def parent_tasks(self) -> Iterator["Task"]: + def parent_task_pathspecs(self) -> Iterator[str]: """ - Yields all parent tasks of the current task if one exists. + Yields pathspecs of all parent tasks of the current task. Yields ------ - Task - Parent task of the current task - + str + Pathspec of the parent task of the current task """ flow_id, run_id, _, _ = self.path_components - steps = list(self.parent.parent_steps) - if not steps: - return [] - current_path = self.metadata_dict.get("foreach-execution-path", "") if len(steps) > 1: # Static join - use exact path matching pattern = current_path or ".*" - yield from self._iter_matching_tasks( - steps, "foreach-execution-path", pattern - ) - return - - # Handle single step case - target_task = Step( - f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False - ).task - target_path = target_task.metadata_dict.get("foreach-execution-path") - - if not target_path or not current_path: - # (Current task, "A:10") and (Parent task, "") - # Pattern: ".*" - pattern = ".*" else: - current_depth = len(current_path.split(",")) - target_depth = len(target_path.split(",")) - - if current_depth < target_depth: - # Foreach join - # (Current task, "A:10,B:13") and (Parent task, "A:10,B:13,C:21") - # Pattern: "A:10,B:13,.*" - pattern = f"{current_path},.*" + if not steps: + return # No parent steps, yield nothing + # Handle single step case + target_task = Step( + f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False + ).task + target_path = target_task.metadata_dict.get("foreach-execution-path") + + if not target_path or not current_path: + # (Current task, "A:10") and (Parent task, "") + # Pattern: ".*" + pattern = ".*" else: - # Foreach split or linear step - # Option 1: - # (Current task, "A:10,B:13,C:21") and (Parent task, "A:10,B:13") - # Option 2: - # (Current task, "A:10,B:13") and (Parent task, "A:10,B:13") - # Pattern: "A:10,B:13" - pattern = ",".join(current_path.split(",")[:target_depth]) - - yield from self._iter_matching_tasks(steps, "foreach-execution-path", pattern) + current_depth = len(current_path.split(",")) + target_depth = len(target_path.split(",")) + + if current_depth < target_depth: + # Foreach join + # (Current task, "A:10,B:13") and (Parent task, "A:10,B:13,C:21") + # Pattern: "A:10,B:13,.*" + pattern = f"{current_path},.*" + else: + # Foreach split or linear step + # Option 1: + # (Current task, "A:10,B:13,C:21") and (Parent task, "A:10,B:13") + # Option 2: + # (Current task, "A:10,B:13") and (Parent task, "A:10,B:13") + # Pattern: "A:10,B:13" + pattern = ",".join(current_path.split(",")[:target_depth]) + + metadata_key = "foreach-execution-path" + for pathspec in self._get_matching_pathspecs(steps, metadata_key, pattern): + yield pathspec @property - def child_tasks(self) -> Iterator["Task"]: + def child_task_pathspecs(self) -> Iterator[str]: """ - Yield all child tasks of the current task if one exists. + Yields pathspecs of all child tasks of the current task. Yields ------ - Task - Child task of the current task + str + Pathspec of the child task of the current task """ flow_id, run_id, _, _ = self.path_components steps = list(self.parent.child_steps) - if not steps: - return [] - current_path = self.metadata_dict.get("foreach-execution-path", "") if len(steps) > 1: # Static split - use exact path matching pattern = current_path or ".*" - yield from self._iter_matching_tasks( - steps, "foreach-execution-path", pattern - ) - return - - # Handle single step case - target_task = Step( - f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False - ).task - target_path = target_task.metadata_dict.get("foreach-execution-path") - - if not target_path or not current_path: - # (Current task, "A:10") and (Child task, "") - # Pattern: ".*" - pattern = ".*" else: - current_depth = len(current_path.split(",")) - target_depth = len(target_path.split(",")) - - if current_depth < target_depth: - # Foreach split - # (Current task, "A:10,B:13") and (Child task, "A:10,B:13,C:21") - # Pattern: "A:10,B:13,.*" - pattern = f"{current_path},.*" + if not steps: + return # No child steps, yield nothing + # Handle single step case + target_task = Step( + f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False + ).task + target_path = target_task.metadata_dict.get("foreach-execution-path") + + if not target_path or not current_path: + # (Current task, "A:10") and (Child task, "") + # Pattern: ".*" + pattern = ".*" else: - # Foreach join or linear step - # Option 1: - # (Current task, "A:10,B:13,C:21") and (Child task, "A:10,B:13") - # Option 2: - # (Current task, "A:10,B:13") and (Child task, "A:10,B:13") - # Pattern: "A:10,B:13" - pattern = ",".join(current_path.split(",")[:target_depth]) - - yield from self._iter_matching_tasks(steps, "foreach-execution-path", pattern) + current_depth = len(current_path.split(",")) + target_depth = len(target_path.split(",")) + + if current_depth < target_depth: + # Foreach split + # (Current task, "A:10,B:13") and (Child task, "A:10,B:13,C:21") + # Pattern: "A:10,B:13,.*" + pattern = f"{current_path},.*" + else: + # Foreach join or linear step + # Option 1: + # (Current task, "A:10,B:13,C:21") and (Child task, "A:10,B:13") + # Option 2: + # (Current task, "A:10,B:13") and (Child task, "A:10,B:13") + # Pattern: "A:10,B:13" + pattern = ",".join(current_path.split(",")[:target_depth]) + + metadata_key = "foreach-execution-path" + for pathspec in self._get_matching_pathspecs(steps, metadata_key, pattern): + yield pathspec + + @property + def parent_tasks(self) -> Iterator["Task"]: + """ + Yields all parent tasks of the current task if one exists. + + Yields + ------ + Task + Parent task of the current task + """ + for pathspec in self.parent_task_pathspecs: + yield Task(pathspec=pathspec, _namespace_check=False) + + @property + def child_tasks(self) -> Iterator["Task"]: + """ + Yields all child tasks of the current task if one exists. + + Yields + ------ + Task + Child task of the current task + """ + for pathspec in self.child_task_pathspecs: + yield Task(pathspec=pathspec, _namespace_check=False) @property def metadata(self) -> List[Metadata]: diff --git a/test/core/tests/runtime_dag.py b/test/core/tests/runtime_dag.py index da623b967f7..0e52baf7a61 100644 --- a/test/core/tests/runtime_dag.py +++ b/test/core/tests/runtime_dag.py @@ -59,7 +59,15 @@ def _equals_task(task1, task2): for name, value in type(task1).__dict__.items() if isinstance(value, property) if name - not in ["parent_tasks", "child_tasks", "metadata", "data", "artifacts"] + not in [ + "parent_tasks", + "parent_task_pathspecs", + "child_tasks", + "child_task_pathspecs", + "metadata", + "data", + "artifacts", + ] ] for prop_name in properties: