From d0a7a97341bcb1b8c5a51928fa8853f9cccbf804 Mon Sep 17 00:00:00 2001 From: Joe Alcorn Date: Wed, 19 Aug 2015 12:13:11 +0100 Subject: [PATCH] Add DedupeTask and some preliminary tests --- .travis.yml | 4 +++- celery_dedupe/__init__.py | 1 + celery_dedupe/tasks.py | 36 ++++++++++++++++++++++++++++++++ dev-requirements.txt | 1 + tests/settings.py | 4 ++++ tests/test_task.py | 44 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 celery_dedupe/tasks.py create mode 100644 tests/settings.py create mode 100644 tests/test_task.py diff --git a/.travis.yml b/.travis.yml index a506c11..53df7b2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,4 +7,6 @@ install: "pip install -r dev-requirements.txt" services: - redis-server -script: py.test +script: + - celery worker -l debug --config tests.settings &> celery.log & + - py.test diff --git a/celery_dedupe/__init__.py b/celery_dedupe/__init__.py index e69de29..d69ffc9 100644 --- a/celery_dedupe/__init__.py +++ b/celery_dedupe/__init__.py @@ -0,0 +1 @@ +from .tasks import DedupeTask diff --git a/celery_dedupe/tasks.py b/celery_dedupe/tasks.py new file mode 100644 index 0000000..ccb18ab --- /dev/null +++ b/celery_dedupe/tasks.py @@ -0,0 +1,36 @@ +import hashlib +import warnings + +from celery import Task +from celery.result import EagerResult +from celery.utils import uuid + + +class DedupeTask(Task): + abtract = True + storage = None + + def apply_async(self, args=None, kwargs=None, **kw): + key = self._create_key(args, kwargs) + task_id = kw.setdefault('task_id', uuid()) + + if self.storage.obtain_lock(key, task_id): + return super(DedupeTask, self).apply_async(args=args, kwargs=kwargs, **kw) + + existing_task_id = self.storage.get(key) + app = self._get_app() + if app.conf.CELERY_ALWAYS_EAGER: + warnings.warn('Using DedupeTask in conjunction with CELERY_ALWAYS_EAGER, can not return EagerResult') + + return self.AsyncResult(existing_task_id) + + def after_return(self, status, retval, task_id, args, kwargs, einfo): + key = self._create_key(args, kwargs) + self.storage.release_lock(key) + + def _create_key(self, args, kwargs): + arg_string = ','.join([str(a) for a in args]) if args is not None else str(args) + kwarg_string = ','.join(['%s=%s' % (k, v) for k, v in kwargs.iteritems()]) if kwargs is not None else str(kwargs) + arg_hash = hashlib.md5(arg_string + kwarg_string).hexdigest() + import_path = '%s.%s' % (self.__class__.__module__, self.__class__.__name__) + return '%s:%s' % (import_path, arg_hash) diff --git a/dev-requirements.txt b/dev-requirements.txt index 4eb8ef7..54eb139 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,3 +1,4 @@ exam==0.10.5 pytest==2.7.2 redis==2.10.3 +celery==3.1.17 diff --git a/tests/settings.py b/tests/settings.py new file mode 100644 index 0000000..d995e8c --- /dev/null +++ b/tests/settings.py @@ -0,0 +1,4 @@ +BROKER_URL = 'redis://localhost:6379/9' +CELERY_ALWAYS_EAGER = False +CELERY_IMPORTS = ('tests.test_task',) +CELERY_RESULT_BACKEND = 'redis://localhost:6379/9' diff --git a/tests/test_task.py b/tests/test_task.py new file mode 100644 index 0000000..0e1790d --- /dev/null +++ b/tests/test_task.py @@ -0,0 +1,44 @@ +from celery import current_app +from celery.task import task +from redis import StrictRedis + +from celery_dedupe import DedupeTask +from celery_dedupe.storage.redis import RedisStorage +from tests import settings + +redis = StrictRedis(db=9) + + +@task(base=DedupeTask, storage=RedisStorage(redis)) +def noop_task(*a, **kw): + return None + + +class TestDedupeTask(object): + + def setup_class(self): + current_app.config_from_object(settings) + + def test_create_key(self): + task = DedupeTask() + + key = task._create_key(tuple(), {}) + assert key == 'celery_dedupe.tasks.DedupeTask:d41d8cd98f00b204e9800998ecf8427e' + + key = task._create_key((True, False), {}) + assert key == 'celery_dedupe.tasks.DedupeTask:85eadac9a27eb1f6dccf00145df6003b' + + key = task._create_key(tuple(), {'something': 2}) + assert key == 'celery_dedupe.tasks.DedupeTask:a758dbbdf7ba75fc39732ab4cca53049' + + key = task._create_key(tuple(), {'something_else': 2}) + assert key == 'celery_dedupe.tasks.DedupeTask:271a7d4a14d7892b87b8fc96d50507a9' + + def test_apply_task_twice(self): + result1 = noop_task.apply_async(countdown=10) + result2 = noop_task.apply_async() + assert result1.task_id == result2.task_id + + def test_apply_task_once(self): + result = noop_task.delay(1) + assert result.get() == None