Skip to content

Commit

Permalink
add functional test validating process 'ID:revision' resolution durin…
Browse files Browse the repository at this point in the history
…g execution (fixes #799) + fix 'jobControlOptions' mishandling against invalid 'mode' requested by the job
  • Loading branch information
fmigneault committed Feb 28, 2025
1 parent 3e49be5 commit 071392c
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 22 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Fixes:
- Fix resolution of `Process` revisions when queried by multiple ID and/or version combinations on the `WPS` endpoint.
- Fix resolution of `Process` revisions by ``{processID}:{version}`` for execution by `OGC API - Processes` endpoint
(fixes `#799 <https://github.com/crim-ca/weaver/issues/799>`_).
- Fix ``jobControlOptions`` not respected in cases where resolution occurs against a restricted set of capabilities
for a given `Process` when the submitted `Job` requests an invalid combination by execution ``mode`` body parameter.

.. _changes_6.3.0:

Expand Down
6 changes: 3 additions & 3 deletions tests/functional/test_wps_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
if TYPE_CHECKING:
from typing import List

from weaver.typedefs import JSON
from weaver.typedefs import AnyUUID, AnyVersion, JSON


@pytest.mark.wps
Expand Down Expand Up @@ -231,8 +231,8 @@ class WpsAppTestWithProcessRevisions(WpsConfigBase, ResourcesUtil):
"weaver.wps_metadata_identification_title": "Weaver WPS Test Server",
"weaver.wps_metadata_provider_name": "WpsAppTest"
}
process_ids = None
process_revisions = None
process_ids = [] # type: List[AnyUUID]
process_revisions = [] # type: List[List[AnyVersion]]

