Skip to content

Commit

Permalink
Move things around a little in the main loop.
Browse files Browse the repository at this point in the history
Should take care of two things:

1. When looking at the DEBUG log level, the status should appear before we
   wait for WQ, giving the user time to digest it, and not have to look for
   it, scrolling through tons of task creation output.
2. Plotting should also be triggered before WQ is polled for tasks, which
   minimizes the potential database access overlap.
  • Loading branch information
matz-e committed May 24, 2016
1 parent 1082b4e commit d8fc92c
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions lobster/commands/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,6 @@ def sprint(self):
logger.info("terminating gracefully")
break

stats = self.queue.stats_hierarchy
logger.info("{0} out of {1} workers busy; {3} tasks running, {4} waiting; {2} units left".format(
stats.workers_busy,
stats.workers_busy + stats.workers_ready,
units_left,
stats.tasks_running,
stats.tasks_waiting))

expiry = None
if proxy:
left = proxy.getTimeLeft()
Expand All @@ -266,6 +258,7 @@ def sprint(self):
have[c] = cstats.tasks_running + cstats.tasks_waiting

t = time.time()
stats = self.queue.stats_hierarchy
tasks = task_src.obtain(stats.total_cores, have)

for category, cmd, id, inputs, outputs, env, dir in tasks:
Expand Down Expand Up @@ -297,7 +290,22 @@ def sprint(self):
self.queue.submit(task)
creation_time += int((time.time() - t) * 1e6)

stats = self.queue.stats_hierarchy
logger.info("{0} out of {1} workers busy; {3} tasks running, {4} waiting; {2} units left".format(
stats.workers_busy,
stats.workers_busy + stats.workers_ready,
units_left,
stats.tasks_running,
stats.tasks_waiting))

task_src.update(self.queue)

# recurring actions are triggered here; plotting etc should run
# while we have WQ hand us back tasks w/o any database
# interaction
if action:
action.take()

starttime = time.time()
task = self.queue.wait(interval)
tasks = []
Expand Down Expand Up @@ -329,10 +337,6 @@ def sprint(self):
logger.info("activating fast abort with multiplier: {0}".format(abort_multiplier))
abort_active = True
self.queue.activate_fast_abort(abort_multiplier)

# recurring actions are triggered here
if action:
action.take()
if units_left == 0:
logger.info("no more work left to do")
if action:
Expand Down

0 comments on commit d8fc92c

Please sign in to comment.