Skip to content
This repository was archived by the owner on Jul 25, 2022. It is now read-only.

Fix worker #1935

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
After discussions we've decided to restore the resource manager to its
prior functionality for the current release and rely on most clients
using the monitored worker profile.
- Revert resource manager change
- Update profile in integration tests
- Remove skip_if_satisfied logic from job_scheduler

Signed-off-by: johnsonw <wjohnson@whamcloud.com>
johnsonw committed Jun 17, 2020
commit dc1b91d0b44db656824d3d967d13aaf575dd3b97
16 changes: 2 additions & 14 deletions chroma_core/lib/job.py
Original file line number Diff line number Diff line change
@@ -46,13 +46,7 @@ def satisfied(self):

class DependOn(Dependable):
def __init__(
self,
stateful_object,
preferred_state,
acceptable_states=None,
unacceptable_states=None,
fix_state=None,
skip_if_satisfied=False,
self, stateful_object, preferred_state, acceptable_states=None, unacceptable_states=None, fix_state=None
):
"""preferred_state: what we will try to put the dependency into if
it is not already in one of acceptable_states.
@@ -86,8 +80,6 @@ def __init__(
self.fix_state = fix_state
self.stateful_object = stateful_object

self.skip_if_satisfied = skip_if_satisfied

def __str__(self):
return "%s %s %s %s" % (self.stateful_object, self.preferred_state, self.acceptable_states, self.fix_state)

@@ -100,11 +92,7 @@ def satisfied(self):
except:
self.stateful_object.__class__._base_manager.get(pk=self.stateful_object.pk)

if self.skip_if_satisfied:
satisfied = depended_object.state in self.stateful_object.get_current_route()
else:
satisfied = depended_object.state in self.acceptable_states

satisfied = depended_object.state in self.acceptable_states
if not satisfied:
job_log.warning(
"DependOn not satisfied: %s in state %s, not one of %s (preferred %s)"
3 changes: 0 additions & 3 deletions chroma_core/models/client_mount.py
Original file line number Diff line number Diff line change
@@ -144,7 +144,6 @@ class MountLustreClientJob(StateChangeJob):
stateful_object = "lustre_client_mount"
lustre_client_mount = models.ForeignKey(LustreClientMount, on_delete=CASCADE)
state_verb = None
skip_if_satisfied = True

@classmethod
def long_description(cls, stateful_object):
@@ -184,7 +183,6 @@ class UnmountLustreClientMountJob(StateChangeJob):
stateful_object = "lustre_client_mount"
lustre_client_mount = models.ForeignKey(LustreClientMount, on_delete=CASCADE)
state_verb = None
skip_if_satisfied = True

@classmethod
def long_description(cls, stateful_object):
@@ -221,7 +219,6 @@ class RemoveLustreClientJob(StateChangeJob):
stateful_object = "lustre_client_mount"
lustre_client_mount = models.ForeignKey(LustreClientMount, on_delete=CASCADE)
state_verb = None
skip_if_satisfied = True

@classmethod
def long_description(cls, stateful_object):
28 changes: 2 additions & 26 deletions chroma_core/models/jobs.py
Original file line number Diff line number Diff line change
@@ -42,8 +42,6 @@ class Meta:
route_map = None
transition_map = None
job_class_map = None
_begin_state = None
_end_state = None

reverse_deps = {}

@@ -63,12 +61,6 @@ def set_state(self, state, intentional=False):
self.state = state
self.state_modified_at = now()

def set_begin_state(self, begin_state):
self._begin_state = begin_state

def set_end_state(self, end_state):
self._end_state = end_state

def not_state(self, state):
return list(set(self.states) - set([state]))

@@ -198,12 +190,6 @@ def get_route(cls, begin_state, end_state):
except KeyError:
raise SchedulingError("%s->%s not legal state transition for %s" % (begin_state, end_state, cls))

def get_current_route(self):
if self._begin_state and self._end_state:
return self.get_route(self._begin_state, self._end_state)

return []

def get_available_states(self, begin_state):
"""States which should be advertised externally (i.e. exclude states which
are used internally but don't make sense when requested externally, for example
@@ -399,8 +385,6 @@ def get_confirmation_string(self):
#: Whether the job can be safely cancelled
cancellable = True

skip_if_satisfied = False

class Meta:
app_label = "chroma_core"
ordering = ["id"]
@@ -438,21 +422,13 @@ def all_deps(self, dep_cache):
dependent_dependency.stateful_object == stateful_object
and not new_state in dependent_dependency.acceptable_states
):
if dependent.state != dependent_dependency.fix_state:
dependent.set_begin_state(dependent.state)
dependent.set_end_state(dependent_dependency.fix_state)

