Skip to content

Commit

Permalink
Merge pull request #321 from lsst/tickets/DM-35396
Browse files Browse the repository at this point in the history
DM-35396: Pass quantum ID down to executor so it can be used in provenance
  • Loading branch information
timj authored Feb 4, 2025
2 parents 1d9d8a2 + ee1c935 commit 769567f
Show file tree
Hide file tree
Showing 14 changed files with 67 additions and 29 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: trailing-whitespace
- id: check-toml
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.10.0
rev: 25.1.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -16,13 +16,13 @@ repos:
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.11
- repo: https://github.com/pycqa/isort
rev: 5.13.2
rev: 6.0.0
hooks:
- id: isort
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.7.4
rev: v0.9.3
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-35396.api.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The Quantum ID is now passed through the executors so it can be recorded in the provenance by ``QuantumContext``.
2 changes: 2 additions & 0 deletions doc/changes/DM-35396.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Quantum metadata outputs now record the IDs of all output datasets.
This is stored in an ``outputs`` key.
3 changes: 1 addition & 2 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Module defining CmdLineFwk class and related methods.
"""
"""Module defining CmdLineFwk class and related methods."""

from __future__ import annotations

Expand Down
18 changes: 15 additions & 3 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import sys
import threading
import time
import uuid
from collections.abc import Iterable
from enum import Enum
from typing import Literal
Expand Down Expand Up @@ -124,7 +125,15 @@ def start(
mp_ctx = multiprocessing.get_context(startMethod)
self.process = mp_ctx.Process( # type: ignore[attr-defined]
target=_Job._executeJob,
args=(qe_pickle, task_node_pickle, quantum_pickle, logConfigState, snd_conn, self._fail_fast),
args=(
qe_pickle,
task_node_pickle,
quantum_pickle,
self.qnode.nodeId,
logConfigState,
snd_conn,
self._fail_fast,
),
name=f"task-{self.qnode.quantum.dataId}",
)
# mypy is getting confused by multiprocessing.
Expand All @@ -138,6 +147,7 @@ def _executeJob(
quantumExecutor_pickle: bytes,
task_node_pickle: bytes,
quantum_pickle: bytes,
quantum_id: uuid.UUID | None,
logConfigState: list,
snd_conn: multiprocessing.connection.Connection,
fail_fast: bool,
Expand Down Expand Up @@ -180,7 +190,7 @@ def _executeJob(
# Catch a few known failure modes and stop the process immediately,
# with exception-specific exit code.
try:
_, report = quantumExecutor.execute(task_node, quantum)
_, report = quantumExecutor.execute(task_node, quantum, quantum_id=quantum_id)
except RepeatableQuantumError as exc:
report = QuantumReport.from_exception(
exception=exc,
Expand Down Expand Up @@ -530,7 +540,9 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None:
# exception-specific exit code, but we still want to start
# debugger before exiting if debugging is enabled.
try:
_, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum)
_, quantum_report = self.quantumExecutor.execute(
task_node, qnode.quantum, quantum_id=qnode.nodeId
)
if quantum_report:
report.quantaReports.append(quantum_report)
successCount += 1
Expand Down
8 changes: 7 additions & 1 deletion python/lsst/ctrl/mpexec/quantumGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from .reports import QuantumReport, Report

if TYPE_CHECKING:
import uuid

from lsst.daf.butler import Quantum
from lsst.pipe.base import QuantumGraph
from lsst.pipe.base.pipeline_graph import TaskNode
Expand All @@ -50,7 +52,9 @@ class QuantumExecutor(ABC):
"""

