Skip to content

Commit

Permalink
Merge pull request #1384 from WikiWatershed/feature/tt/pin-jobs-to-ac…
Browse files Browse the repository at this point in the history
…tive-stack

Scope to workers of same color
  • Loading branch information
rajadain authored Jun 29, 2016
2 parents 61e7839 + 0835021 commit d57cfbb
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 27 deletions.
12 changes: 8 additions & 4 deletions src/mmw/apps/modeling/mapshed/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ def nlcd_kfactor(result):
return output


def geop_tasks(geom, errback, exchange, routing_key):
def geop_tasks(geom, errback, exchange, choose_worker):
geop_worker = choose_worker()

# List of tuples of (opname, data, callback) for each geop task
definitions = [
('nlcd_streams',
Expand Down Expand Up @@ -474,10 +476,12 @@ def geop_tasks(geom, errback, exchange, routing_key):
nlcd_streams_drb))

return [(mapshed_start.s(opname, data).set(exchange=exchange,
routing_key=routing_key) |
routing_key=geop_worker) |
mapshed_finish.s().set(exchange=exchange,
routing_key=routing_key) |
callback.s().set(link_error=errback))
routing_key=geop_worker) |
callback.s().set(link_error=errback,
exchange=exchange,
routing_key=choose_worker()))
for (opname, data, callback) in definitions]


Expand Down
65 changes: 44 additions & 21 deletions src/mmw/apps/modeling/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ def start_gwlfe(request, format=None):


def _initiate_gwlfe_job_chain(model_input, inputmod_hash, job_id):
chain = (tasks.run_gwlfe.s(model_input, inputmod_hash) |
save_job_result.s(job_id, model_input))
chain = (tasks.run_gwlfe.s(model_input, inputmod_hash)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()) |
save_job_result.s(job_id, model_input)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()))
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain.apply_async(link_error=save_job_error.s(job_id))
return chain.apply_async(link_error=errback)


@decorators.api_view(['POST'])
Expand Down Expand Up @@ -250,17 +254,20 @@ def start_mapshed(request, format=None):


def _initiate_mapshed_job_chain(mapshed_input, job_id):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()
errback = save_job_error.s(job_id)
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

geom = GEOSGeometry(json.dumps(mapshed_input['area_of_interest']),
srid=4326)

chain = (group(geop_tasks(geom, errback, exchange, routing_key)) |
combine.s() |
collect_data.s(geom.geojson).set(link_error=errback) |
save_job_result.s(job_id, mapshed_input))
chain = (group(geop_tasks(geom, errback, MAGIC_EXCHANGE, choose_worker)) |
combine.s().set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker()) |
collect_data.s(geom.geojson).set(link_error=errback,
exchange=MAGIC_EXCHANGE,
routing_key=choose_worker()) |
save_job_result.s(job_id, mapshed_input)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()))

return chain.apply_async(link_error=errback)

Expand Down Expand Up @@ -350,24 +357,31 @@ def get_list_of_workers():
def _initiate_analyze_job_chain(area_of_interest, job_id, testing=False):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain(tasks.start_histogram_job.s(area_of_interest)
.set(exchange=exchange, routing_key=routing_key),
tasks.get_histogram_job_results.s()
.set(exchange=exchange, routing_key=routing_key),
tasks.histogram_to_survey.s(),
save_job_result.s(job_id, area_of_interest)) \
.apply_async(link_error=save_job_error.s(job_id))
tasks.histogram_to_survey.s()
.set(exchange=exchange, routing_key=choose_worker()),
save_job_result.s(job_id, area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())) \
.apply_async(link_error=errback)


def _initiate_rwd_job_chain(location, snapping, job_id, testing=False):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain(tasks.start_rwd_job.s(location, snapping)
.set(exchange=exchange, routing_key=routing_key),
save_job_result.s(job_id, location)) \
.apply_async(link_error=save_job_error.s(job_id))
save_job_result.s(job_id, location)
.set(exchange=exchange, routing_key=choose_worker())) \
.apply_async(link_error=errback)


@decorators.api_view(['POST'])
Expand All @@ -391,8 +405,10 @@ def start_tr55(request, format=None):

def _initiate_tr55_job_chain(model_input, job_id):
job_chain = _construct_tr55_job_chain(model_input, job_id)
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain(job_chain).apply_async(link_error=save_job_error.s(job_id))
return chain(job_chain).apply_async(link_error=errback)


def _construct_tr55_job_chain(model_input, job_id):
Expand Down Expand Up @@ -421,7 +437,8 @@ def _construct_tr55_job_chain(model_input, job_id):
census_hash == current_hash) or not pieces)):
censuses = [aoi_census] + modification_census_items

job_chain.append(tasks.run_tr55.s(censuses, model_input))
job_chain.append(tasks.run_tr55.s(censuses, model_input)
.set(exhange=exchange, routing_key=choose_worker()))
else:
job_chain.append(tasks.get_histogram_job_results.s()
.set(exchange=exchange, routing_key=routing_key))
Expand All @@ -433,16 +450,22 @@ def _construct_tr55_job_chain(model_input, job_id):

job_chain.insert(0, tasks.start_histograms_job.s(polygons)
.set(exchange=exchange, routing_key=routing_key))
job_chain.insert(len(job_chain), tasks.run_tr55.s(model_input,
cached_aoi_census=aoi_census))
job_chain.insert(len(job_chain),
tasks.run_tr55.s(model_input,
cached_aoi_census=aoi_census)
.set(exchange=exchange,
routing_key=choose_worker()))
else:
polygons = [aoi] + [m['shape']['geometry'] for m in pieces]

job_chain.insert(0, tasks.start_histograms_job.s(polygons)
.set(exchange=exchange, routing_key=routing_key))
job_chain.insert(len(job_chain), tasks.run_tr55.s(model_input))
job_chain.insert(len(job_chain), tasks.run_tr55.s(model_input)
.set(exchange=exchange,
routing_key=choose_worker()))

job_chain.append(save_job_result.s(job_id, model_input))
job_chain.append(save_job_result.s(job_id, model_input)
.set(exchange=exchange, routing_key=choose_worker()))

return job_chain

Expand Down
7 changes: 6 additions & 1 deletion src/mmw/mmw/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def add_unlock_chord_task_shim(app):
from celery.exceptions import ChordError
from celery.result import allow_join_result, result_from_tuple

from apps.modeling.views import choose_worker, MAGIC_EXCHANGE

logger = logging.getLogger(__name__)

MAX_RETRIES = settings.CELERY_CHORD_UNLOCK_MAX_RETRIES
Expand Down Expand Up @@ -52,10 +54,13 @@ def unlock_chord(self, group_id, callback, interval=None,
except Exception as exc:
raise self.retry(
exc=exc, countdown=interval, max_retries=max_retries,
exchange=MAGIC_EXCHANGE, routing_key=choose_worker()
)
else:
if not ready:
raise self.retry(countdown=interval, max_retries=max_retries)
raise self.retry(countdown=interval, max_retries=max_retries,
exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

callback = maybe_signature(callback, app=app)
try:
Expand Down
2 changes: 1 addition & 1 deletion src/mmw/mmw/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def get_env_setting(setting):
CELERY_WORKER_DIRECT = True
CELERY_CREATE_MISSING_QUEUES = True
CELERY_CHORD_PROPAGATES = True
CELERY_CHORD_UNLOCK_MAX_RETRIES = 20
CELERY_CHORD_UNLOCK_MAX_RETRIES = 60
# END CELERY CONFIGURATION


Expand Down

0 comments on commit d57cfbb

Please sign in to comment.