Skip to content

Commit

Permalink
Scope to workers of same color
Browse files Browse the repository at this point in the history
Ensure that they don't cross over stack boundaries when multiple
stacks exist at once.
  • Loading branch information
Hector Castro authored and rajadain committed Jun 29, 2016
1 parent 61e7839 commit 83bf089
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 26 deletions.
12 changes: 8 additions & 4 deletions src/mmw/apps/modeling/mapshed/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ def nlcd_kfactor(result):
return output


def geop_tasks(geom, errback, exchange, routing_key):
def geop_tasks(geom, errback, exchange, choose_worker):
geop_worker = choose_worker()

# List of tuples of (opname, data, callback) for each geop task
definitions = [
('nlcd_streams',
Expand Down Expand Up @@ -474,10 +476,12 @@ def geop_tasks(geom, errback, exchange, routing_key):
nlcd_streams_drb))

return [(mapshed_start.s(opname, data).set(exchange=exchange,
routing_key=routing_key) |
routing_key=geop_worker) |
mapshed_finish.s().set(exchange=exchange,
routing_key=routing_key) |
callback.s().set(link_error=errback))
routing_key=geop_worker) |
callback.s().set(link_error=errback,
exchange=exchange,
routing_key=choose_worker()))
for (opname, data, callback) in definitions]


Expand Down
65 changes: 44 additions & 21 deletions src/mmw/apps/modeling/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ def start_gwlfe(request, format=None):


def _initiate_gwlfe_job_chain(model_input, inputmod_hash, job_id):
chain = (tasks.run_gwlfe.s(model_input, inputmod_hash) |
save_job_result.s(job_id, model_input))
chain = (tasks.run_gwlfe.s(model_input, inputmod_hash)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()) |
save_job_result.s(job_id, model_input)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()))
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain.apply_async(link_error=save_job_error.s(job_id))
return chain.apply_async(link_error=errback)


@decorators.api_view(['POST'])
Expand Down Expand Up @@ -250,17 +254,20 @@ def start_mapshed(request, format=None):


def _initiate_mapshed_job_chain(mapshed_input, job_id):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()
errback = save_job_error.s(job_id)
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

geom = GEOSGeometry(json.dumps(mapshed_input['area_of_interest']),
srid=4326)

chain = (group(geop_tasks(geom, errback, exchange, routing_key)) |
combine.s() |
collect_data.s(geom.geojson).set(link_error=errback) |
save_job_result.s(job_id, mapshed_input))
chain = (group(geop_tasks(geom, errback, MAGIC_EXCHANGE, choose_worker)) |
combine.s().set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker()) |
collect_data.s(geom.geojson).set(link_error=errback,
exchange=MAGIC_EXCHANGE,
routing_key=choose_worker()) |
save_job_result.s(job_id, mapshed_input)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()))

return chain.apply_async(link_error=errback)

Expand Down Expand Up @@ -350,24 +357,31 @@ def get_list_of_workers():
def _initiate_analyze_job_chain(area_of_interest, job_id, testing=False):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain(tasks.start_histogram_job.s(area_of_interest)
.set(exchange=exchange, routing_key=routing_key),
tasks.get_histogram_job_results.s()
.set(exchange=exchange, routing_key=routing_key),
tasks.histogram_to_survey.s(),
save_job_result.s(job_id, area_of_interest)) \
.apply_async(link_error=save_job_error.s(job_id))
tasks.histogram_to_survey.s()
.set(exchange=exchange, routing_key=choose_worker()),
save_job_result.s(job_id, area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())) \
.apply_async(link_error=errback)


def _initiate_rwd_job_chain(location, snapping, job_id, testing=False):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain(tasks.start_rwd_job.s(location, snapping)
.set(exchange=exchange, routing_key=routing_key),
save_job_result.s(job_id, location)) \
.apply_async(link_error=save_job_error.s(job_id))
save_job_result.s(job_id, location)
.set(exchange=exchange, routing_key=choose_worker())) \
.apply_async(link_error=errback)


@decorators.api_view(['POST'])
Expand All @@ -391,8 +405,10 @@ def start_tr55(request, format=None):

def _initiate_tr55_job_chain(model_input, job_id):
job_chain = _construct_tr55_job_chain(model_input, job_id)
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain(job_chain).apply_async(link_error=save_job_error.s(job_id))
return chain(job_chain).apply_async(link_error=errback)


def _construct_tr55_job_chain(model_input, job_id):
Expand Down Expand Up @@ -421,7 +437,8 @@ def _construct_tr55_job_chain(model_input, job_id):
census_hash == current_hash) or not pieces)):
censuses = [aoi_census] + modification_census_items

job_chain.append(tasks.run_tr55.s(censuses, model_input))
job_chain.append(tasks.run_tr55.s(censuses, model_input)
.set(exhange=exchange, routing_key=choose_worker()))
else:
job_chain.append(tasks.get_histogram_job_results.s()
.set(exchange=exchange, routing_key=routing_key))
Expand All @@ -433,16 +450,22 @@ def _construct_tr55_job_chain(model_input, job_id):

job_chain.insert(0, tasks.start_histograms_job.s(polygons)
.set(exchange=exchange, routing_key=routing_key))
job_chain.insert(len(job_chain), tasks.run_tr55.s(model_input,
cached_aoi_census=aoi_census))
job_chain.insert(len(job_chain),
tasks.run_tr55.s(model_input,
cached_aoi_census=aoi_census)
.set(exchange=exchange,
routing_key=choose_worker()))
else:
polygons = [aoi] + [m['shape']['geometry'] for m in pieces]

job_chain.insert(0, tasks.start_histograms_job.s(polygons)
.set(exchange=exchange, routing_key=routing_key))
job_chain.insert(len(job_chain), tasks.run_tr55.s(model_input))
job_chain.insert(len(job_chain), tasks.run_tr55.s(model_input)
.set(exchange=exchange,
routing_key=choose_worker()))

job_chain.append(save_job_result.s(job_id, model_input))
job_chain.append(save_job_result.s(job_id, model_input)
.set(exchange=exchange, routing_key=choose_worker()))

return job_chain

Expand Down
7 changes: 6 additions & 1 deletion src/mmw/mmw/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def add_unlock_chord_task_shim(app):
from celery.exceptions import ChordError
from celery.result import allow_join_result, result_from_tuple

from apps.modeling.views import choose_worker, MAGIC_EXCHANGE

logger = logging.getLogger(__name__)

MAX_RETRIES = settings.CELERY_CHORD_UNLOCK_MAX_RETRIES
Expand Down Expand Up @@ -52,10 +54,13 @@ def unlock_chord(self, group_id, callback, interval=None,
except Exception as exc:
raise self.retry(
exc=exc, countdown=interval, max_retries=max_retries,
exchange=MAGIC_EXCHANGE, routing_key=choose_worker()
)
else:
if not ready:
raise self.retry(countdown=interval, max_retries=max_retries)
raise self.retry(countdown=interval, max_retries=max_retries,
exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

callback = maybe_signature(callback, app=app)
try:
Expand Down

0 comments on commit 83bf089

Please sign in to comment.