Skip to content

Commit

Permalink
Added fault tolerance to django beanstalk worker
Browse files Browse the repository at this point in the history
  • Loading branch information
matthill committed Jul 23, 2014
1 parent 647c95d commit bebbb20
Showing 1 changed file with 44 additions and 28 deletions.
72 changes: 44 additions & 28 deletions django_beanstalkd/management/commands/beanstalk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import sys
import traceback

import time
from django.conf import settings
from django.core.management.base import NoArgsCommand
from django_beanstalkd import connect_beanstalkd
from django_beanstalkd import connect_beanstalkd, BeanstalkError
from beanstalkc import SocketError


logger = logging.getLogger('django_beanstalkd')
Expand Down Expand Up @@ -106,35 +108,49 @@ def spawn_workers(self, worker_count):

def work(self):
"""children only: watch tubes for all jobs, start working"""
beanstalk = connect_beanstalkd()
for job in self.jobs.keys():
beanstalk.watch(job)
beanstalk.ignore('default')

try:

while True:
job = beanstalk.reserve()
job_name = job.stats()['tube']
if job_name in self.jobs:
logger.debug("Calling %s with arg: %s" % (job_name, job.body))
try:
self.jobs[job_name](job.body)
except Exception, e:
tp, value, tb = sys.exc_info()
logger.error('Error while calling "%s" with arg "%s": '
'%s' % (
job_name,
job.body,
e,
)
)
logger.debug("%s:%s" % (tp.__name__, value))
logger.debug("\n".join(traceback.format_tb(tb)))
job.bury()
else:
job.delete()
else:
job.release()
try:
# Reattempt Beanstalk connection if connection attempt fails or is dropped
beanstalk = connect_beanstalkd()
for job in self.jobs.keys():
beanstalk.watch(job)
beanstalk.ignore('default')

# Connected to Beanstalk queue, continually process jobs until an error occurs
self.process_jobs(beanstalk)

except (BeanstalkError, SocketError) as e:
logger.info("Beanstalk connection error: " + str(e))
time.sleep(2.0)
logger.info("retrying Beanstalk connection...")

except KeyboardInterrupt:
sys.exit(0)

def process_jobs(self, beanstalk):
while True:
logger.debug("Beanstalk connection established, waiting for jobs")
job = beanstalk.reserve()
job_name = job.stats()['tube']
if job_name in self.jobs:
logger.debug("Calling %s with arg: %s" % (job_name, job.body))
try:
self.jobs[job_name](job.body)
except Exception, e:
tp, value, tb = sys.exc_info()
logger.error('Error while calling "%s" with arg "%s": '
'%s' % (
job_name,
job.body,
e,
)
)
logger.debug("%s:%s" % (tp.__name__, value))
logger.debug("\n".join(traceback.format_tb(tb)))
job.bury()
else:
job.delete()
else:
job.release()

0 comments on commit bebbb20

Please sign in to comment.