Skip to content

Commit

Permalink
Merge pull request #748 from crim-ca/job-nested-process-workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault authored Nov 16, 2024
2 parents 0cebfa0 + edd3abc commit a9bda01
Show file tree
Hide file tree
Showing 27 changed files with 959 additions and 171 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ reports

## PyCharm
*.idea
*.run

## Intellij
*.iml
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ testdata.json

## PyCharm
*.idea
*.run

## Intellij
*.iml
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Changes

Changes:
--------
- Add support of *OGC API - Processes - Part 3: Workflows and Chaining* with *Nested Process* ad-hoc workflow
definitions directly submitted for execution (fixes `#747 <https://github.com/crim-ca/weaver/issues/747>`_,
relates to `#412 <https://github.com/crim-ca/weaver/issues/412>`_).
- Add support of *OGC API - Processes - Part 4: Job Management* endpoints for `Job` creation and execution
(fixes `#716 <https://github.com/crim-ca/weaver/issues/716>`_).
- Add `CLI` operations ``update_job``, ``trigger_job`` and ``inputs`` corresponding to the required `Job` operations
Expand Down Expand Up @@ -77,6 +80,7 @@ Fixes:
- Add the appropriate HTTP error type to respect ``/conf/dru/deploy/unsupported-content-type``
(fixes `#624 <https://github.com/crim-ca/weaver/issues/624>`_).
- Fix S3 bucket storage for result file missing the output ID in the path to match local WPS output storage structure.
- Fix rendering of the ``deprecated`` property in `OpenAPI` representation.

.. _changes_5.9.0:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# YAML representation supported by WeaverClient
processDescription:
process:
id: EchoResultsTester
version: "1.0" # must be string, avoid interpretation as float
executionUnit:
# note: This does not work by itself! The test suite injects the file dynamically.
Expand Down
175 changes: 152 additions & 23 deletions tests/functional/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
ExecutionInputsMap,
ExecutionResults,
HeadersType,
JSON,
ProcessDeployment,
ProcessExecution,
SettingsType
Expand Down Expand Up @@ -119,6 +120,7 @@ class WorkflowProcesses(enum.Enum):
APP_ECHO = "Echo"
APP_ECHO_OPTIONAL = "EchoOptional"
APP_ECHO_SECRETS = "EchoSecrets"
APP_ECHO_RESULTS_TESTER = "EchoResultsTester"
APP_ICE_DAYS = "Finch_IceDays"
APP_READ_FILE = "ReadFile"
APP_SUBSET_BBOX = "ColibriFlyingpigeon_SubsetBbox"
Expand Down Expand Up @@ -166,7 +168,8 @@ def __init__(self,
application_package=None, # type: Optional[CWL]
): # type: (...) -> None
self.pid = WorkflowProcesses(process_id) # type: WorkflowProcesses
self.id = self.pid.value # type: Optional[str] # noqa
self.id = self.pid.value # type: str
self.path = f"/processes/{self.id}" # type: str
self.test_id = test_id # type: Optional[str]
self.deploy_payload = deploy_payload # type: Optional[ProcessDeployment]
self.execute_payload = execute_payload # type: Optional[ProcessExecution]
Expand Down Expand Up @@ -208,7 +211,7 @@ class WorkflowTestRunnerBase(ResourcesUtil, TestCase):
"""
Used between various TestCase runs.
"""
logger_level = logging.INFO # type: int
logger_level = logging.INFO # type: AnyLogLevel
logger_enabled = True # type: bool
logger = None # type: Optional[logging.Logger]
# setting indent to `None` disables pretty-printing of JSON payload
Expand Down Expand Up @@ -820,6 +823,7 @@ def workflow_runner(
log_full_trace, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
): # type: (...) -> ExecutionResults
...

