From 83bf089f5a2fd5e00711f63094e39b3d4111335b Mon Sep 17 00:00:00 2001 From: Hector Castro Date: Wed, 29 Jun 2016 10:46:32 -0400 Subject: [PATCH 1/2] Scope to workers of same color Ensure that they don't cross over stack boundaries when multiple stacks exist at once. --- src/mmw/apps/modeling/mapshed/tasks.py | 12 +++-- src/mmw/apps/modeling/views.py | 65 +++++++++++++++++--------- src/mmw/mmw/celery.py | 7 ++- 3 files changed, 58 insertions(+), 26 deletions(-) diff --git a/src/mmw/apps/modeling/mapshed/tasks.py b/src/mmw/apps/modeling/mapshed/tasks.py index afd047524..8082e3eb6 100644 --- a/src/mmw/apps/modeling/mapshed/tasks.py +++ b/src/mmw/apps/modeling/mapshed/tasks.py @@ -445,7 +445,9 @@ def nlcd_kfactor(result): return output -def geop_tasks(geom, errback, exchange, routing_key): +def geop_tasks(geom, errback, exchange, choose_worker): + geop_worker = choose_worker() + # List of tuples of (opname, data, callback) for each geop task definitions = [ ('nlcd_streams', @@ -474,10 +476,12 @@ def geop_tasks(geom, errback, exchange, routing_key): nlcd_streams_drb)) return [(mapshed_start.s(opname, data).set(exchange=exchange, - routing_key=routing_key) | + routing_key=geop_worker) | mapshed_finish.s().set(exchange=exchange, - routing_key=routing_key) | - callback.s().set(link_error=errback)) + routing_key=geop_worker) | + callback.s().set(link_error=errback, + exchange=exchange, + routing_key=choose_worker())) for (opname, data, callback) in definitions] diff --git a/src/mmw/apps/modeling/views.py b/src/mmw/apps/modeling/views.py index beda954cd..49a959c7c 100644 --- a/src/mmw/apps/modeling/views.py +++ b/src/mmw/apps/modeling/views.py @@ -217,10 +217,14 @@ def start_gwlfe(request, format=None): def _initiate_gwlfe_job_chain(model_input, inputmod_hash, job_id): - chain = (tasks.run_gwlfe.s(model_input, inputmod_hash) | - save_job_result.s(job_id, model_input)) + chain = (tasks.run_gwlfe.s(model_input, inputmod_hash) + .set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()) | + save_job_result.s(job_id, model_input) + .set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker())) + errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE, + routing_key=choose_worker()) - return chain.apply_async(link_error=save_job_error.s(job_id)) + return chain.apply_async(link_error=errback) @decorators.api_view(['POST']) @@ -250,17 +254,20 @@ def start_mapshed(request, format=None): def _initiate_mapshed_job_chain(mapshed_input, job_id): - exchange = MAGIC_EXCHANGE - routing_key = choose_worker() - errback = save_job_error.s(job_id) + errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE, + routing_key=choose_worker()) geom = GEOSGeometry(json.dumps(mapshed_input['area_of_interest']), srid=4326) - chain = (group(geop_tasks(geom, errback, exchange, routing_key)) | - combine.s() | - collect_data.s(geom.geojson).set(link_error=errback) | - save_job_result.s(job_id, mapshed_input)) + chain = (group(geop_tasks(geom, errback, MAGIC_EXCHANGE, choose_worker)) | + combine.s().set(exchange=MAGIC_EXCHANGE, + routing_key=choose_worker()) | + collect_data.s(geom.geojson).set(link_error=errback, + exchange=MAGIC_EXCHANGE, + routing_key=choose_worker()) | + save_job_result.s(job_id, mapshed_input) + .set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker())) return chain.apply_async(link_error=errback) @@ -350,24 +357,31 @@ def get_list_of_workers(): def _initiate_analyze_job_chain(area_of_interest, job_id, testing=False): exchange = MAGIC_EXCHANGE routing_key = choose_worker() + errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE, + routing_key=choose_worker()) return chain(tasks.start_histogram_job.s(area_of_interest) .set(exchange=exchange, routing_key=routing_key), tasks.get_histogram_job_results.s() .set(exchange=exchange, routing_key=routing_key), - tasks.histogram_to_survey.s(), - save_job_result.s(job_id, area_of_interest)) \ - .apply_async(link_error=save_job_error.s(job_id)) + tasks.histogram_to_survey.s() + .set(exchange=exchange, routing_key=choose_worker()), + save_job_result.s(job_id, area_of_interest) + .set(exchange=exchange, routing_key=choose_worker())) \ + .apply_async(link_error=errback) def _initiate_rwd_job_chain(location, snapping, job_id, testing=False): exchange = MAGIC_EXCHANGE routing_key = choose_worker() + errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE, + routing_key=choose_worker()) return chain(tasks.start_rwd_job.s(location, snapping) .set(exchange=exchange, routing_key=routing_key), - save_job_result.s(job_id, location)) \ - .apply_async(link_error=save_job_error.s(job_id)) + save_job_result.s(job_id, location) + .set(exchange=exchange, routing_key=choose_worker())) \ + .apply_async(link_error=errback) @decorators.api_view(['POST']) @@ -391,8 +405,10 @@ def start_tr55(request, format=None): def _initiate_tr55_job_chain(model_input, job_id): job_chain = _construct_tr55_job_chain(model_input, job_id) + errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE, + routing_key=choose_worker()) - return chain(job_chain).apply_async(link_error=save_job_error.s(job_id)) + return chain(job_chain).apply_async(link_error=errback) def _construct_tr55_job_chain(model_input, job_id): @@ -421,7 +437,8 @@ def _construct_tr55_job_chain(model_input, job_id): census_hash == current_hash) or not pieces)): censuses = [aoi_census] + modification_census_items - job_chain.append(tasks.run_tr55.s(censuses, model_input)) + job_chain.append(tasks.run_tr55.s(censuses, model_input) + .set(exhange=exchange, routing_key=choose_worker())) else: job_chain.append(tasks.get_histogram_job_results.s() .set(exchange=exchange, routing_key=routing_key)) @@ -433,16 +450,22 @@ def _construct_tr55_job_chain(model_input, job_id): job_chain.insert(0, tasks.start_histograms_job.s(polygons) .set(exchange=exchange, routing_key=routing_key)) - job_chain.insert(len(job_chain), tasks.run_tr55.s(model_input, - cached_aoi_census=aoi_census)) + job_chain.insert(len(job_chain), + tasks.run_tr55.s(model_input, + cached_aoi_census=aoi_census) + .set(exchange=exchange, + routing_key=choose_worker())) else: polygons = [aoi] + [m['shape']['geometry'] for m in pieces] job_chain.insert(0, tasks.start_histograms_job.s(polygons) .set(exchange=exchange, routing_key=routing_key)) - job_chain.insert(len(job_chain), tasks.run_tr55.s(model_input)) + job_chain.insert(len(job_chain), tasks.run_tr55.s(model_input) + .set(exchange=exchange, + routing_key=choose_worker())) - job_chain.append(save_job_result.s(job_id, model_input)) + job_chain.append(save_job_result.s(job_id, model_input) + .set(exchange=exchange, routing_key=choose_worker())) return job_chain diff --git a/src/mmw/mmw/celery.py b/src/mmw/mmw/celery.py index fbe620421..d5f0249af 100644 --- a/src/mmw/mmw/celery.py +++ b/src/mmw/mmw/celery.py @@ -24,6 +24,8 @@ def add_unlock_chord_task_shim(app): from celery.exceptions import ChordError from celery.result import allow_join_result, result_from_tuple + from apps.modeling.views import choose_worker, MAGIC_EXCHANGE + logger = logging.getLogger(__name__) MAX_RETRIES = settings.CELERY_CHORD_UNLOCK_MAX_RETRIES @@ -52,10 +54,13 @@ def unlock_chord(self, group_id, callback, interval=None, except Exception as exc: raise self.retry( exc=exc, countdown=interval, max_retries=max_retries, + exchange=MAGIC_EXCHANGE, routing_key=choose_worker() ) else: if not ready: - raise self.retry(countdown=interval, max_retries=max_retries) + raise self.retry(countdown=interval, max_retries=max_retries, + exchange=MAGIC_EXCHANGE, + routing_key=choose_worker()) callback = maybe_signature(callback, app=app) try: From 0835021e695c2ab812a5468a2eafacb733629a28 Mon Sep 17 00:00:00 2001 From: Terence Tuhinanshu Date: Wed, 29 Jun 2016 16:02:24 -0400 Subject: [PATCH 2/2] Increase Celery Chord Unlock Max Retries In some cases, having multiple workers exhausts this limit quicker than anticipated. The setting exists purely to protect against an infinite loop, so having a slightly larger limit will not affect performance. --- src/mmw/mmw/settings/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mmw/mmw/settings/base.py b/src/mmw/mmw/settings/base.py index b966acf3d..e56ca2efc 100644 --- a/src/mmw/mmw/settings/base.py +++ b/src/mmw/mmw/settings/base.py @@ -129,7 +129,7 @@ def get_env_setting(setting): CELERY_WORKER_DIRECT = True CELERY_CREATE_MISSING_QUEUES = True CELERY_CHORD_PROPAGATES = True -CELERY_CHORD_UNLOCK_MAX_RETRIES = 20 +CELERY_CHORD_UNLOCK_MAX_RETRIES = 60 # END CELERY CONFIGURATION