Skip to content

Commit

Permalink
Add some debug logging
Browse files Browse the repository at this point in the history
  • Loading branch information
joealcorn committed Sep 3, 2015
1 parent 4c5402a commit 000af67
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions celery_dedupe/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import hashlib
import logging
import warnings

from celery import Task
from celery.result import EagerResult
from celery.utils import uuid

logger = logging.getLogger('celery_dedupe')


class DedupeTask(Task):
abtract = True
Expand All @@ -15,17 +18,20 @@ def apply_async(self, args=None, kwargs=None, **kw):
task_id = kw.setdefault('task_id', uuid())

if self.storage.obtain_lock(key, task_id):
logger.debug('Queueing %s [%s]', task_id, key)
return super(DedupeTask, self).apply_async(args=args, kwargs=kwargs, **kw)

existing_task_id = self.storage.get(key)
if existing_task_id == task_id:
# This should be a retry, so add it to the broker anyway
logger.debug('Queueing %s for retry [%s]', task_id, key)
return super(DedupeTask, self).apply_async(args=args, kwargs=kwargs, **kw)

app = self._get_app()
if app.conf.CELERY_ALWAYS_EAGER:
warnings.warn('Using DedupeTask in conjunction with CELERY_ALWAYS_EAGER, can not return EagerResult')

logger.debug('%s already queued, returning AsyncResult [%s]', task_id, key)
return self.AsyncResult(existing_task_id)

def on_success(self, retval, task_id, args, kwargs):
Expand Down

0 comments on commit 000af67

Please sign in to comment.