dependent_deps.append(
DependOn(
dependent, dependent_dependency.fix_state, skip_if_satisfied=self.skip_if_satisfied
)
)
dependent_deps.append(DependOn(dependent, dependent_dependency.fix_state))

return DependAll(
DependAll(dependent_deps),
dep_cache.get(self),
dep_cache.get(stateful_object, new_state),
DependOn(stateful_object, self.old_state, skip_if_satisfied=self.skip_if_satisfied),
DependOn(stateful_object, self.old_state),
)
else:
return dep_cache.get(self)
3 changes: 0 additions & 3 deletions chroma_core/models/lnet_configuration.py
Original file line number Diff line number Diff line change
@@ -279,7 +279,6 @@ def get_steps(self):
class EnableLNetJob(NullStateChangeJob):
target_object = models.ForeignKey(LNetConfiguration, on_delete=CASCADE)
state_transition = StateChangeJob.StateTransition(LNetConfiguration, "unconfigured", "lnet_unloaded")
skip_if_satisfied = True

class Meta:
app_label = "chroma_core"
@@ -321,7 +320,6 @@ class LoadLNetJob(LNetStateChangeJob):
stateful_object = "lnet_configuration"
lnet_configuration = models.ForeignKey(LNetConfiguration, on_delete=CASCADE)
state_verb = "Load LNet"
skip_if_satisfied = True

display_group = Job.JOB_GROUPS.COMMON
display_order = 30
@@ -361,7 +359,6 @@ class StartLNetJob(LNetStateChangeJob):
stateful_object = "lnet_configuration"
lnet_configuration = models.ForeignKey(LNetConfiguration, on_delete=CASCADE)
state_verb = "Start LNet"
skip_if_satisfied = True

