Skip to content

Commit

Permalink
Merge pull request #1382 from rajadain/feature/mapshed-scale-correctly
Browse files Browse the repository at this point in the history
Scale Celery across multiple workers correctly
  • Loading branch information
rajadain authored Jun 28, 2016
2 parents db36c38 + c7abcf6 commit 61e7839
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
8 changes: 5 additions & 3 deletions src/mmw/apps/modeling/mapshed/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ def nlcd_kfactor(result):
return output


def geop_tasks(geom, errback):
def geop_tasks(geom, errback, exchange, routing_key):
# List of tuples of (opname, data, callback) for each geop task
definitions = [
('nlcd_streams',
Expand Down Expand Up @@ -473,8 +473,10 @@ def geop_tasks(geom, errback):
'vector': streams(geom, drb=True)},
nlcd_streams_drb))

return [(mapshed_start.s(opname, data) |
mapshed_finish.s() |
return [(mapshed_start.s(opname, data).set(exchange=exchange,
routing_key=routing_key) |
mapshed_finish.s().set(exchange=exchange,
routing_key=routing_key) |
callback.s().set(link_error=errback))
for (opname, data, callback) in definitions]

Expand Down
3 changes: 1 addition & 2 deletions src/mmw/apps/modeling/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ def _initiate_mapshed_job_chain(mapshed_input, job_id):
geom = GEOSGeometry(json.dumps(mapshed_input['area_of_interest']),
srid=4326)

chain = (group(geop_tasks(geom, errback)).set(exchange=exchange,
routing_key=routing_key) |
chain = (group(geop_tasks(geom, errback, exchange, routing_key)) |
combine.s() |
collect_data.s(geom.geojson).set(link_error=errback) |
save_job_result.s(job_id, mapshed_input))
Expand Down

0 comments on commit 61e7839

Please sign in to comment.