@classmethod
def setUpClass(cls):
Expand Down
81 changes: 81 additions & 0 deletions tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from weaver.execute import (
ExecuteCollectionFormat,
ExecuteControlOption,
ExecuteMode,
ExecuteResponse,
ExecuteReturnPreference,
Expand Down Expand Up @@ -1870,6 +1871,86 @@ def test_deploy_merge_wps_io_as_mappings(self):
assert proc["outputs"]["out_file"]["title"] == "Result File"
assert proc["outputs"]["out_file"]["formats"][0]["mediaType"] == "text/csv"

def test_execute_process_revision(self):
proc = self.fully_qualified_test_name()
old = "1.0.0"
rev = "1.1.0"
cwl = {
"s:version": old,
"cwlVersion": "v1.0",
"class": "CommandLineTool",
"baseCommand": "echo",
"inputs": {"message": {"type": "string", "inputBinding": {"position": 1}}},
"outputs": {"output": {"type": "string", "outputBinding": {"outputEval": "$(inputs.message)"}}}
}
body = {
"processDescription": {"process": {"id": proc}},
"deploymentProfileName": "http://www.opengis.net/profiles/eoc/wpsApplication",
"executionUnit": [{"unit": cwl}],
}
self.deploy_process(body)

path = f"/processes/{proc}"
data = {"version": rev, "title": "updated", "jobControlOptions": [ExecuteControlOption.SYNC]}
resp = self.app.patch_json(path, params=data, headers=self.json_headers)
assert resp.status_code == 200

exec_value = "test"
exec_body = {
# because it is updated above to be sync-only,
# using async below MUST fail with correctly resolved newer revision
"mode": ExecuteMode.ASYNC,
"response": ExecuteResponse.DOCUMENT,
"inputs": [{"id": "message", "value": exec_value}],
"outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.VALUE}]
}

with contextlib.ExitStack() as stack_exec:
for mock_exec in mocked_execute_celery():
stack_exec.enter_context(mock_exec)

# test the newer revision with async, failure expected
path = f"/processes/{proc}:{rev}/execution"
resp = mocked_sub_requests(
self.app, "post_json", path,
data=exec_body, headers=self.json_headers,
timeout=5, only_local=True, expect_errors=True
)
assert resp.status_code == 422, f"Failed with: [{resp.status_code}]\nReason:\n{resp.text}"

# use the older revision which allowed async, validating that the right process version is invoked
path = f"/processes/{proc}:{old}/execution"
resp = mocked_sub_requests(
self.app, "post_json", path,
data=exec_body, headers=self.json_headers,
timeout=5, only_local=True, expect_errors=True
)
assert resp.status_code == 201, f"Failed with: [{resp.status_code}]\nReason:\n{resp.text}"
status_url = resp.headers.get("Location")
assert f"{proc}:{old}" in status_url
job_id = resp.json.get("jobID")
job = self.job_store.fetch_by_id(job_id)
assert job.process == f"{proc}:{old}"
data = self.get_outputs(status_url, schema=JobInputsOutputsSchema.OGC)
assert data["outputs"]["output"]["value"] == exec_value

# counter-validate the assumption with the latest revision using permitted sync
# use the explicit version to be sure
exec_body["mode"] = ExecuteMode.SYNC
path = f"/processes/{proc}:{rev}/execution"
resp = mocked_sub_requests(
self.app, "post_json", path,
data=exec_body, headers=self.json_headers,
timeout=5, only_local=True, expect_errors=True
)
assert resp.status_code == 200, f"Failed with: [{resp.status_code}]\nReason:\n{resp.text}"
results_url = resp.headers.get("Content-Location") # because sync returns directly, no 'Location' header
assert f"{proc}:{rev}" in results_url
job_id = results_url.split("/results", 1)[0].rsplit("/", 1)[-1]
job = self.job_store.fetch_by_id(job_id)
assert job.process == f"{proc}:{rev}"
assert resp.json["output"] == exec_value

def test_execute_job_with_accept_languages(self):
"""
Test that different accept language matching supported languages all successfully execute and apply them.
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ def check_job_status(_resp, running=False):
assert resp.status_code == 200, f"Error job info:\n{resp.text}"
return resp.json

def get_outputs(self, status_url):
path = get_path_kvp(f"{status_url}/outputs", schema=JobInputsOutputsSchema.OLD)
def get_outputs(self, status_url, schema=JobInputsOutputsSchema.OLD):
path = get_path_kvp(f"{status_url}/outputs", schema=schema)
resp = self.app.get(path, headers=dict(self.json_headers))
body = resp.json
pretty = json.dumps(body, indent=2, ensure_ascii=False)
Expand Down
7 changes: 4 additions & 3 deletions tests/wps_restapi/test_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def setUp(self):
self.process_public,
processDescriptionURL=f"{weaver_api_url}/processes/{self.process_public.identifier}",
processEndpointWPS1=weaver_wps_url,
jobControlOptions=ExecuteControlOption.values(),
)
self.process_store.save_process(public_process)
self.process_store.save_process(self.process_private)
Expand Down Expand Up @@ -223,7 +224,7 @@ def test_get_processes(self):
assert "version" in process and isinstance(process["version"], str)
assert "keywords" in process and isinstance(process["keywords"], list)
assert "metadata" in process and isinstance(process["metadata"], list)
assert len(process["jobControlOptions"]) == 1
assert len(process["jobControlOptions"]) == 2
assert ExecuteControlOption.ASYNC in process["jobControlOptions"]

processes_id = [p["id"] for p in resp.json["processes"]]
Expand Down Expand Up @@ -432,7 +433,7 @@ def test_get_processes_with_tagged_revisions(self):
body = resp.json
assert len(body["processes"]) == proc_total
proc_result = [(proc["id"], proc["version"]) for proc in body["processes"]]
proc_expect = [(proc_id, "0.0.0") for proc_id in proc_no_revs]
proc_expect = [(proc_id, self.process_public.version) for proc_id in proc_no_revs]
proc_expect += [(tag, ver) for tag, ver in zip(proc1_tags, proc1_versions)]
proc_expect += [(tag, ver) for tag, ver in zip(proc2_tags, proc2_versions)]
assert proc_result == sorted(proc_expect)
Expand Down Expand Up @@ -2449,7 +2450,7 @@ def test_execute_process_revision(self):
rev = "1.1.0"
proc = self.process_public.identifier
path = f"/processes/{proc}"
data = {"version": rev, "title": "updated", "jobControlOptions": [ExecuteControlOption.SYNC]}
data = {"version": rev, "title": "updated", "jobControlOptions": [ExecuteControlOption.ASYNC]}
resp = self.app.patch_json(path, params=data, headers=self.json_headers)
assert resp.status_code == 200

Expand Down
7 changes: 7 additions & 0 deletions weaver/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ def values(cls):
"""
return [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC]

@classmethod
def from_mode(cls, mode):
# type: (Optional[AnyExecuteMode]) -> Optional[ExecuteControlOption]
mode = ExecuteMode.get(mode)
ctrl = cls.get(f"{mode}-execute")
return ctrl


class ExecuteReturnPreference(Constants):
MINIMAL = "minimal" # type: ExecuteReturnPreferenceMinimalType
Expand Down
46 changes: 43 additions & 3 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
from pywps.inout.inputs import BoundingBoxInput, ComplexInput

from weaver.datatype import Job
from weaver.execute import AnyExecuteMode
from weaver.execute import AnyExecuteControlOption, AnyExecuteMode
from weaver.processes.convert import OWS_Input_Type, ProcessOWS
from weaver.status import AnyStatusType, StatusType
from weaver.typedefs import (
Expand Down Expand Up @@ -195,16 +195,18 @@ def execute_process(task, job_id, wps_url, headers=None):
# if process refers to a remote WPS provider, pass it down to avoid unnecessary re-fetch request
if job.is_local:
process = None # already got all the information needed pre-loaded in PyWPS service
local_process_id = wps_process.identifier
else:
service = Service(name=job.service, url=wps_url)
process = Process.from_ows(wps_process, service, settings)
local_process_id = None

job.progress = JobProgress.EXECUTE_REQUEST
job.save_log(logger=task_logger, message="Starting job process execution.")
job.save_log(logger=task_logger,
message="Following updates could take a while until the Application Package answers...")

wps_worker = get_pywps_service(environ=settings, is_worker=True)
wps_worker = get_pywps_service(environ=settings, is_worker=True, process_id=local_process_id)
execution = wps_worker.execute_job(job,
wps_inputs=wps_inputs, wps_outputs=wps_outputs,
remote_process=process, headers=headers)
Expand Down Expand Up @@ -931,6 +933,8 @@ def submit_job_handler(payload, # type: ProcessExecution
# as per https://datatracker.ietf.org/doc/html/rfc7240#section-2
# Prefer header not resolved with a valid value should still resume without error
execute_mode = mode
validate_process_exec_mode(job_ctl_opts, execute_mode)

accept_type = validate_job_accept_header(headers, mode)
exec_resp, exec_return = get_job_return(job=None, body=json_body, headers=headers) # job 'None' since still parsing
req_headers = copy.deepcopy(headers or {})
Expand Down Expand Up @@ -1089,7 +1093,7 @@ def update_job_parameters(job, request):
if value == ExecuteMode.AUTO:
continue # don't override previously set value that resolved with default value by omission
if value in [ExecuteMode.ASYNC, ExecuteMode.SYNC]:
job_ctrl_exec = ExecuteControlOption.get(f"{value}-execute")
job_ctrl_exec = ExecuteControlOption.from_mode(value)
if job_ctrl_exec not in job_process.jobControlOptions:
raise HTTPBadRequest(
json=sd.ErrorJsonResponseBodySchema(schema_include=True).deserialize({
Expand Down Expand Up @@ -1252,6 +1256,42 @@ def validate_job_accept_header(headers, execution_mode):
)


def validate_process_exec_mode(job_control_options, execution_mode):
# type: (List[AnyExecuteControlOption], Optional[AnyExecuteMode]) -> None
"""
Verify that a certain :term:`Job` execution mode fulfills the :term:`Process` ``jobControlOptions`` prerequisite.
Assumes that any applicable resolution of the :term:`Job` execution mode (header, query, body, etc.)
and the relevant control options was already performed by any applicable upstream operations.
.. seealso::
- :ref:`proc_exec_mode`
- :func:`parse_prefer_header_execute_mode`
:raises HTTPUnprocessableEntity: If the execution mode is not permitted by the :term:`Process`.
"""
job_ctrl_exec = ExecuteControlOption.from_mode(execution_mode)
if not (job_ctrl_exec in job_control_options or execution_mode in [ExecuteMode.AUTO, None]):
raise HTTPUnprocessableEntity(
json=sd.ErrorJsonResponseBodySchema(schema_include=True).deserialize(
{
"type": "InvalidJobControlOptions",
"title": "Invalid Job Execution Mode specified against permitted Process Job Control Options.",
"detail": "Any hint of a job execution strategy must respect the process prerequisites.",
"status": HTTPUnprocessableEntity.code,
"cause": {"name": "process.jobControlOptions"},
"value": repr_json(
{
"process.jobControlOptions": job_control_options,
"job.mode": execution_mode,
},
force_string=False,
),
}
)
)


def validate_process_id(job_process, payload):
# type: (Process, ProcessExecution) -> None
"""
Expand Down
4 changes: 2 additions & 2 deletions weaver/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ def list_processes(self,
raise NotImplementedError

@abc.abstractmethod
def fetch_by_id(self, process_id, visibility=None):
# type: (AnyProcessRef, Optional[AnyVisibility]) -> Process
def fetch_by_id(self, process_id, visibility=None, revision=False):
# type: (AnyProcessRef, Optional[AnyVisibility], bool) -> Process
raise NotImplementedError

@abc.abstractmethod
Expand Down
12 changes: 10 additions & 2 deletions weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,15 +683,21 @@ def _get_revision_search(self, process_id):
search = {"$or": [{"identifier": sane_tag}, {"identifier": sane_name, "version": version}]}
return search, version

def fetch_by_id(self, process_id, visibility=None):
# type: (AnyProcessRef, Optional[AnyVisibility]) -> Process
def fetch_by_id(self, process_id, visibility=None, revision=False):
# type: (AnyProcessRef, Optional[AnyVisibility], bool) -> Process
"""
Get process for given :paramref:`process_id` from storage, optionally filtered by :paramref:`visibility`.
If ``visibility=None``, the process is retrieved (if existing) regardless of its visibility value.
:param process_id: Process identifier (optionally with version tag).
:param visibility: One value amongst :py:mod:`weaver.visibility`.
:param revision:
Request that the specified 'ID:revision' tag be applied to the retrieved process ID.
Applies only when an explicit 'revision' part is provided in :paramref:`process_id`,
and that the retrieved :class:`Process` corresponds to the latest resvision that is
stored without the ``version`` within its ID. For other revisions, they would already
be applied, and must be provided explicitly either way to retrieve them.
:return: An instance of :class:`weaver.datatype.Process`.
"""
process_id = self._get_process_id(process_id)
Expand All @@ -702,6 +708,8 @@ def fetch_by_id(self, process_id, visibility=None):
process = Process(process)
if version:
process.version = version # ensure version was applied just in case
if revision:
process.identifier = process_id # apply revision in case it was requested explicitly for latest ID
if visibility is not None and process.visibility != visibility:
raise ProcessNotAccessible(f"Process '{process_id}' cannot be accessed.")
return process
Expand Down
22 changes: 16 additions & 6 deletions weaver/wps/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from configparser import ConfigParser
from typing import TYPE_CHECKING
from urllib.parse import urlparse, unquote
from urllib.parse import unquote, urlparse

import colander
from owslib.wps import WPSExecution
Expand Down Expand Up @@ -51,9 +51,9 @@
LOGGER = logging.getLogger(__name__)
if TYPE_CHECKING:
from typing import Any, Deque, Dict, List, Optional, Union
from uuid import UUID

from pywps.inout.basic import ComplexInput
from uuid import UUID

from weaver.datatype import Job
from weaver.typedefs import (
Expand Down Expand Up @@ -426,10 +426,17 @@ def check_invalid_ids(identifiers, content_type):
raise OWSInvalidParameterValue(desc, locator="identifier", json=body)


def get_pywps_service(environ=None, is_worker=False):
# type: (SettingsType, bool) -> WorkerService
def get_pywps_service(environ=None, is_worker=False, process_id=None):
# type: (SettingsType, bool, str) -> WorkerService
"""
Generates the PyWPS Service that provides WPS-1/2 XML endpoint.
:param environ: Environment variables containing application settings and the contextual HTTP request.
:param is_worker: Hint for managing active processes against :class:`WorkerService`.
:param process_id:
Pre-resolved :term:`Process` to use, skipping loading and resolution from the database.
This is typically employed when a previous :term:`WPS` request occurred to identify
the relevant process (i.e.: ``DescribeProcess``), and a following ``Execute`` request.
"""
environ = environ or {}
try:
Expand All @@ -453,8 +460,11 @@ def get_pywps_service(environ=None, is_worker=False):
# call pywps application with processes filtered according to the adapter's definition
process_store = get_db(registry).get_store(StoreProcesses) # type: StoreProcesses
processes_wps = [
process.wps() for process in
process_store.list_processes(visibility=Visibility.PUBLIC, identifiers=proc_ids)
process.wps() for process in (
[process_store.fetch_by_id(visibility=Visibility.PUBLIC, process_id=process_id, revision=True)]
if process_id else
process_store.list_processes(visibility=Visibility.PUBLIC, identifiers=proc_ids)
)
]
service = WorkerService(processes_wps, is_worker=is_worker, settings=settings)
except OWSException:
Expand Down
1 change: 0 additions & 1 deletion weaver/wps_restapi/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ def create_job(request):
ref = get_service(request, provider_id=prov_id)
else:
ref = get_process(process_id=proc_id)
proc_id = None # ensure ref is used, process ID needed only for provider
return submit_job(request, ref, process_id=proc_id, tags=["wps-rest", "ogc-api"])


Expand Down

0 comments on commit 071392c

Please sign in to comment.