From 54621bbde3274f1b1b4ebeb434f95780e6b9bbdb Mon Sep 17 00:00:00 2001 From: johnsonw Date: Fri, 22 May 2020 11:18:12 -0400 Subject: [PATCH 1/6] Updates to fix adding and removing a worker client node in both monitored and managed mode. Fixes #1860. Fixes #1858. Fixes #1859. Signed-off-by: johnsonw --- chroma_core/lib/job.py | 16 +++++- chroma_core/models/client_mount.py | 16 +++++- chroma_core/models/host.py | 39 +++++++------ chroma_core/models/jobs.py | 29 +++++++++- chroma_core/models/lnet_configuration.py | 3 + .../services/job_scheduler/job_scheduler.py | 56 +++++++++++++++++-- .../job_scheduler/job_scheduler_client.py | 4 +- .../services/lustre_audit/update_scan.py | 3 +- .../plugin_runner/resource_manager.py | 30 +++++----- 9 files changed, 147 insertions(+), 49 deletions(-) diff --git a/chroma_core/lib/job.py b/chroma_core/lib/job.py index 16a9dccbf7..c1d3ef691f 100644 --- a/chroma_core/lib/job.py +++ b/chroma_core/lib/job.py @@ -46,7 +46,13 @@ def satisfied(self): class DependOn(Dependable): def __init__( - self, stateful_object, preferred_state, acceptable_states=None, unacceptable_states=None, fix_state=None + self, + stateful_object, + preferred_state, + acceptable_states=None, + unacceptable_states=None, + fix_state=None, + skip_if_satisfied=False, ): """preferred_state: what we will try to put the dependency into if it is not already in one of acceptable_states. @@ -80,6 +86,8 @@ def __init__( self.fix_state = fix_state self.stateful_object = stateful_object + self.skip_if_satisfied = skip_if_satisfied + def __str__(self): return "%s %s %s %s" % (self.stateful_object, self.preferred_state, self.acceptable_states, self.fix_state) @@ -92,7 +100,11 @@ def satisfied(self): except: self.stateful_object.__class__._base_manager.get(pk=self.stateful_object.pk) - satisfied = depended_object.state in self.acceptable_states + if self.skip_if_satisfied: + satisfied = depended_object.state in self.stateful_object.get_current_route() + else: + satisfied = depended_object.state in self.acceptable_states + if not satisfied: job_log.warning( "DependOn not satisfied: %s in state %s, not one of %s (preferred %s)" diff --git a/chroma_core/models/client_mount.py b/chroma_core/models/client_mount.py index 5772f6e241..521f830633 100644 --- a/chroma_core/models/client_mount.py +++ b/chroma_core/models/client_mount.py @@ -26,6 +26,7 @@ class LustreClientMount(DeletableStatefulObject): states = ["unmounted", "mounted", "removed"] initial_state = "unmounted" + skip_if_satisfied = True def __str__(self): return self.get_label() @@ -42,6 +43,10 @@ def get_deps(self, state=None): state = self.state deps = [] + + if self.host.immutable_state: + return DependAll(deps) + if state == "mounted": # Depend on this mount's host having LNet up. If LNet is stopped # on the host, this filesystem will be unmounted first. @@ -140,6 +145,7 @@ class MountLustreClientJob(StateChangeJob): stateful_object = "lustre_client_mount" lustre_client_mount = models.ForeignKey(LustreClientMount, on_delete=CASCADE) state_verb = None + skip_if_satisfied = True @classmethod def long_description(cls, stateful_object): @@ -179,6 +185,7 @@ class UnmountLustreClientMountJob(StateChangeJob): stateful_object = "lustre_client_mount" lustre_client_mount = models.ForeignKey(LustreClientMount, on_delete=CASCADE) state_verb = None + skip_if_satisfied = True @classmethod def long_description(cls, stateful_object): @@ -215,6 +222,7 @@ class RemoveLustreClientJob(StateChangeJob): stateful_object = "lustre_client_mount" lustre_client_mount = models.ForeignKey(LustreClientMount, on_delete=CASCADE) state_verb = None + skip_if_satisfied = True @classmethod def long_description(cls, stateful_object): @@ -353,5 +361,11 @@ def description(self): def get_steps(self): search = lambda cm: (cm.host == self.host and cm.state == "mounted") mounted = ObjectCache.get(LustreClientMount, search) - args = dict(host=self.host, filesystems=[(m.filesystem.mount_path(), m.mountpoint) for m in mounted]) + args = dict( + host=self.host, + filesystems=[ + (ObjectCache.get_one(ManagedFilesystem, lambda mf, mtd=m: mf.name == mtd.filesystem).mount_path(), m.mountpoint) + for m in mounted + ], + ) return [(UnmountLustreFilesystemsStep, args)] diff --git a/chroma_core/models/host.py b/chroma_core/models/host.py index 9c6391e6af..23055e8778 100644 --- a/chroma_core/models/host.py +++ b/chroma_core/models/host.py @@ -797,7 +797,7 @@ class BaseSetupHostJob(NullStateChangeJob): class Meta: abstract = True - def _common_deps(self, lnet_state_required, lnet_acceptable_states, lnet_unacceptable_states): + def _common_deps(self): # It really does not feel right that this is in here, but it does sort of work. These are the things # it is dependent on so create them. Also I can't work out with today's state machine anywhere else to # put them that works. @@ -826,23 +826,13 @@ def _common_deps(self, lnet_state_required, lnet_acceptable_states, lnet_unaccep deps = [] - if self.target_object.lnet_configuration: - deps.append( - DependOn( - self.target_object.lnet_configuration, - lnet_state_required, - lnet_acceptable_states, - lnet_unacceptable_states, - ) - ) - if self.target_object.pacemaker_configuration: deps.append(DependOn(self.target_object.pacemaker_configuration, "started")) if self.target_object.ntp_configuration: deps.append(DependOn(self.target_object.ntp_configuration, "configured")) - return DependAll(deps) + return deps class InitialiseBlockDeviceDriversStep(Step): @@ -871,7 +861,12 @@ def description(self): return help_text["setup_managed_host_on"] % self.target_object def get_deps(self): - return self._common_deps("lnet_up", None, None) + deps = self._common_deps() + + if self.target_object.lnet_configuration: + deps.append(DependOn(self.target_object.lnet_configuration, "lnet_up")) + + return DependAll(deps) def get_steps(self): return [(InitialiseBlockDeviceDriversStep, {"host": self.target_object})] @@ -891,9 +886,9 @@ class Meta: ordering = ["id"] def get_deps(self): - # Moving out of unconfigured into lnet_unloaded will mean that lnet will start monitoring and responding to - # the state. Once we start monitoring any state other than unconfigured is acceptable. - return self._common_deps("lnet_unloaded", None, ["unconfigured"]) + deps = self._common_deps() + + return DependAll(deps) def description(self): return help_text["setup_monitored_host_on"] % self.target_object @@ -913,14 +908,19 @@ class Meta: ordering = ["id"] def get_deps(self): - return self._common_deps("lnet_up", None, None) + deps = self._common_deps() + + if self.target_object.lnet_configuration and not self.target_object.immutable_state: + deps.append(DependOn(self.target_object.lnet_configuration, "lnet_up")) + + return DependAll(deps) def description(self): return help_text["setup_worker_host_on"] % self.target_object @classmethod def can_run(cls, host): - return host.is_managed and host.is_worker and (host.state != "unconfigured") + return host.is_worker and (host.state != "unconfigured") class DetectTargetsStep(Step): @@ -1174,6 +1174,9 @@ def description(self): def get_deps(self): deps = [] + if self.host.immutable_state: + return DependAll(deps) + if self.host.lnet_configuration: deps.append(DependOn(self.host.lnet_configuration, "unconfigured")) diff --git a/chroma_core/models/jobs.py b/chroma_core/models/jobs.py index 6698ca6208..4791f28381 100644 --- a/chroma_core/models/jobs.py +++ b/chroma_core/models/jobs.py @@ -42,6 +42,9 @@ class Meta: route_map = None transition_map = None job_class_map = None + skip_if_satisfied = False + _begin_state = None + _end_state = None reverse_deps = {} @@ -61,6 +64,12 @@ def set_state(self, state, intentional=False): self.state = state self.state_modified_at = now() + def set_begin_state(self, begin_state): + self._begin_state = begin_state + + def set_end_state(self, end_state): + self._end_state = end_state + def not_state(self, state): return list(set(self.states) - set([state])) @@ -190,6 +199,12 @@ def get_route(cls, begin_state, end_state): except KeyError: raise SchedulingError("%s->%s not legal state transition for %s" % (begin_state, end_state, cls)) + def get_current_route(self): + if self._begin_state and self._end_state: + return self.get_route(self._begin_state, self._end_state) + + return [] + def get_available_states(self, begin_state): """States which should be advertised externally (i.e. exclude states which are used internally but don't make sense when requested externally, for example @@ -385,6 +400,8 @@ def get_confirmation_string(self): #: Whether the job can be safely cancelled cancellable = True + skip_if_satisfied = False + class Meta: app_label = "chroma_core" ordering = ["id"] @@ -422,13 +439,21 @@ def all_deps(self, dep_cache): dependent_dependency.stateful_object == stateful_object and not new_state in dependent_dependency.acceptable_states ): - dependent_deps.append(DependOn(dependent, dependent_dependency.fix_state)) + if dependent.state != dependent_dependency.fix_state: + dependent.set_begin_state(dependent.state) + dependent.set_end_state(dependent_dependency.fix_state) + + dependent_deps.append( + DependOn( + dependent, dependent_dependency.fix_state, skip_if_satisfied=dependent.skip_if_satisfied + ) + ) return DependAll( DependAll(dependent_deps), dep_cache.get(self), dep_cache.get(stateful_object, new_state), - DependOn(stateful_object, self.old_state), + DependOn(stateful_object, self.old_state, skip_if_satisfied=self.skip_if_satisfied), ) else: return dep_cache.get(self) diff --git a/chroma_core/models/lnet_configuration.py b/chroma_core/models/lnet_configuration.py index aa6df0705c..78591f5414 100644 --- a/chroma_core/models/lnet_configuration.py +++ b/chroma_core/models/lnet_configuration.py @@ -279,6 +279,7 @@ def get_steps(self): class EnableLNetJob(NullStateChangeJob): target_object = models.ForeignKey(LNetConfiguration, on_delete=CASCADE) state_transition = StateChangeJob.StateTransition(LNetConfiguration, "unconfigured", "lnet_unloaded") + skip_if_satisfied = True class Meta: app_label = "chroma_core" @@ -320,6 +321,7 @@ class LoadLNetJob(LNetStateChangeJob): stateful_object = "lnet_configuration" lnet_configuration = models.ForeignKey(LNetConfiguration, on_delete=CASCADE) state_verb = "Load LNet" + skip_if_satisfied = True display_group = Job.JOB_GROUPS.COMMON display_order = 30 @@ -359,6 +361,7 @@ class StartLNetJob(LNetStateChangeJob): stateful_object = "lnet_configuration" lnet_configuration = models.ForeignKey(LNetConfiguration, on_delete=CASCADE) state_verb = "Start LNet" + skip_if_satisfied = True display_group = Job.JOB_GROUPS.COMMON display_order = 40 diff --git a/chroma_core/services/job_scheduler/job_scheduler.py b/chroma_core/services/job_scheduler/job_scheduler.py index 2d22c2037f..8d9d4a0e44 100644 --- a/chroma_core/services/job_scheduler/job_scheduler.py +++ b/chroma_core/services/job_scheduler/job_scheduler.py @@ -36,7 +36,7 @@ from chroma_core.models import ManagedMdt from chroma_core.models import FilesystemMember from chroma_core.models import ConfigureLNetJob -from chroma_core.models import ManagedTarget, ApplyConfParams, ManagedOst, Job, DeletableStatefulObject +from chroma_core.models import ManagedTarget, ApplyConfParams, ManagedOst, Job, DeletableStatefulObject, StatefulObject from chroma_core.models import StepResult from chroma_core.models import ( ManagedMgs, @@ -60,6 +60,7 @@ StratagemConfiguration, ) from chroma_core.models import Task, CreateTaskJob +from chroma_core.models.jobs import StateChangeJob from chroma_core.services.job_scheduler.dep_cache import DepCache from chroma_core.services.job_scheduler.lock_cache import LockCache, lock_change_receiver, to_lock_json from chroma_core.services.job_scheduler.command_plan import CommandPlan @@ -402,6 +403,14 @@ def add_command(self, command, jobs): """ for job in jobs: self.add(job) + + stateful_objects = list( + set([x.get_stateful_object() for x in self._jobs.values() if isinstance(x, StateChangeJob)]) + ) + for so in stateful_objects: + so.set_begin_state(self.get_stateful_object_begin_state(so)) + so.set_end_state(self.get_stateful_object_end_state(so)) + self._commands[command.id] = command self._command_to_jobs[command.id] |= set([j.id for j in jobs]) for job in jobs: @@ -454,6 +463,23 @@ def update_many(self, jobs, new_state): Job.objects.filter(id__in=[j.id for j in jobs]).update(state=new_state) + def get_jobs_by_stateful_object(self, so): + return [ + x + for x in self._jobs.values() + if isinstance(x, StateChangeJob) and type(x.get_stateful_object()) is type(so) + ] + + def get_stateful_object_begin_state(self, so): + first_job = self.get_jobs_by_stateful_object(so)[:+1] + if first_job: + return first_job[0].state_transition.old_state + + def get_stateful_object_end_state(self, so): + last_job = self.get_jobs_by_stateful_object(so)[-1:] + if last_job: + return last_job[0].state_transition.new_state + @property def ready_jobs(self): result = [] @@ -567,7 +593,20 @@ def _check_jobs(self, jobs, dep_cache): log.error("Job %d: exception in get_steps: %s" % (job.id, traceback.format_exc())) cancel_jobs.append(job) else: - ok_jobs.append(job) + if isinstance(job, StateChangeJob): + so = job.get_stateful_object() + stateful_object = so.__class__._base_manager.get(pk=so.pk) + state = stateful_object.state + + if state != job.old_state and job.skip_if_satisfied: + if state in so.get_current_route(): + self._complete_job(job, False, False) + else: + cancel_jobs.append(job) + else: + ok_jobs.append(job) + else: + ok_jobs.append(job) return ok_jobs, cancel_jobs @@ -1106,14 +1145,16 @@ def key(td): # if we have an entry with 'root'=true then move it to the front of the list before returning the result return sorted(sorted_list, key=lambda entry: entry.get("root", False), reverse=True) - def create_client_mount(self, host_id, filesystem_name, mountpoint): + def create_client_mount(self, host_id, filesystem_name, mountpoint, existing): # RPC-callable host = ObjectCache.get_one(ManagedHost, lambda mh: mh.id == host_id) - mount = self._create_client_mount(host, filesystem_name, mountpoint) + + mount = self._create_client_mount(host, filesystem_name, mountpoint, existing) + self.progress.advance() return mount.id - def _create_client_mount(self, host, filesystem_name, mountpoint): + def _create_client_mount(self, host, filesystem_name, mountpoint, existing=False): # Used for intra-JobScheduler calls log.debug("Creating client mount for %s as %s:%s" % (filesystem_name, host, mountpoint)) @@ -1122,6 +1163,9 @@ def _create_client_mount(self, host, filesystem_name, mountpoint): with transaction.atomic(): mount, created = LustreClientMount.objects.get_or_create(host=host, filesystem=filesystem_name) + if existing: + mount.state = "mounted" + mount.mountpoint = mountpoint mount.save() @@ -1889,7 +1933,7 @@ def run_stratagem(self, mdts, fs_id, stratagem_data): mountpoint = "/mnt/{}".format(filesystem.name) if not client_mount_exists: - self._create_client_mount(client_host, filesystem, mountpoint) + self._create_client_mount(client_host, filesystem.name, mountpoint) client_mount = ObjectCache.get_one( LustreClientMount, lambda mnt: mnt.host_id == client_host.id and mnt.filesystem == filesystem.name diff --git a/chroma_core/services/job_scheduler/job_scheduler_client.py b/chroma_core/services/job_scheduler/job_scheduler_client.py index 8091f3f986..1c7c567059 100644 --- a/chroma_core/services/job_scheduler/job_scheduler_client.py +++ b/chroma_core/services/job_scheduler/job_scheduler_client.py @@ -242,10 +242,10 @@ def create_targets(cls, targets_data): return (list(ManagedTarget.objects.filter(id__in=target_ids)), Command.objects.get(pk=command_id)) @classmethod - def create_client_mount(cls, host, filesystem_name, mountpoint): + def create_client_mount(cls, host, filesystem_name, mountpoint, existing): from chroma_core.models import LustreClientMount - client_mount_id = JobSchedulerRpc().create_client_mount(host.id, filesystem_name, mountpoint) + client_mount_id = JobSchedulerRpc().create_client_mount(host.id, filesystem_name, mountpoint, existing) return LustreClientMount.objects.get(id=client_mount_id) @classmethod diff --git a/chroma_core/services/lustre_audit/update_scan.py b/chroma_core/services/lustre_audit/update_scan.py index a0f375f101..db8da38ce1 100755 --- a/chroma_core/services/lustre_audit/update_scan.py +++ b/chroma_core/services/lustre_audit/update_scan.py @@ -162,7 +162,8 @@ def update_client_mounts(self): log.info("updated mount %s on %s -> active" % (actual_mount["mountpoint"], self.host)) except IndexError: log.info("creating new mount %s on %s" % (actual_mount["mountpoint"], self.host)) - JobSchedulerClient.create_client_mount(self.host, fsname, actual_mount["mountpoint"]) + filesystem = ManagedFilesystem.objects.get(name=fsname) + JobSchedulerClient.create_client_mount(self.host, fsname, actual_mount["mountpoint"], True) def update_target_mounts(self): # If mounts is None then nothing changed since the last update and so we can just return. diff --git a/chroma_core/services/plugin_runner/resource_manager.py b/chroma_core/services/plugin_runner/resource_manager.py index c4015ac436..608a4f5bd0 100644 --- a/chroma_core/services/plugin_runner/resource_manager.py +++ b/chroma_core/services/plugin_runner/resource_manager.py @@ -901,23 +901,19 @@ def _persist_nid_updates(self, scannable_id, changed_resource_id, changed_attrs) for lnet_state_resource in node_resources[LNETModules]: lnet_state = lnet_state_resource.to_resource() - # Really this code should be more tightly tied to the lnet_configuration classes, but in a one step - # at a time approach. Until lnet is !unconfigured we should not be updating it's state. - # Double if because the first if should be removed really, in some more perfect future. - if host.lnet_configuration.state != "unconfigured": - if lnet_state.host_id == host.id: - lnet_configuration = LNetConfiguration.objects.get(host=lnet_state.host_id) - - # Truthfully this should use the notify which I've added as a comment to show the correct way. The problem is that - # during the ConfigureLNetJob the state is changed to unloaded and this masks the notify in some way the is probably - # as planned but prevents it being set back. What we really need is to somehow get a single command that goes - # to a state and then to another state. post_dependencies almost. At present I can't see how to do this so I am leaving - # this code as is. - lnet_configuration.set_state(lnet_state.state) - lnet_configuration.save() - # JobSchedulerClient.notify(lnet_configuration, now(), {'state': lnet_state.state}) - - log.debug("_persist_nid_updates lnet_configuration %s" % lnet_configuration) + if lnet_state.host_id == host.id: + lnet_configuration = LNetConfiguration.objects.get(host=lnet_state.host_id) + + # Truthfully this should use the notify which I've added as a comment to show the correct way. The problem is that + # during the ConfigureLNetJob the state is changed to unloaded and this masks the notify in some way the is probably + # as planned but prevents it being set back. What we really need is to somehow get a single command that goes + # to a state and then to another state. post_dependencies almost. At present I can't see how to do this so I am leaving + # this code as is. + lnet_configuration.set_state(lnet_state.state) + lnet_configuration.save() + # JobSchedulerClient.notify(lnet_configuration, now(), {'state': lnet_state.state}) + + log.debug("_persist_nid_updates lnet_configuration %s" % lnet_configuration) # Only get the lnet_configuration if we actually have a LNetInterface (nid) to add. if len(node_resources[LNETInterface]) > 0: From bf72ff441f3ee226fa70f0168a6f2d62a80b990c Mon Sep 17 00:00:00 2001 From: johnsonw Date: Wed, 10 Jun 2020 19:24:54 -0400 Subject: [PATCH 2/6] - Revert change in stratagem.py; this will be done in a separate patch - Fix unit tests Signed-off-by: johnsonw --- chroma_core/models/stratagem.py | 2 +- chroma_core/services/lustre_audit/update_scan.py | 1 - tests/unit/services/job_scheduler/job_test_case.py | 4 ++++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/chroma_core/models/stratagem.py b/chroma_core/models/stratagem.py index dfd2ca4834..6166c839c4 100644 --- a/chroma_core/models/stratagem.py +++ b/chroma_core/models/stratagem.py @@ -679,7 +679,7 @@ def get_steps(self): client_host = ManagedHost.objects.get( Q(server_profile_id="stratagem_client") | Q(server_profile_id="stratagem_existing_client") ) - client_mount = LustreClientMount.objects.get(host_id=client_host.id, filesystem=self.filesystem.name) + client_mount = LustreClientMount.objects.get(host_id=client_host.id, filesystem=filesystem.name) return [ ( diff --git a/chroma_core/services/lustre_audit/update_scan.py b/chroma_core/services/lustre_audit/update_scan.py index db8da38ce1..9758bf84e4 100755 --- a/chroma_core/services/lustre_audit/update_scan.py +++ b/chroma_core/services/lustre_audit/update_scan.py @@ -162,7 +162,6 @@ def update_client_mounts(self): log.info("updated mount %s on %s -> active" % (actual_mount["mountpoint"], self.host)) except IndexError: log.info("creating new mount %s on %s" % (actual_mount["mountpoint"], self.host)) - filesystem = ManagedFilesystem.objects.get(name=fsname) JobSchedulerClient.create_client_mount(self.host, fsname, actual_mount["mountpoint"], True) def update_target_mounts(self): diff --git a/tests/unit/services/job_scheduler/job_test_case.py b/tests/unit/services/job_scheduler/job_test_case.py index 6f3c7e789b..5487a98d1d 100644 --- a/tests/unit/services/job_scheduler/job_test_case.py +++ b/tests/unit/services/job_scheduler/job_test_case.py @@ -8,6 +8,7 @@ from chroma_core.models import ManagedTarget from chroma_core.models import ManagedTargetMount from chroma_core.models import Nid +from chroma_core.models import StateChangeJob from chroma_core.services.plugin_runner.agent_daemon_interface import AgentDaemonRpcInterface from chroma_core.services.queue import ServiceQueue from chroma_core.services.rpc import ServiceRpcInterface @@ -190,6 +191,9 @@ def _spawn_job(job): def run_next(): while True: runnable_jobs = self.job_scheduler._job_collection.ready_jobs + for job in runnable_jobs: + if isinstance(job, StateChangeJob): + job.skip_if_satisfied = False log.info( "run_next: %d runnable jobs of (%d pending, %d tasked)" From c80450592839b9f11058fd18a8ade6442a83880b Mon Sep 17 00:00:00 2001 From: johnsonw Date: Wed, 10 Jun 2020 21:57:12 -0400 Subject: [PATCH 3/6] - remove `skip_if_satisfied` from the stateful object Signed-off-by: johnsonw --- chroma_core/models/client_mount.py | 1 - chroma_core/models/jobs.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/chroma_core/models/client_mount.py b/chroma_core/models/client_mount.py index 521f830633..5091e99de9 100644 --- a/chroma_core/models/client_mount.py +++ b/chroma_core/models/client_mount.py @@ -26,7 +26,6 @@ class LustreClientMount(DeletableStatefulObject): states = ["unmounted", "mounted", "removed"] initial_state = "unmounted" - skip_if_satisfied = True def __str__(self): return self.get_label() diff --git a/chroma_core/models/jobs.py b/chroma_core/models/jobs.py index 4791f28381..f291f20618 100644 --- a/chroma_core/models/jobs.py +++ b/chroma_core/models/jobs.py @@ -42,7 +42,6 @@ class Meta: route_map = None transition_map = None job_class_map = None - skip_if_satisfied = False _begin_state = None _end_state = None @@ -445,7 +444,7 @@ def all_deps(self, dep_cache): dependent_deps.append( DependOn( - dependent, dependent_dependency.fix_state, skip_if_satisfied=dependent.skip_if_satisfied + dependent, dependent_dependency.fix_state, skip_if_satisfied=self.skip_if_satisfied ) ) From f6da0c153a6fb4a08f0f7e84e961565d36d6b746 Mon Sep 17 00:00:00 2001 From: johnsonw Date: Mon, 15 Jun 2020 15:18:29 -0400 Subject: [PATCH 4/6] - Fix formatting in cient_mount Signed-off-by: johnsonw --- chroma_core/models/client_mount.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/chroma_core/models/client_mount.py b/chroma_core/models/client_mount.py index 5091e99de9..e260e09c07 100644 --- a/chroma_core/models/client_mount.py +++ b/chroma_core/models/client_mount.py @@ -363,7 +363,10 @@ def get_steps(self): args = dict( host=self.host, filesystems=[ - (ObjectCache.get_one(ManagedFilesystem, lambda mf, mtd=m: mf.name == mtd.filesystem).mount_path(), m.mountpoint) + ( + ObjectCache.get_one(ManagedFilesystem, lambda mf, mtd=m: mf.name == mtd.filesystem).mount_path(), + m.mountpoint, + ) for m in mounted ], ) From fa202d512c9b37da58efafee56058e1530deb2fe Mon Sep 17 00:00:00 2001 From: Nathaniel Clark Date: Tue, 16 Jun 2020 11:05:57 -0400 Subject: [PATCH 5/6] Fix MountLustreFilesystemsJob Signed-off-by: Nathaniel Clark --- chroma_core/models/client_mount.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/chroma_core/models/client_mount.py b/chroma_core/models/client_mount.py index e260e09c07..e5830e5d42 100644 --- a/chroma_core/models/client_mount.py +++ b/chroma_core/models/client_mount.py @@ -300,7 +300,16 @@ def description(self): def get_steps(self): search = lambda cm: (cm.host == self.host and cm.state == "unmounted") unmounted = ObjectCache.get(LustreClientMount, search) - args = dict(host=self.host, filesystems=[(m.filesystem.mount_path(), m.mountpoint) for m in unmounted]) + args = dict( + host=self.host, + filesystems=[ + ( + ObjectCache.get_one(ManagedFilesystem, lambda mf, mtd=m: mf.name == mtd.filesystem).mount_path(), + m.mountpoint, + ) + for m in unmounted + ], + ) return [(MountLustreFilesystemsStep, args)] From dc1b91d0b44db656824d3d967d13aaf575dd3b97 Mon Sep 17 00:00:00 2001 From: johnsonw Date: Tue, 16 Jun 2020 20:22:20 -0400 Subject: [PATCH 6/6] After discussions we've decided to restore the resource manager to its prior functionality for the current release and rely on most clients using the monitored worker profile. - Revert resource manager change - Update profile in integration tests - Remove skip_if_satisfied logic from job_scheduler Signed-off-by: johnsonw --- chroma_core/lib/job.py | 16 +------ chroma_core/models/client_mount.py | 3 -- chroma_core/models/jobs.py | 28 +----------- chroma_core/models/lnet_configuration.py | 3 -- .../services/job_scheduler/job_scheduler.py | 43 +------------------ .../plugin_runner/resource_manager.py | 30 +++++++------ iml-system-test-utils/src/lib.rs | 12 +++--- .../services/job_scheduler/job_test_case.py | 4 -- 8 files changed, 28 insertions(+), 111 deletions(-) diff --git a/chroma_core/lib/job.py b/chroma_core/lib/job.py index c1d3ef691f..16a9dccbf7 100644 --- a/chroma_core/lib/job.py +++ b/chroma_core/lib/job.py @@ -46,13 +46,7 @@ def satisfied(self): class DependOn(Dependable): def __init__( - self, - stateful_object, - preferred_state, - acceptable_states=None, - unacceptable_states=None, - fix_state=None, - skip_if_satisfied=False, + self, stateful_object, preferred_state, acceptable_states=None, unacceptable_states=None, fix_state=None ): """preferred_state: what we will try to put the dependency into if it is not already in one of acceptable_states. @@ -86,8 +80,6 @@ def __init__( self.fix_state = fix_state self.stateful_object = stateful_object - self.skip_if_satisfied = skip_if_satisfied - def __str__(self): return "%s %s %s %s" % (self.stateful_object, self.preferred_state, self.acceptable_states, self.fix_state) @@ -100,11 +92,7 @@ def satisfied(self): except: self.stateful_object.__class__._base_manager.get(pk=self.stateful_object.pk) - if self.skip_if_satisfied: - satisfied = depended_object.state in self.stateful_object.get_current_route() - else: - satisfied = depended_object.state in self.acceptable_states - + satisfied = depended_object.state in self.acceptable_states if not satisfied: job_log.warning( "DependOn not satisfied: %s in state %s, not one of %s (preferred %s)" diff --git a/chroma_core/models/client_mount.py b/chroma_core/models/client_mount.py index e5830e5d42..017466bfb1 100644 --- a/chroma_core/models/client_mount.py +++ b/chroma_core/models/client_mount.py @@ -144,7 +144,6 @@ class MountLustreClientJob(StateChangeJob): stateful_object = "lustre_client_mount" lustre_client_mount = models.ForeignKey(LustreClientMount, on_delete=CASCADE) state_verb = None - skip_if_satisfied = True @classmethod def long_description(cls, stateful_object): @@ -184,7 +183,6 @@ class UnmountLustreClientMountJob(StateChangeJob): stateful_object = "lustre_client_mount" lustre_client_mount = models.ForeignKey(LustreClientMount, on_delete=CASCADE) state_verb = None - skip_if_satisfied = True @classmethod def long_description(cls, stateful_object): @@ -221,7 +219,6 @@ class RemoveLustreClientJob(StateChangeJob): stateful_object = "lustre_client_mount" lustre_client_mount = models.ForeignKey(LustreClientMount, on_delete=CASCADE) state_verb = None - skip_if_satisfied = True @classmethod def long_description(cls, stateful_object): diff --git a/chroma_core/models/jobs.py b/chroma_core/models/jobs.py index f291f20618..6698ca6208 100644 --- a/chroma_core/models/jobs.py +++ b/chroma_core/models/jobs.py @@ -42,8 +42,6 @@ class Meta: route_map = None transition_map = None job_class_map = None - _begin_state = None - _end_state = None reverse_deps = {} @@ -63,12 +61,6 @@ def set_state(self, state, intentional=False): self.state = state self.state_modified_at = now() - def set_begin_state(self, begin_state): - self._begin_state = begin_state - - def set_end_state(self, end_state): - self._end_state = end_state - def not_state(self, state): return list(set(self.states) - set([state])) @@ -198,12 +190,6 @@ def get_route(cls, begin_state, end_state): except KeyError: raise SchedulingError("%s->%s not legal state transition for %s" % (begin_state, end_state, cls)) - def get_current_route(self): - if self._begin_state and self._end_state: - return self.get_route(self._begin_state, self._end_state) - - return [] - def get_available_states(self, begin_state): """States which should be advertised externally (i.e. exclude states which are used internally but don't make sense when requested externally, for example @@ -399,8 +385,6 @@ def get_confirmation_string(self): #: Whether the job can be safely cancelled cancellable = True - skip_if_satisfied = False - class Meta: app_label = "chroma_core" ordering = ["id"] @@ -438,21 +422,13 @@ def all_deps(self, dep_cache): dependent_dependency.stateful_object == stateful_object and not new_state in dependent_dependency.acceptable_states ): - if dependent.state != dependent_dependency.fix_state: - dependent.set_begin_state(dependent.state) - dependent.set_end_state(dependent_dependency.fix_state) - - dependent_deps.append( - DependOn( - dependent, dependent_dependency.fix_state, skip_if_satisfied=self.skip_if_satisfied - ) - ) + dependent_deps.append(DependOn(dependent, dependent_dependency.fix_state)) return DependAll( DependAll(dependent_deps), dep_cache.get(self), dep_cache.get(stateful_object, new_state), - DependOn(stateful_object, self.old_state, skip_if_satisfied=self.skip_if_satisfied), + DependOn(stateful_object, self.old_state), ) else: return dep_cache.get(self) diff --git a/chroma_core/models/lnet_configuration.py b/chroma_core/models/lnet_configuration.py index 78591f5414..aa6df0705c 100644 --- a/chroma_core/models/lnet_configuration.py +++ b/chroma_core/models/lnet_configuration.py @@ -279,7 +279,6 @@ def get_steps(self): class EnableLNetJob(NullStateChangeJob): target_object = models.ForeignKey(LNetConfiguration, on_delete=CASCADE) state_transition = StateChangeJob.StateTransition(LNetConfiguration, "unconfigured", "lnet_unloaded") - skip_if_satisfied = True class Meta: app_label = "chroma_core" @@ -321,7 +320,6 @@ class LoadLNetJob(LNetStateChangeJob): stateful_object = "lnet_configuration" lnet_configuration = models.ForeignKey(LNetConfiguration, on_delete=CASCADE) state_verb = "Load LNet" - skip_if_satisfied = True display_group = Job.JOB_GROUPS.COMMON display_order = 30 @@ -361,7 +359,6 @@ class StartLNetJob(LNetStateChangeJob): stateful_object = "lnet_configuration" lnet_configuration = models.ForeignKey(LNetConfiguration, on_delete=CASCADE) state_verb = "Start LNet" - skip_if_satisfied = True display_group = Job.JOB_GROUPS.COMMON display_order = 40 diff --git a/chroma_core/services/job_scheduler/job_scheduler.py b/chroma_core/services/job_scheduler/job_scheduler.py index 8d9d4a0e44..aad5eabdd1 100644 --- a/chroma_core/services/job_scheduler/job_scheduler.py +++ b/chroma_core/services/job_scheduler/job_scheduler.py @@ -36,7 +36,7 @@ from chroma_core.models import ManagedMdt from chroma_core.models import FilesystemMember from chroma_core.models import ConfigureLNetJob -from chroma_core.models import ManagedTarget, ApplyConfParams, ManagedOst, Job, DeletableStatefulObject, StatefulObject +from chroma_core.models import ManagedTarget, ApplyConfParams, ManagedOst, Job, DeletableStatefulObject from chroma_core.models import StepResult from chroma_core.models import ( ManagedMgs, @@ -60,7 +60,6 @@ StratagemConfiguration, ) from chroma_core.models import Task, CreateTaskJob -from chroma_core.models.jobs import StateChangeJob from chroma_core.services.job_scheduler.dep_cache import DepCache from chroma_core.services.job_scheduler.lock_cache import LockCache, lock_change_receiver, to_lock_json from chroma_core.services.job_scheduler.command_plan import CommandPlan @@ -403,14 +402,6 @@ def add_command(self, command, jobs): """ for job in jobs: self.add(job) - - stateful_objects = list( - set([x.get_stateful_object() for x in self._jobs.values() if isinstance(x, StateChangeJob)]) - ) - for so in stateful_objects: - so.set_begin_state(self.get_stateful_object_begin_state(so)) - so.set_end_state(self.get_stateful_object_end_state(so)) - self._commands[command.id] = command self._command_to_jobs[command.id] |= set([j.id for j in jobs]) for job in jobs: @@ -463,23 +454,6 @@ def update_many(self, jobs, new_state): Job.objects.filter(id__in=[j.id for j in jobs]).update(state=new_state) - def get_jobs_by_stateful_object(self, so): - return [ - x - for x in self._jobs.values() - if isinstance(x, StateChangeJob) and type(x.get_stateful_object()) is type(so) - ] - - def get_stateful_object_begin_state(self, so): - first_job = self.get_jobs_by_stateful_object(so)[:+1] - if first_job: - return first_job[0].state_transition.old_state - - def get_stateful_object_end_state(self, so): - last_job = self.get_jobs_by_stateful_object(so)[-1:] - if last_job: - return last_job[0].state_transition.new_state - @property def ready_jobs(self): result = [] @@ -593,20 +567,7 @@ def _check_jobs(self, jobs, dep_cache): log.error("Job %d: exception in get_steps: %s" % (job.id, traceback.format_exc())) cancel_jobs.append(job) else: - if isinstance(job, StateChangeJob): - so = job.get_stateful_object() - stateful_object = so.__class__._base_manager.get(pk=so.pk) - state = stateful_object.state - - if state != job.old_state and job.skip_if_satisfied: - if state in so.get_current_route(): - self._complete_job(job, False, False) - else: - cancel_jobs.append(job) - else: - ok_jobs.append(job) - else: - ok_jobs.append(job) + ok_jobs.append(job) return ok_jobs, cancel_jobs diff --git a/chroma_core/services/plugin_runner/resource_manager.py b/chroma_core/services/plugin_runner/resource_manager.py index 608a4f5bd0..c4015ac436 100644 --- a/chroma_core/services/plugin_runner/resource_manager.py +++ b/chroma_core/services/plugin_runner/resource_manager.py @@ -901,19 +901,23 @@ def _persist_nid_updates(self, scannable_id, changed_resource_id, changed_attrs) for lnet_state_resource in node_resources[LNETModules]: lnet_state = lnet_state_resource.to_resource() - if lnet_state.host_id == host.id: - lnet_configuration = LNetConfiguration.objects.get(host=lnet_state.host_id) - - # Truthfully this should use the notify which I've added as a comment to show the correct way. The problem is that - # during the ConfigureLNetJob the state is changed to unloaded and this masks the notify in some way the is probably - # as planned but prevents it being set back. What we really need is to somehow get a single command that goes - # to a state and then to another state. post_dependencies almost. At present I can't see how to do this so I am leaving - # this code as is. - lnet_configuration.set_state(lnet_state.state) - lnet_configuration.save() - # JobSchedulerClient.notify(lnet_configuration, now(), {'state': lnet_state.state}) - - log.debug("_persist_nid_updates lnet_configuration %s" % lnet_configuration) + # Really this code should be more tightly tied to the lnet_configuration classes, but in a one step + # at a time approach. Until lnet is !unconfigured we should not be updating it's state. + # Double if because the first if should be removed really, in some more perfect future. + if host.lnet_configuration.state != "unconfigured": + if lnet_state.host_id == host.id: + lnet_configuration = LNetConfiguration.objects.get(host=lnet_state.host_id) + + # Truthfully this should use the notify which I've added as a comment to show the correct way. The problem is that + # during the ConfigureLNetJob the state is changed to unloaded and this masks the notify in some way the is probably + # as planned but prevents it being set back. What we really need is to somehow get a single command that goes + # to a state and then to another state. post_dependencies almost. At present I can't see how to do this so I am leaving + # this code as is. + lnet_configuration.set_state(lnet_state.state) + lnet_configuration.save() + # JobSchedulerClient.notify(lnet_configuration, now(), {'state': lnet_state.state}) + + log.debug("_persist_nid_updates lnet_configuration %s" % lnet_configuration) # Only get the lnet_configuration if we actually have a LNetInterface (nid) to add. if len(node_resources[LNETInterface]) > 0: diff --git a/iml-system-test-utils/src/lib.rs b/iml-system-test-utils/src/lib.rs index b57952231b..149771057d 100644 --- a/iml-system-test-utils/src/lib.rs +++ b/iml-system-test-utils/src/lib.rs @@ -75,22 +75,20 @@ pub const STRATAGEM_SERVER_PROFILE: &str = r#"{ pub const STRATAGEM_CLIENT_PROFILE: &str = r#"{ "ui_name": "Stratagem Client Node", - "managed": true, + "managed": false, "worker": true, "name": "stratagem_client", - "initial_state": "managed", - "ntp": true, + "initial_state": "monitored", + "ntp": false, "corosync": false, "corosync2": false, "pacemaker": false, "ui_description": "A client that can receive stratagem data", "packages": [ - "python2-iml-agent-management", - "lustre-client" + "python2-iml-agent-management" ], "repolist": [ - "base", - "lustre-client" + "base" ] } "#; diff --git a/tests/unit/services/job_scheduler/job_test_case.py b/tests/unit/services/job_scheduler/job_test_case.py index 5487a98d1d..6f3c7e789b 100644 --- a/tests/unit/services/job_scheduler/job_test_case.py +++ b/tests/unit/services/job_scheduler/job_test_case.py @@ -8,7 +8,6 @@ from chroma_core.models import ManagedTarget from chroma_core.models import ManagedTargetMount from chroma_core.models import Nid -from chroma_core.models import StateChangeJob from chroma_core.services.plugin_runner.agent_daemon_interface import AgentDaemonRpcInterface from chroma_core.services.queue import ServiceQueue from chroma_core.services.rpc import ServiceRpcInterface @@ -191,9 +190,6 @@ def _spawn_job(job): def run_next(): while True: runnable_jobs = self.job_scheduler._job_collection.ready_jobs - for job in runnable_jobs: - if isinstance(job, StateChangeJob): - job.skip_if_satisfied = False log.info( "run_next: %d runnable jobs of (%d pending, %d tasked)"