Expand All @@ -831,6 +835,7 @@ def workflow_runner(
log_full_trace=False, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
detailed_results=True, # type: Literal[True]
): # type: (...) -> DetailedExecutionResults
...
Expand All @@ -842,6 +847,7 @@ def workflow_runner(
log_full_trace=False, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
detailed_results=False, # type: bool
): # type: (...) -> Union[ExecutionResults, DetailedExecutionResults]
"""
Expand All @@ -867,6 +873,9 @@ def workflow_runner(
Function to add further requests mock specifications as needed by the calling test case.
:param override_execute_body:
Alternate execution request content from the default one loaded from the referenced Workflow location.
:param override_execute_path:
Alternate execution request path from the default one employed by the workflow runner.
Must be a supported endpoints (``/jobs``, ``/processes/{pID}/jobs``, ``/processes/{pID}/execution``).
:param detailed_results:
If enabled, each step involved in the :term:`Workflow` chain will provide their respective details
including the :term:`Process` ID, the :term:`Job` UUID, intermediate outputs and logs.
Expand All @@ -879,29 +888,45 @@ def workflow_runner(

# deploy processes and make them visible for workflow
has_duplicate_apps = len(set(test_application_ids)) != len(list(test_application_ids))
path_deploy = "/processes"
for process_id in test_application_ids:
path_visible = f"{path_deploy}/{self.test_processes_info[process_id].id}/visibility"
data_visible = {"value": Visibility.PUBLIC}
allowed_status = [HTTPCreated.code, HTTPConflict.code] if has_duplicate_apps else HTTPCreated.code
self.request("POST", path_deploy, status=allowed_status, headers=self.headers,
json=self.test_processes_info[process_id].deploy_payload,
message="Expect deployed application process.")
self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible,
message="Expect visible application process.")
self.prepare_process(process_id, exists_ok=has_duplicate_apps)

# deploy workflow process itself and make visible
workflow_info = self.test_processes_info[test_workflow_id]
self.request("POST", path_deploy, status=HTTPCreated.code, headers=self.headers,
json=workflow_info.deploy_payload,
message="Expect deployed workflow process.")
process_path = f"{path_deploy}/{workflow_info.id}"
visible_path = f"{process_path}/visibility"
visible = {"value": Visibility.PUBLIC}
resp = self.request("PUT", visible_path, json=visible, status=HTTPOk.code, headers=self.headers)
self.assert_test(lambda: resp.json.get("value") == Visibility.PUBLIC,
message="Process should be public.")
workflow_info = self.prepare_process(test_workflow_id)
status_or_results = self.execute_monitor_process(
workflow_info,
detailed_results=detailed_results,
override_execute_body=override_execute_body,
override_execute_path=override_execute_path,
requests_mock_callback=requests_mock_callback,
)
return status_or_results

def prepare_process(self, process_id, exists_ok=False):
# type: (WorkflowProcesses, bool) -> ProcessInfo
"""
Deploys the process referenced by ID using the available :term:`Application Package` and makes it visible.
"""
proc_info = self.test_processes_info[process_id]
body_deploy = proc_info.deploy_payload
path_deploy = "/processes"
path_visible = f"{proc_info.path}/visibility"
data_visible = {"value": Visibility.PUBLIC}
allowed_status = [HTTPCreated.code, HTTPConflict.code] if exists_ok else HTTPCreated.code
self.request("POST", path_deploy, status=allowed_status, headers=self.headers, json=body_deploy,
message="Expect deployed process.")
self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible,
message="Expect visible process.")
return proc_info

def execute_monitor_process(
self,
process_info, # type: ProcessInfo
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
detailed_results=True, # type: Literal[True]
): # type: (...) -> Union[JSON, DetailedExecutionResults]
with contextlib.ExitStack() as stack_exec:
for data_source_use in [
"weaver.processes.sources.get_data_source_from_url",
Expand All @@ -918,9 +943,9 @@ def workflow_runner(
requests_mock_callback(mock_req)

# execute workflow
execute_body = override_execute_body or workflow_info.execute_payload
execute_body = override_execute_body or process_info.execute_payload
execute_body.setdefault("mode", ExecuteMode.ASYNC)
execute_path = f"{process_path}/jobs"
execute_path = override_execute_path or f"{process_info.path}/jobs"
self.assert_test(lambda: execute_body is not None,
message="Cannot execute workflow without a request body!")
resp = self.request("POST", execute_path, status=HTTPCreated.code,
Expand Down Expand Up @@ -1016,6 +1041,7 @@ def try_retrieve_logs(self, workflow_job_url, detailed_results):
if detailed_results:
details[job_id] = self.extract_job_details(workflow_job_url, workflow_logs)
log_matches = set(re.findall(r".*(https?://.+/jobs/.+(?:/logs)?).*", workflow_logs))
log_matches = {url.strip("[]") for url in log_matches}
log_matches -= {workflow_job_url}
log_matches = {url if url.rstrip("/").endswith("/logs") else f"{url}/logs" for url in log_matches}
for log_url in log_matches:
Expand Down Expand Up @@ -1067,6 +1093,7 @@ class WorkflowTestCase(WorkflowTestRunnerBase):
WorkflowProcesses.APP_DOCKER_STAGE_IMAGES,
WorkflowProcesses.APP_ECHO,
WorkflowProcesses.APP_ECHO_OPTIONAL,
WorkflowProcesses.APP_ECHO_RESULTS_TESTER,
WorkflowProcesses.APP_ECHO_SECRETS,
WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS,
WorkflowProcesses.APP_READ_FILE,
Expand Down Expand Up @@ -1603,3 +1630,105 @@ def test_workflow_optional_input_propagation(self):
with open(path, mode="r", encoding="utf-8") as out_file:
data = out_file.read().strip()
assert data == "test-message", "output from workflow should match the default resolved from input omission"

@pytest.mark.oap_part3
def test_workflow_ad_hoc_nested_process_chaining(self):
"""
Validate the execution of an ad-hoc workflow directly submitted for execution with nested process references.
The test purposely uses two different processes that have different input requirements, but that happen to have
similarly named inputs/outputs, such that we can validate nesting chains the correct parameters at each level.
Similarly, the same process is reused more than once in a nested fashion to make sure they don't get mixed.
Finally, output selection is defined using multi-output processes to make sure filtering is applied inline.
"""
passthrough_process_info = self.prepare_process(WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS)
echo_result_process_info = self.prepare_process(WorkflowProcesses.APP_ECHO_RESULTS_TESTER)

workflow_exec = {
"process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}",
"inputs": {
"message": {
"process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}",
"inputs": {
"message": {
"process": f"{self.WEAVER_RESTAPI_URL}{echo_result_process_info.path}",
"inputs": {
"message": "test"
},
"outputs": {"output_data": {}}
},
"code": 123,
},
"outputs": {"message": {}}
},
"code": 456,
}
}
details = self.execute_monitor_process(
passthrough_process_info,
override_execute_body=workflow_exec,
override_execute_path="/jobs",
)

# note:
# Because we are running an ad-hoc nested chaining of processes rather than the typical 'workflow'
# approach that has all execution steps managed by CWL, each nested call is performed on its own.
# Therefore, the collected details will only contain the logs from the top-most process and its directly
# nested process. Further nested processes will not be embedded within those logs, as the entire nested
# operation is dispatched as a "single process". Retrieve the logs iteratively crawling into the nested jobs.
details_to_process = list(details.values())
while details_to_process:
active_detail = details_to_process.pop(0)
if active_detail["inputs"] == workflow_exec["inputs"]:
continue # top-most process, already got all logs
nested_job_id = active_detail["job"]
nested_job_proc = active_detail["process"]
nested_job_url = f"/processes/{nested_job_proc}/jobs/{nested_job_id}"
_, nested_details = self.try_retrieve_logs(nested_job_url, True)
for job_id, job_detail in nested_details.items():
if job_id not in details:
details[job_id] = job_detail
details_to_process.append(job_detail)

self.assert_test(
lambda: len(details) == 3,
"Jobs amount should match the number of involved nested processes.",
)

# heuristic:
# since all nested processes contain all other definitions as input, we can sort by size deepest->highest
job_details = sorted(details.values(), key=lambda info: len(str(info["inputs"])))

self.assert_test(
lambda: job_details[0]["outputs"] == {
"output_data": {"value": "test"}
}
)
self.assert_test(
lambda: job_details[1]["outputs"] == {
"message": {"value": "test"},
}
)
self.assert_test(
lambda: job_details[2]["outputs"] == {
"message": {"value": "test"},
"code": {"value": 456},
"number": {"value": 3.1416},
"integer": {"value": 3},
}
)

for job_detail in job_details:
job_progresses = [
float(progress) for progress in
re.findall(
# Extra '.*\n' is to make sure we match only the first percent of the current job log per line.
# Because of nested processes and CWL operations, there can be sub-progress percentages as well.
r"([0-9]+(?:\.[0-9]+)?)%.*\n",
job_detail["logs"],
)
]
self.assert_test(
lambda: sorted(job_progresses) == job_progresses,
"Job log progress values should be in sequential order even when involving a nested process execution."
)
2 changes: 1 addition & 1 deletion tests/functional/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def retrieve_payload(cls, process, ref_type=None, ref_name=None, ref_found=False
with open(path_ext, mode="r", encoding="utf-8") as f:
json_payload = yaml.safe_load(f) # both JSON/YAML
return json_payload
if urlparse(path_ext).scheme != "":
if urlparse(path_ext).scheme.startswith("http"):
if ref_found:
return path
resp = cls.request("GET", path, force_requests=True, ignore_errors=True)
Expand Down
6 changes: 6 additions & 0 deletions tests/processes/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
from typing import TYPE_CHECKING, List, cast

import mock
import pytest
from owslib.wps import BoundingBoxDataInput, ComplexDataInput, Input, Process

Expand Down Expand Up @@ -36,6 +37,11 @@ def __init__(self, inputs: List[Input]) -> None:
self.dataInputs = inputs


@mock.patch(
# avoid error on database connection not established
"weaver.processes.execution.log_and_save_update_status_handler",
lambda *_, **__: lambda *_a, **__kw: None,
)
@pytest.mark.parametrize(
["input_data", "input_definition", "expect_input"],
[
Expand Down
5 changes: 4 additions & 1 deletion tests/processes/test_wps3_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def mock_wps_request(method, url, *_, **kwargs):
if method == "PUT":
test_reached_parse_inputs = True # last operation before parsing I/O is setting visibility
return resp
if method == "POST" and url.endswith(f"{test_process}/jobs"):
if method == "POST" and (
url.endswith(f"{test_process}/jobs") or
url.endswith(f"{test_process}/execution")
):
# actual evaluation of intended handling of CWL inputs conversion to WPS-3 execute request
assert kwargs.get("json", {}).get("inputs") == expected_wps_inputs
raise TestDoneEarlyExit("Expected exception raised to skip executed job status monitoring")
Expand Down
4 changes: 3 additions & 1 deletion tests/wps_restapi/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
if TYPE_CHECKING:
from typing import List, Tuple

from weaver.typedefs import JSON # noqa


@pytest.mark.functional
class GenericApiRoutesTestCase(WpsConfigBase):
Expand All @@ -34,7 +36,7 @@ class GenericApiRoutesTestCase(WpsConfigBase):
def test_frontpage_format(self):
resp = self.app.get(sd.api_frontpage_service.path, headers=self.json_headers)
assert resp.status_code == 200
body = resp.json
body = cast("JSON", resp.json)
try:
sd.FrontpageSchema().deserialize(body)
except colander.Invalid as ex:
Expand Down
Loading

0 comments on commit a9bda01

Please sign in to comment.