Skip to content

Commit

Permalink
Merge pull request #1225 from onlyann/fetch-jobs-indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Oct 19, 2024
2 parents ea37f88 + c815369 commit 98fd11e
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from django.db import migrations

from .. import migrations_utils


class Migration(migrations.Migration):
operations = [
migrations_utils.RunProcrastinateSQL(
name="02.14.01_01_add_indexes_for_fetch_job.sql"
)
]
name = "0031_add_indexes_for_fetch_job"
dependencies = [("procrastinate", "0030_alter_procrastinateevent_options")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- recreate procrastinate_jobs_id_lock_idx index by adding aborting status to the filter so that it can be used by the fetch job function
CREATE INDEX procrastinate_jobs_id_lock_idx_temp ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status, 'aborting'::procrastinate_job_status]);
DROP INDEX procrastinate_jobs_id_lock_idx;
ALTER INDEX procrastinate_jobs_id_lock_idx_temp RENAME TO procrastinate_jobs_id_lock_idx;

-- add index to avoid seq scan of outer query in the fetch job function
CREATE INDEX procrastinate_jobs_priority_idx ON procrastinate_jobs(priority desc, id asc) WHERE (status = 'todo'::procrastinate_job_status);
4 changes: 3 additions & 1 deletion procrastinate/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (
CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing';

CREATE INDEX procrastinate_jobs_queue_name_idx ON procrastinate_jobs(queue_name);
CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status]);
CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status, 'aborting'::procrastinate_job_status]);
CREATE INDEX procrastinate_jobs_priority_idx ON procrastinate_jobs(priority desc, id asc) WHERE (status = 'todo'::procrastinate_job_status);


CREATE INDEX procrastinate_events_job_id_fkey ON procrastinate_events(job_id);

Expand Down
8 changes: 4 additions & 4 deletions tests/acceptance/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ async def test_abort(async_app):
@async_app.task(queue="default", name="task1", pass_context=True)
async def task1(context):
while True:
await asyncio.sleep(0.1)
await asyncio.sleep(0.02)
if await context.should_abort_async():
raise JobAborted

@async_app.task(queue="default", name="task2", pass_context=True)
def task2(context):
while True:
time.sleep(0.1)
time.sleep(0.02)
if context.should_abort():
raise JobAborted

Expand All @@ -130,11 +130,11 @@ def task2(context):
async_app.run_worker_async(queues=["default"], wait=False)
)

await asyncio.sleep(0.1)
await asyncio.sleep(0.05)
result = await async_app.job_manager.cancel_job_by_id_async(job1_id, abort=True)
assert result is True

await asyncio.sleep(0.1)
await asyncio.sleep(0.05)
result = await async_app.job_manager.cancel_job_by_id_async(job2_id, abort=True)
assert result is True

Expand Down

0 comments on commit 98fd11e

Please sign in to comment.