From e27ad93c1bd93624acfa9ef9ee67239cf4301dc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Kr=C3=BCger=20Svensson?= Date: Mon, 1 Apr 2024 12:23:14 +0200 Subject: [PATCH] feat: add job_id to JobDef, closing #376 (#378) --- arq/connections.py | 1 + arq/jobs.py | 6 +++++- arq/worker.py | 3 +++ tests/test_jobs.py | 24 +++++++++++++++++++----- tests/test_main.py | 9 ++++++--- 5 files changed, 34 insertions(+), 9 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index d4fc4434..69ac8ce2 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -193,6 +193,7 @@ async def _get_job_def(self, job_id: bytes, score: int) -> JobDef: assert v is not None, f'job "{key}" not found' jd = deserialize_job(v, deserializer=self.job_deserializer) jd.score = score + jd.job_id = job_id.decode() return jd async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef]: diff --git a/arq/jobs.py b/arq/jobs.py index 8028cbe7..d0c0a5ef 100644 --- a/arq/jobs.py +++ b/arq/jobs.py @@ -47,6 +47,7 @@ class JobDef: job_try: int enqueue_time: datetime score: Optional[int] + job_id: Optional[str] def __post_init__(self) -> None: if isinstance(self.score, float): @@ -60,7 +61,6 @@ class JobResult(JobDef): start_time: datetime finish_time: datetime queue_name: str - job_id: Optional[str] = None class Job: @@ -238,6 +238,7 @@ def serialize_result( finished_ms: int, ref: str, queue_name: str, + job_id: str, *, serializer: Optional[Serializer] = None, ) -> Optional[bytes]: @@ -252,6 +253,7 @@ def serialize_result( 'st': start_ms, 'ft': finished_ms, 'q': queue_name, + 'id': job_id, } if serializer is None: serializer = pickle.dumps @@ -281,6 +283,7 @@ def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) -> job_try=d['t'], enqueue_time=ms_to_datetime(d['et']), score=None, + job_id=None, ) except Exception as e: raise DeserializationError('unable to deserialize job') from e @@ -315,6 +318,7 @@ def deserialize_result(r: bytes, *, deserializer: Optional[Deserializer] = None) start_time=ms_to_datetime(d['st']), finish_time=ms_to_datetime(d['ft']), queue_name=d.get('q', ''), + job_id=d.get('id', ''), ) except Exception as e: raise DeserializationError('unable to deserialize job result') from e diff --git a/arq/worker.py b/arq/worker.py index 398409b5..7ff5393a 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -501,6 +501,7 @@ async def job_failed(exc: BaseException) -> None: ref=f'{job_id}:{function_name}', serializer=self.job_serializer, queue_name=self.queue_name, + job_id=job_id, ) await asyncio.shield(self.finish_failed_job(job_id, result_data_)) @@ -556,6 +557,7 @@ async def job_failed(exc: BaseException) -> None: timestamp_ms(), ref, self.queue_name, + job_id=job_id, serializer=self.job_serializer, ) return await asyncio.shield(self.finish_failed_job(job_id, result_data)) @@ -649,6 +651,7 @@ async def job_failed(exc: BaseException) -> None: finished_ms, ref, self.queue_name, + job_id=job_id, serializer=self.job_serializer, ) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 634a8b03..f8f6c8c4 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -2,7 +2,7 @@ import pickle import pytest -from dirty_equals import IsNow +from dirty_equals import IsNow, IsStr from arq import Worker, func from arq.connections import ArqRedis, RedisSettings, create_pool @@ -89,6 +89,7 @@ async def foobar(ctx, *args, **kwargs): finish_time=IsNow(tz='utc'), score=None, queue_name=expected_queue_name, + job_id=IsStr(), ) results = await arq_redis.all_job_results() assert results == [ @@ -139,9 +140,9 @@ class Foobar: def __getstate__(self): raise TypeError("this doesn't pickle") - r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue') + r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', 'job_1') assert isinstance(r1, bytes) - r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue') + r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', 'job_1') assert r2 is None @@ -154,7 +155,19 @@ def custom_serializer(x): return b'0123456789' r1 = serialize_result( - 'foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', serializer=custom_serializer + 'foobar', + (1,), + {}, + 1, + 123, + True, + Foobar(), + 123, + 123, + 'testing', + 'test-queue', + 'job_1', + serializer=custom_serializer, ) assert r1 == b'0123456789' r2 = serialize_result( @@ -169,6 +182,7 @@ def custom_serializer(x): 123, 'testing', 'test-queue', + 'job_1', serializer=custom_serializer, ) assert r2 == b'0123456789' @@ -213,7 +227,7 @@ async def test_get_job_result(arq_redis: ArqRedis): async def test_result_pole_delay_dep(arq_redis: ArqRedis): j = Job('foobar', arq_redis) - r = serialize_result('foobar', (1,), {}, 1, 123, True, 42, 123, 123, 'testing', 'test-queue') + r = serialize_result('foobar', (1,), {}, 1, 123, True, 42, 123, 123, 'testing', 'test-queue', 'job_1') await arq_redis.set(result_key_prefix + j.job_id, r) with pytest.warns( DeprecationWarning, match='"pole_delay" is deprecated, use the correct spelling "poll_delay" instead' diff --git a/tests/test_main.py b/tests/test_main.py index 7c3a9835..198c815b 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -238,11 +238,11 @@ async def foobar(ctx): async def test_get_jobs(arq_redis: ArqRedis): - await arq_redis.enqueue_job('foobar', a=1, b=2, c=3) + await arq_redis.enqueue_job('foobar', a=1, b=2, c=3, _job_id='1') await asyncio.sleep(0.01) - await arq_redis.enqueue_job('second', 4, b=5, c=6) + await arq_redis.enqueue_job('second', 4, b=5, c=6, _job_id='2') await asyncio.sleep(0.01) - await arq_redis.enqueue_job('third', 7, b=8) + await arq_redis.enqueue_job('third', 7, b=8, _job_id='3') jobs = await arq_redis.queued_jobs() assert [dataclasses.asdict(j) for j in jobs] == [ { @@ -252,6 +252,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'job_try': None, 'enqueue_time': IsNow(tz='utc'), 'score': IsInt(), + 'job_id': '1', }, { 'function': 'second', @@ -260,6 +261,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'job_try': None, 'enqueue_time': IsNow(tz='utc'), 'score': IsInt(), + 'job_id': '2', }, { 'function': 'third', @@ -268,6 +270,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'job_try': None, 'enqueue_time': IsNow(tz='utc'), 'score': IsInt(), + 'job_id': '3', }, ] assert jobs[0].score < jobs[1].score < jobs[2].score