display_group = Job.JOB_GROUPS.COMMON
display_order = 40
43 changes: 2 additions & 41 deletions chroma_core/services/job_scheduler/job_scheduler.py
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@
from chroma_core.models import ManagedMdt
from chroma_core.models import FilesystemMember
from chroma_core.models import ConfigureLNetJob
from chroma_core.models import ManagedTarget, ApplyConfParams, ManagedOst, Job, DeletableStatefulObject, StatefulObject
from chroma_core.models import ManagedTarget, ApplyConfParams, ManagedOst, Job, DeletableStatefulObject
from chroma_core.models import StepResult
from chroma_core.models import (
ManagedMgs,
@@ -60,7 +60,6 @@
StratagemConfiguration,
)
from chroma_core.models import Task, CreateTaskJob
from chroma_core.models.jobs import StateChangeJob
from chroma_core.services.job_scheduler.dep_cache import DepCache
from chroma_core.services.job_scheduler.lock_cache import LockCache, lock_change_receiver, to_lock_json
from chroma_core.services.job_scheduler.command_plan import CommandPlan
@@ -403,14 +402,6 @@ def add_command(self, command, jobs):
"""
for job in jobs:
self.add(job)

stateful_objects = list(
set([x.get_stateful_object() for x in self._jobs.values() if isinstance(x, StateChangeJob)])
)
for so in stateful_objects:
so.set_begin_state(self.get_stateful_object_begin_state(so))
so.set_end_state(self.get_stateful_object_end_state(so))

self._commands[command.id] = command
self._command_to_jobs[command.id] |= set([j.id for j in jobs])
for job in jobs:
@@ -463,23 +454,6 @@ def update_many(self, jobs, new_state):

Job.objects.filter(id__in=[j.id for j in jobs]).update(state=new_state)

def get_jobs_by_stateful_object(self, so):
return [
x
for x in self._jobs.values()
if isinstance(x, StateChangeJob) and type(x.get_stateful_object()) is type(so)
]

def get_stateful_object_begin_state(self, so):
first_job = self.get_jobs_by_stateful_object(so)[:+1]
if first_job:
return first_job[0].state_transition.old_state

def get_stateful_object_end_state(self, so):
last_job = self.get_jobs_by_stateful_object(so)[-1:]
if last_job:
return last_job[0].state_transition.new_state

@property
def ready_jobs(self):
result = []
@@ -593,20 +567,7 @@ def _check_jobs(self, jobs, dep_cache):
log.error("Job %d: exception in get_steps: %s" % (job.id, traceback.format_exc()))
cancel_jobs.append(job)
else:
if isinstance(job, StateChangeJob):
so = job.get_stateful_object()
stateful_object = so.__class__._base_manager.get(pk=so.pk)
state = stateful_object.state

if state != job.old_state and job.skip_if_satisfied:
if state in so.get_current_route():
self._complete_job(job, False, False)
else:
cancel_jobs.append(job)
else:
ok_jobs.append(job)
else:
ok_jobs.append(job)
ok_jobs.append(job)

return ok_jobs, cancel_jobs

30 changes: 17 additions & 13 deletions chroma_core/services/plugin_runner/resource_manager.py
Original file line number Diff line number Diff line change
@@ -901,19 +901,23 @@ def _persist_nid_updates(self, scannable_id, changed_resource_id, changed_attrs)
for lnet_state_resource in node_resources[LNETModules]:
lnet_state = lnet_state_resource.to_resource()

if lnet_state.host_id == host.id:
lnet_configuration = LNetConfiguration.objects.get(host=lnet_state.host_id)

# Truthfully this should use the notify which I've added as a comment to show the correct way. The problem is that
# during the ConfigureLNetJob the state is changed to unloaded and this masks the notify in some way the is probably
# as planned but prevents it being set back. What we really need is to somehow get a single command that goes
# to a state and then to another state. post_dependencies almost. At present I can't see how to do this so I am leaving
# this code as is.
lnet_configuration.set_state(lnet_state.state)
lnet_configuration.save()
# JobSchedulerClient.notify(lnet_configuration, now(), {'state': lnet_state.state})

log.debug("_persist_nid_updates lnet_configuration %s" % lnet_configuration)
# Really this code should be more tightly tied to the lnet_configuration classes, but in a one step
# at a time approach. Until lnet is !unconfigured we should not be updating it's state.
# Double if because the first if should be removed really, in some more perfect future.
if host.lnet_configuration.state != "unconfigured":
if lnet_state.host_id == host.id:
lnet_configuration = LNetConfiguration.objects.get(host=lnet_state.host_id)

# Truthfully this should use the notify which I've added as a comment to show the correct way. The problem is that
# during the ConfigureLNetJob the state is changed to unloaded and this masks the notify in some way the is probably
# as planned but prevents it being set back. What we really need is to somehow get a single command that goes
# to a state and then to another state. post_dependencies almost. At present I can't see how to do this so I am leaving
# this code as is.
lnet_configuration.set_state(lnet_state.state)
lnet_configuration.save()
# JobSchedulerClient.notify(lnet_configuration, now(), {'state': lnet_state.state})

log.debug("_persist_nid_updates lnet_configuration %s" % lnet_configuration)

# Only get the lnet_configuration if we actually have a LNetInterface (nid) to add.
if len(node_resources[LNETInterface]) > 0:
12 changes: 5 additions & 7 deletions iml-system-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -75,22 +75,20 @@ pub const STRATAGEM_SERVER_PROFILE: &str = r#"{

pub const STRATAGEM_CLIENT_PROFILE: &str = r#"{
"ui_name": "Stratagem Client Node",
"managed": true,
"managed": false,
"worker": true,
"name": "stratagem_client",
"initial_state": "managed",
"ntp": true,
"initial_state": "monitored",
"ntp": false,
"corosync": false,
"corosync2": false,
"pacemaker": false,
"ui_description": "A client that can receive stratagem data",
"packages": [
"python2-iml-agent-management",
"lustre-client"
"python2-iml-agent-management"
],
"repolist": [
"base",
"lustre-client"
"base"
]
}
"#;
4 changes: 0 additions & 4 deletions tests/unit/services/job_scheduler/job_test_case.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@
from chroma_core.models import ManagedTarget
from chroma_core.models import ManagedTargetMount
from chroma_core.models import Nid
from chroma_core.models import StateChangeJob
from chroma_core.services.plugin_runner.agent_daemon_interface import AgentDaemonRpcInterface
from chroma_core.services.queue import ServiceQueue
from chroma_core.services.rpc import ServiceRpcInterface
@@ -191,9 +190,6 @@ def _spawn_job(job):
def run_next():
while True:
runnable_jobs = self.job_scheduler._job_collection.ready_jobs
for job in runnable_jobs:
if isinstance(job, StateChangeJob):
job.skip_if_satisfied = False

log.info(
"run_next: %d runnable jobs of (%d pending, %d tasked)"