Skip to content

Commit

Permalink
Add support for dynamic process requirements
Browse files Browse the repository at this point in the history
  • Loading branch information
plojyon committed Feb 24, 2025
1 parent 922998b commit 61a7099
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Unreleased
Changed
-------
- **BACKWARD INCOMPATIBLE:** Drop support for ``Python 3.11``
- Support dynamic calculation of processes' resource requirements.
- Add ``process_exception`` method to auditlog middleware
- Add ``deleted`` field to ``AnnotationValues`` model
- Remove not used method ``remove_delete_markers`` from the ``AnnotationValue``
Expand Down
11 changes: 11 additions & 0 deletions resolwe/flow/models/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.db import models

from resolwe.permissions.models import PermissionObject
from resolwe.process.resources import PROCESS_RESOURCES

from .base import BaseManagerWithoutVersion, BaseModel
from .data import Data
Expand Down Expand Up @@ -221,11 +222,21 @@ def get_resource_limits(self, data: Optional[Data] = None):
if environment_settings.get(resource, {}).get(self.slug)
}

dynamic_resources = {}
if data is not None:
estimators = PROCESS_RESOURCES.get(self.slug, {})
dynamic_resources = {
resource: estimators[resource](data)
for resource in resources
if resource in estimators
}

# Gather requirements for all resources from all sources.
# The order of requirements determines their priority.
resources_map = ChainMap(
data.process_resources if data is not None else {},
environment_resources,
dynamic_resources,
self.requirements.get("resources", {}),
getattr(settings, "FLOW_PROCESS_RESOURCE_DEFAULTS", {}),
fallback,
Expand Down
16 changes: 16 additions & 0 deletions resolwe/process/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Resource consumption estimators for processes."""

from collections import defaultdict

PROCESS_RESOURCES = defaultdict(dict)


def estimator(process_slug, resource):
"""Register resource consumption estimator."""

def decorator(func):
"""Register resource consumption estimator."""
PROCESS_RESOURCES[process_slug][resource] = func
return func

return decorator
26 changes: 26 additions & 0 deletions resolwe/process/tests/processes/python_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,32 @@ def run(self, inputs, outputs):
print("Storage:", outputs.storage)


class RequirementsProcess2(Process):
slug = "test-python-process-requirements2"
name = "Test Python Process Requirements 2"
version = "0.0.1"
process_type = "data:python:requirements"
requirements = {
"resources": {
"cores": 2,
"memory": 4096,
"storage": 200,
},
}

class Output:
"""Input fields."""

cores = IntegerField(label="Cores")
memory = IntegerField(label="Memory")
storage = IntegerField(label="Storage")

def run(self, inputs, outputs):
outputs.cores = self.requirements["resources"]["cores"]
outputs.memory = self.requirements["resources"]["memory"]
outputs.storage = self.requirements["resources"]["storage"]


class IterateProcess(Process):
slug = "test-python-process-iterate"
name = "Test iterating over filtered objects"
Expand Down
28 changes: 28 additions & 0 deletions resolwe/process/tests/test_python_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
AnnotationValue,
)
from resolwe.permissions.models import Permission, get_anonymous_user
from resolwe.process import resources
from resolwe.test import (
ProcessTestCase,
tag_process,
Expand Down Expand Up @@ -662,6 +663,33 @@ def test_resource_data_override(self):
self.assertEqual(data.output["memory"], 50000)
self.assertEqual(data.output["storage"], 500)

@with_docker_executor
@tag_process("test-python-process-requirements2")
def test_dynamic_resources(self):
data = self.run_process("test-python-process-requirements2")
data.refresh_from_db()
self.assertEqual(data.status, "OK")

self.assertEqual(data.output["cores"], 2)
self.assertEqual(data.output["memory"], 4096)
self.assertEqual(data.output["storage"], 200)

@resources.estimator("test-python-process-requirements2", "cores")
def cores_estimator(data):
return 3

@resources.estimator("test-python-process-requirements2", "storage")
def storage_estimator(data):
return 3.5e9

data = self.run_process("test-python-process-requirements2")
data.refresh_from_db()
self.assertEqual(data.status, "OK")

self.assertEqual(data.output["cores"], 3)
self.assertEqual(data.output["memory"], 4096) # default
self.assertEqual(data.output["storage"], 3.5e9)

@with_docker_executor
@tag_process("test-python-process-iterate")
def test_python_process_iterate(self):
Expand Down

0 comments on commit 61a7099

Please sign in to comment.