@abstractmethod
def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, QuantumReport | None]:
def execute(
self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None
) -> tuple[Quantum, QuantumReport | None]:
"""Execute single quantum.
Parameters
Expand All @@ -59,6 +63,8 @@ def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, Qu
Task definition structure.
quantum : `~lsst.daf.butler.Quantum`
Quantum for this execution.
quantum_id : `uuid.UUID` or `None`, optional
The ID of the quantum to be executed.
Returns
-------
Expand Down
33 changes: 26 additions & 7 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
# -------------------------------
import logging
import time
import uuid
from collections import defaultdict
from collections.abc import Callable
from itertools import chain
Expand Down Expand Up @@ -168,18 +169,22 @@ def __init__(
collectionTypes=CollectionType.RUN,
)

def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, QuantumReport | None]:
def execute(
self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None
) -> tuple[Quantum, QuantumReport | None]:
# Docstring inherited from QuantumExecutor.execute
assert quantum.dataId is not None, "Quantum DataId cannot be None"

if self.butler is not None:
self.butler.registry.refresh()

result = self._execute(task_node, quantum)
result = self._execute(task_node, quantum, quantum_id=quantum_id)
report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label)
return result, report

def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
def _execute(
self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None
) -> Quantum:
"""Execute the quantum.
Internal implementation of `execute()`.
Expand Down Expand Up @@ -268,7 +273,9 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
task = self.taskFactory.makeTask(task_node, limited_butler, init_input_refs)
logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
try:
quantumMetadata["caveats"] = self.runQuantum(task, quantum, task_node, limited_butler).value
caveats, outputsPut = self.runQuantum(
task, quantum, task_node, limited_butler, quantum_id=quantum_id
)
except Exception as e:
_LOG.error(
"Execution of task '%s' on quantum %s failed. Exception %s: %s",
Expand All @@ -278,6 +285,11 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
str(e),
)
raise
else:
quantumMetadata["caveats"] = caveats.value
# Stringify the UUID for easier compatibility with
# PropertyList.
quantumMetadata["outputs"] = [str(output) for output in outputsPut]
logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type]
fullMetadata = task.getFullMetadata()
fullMetadata["quantum"] = quantumMetadata
Expand Down Expand Up @@ -475,7 +487,8 @@ def runQuantum(
task_node: TaskNode,
/,
limited_butler: LimitedButler,
) -> QuantumSuccessCaveats:
quantum_id: uuid.UUID | None = None,
) -> tuple[QuantumSuccessCaveats, list[uuid.UUID]]:
"""Execute task on a single quantum.
Parameters
Expand All @@ -488,16 +501,21 @@ def runQuantum(
Task definition structure.
limited_butler : `~lsst.daf.butler.LimitedButler`
Butler to use for dataset I/O.
quantum_id : `uuid.UUID` or `None`, optional
ID of the quantum being executed.
Returns
-------
flags : `QuantumSuccessCaveats`
Flags that describe qualified successes.
ids_put : list[ `uuid.UUID` ]
Record of all the dataset IDs that were written by this quantum
being executed.
"""
flags = QuantumSuccessCaveats.NO_CAVEATS

# Create a butler that operates in the context of a quantum
butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources)
butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources, quantum_id=quantum_id)

# Get the input and output references for the task
inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum)
Expand Down Expand Up @@ -546,7 +564,8 @@ def runQuantum(
flags |= QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
if not butlerQC.outputsPut == butlerQC.allOutputs:
flags |= QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
return flags
ids_put = [output[2] for output in butlerQC.outputsPut]
return flags, ids_put

def writeMetadata(
self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler
Expand Down
3 changes: 1 addition & 2 deletions python/lsst/ctrl/mpexec/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Few utility methods used by the rest of a package.
"""
"""Few utility methods used by the rest of a package."""

__all__ = ["printTable", "filterTaskNodes", "subTaskIter"]

Expand Down
3 changes: 1 addition & 2 deletions tests/test_cliCmdCleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Unit tests for ctrl_mpexec CLI cleanup subcommand.
"""
"""Unit tests for ctrl_mpexec CLI cleanup subcommand."""


import os
Expand Down
3 changes: 1 addition & 2 deletions tests/test_cliCmdPurge.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Unit tests for ctrl_mpexec CLI purge subcommand.
"""
"""Unit tests for ctrl_mpexec CLI purge subcommand."""


import os
Expand Down
3 changes: 1 addition & 2 deletions tests/test_cliUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Unit tests for the daf_butler shared CLI options.
"""
"""Unit tests for the daf_butler shared CLI options."""

import unittest

Expand Down
3 changes: 1 addition & 2 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Simple unit test for cmdLineFwk module.
"""
"""Simple unit test for cmdLineFwk module."""

import contextlib
import logging
Expand Down
7 changes: 6 additions & 1 deletion tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph

if TYPE_CHECKING:
import uuid
from collections.abc import Iterator
from multiprocessing.managers import ListProxy

Expand Down Expand Up @@ -88,7 +89,11 @@ def __init__(self, mp: bool = False):
self.quanta = manager.list()

def execute( # type: ignore[override]
self, task_node: TaskNodeMock, /, quantum: QuantumMock # type: ignore[override]
self,
task_node: TaskNodeMock,
/,
quantum: QuantumMock, # type: ignore[override]
quantum_id: uuid.UUID | None = None,
) -> tuple[QuantumMock, QuantumReport | None]:
_LOG.debug("QuantumExecutorMock.execute: task_node=%s dataId=%s", task_node, quantum.dataId)
self._execute_called = True
Expand Down
3 changes: 1 addition & 2 deletions tests/test_preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Simple unit test for PreExecInit class.
"""
"""Simple unit test for PreExecInit class."""

import contextlib
import shutil
Expand Down

0 comments on commit 769567f

Please sign in to comment.