Skip to content

Commit

Permalink
Add DedupeTask and some preliminary tests
Browse files Browse the repository at this point in the history
  • Loading branch information
joealcorn committed Aug 19, 2015
1 parent 27e1152 commit d0a7a97
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 1 deletion.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ install: "pip install -r dev-requirements.txt"
services:
- redis-server

script: py.test
script:
- celery worker -l debug --config tests.settings &> celery.log &
- py.test
1 change: 1 addition & 0 deletions celery_dedupe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .tasks import DedupeTask
36 changes: 36 additions & 0 deletions celery_dedupe/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import hashlib
import warnings

from celery import Task
from celery.result import EagerResult
from celery.utils import uuid


class DedupeTask(Task):
abtract = True
storage = None

def apply_async(self, args=None, kwargs=None, **kw):
key = self._create_key(args, kwargs)
task_id = kw.setdefault('task_id', uuid())

if self.storage.obtain_lock(key, task_id):
return super(DedupeTask, self).apply_async(args=args, kwargs=kwargs, **kw)

existing_task_id = self.storage.get(key)
app = self._get_app()
if app.conf.CELERY_ALWAYS_EAGER:
warnings.warn('Using DedupeTask in conjunction with CELERY_ALWAYS_EAGER, can not return EagerResult')

return self.AsyncResult(existing_task_id)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
key = self._create_key(args, kwargs)
self.storage.release_lock(key)

def _create_key(self, args, kwargs):
arg_string = ','.join([str(a) for a in args]) if args is not None else str(args)
kwarg_string = ','.join(['%s=%s' % (k, v) for k, v in kwargs.iteritems()]) if kwargs is not None else str(kwargs)
arg_hash = hashlib.md5(arg_string + kwarg_string).hexdigest()
import_path = '%s.%s' % (self.__class__.__module__, self.__class__.__name__)
return '%s:%s' % (import_path, arg_hash)
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
exam==0.10.5
pytest==2.7.2
redis==2.10.3
celery==3.1.17
4 changes: 4 additions & 0 deletions tests/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
BROKER_URL = 'redis://localhost:6379/9'
CELERY_ALWAYS_EAGER = False
CELERY_IMPORTS = ('tests.test_task',)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/9'
44 changes: 44 additions & 0 deletions tests/test_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from celery import current_app
from celery.task import task
from redis import StrictRedis

from celery_dedupe import DedupeTask
from celery_dedupe.storage.redis import RedisStorage
from tests import settings

redis = StrictRedis(db=9)


@task(base=DedupeTask, storage=RedisStorage(redis))
def noop_task(*a, **kw):
return None


class TestDedupeTask(object):

def setup_class(self):
current_app.config_from_object(settings)

def test_create_key(self):
task = DedupeTask()

key = task._create_key(tuple(), {})
assert key == 'celery_dedupe.tasks.DedupeTask:d41d8cd98f00b204e9800998ecf8427e'

key = task._create_key((True, False), {})
assert key == 'celery_dedupe.tasks.DedupeTask:85eadac9a27eb1f6dccf00145df6003b'

key = task._create_key(tuple(), {'something': 2})
assert key == 'celery_dedupe.tasks.DedupeTask:a758dbbdf7ba75fc39732ab4cca53049'

key = task._create_key(tuple(), {'something_else': 2})
assert key == 'celery_dedupe.tasks.DedupeTask:271a7d4a14d7892b87b8fc96d50507a9'

def test_apply_task_twice(self):
result1 = noop_task.apply_async(countdown=10)
result2 = noop_task.apply_async()
assert result1.task_id == result2.task_id

def test_apply_task_once(self):
result = noop_task.delay(1)
assert result.get() == None

0 comments on commit d0a7a97

Please sign in to comment.