-
Notifications
You must be signed in to change notification settings - Fork 6
/
task.py
210 lines (161 loc) · 6.04 KB
/
task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from types import TracebackType
from typing import (
Any,
cast,
Dict,
List,
Type,
Tuple,
Generic,
TypeVar,
Callable,
Optional,
)
from .job import Job
from .types import QueueName
from .utils import get_backend, contribute_implied_queue_name
from .app_settings import app_settings
TCallable = TypeVar('TCallable', bound=Callable[..., Any])
class task:
def __init__(
self,
queue: str = 'default',
timeout: Optional[int] = None,
sigkill_on_stop: bool = False,
atomic: Optional[bool] = None,
) -> None:
"""
Define a task to be run.
@task()
def fn(arg1, arg2):
# <do something with arg1, arg2, etc.>
>>> fn("hello", 1)
Note that arguments must be JSON serialisable and, therefore, cannot be
Django model instances. This is entirely deliberate due to:
a) Transparency when inspecting the queue
b) Avoiding state/caching/etc issues on this kinds of instances
generally.
You can set some default options on `@task()`s:
`queue` -- Which queue to run the task in. You can invent queue
names; the worker's will be created automatically.
`timeout` -- The task will be SIGKILL'd after it has run for *at
least* this many seconds. Note that in most circumstances you want
`sigkill_on_stop` (below) instead.
`sigkill_on_stop` -- The task will be SIGKILL'd when the queue
processor is shut down. The default behaviour is to let it run to
completion.
`atomic` -- The task will be run inside a database transaction.
For example::
@task(sigkill_on_stop=True, timeout=60)
def slow_fn(arg):
time.sleep(600)
>>> slow_fn(1)
You can also dynamically override values at call time if you provide a
`django_lightweight_queue_` prefix::
@task(sigkill_on_stop=True, timeout=60)
def slow_fn(arg):
time.sleep(600)
>>> slow_fn(2, django_lightweight_queue_timeout=30)
(NB. You cannot yet invent dynamic queue names here; a queue with that
name must already be running.)
"""
if atomic is None:
atomic = app_settings.ATOMIC_JOBS
self.queue = QueueName(queue)
self.timeout = timeout
self.sigkill_on_stop = sigkill_on_stop
self.atomic = atomic
contribute_implied_queue_name(self.queue)
def __call__(self, fn: TCallable) -> 'TaskWrapper[TCallable]':
return TaskWrapper(fn, self.queue, self.timeout, self.sigkill_on_stop, self.atomic)
class BulkEnqueueHelper(Generic[TCallable]):
def __init__(
self,
task_wrapper: 'TaskWrapper[TCallable]',
batch_size: int,
queue_override: Optional[QueueName],
) -> None:
self._to_create: List[Job] = []
self._task_wrapper = task_wrapper
self.batch_size = batch_size
self.queue_override = queue_override
def __enter__(self) -> TCallable:
return cast(TCallable, self._create)
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.flush()
def _create(self, *args: Any, **kwargs: Any) -> None:
self._to_create.append(
self._task_wrapper._build_job(args, kwargs),
)
if len(self._to_create) >= self.batch_size:
self.flush()
def flush(self) -> None:
if not self._to_create:
return
self._task_wrapper._enqueue_job_instances(
self._to_create,
queue_override=self.queue_override,
)
self._to_create = []
class TaskWrapper(Generic[TCallable]):
def __init__(
self,
fn: TCallable,
queue: QueueName,
timeout: Optional[int],
sigkill_on_stop: bool,
atomic: bool,
):
self.fn = fn
self.queue = queue
self.timeout = timeout
self.sigkill_on_stop = sigkill_on_stop
self.atomic = atomic
self.path = '{}.{}'.format(fn.__module__, fn.__name__)
def __repr__(self) -> str:
return "<TaskWrapper: {}>".format(self.path)
def _build_job(self, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> Job:
# Allow us to override the default values dynamically
timeout = kwargs.pop('django_lightweight_queue_timeout', self.timeout)
sigkill_on_stop = kwargs.pop(
'django_lightweight_queue_sigkill_on_stop',
self.sigkill_on_stop,
)
job = Job(self.path, args, kwargs, timeout, sigkill_on_stop)
job.validate()
return job
def _enqueue_job_instances(
self,
new_jobs: List[Job],
queue_override: Optional[QueueName],
) -> None:
queue = queue_override if queue_override is not None else self.queue
get_backend(queue).bulk_enqueue(new_jobs, queue)
def __call__(self, *args: Any, **kwargs: Any) -> None:
# Allow queue overrides, but you must ensure that this queue will exist
queue = kwargs.pop('django_lightweight_queue_queue', self.queue)
job = self._build_job(args, kwargs)
get_backend(queue).enqueue(job, queue)
def bulk_enqueue(
self,
batch_size: int = 1000,
queue_override: Optional[QueueName] = None,
) -> BulkEnqueueHelper[TCallable]:
"""
Enqueue jobs in bulk.
Use like:
with my_task.bulk_enqueue() as enqueue:
enqueue(the_ids=[42, 43])
enqueue(the_ids=[45, 46])
This is equivalent to:
my_task(the_ids=[42, 43])
my_task(the_ids=[45, 46])
The target queue for the whole batch may be overridden, however the
caller must ensure that the queue actually exists (i.e: has workers).
"""
return BulkEnqueueHelper(self, batch_size, queue_override)