Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job nested process workflow #748

Merged
merged 24 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4c2c023
prepare workflow input parsing from nested process output
fmigneault Oct 28, 2024
cedb286
[wip] workaround conceptual nested-process recursive colander/cornice…
fmigneault Oct 29, 2024
bf4ada2
[wip] workaround conceptual nested-process recursive colander/cornice…
fmigneault Nov 6, 2024
f9fa75f
[wip] add input 'properties' modifier support (relates to https://git…
fmigneault Nov 6, 2024
e1701a6
[wip] handle recursive deserialize for nested process
fmigneault Nov 6, 2024
471713f
functional recursive nested-processes JSON schema and body deserializ…
fmigneault Nov 6, 2024
d66f07a
[wip] add nested workflow test
fmigneault Nov 6, 2024
19477d5
Merge branch 'job-nested-process-workflow' of github-perso:crim-ca/we…
fmigneault Nov 6, 2024
5005873
[wip] test nested process workflow - fixed up to nested invocation
fmigneault Nov 7, 2024
758dd3e
functional test with working nested processes chaining
fmigneault Nov 9, 2024
552949b
fix tests and undo tmp edits
fmigneault Nov 9, 2024
0c005e4
fix colander schema type for OpenAPI generation
fmigneault Nov 9, 2024
748ef59
Merge branch 'job-management-execute' into job-nested-process-workflow
fmigneault Nov 11, 2024
e1eaa8c
fix modified job patch schema definition to refer to expected execute…
fmigneault Nov 11, 2024
dcfc22e
fix again for patch job update schema
fmigneault Nov 11, 2024
1554874
revert properties field modifiers (separte PR)
fmigneault Nov 11, 2024
7216d82
update changelog with nested process execution (fixes https://github.…
fmigneault Nov 11, 2024
3224103
fix linting
fmigneault Nov 11, 2024
dbadce9
fix linting
fmigneault Nov 11, 2024
30d350f
fix doc8 linting
fmigneault Nov 13, 2024
e22c07a
fix imports linting
fmigneault Nov 13, 2024
5650777
improve coverage and raised errors by nested process execution
fmigneault Nov 14, 2024
ef93c74
fix coverage of binding over colander extra keywords
fmigneault Nov 14, 2024
edd3abc
fix typing
fmigneault Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ reports

## PyCharm
*.idea
*.run

## Intellij
*.iml
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ testdata.json

## PyCharm
*.idea
*.run

## Intellij
*.iml
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Changes

Changes:
--------
- Add support of *OGC API - Processes - Part 3: Workflows and Chaining* with *Nested Process* ad-hoc workflow
definitions directly submitted for execution (fixes `#747 <https://github.com/crim-ca/weaver/issues/747>`_,
relates to `#412 <https://github.com/crim-ca/weaver/issues/412>`_).
- Add support of *OGC API - Processes - Part 4: Job Management* endpoints for `Job` creation and execution
(fixes `#716 <https://github.com/crim-ca/weaver/issues/716>`_).
- Add ``headers``, ``mode`` and ``response`` parameters along the ``inputs`` and ``outputs`` returned by
Expand Down Expand Up @@ -75,6 +78,7 @@ Fixes:
- Add the appropriate HTTP error type to respect ``/conf/dru/deploy/unsupported-content-type``
(fixes `#624 <https://github.com/crim-ca/weaver/issues/624>`_).
- Fix S3 bucket storage for result file missing the output ID in the path to match local WPS output storage structure.
- Fix rendering of the ``deprecated`` property in `OpenAPI` representation.

.. _changes_5.9.0:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# YAML representation supported by WeaverClient
processDescription:
process:
id: EchoResultsTester
version: "1.0" # must be string, avoid interpretation as float
executionUnit:
# note: This does not work by itself! The test suite injects the file dynamically.
Expand Down
175 changes: 152 additions & 23 deletions tests/functional/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
ExecutionInputsMap,
ExecutionResults,
HeadersType,
JSON,
ProcessDeployment,
ProcessExecution,
SettingsType
Expand Down Expand Up @@ -119,6 +120,7 @@ class WorkflowProcesses(enum.Enum):
APP_ECHO = "Echo"
APP_ECHO_OPTIONAL = "EchoOptional"
APP_ECHO_SECRETS = "EchoSecrets"
APP_ECHO_RESULTS_TESTER = "EchoResultsTester"
APP_ICE_DAYS = "Finch_IceDays"
APP_READ_FILE = "ReadFile"
APP_SUBSET_BBOX = "ColibriFlyingpigeon_SubsetBbox"
Expand Down Expand Up @@ -166,7 +168,8 @@ def __init__(self,
application_package=None, # type: Optional[CWL]
): # type: (...) -> None
self.pid = WorkflowProcesses(process_id) # type: WorkflowProcesses
self.id = self.pid.value # type: Optional[str] # noqa
self.id = self.pid.value # type: str
self.path = f"/processes/{self.id}" # type: str
self.test_id = test_id # type: Optional[str]
self.deploy_payload = deploy_payload # type: Optional[ProcessDeployment]
self.execute_payload = execute_payload # type: Optional[ProcessExecution]
Expand Down Expand Up @@ -208,7 +211,7 @@ class WorkflowTestRunnerBase(ResourcesUtil, TestCase):
"""
Used between various TestCase runs.
"""
logger_level = logging.INFO # type: int
logger_level = logging.INFO # type: AnyLogLevel
logger_enabled = True # type: bool
logger = None # type: Optional[logging.Logger]
# setting indent to `None` disables pretty-printing of JSON payload
Expand Down Expand Up @@ -820,6 +823,7 @@ def workflow_runner(
log_full_trace, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
): # type: (...) -> ExecutionResults
...

Expand All @@ -831,6 +835,7 @@ def workflow_runner(
log_full_trace=False, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
detailed_results=True, # type: Literal[True]
): # type: (...) -> DetailedExecutionResults
...
Expand All @@ -842,6 +847,7 @@ def workflow_runner(
log_full_trace=False, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
detailed_results=False, # type: bool
): # type: (...) -> Union[ExecutionResults, DetailedExecutionResults]
"""
Expand All @@ -867,6 +873,9 @@ def workflow_runner(
Function to add further requests mock specifications as needed by the calling test case.
:param override_execute_body:
Alternate execution request content from the default one loaded from the referenced Workflow location.
:param override_execute_path:
Alternate execution request path from the default one employed by the workflow runner.
Must be a supported endpoints (``/jobs``, ``/processes/{pID}/jobs``, ``/processes/{pID}/execution``).
:param detailed_results:
If enabled, each step involved in the :term:`Workflow` chain will provide their respective details
including the :term:`Process` ID, the :term:`Job` UUID, intermediate outputs and logs.
Expand All @@ -879,29 +888,45 @@ def workflow_runner(

# deploy processes and make them visible for workflow
has_duplicate_apps = len(set(test_application_ids)) != len(list(test_application_ids))
path_deploy = "/processes"
for process_id in test_application_ids:
path_visible = f"{path_deploy}/{self.test_processes_info[process_id].id}/visibility"
data_visible = {"value": Visibility.PUBLIC}
allowed_status = [HTTPCreated.code, HTTPConflict.code] if has_duplicate_apps else HTTPCreated.code
self.request("POST", path_deploy, status=allowed_status, headers=self.headers,
json=self.test_processes_info[process_id].deploy_payload,
message="Expect deployed application process.")
self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible,
message="Expect visible application process.")
self.prepare_process(process_id, exists_ok=has_duplicate_apps)

# deploy workflow process itself and make visible
workflow_info = self.test_processes_info[test_workflow_id]
self.request("POST", path_deploy, status=HTTPCreated.code, headers=self.headers,
json=workflow_info.deploy_payload,
message="Expect deployed workflow process.")
process_path = f"{path_deploy}/{workflow_info.id}"
visible_path = f"{process_path}/visibility"
visible = {"value": Visibility.PUBLIC}
resp = self.request("PUT", visible_path, json=visible, status=HTTPOk.code, headers=self.headers)
self.assert_test(lambda: resp.json.get("value") == Visibility.PUBLIC,
message="Process should be public.")
workflow_info = self.prepare_process(test_workflow_id)
status_or_results = self.execute_monitor_process(
workflow_info,
detailed_results=detailed_results,
override_execute_body=override_execute_body,
override_execute_path=override_execute_path,
requests_mock_callback=requests_mock_callback,
)
return status_or_results

def prepare_process(self, process_id, exists_ok=False):
# type: (WorkflowProcesses, bool) -> ProcessInfo
"""
Deploys the process referenced by ID using the available :term:`Application Package` and makes it visible.
"""
proc_info = self.test_processes_info[process_id]
body_deploy = proc_info.deploy_payload
path_deploy = "/processes"
path_visible = f"{proc_info.path}/visibility"
data_visible = {"value": Visibility.PUBLIC}
allowed_status = [HTTPCreated.code, HTTPConflict.code] if exists_ok else HTTPCreated.code
self.request("POST", path_deploy, status=allowed_status, headers=self.headers, json=body_deploy,
message="Expect deployed process.")
self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible,
message="Expect visible process.")
return proc_info

def execute_monitor_process(
self,
process_info, # type: ProcessInfo
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
detailed_results=True, # type: Literal[True]
): # type: (...) -> Union[JSON, DetailedExecutionResults]
with contextlib.ExitStack() as stack_exec:
for data_source_use in [
"weaver.processes.sources.get_data_source_from_url",
Expand All @@ -918,9 +943,9 @@ def workflow_runner(
requests_mock_callback(mock_req)

# execute workflow
execute_body = override_execute_body or workflow_info.execute_payload
execute_body = override_execute_body or process_info.execute_payload
execute_body.setdefault("mode", ExecuteMode.ASYNC)
execute_path = f"{process_path}/jobs"
execute_path = override_execute_path or f"{process_info.path}/jobs"
self.assert_test(lambda: execute_body is not None,
message="Cannot execute workflow without a request body!")
resp = self.request("POST", execute_path, status=HTTPCreated.code,
Expand Down Expand Up @@ -1016,6 +1041,7 @@ def try_retrieve_logs(self, workflow_job_url, detailed_results):
if detailed_results:
details[job_id] = self.extract_job_details(workflow_job_url, workflow_logs)
log_matches = set(re.findall(r".*(https?://.+/jobs/.+(?:/logs)?).*", workflow_logs))
log_matches = {url.strip("[]") for url in log_matches}
log_matches -= {workflow_job_url}
log_matches = {url if url.rstrip("/").endswith("/logs") else f"{url}/logs" for url in log_matches}
for log_url in log_matches:
Expand Down Expand Up @@ -1067,6 +1093,7 @@ class WorkflowTestCase(WorkflowTestRunnerBase):
WorkflowProcesses.APP_DOCKER_STAGE_IMAGES,
WorkflowProcesses.APP_ECHO,
WorkflowProcesses.APP_ECHO_OPTIONAL,
WorkflowProcesses.APP_ECHO_RESULTS_TESTER,
WorkflowProcesses.APP_ECHO_SECRETS,
WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS,
WorkflowProcesses.APP_READ_FILE,
Expand Down Expand Up @@ -1603,3 +1630,105 @@ def test_workflow_optional_input_propagation(self):
with open(path, mode="r", encoding="utf-8") as out_file:
data = out_file.read().strip()
assert data == "test-message", "output from workflow should match the default resolved from input omission"

@pytest.mark.oap_part3
def test_workflow_ad_hoc_nested_process_chaining(self):
"""
Validate the execution of an ad-hoc workflow directly submitted for execution with nested process references.

The test purposely uses two different processes that have different input requirements, but that happen to have
similarly named inputs/outputs, such that we can validate nesting chains the correct parameters at each level.
Similarly, the same process is reused more than once in a nested fashion to make sure they don't get mixed.
Finally, output selection is defined using multi-output processes to make sure filtering is applied inline.
"""
passthrough_process_info = self.prepare_process(WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS)
echo_result_process_info = self.prepare_process(WorkflowProcesses.APP_ECHO_RESULTS_TESTER)

workflow_exec = {
"process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}",
"inputs": {
"message": {
"process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}",
"inputs": {
"message": {
"process": f"{self.WEAVER_RESTAPI_URL}{echo_result_process_info.path}",
"inputs": {
"message": "test"
},
"outputs": {"output_data": {}}
},
"code": 123,
},
"outputs": {"message": {}}
},
"code": 456,
}
}
details = self.execute_monitor_process(
passthrough_process_info,
override_execute_body=workflow_exec,
override_execute_path="/jobs",
)

# note:
# Because we are running an ad-hoc nested chaining of processes rather than the typical 'workflow'
# approach that has all execution steps managed by CWL, each nested call is performed on its own.
# Therefore, the collected details will only contain the logs from the top-most process and its directly
# nested process. Further nested processes will not be embedded within those logs, as the entire nested
# operation is dispatched as a "single process". Retrieve the logs iteratively crawling into the nested jobs.
details_to_process = list(details.values())
while details_to_process:
active_detail = details_to_process.pop(0)
if active_detail["inputs"] == workflow_exec["inputs"]:
continue # top-most process, already got all logs
nested_job_id = active_detail["job"]
nested_job_proc = active_detail["process"]
nested_job_url = f"/processes/{nested_job_proc}/jobs/{nested_job_id}"
_, nested_details = self.try_retrieve_logs(nested_job_url, True)
for job_id, job_detail in nested_details.items():
if job_id not in details:
details[job_id] = job_detail
details_to_process.append(job_detail)

self.assert_test(
lambda: len(details) == 3,
"Jobs amount should match the number of involved nested processes.",
)

# heuristic:
# since all nested processes contain all other definitions as input, we can sort by size deepest->highest
job_details = sorted(details.values(), key=lambda info: len(str(info["inputs"])))

self.assert_test(
lambda: job_details[0]["outputs"] == {
"output_data": {"value": "test"}
}
)
self.assert_test(
lambda: job_details[1]["outputs"] == {
"message": {"value": "test"},
}
)
self.assert_test(
lambda: job_details[2]["outputs"] == {
"message": {"value": "test"},
"code": {"value": 456},
"number": {"value": 3.1416},
"integer": {"value": 3},
}
)

for job_detail in job_details:
job_progresses = [
float(progress) for progress in
re.findall(
# Extra '.*\n' is to make sure we match only the first percent of the current job log per line.
# Because of nested processes and CWL operations, there can be sub-progress percentages as well.
r"([0-9]+(?:\.[0-9]+)?)%.*\n",
job_detail["logs"],
)
]
self.assert_test(
lambda: sorted(job_progresses) == job_progresses,
"Job log progress values should be in sequential order even when involving a nested process execution."
)
2 changes: 1 addition & 1 deletion tests/functional/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def retrieve_payload(cls, process, ref_type=None, ref_name=None, ref_found=False
with open(path_ext, mode="r", encoding="utf-8") as f:
json_payload = yaml.safe_load(f) # both JSON/YAML
return json_payload
if urlparse(path_ext).scheme != "":
if urlparse(path_ext).scheme.startswith("http"):
if ref_found:
return path
resp = cls.request("GET", path, force_requests=True, ignore_errors=True)
Expand Down
6 changes: 6 additions & 0 deletions tests/processes/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
from typing import TYPE_CHECKING, List, cast

import mock
import pytest
from owslib.wps import BoundingBoxDataInput, ComplexDataInput, Input, Process

Expand Down Expand Up @@ -36,6 +37,11 @@ def __init__(self, inputs: List[Input]) -> None:
self.dataInputs = inputs


@mock.patch(
# avoid error on database connection not established
"weaver.processes.execution.log_and_save_update_status_handler",
lambda *_, **__: lambda *_a, **__kw: None,
)
@pytest.mark.parametrize(
["input_data", "input_definition", "expect_input"],
[
Expand Down
5 changes: 4 additions & 1 deletion tests/processes/test_wps3_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def mock_wps_request(method, url, *_, **kwargs):
if method == "PUT":
test_reached_parse_inputs = True # last operation before parsing I/O is setting visibility
return resp
if method == "POST" and url.endswith(f"{test_process}/jobs"):
if method == "POST" and (
url.endswith(f"{test_process}/jobs") or
url.endswith(f"{test_process}/execution")
):
# actual evaluation of intended handling of CWL inputs conversion to WPS-3 execute request
assert kwargs.get("json", {}).get("inputs") == expected_wps_inputs
raise TestDoneEarlyExit("Expected exception raised to skip executed job status monitoring")
Expand Down
4 changes: 3 additions & 1 deletion tests/wps_restapi/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
if TYPE_CHECKING:
from typing import List, Tuple

from weaver.typedefs import JSON # noqa


@pytest.mark.functional
class GenericApiRoutesTestCase(WpsConfigBase):
Expand All @@ -34,7 +36,7 @@ class GenericApiRoutesTestCase(WpsConfigBase):
def test_frontpage_format(self):
resp = self.app.get(sd.api_frontpage_service.path, headers=self.json_headers)
assert resp.status_code == 200
body = resp.json
body = cast("JSON", resp.json)
try:
sd.FrontpageSchema().deserialize(body)
except colander.Invalid as ex:
Expand Down
Loading
Loading