Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support to delay a job and enqueue it N seconds later #211

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
5c8c401
add support to delay a job and enqueue it N seconds later
orlandobcrra Mar 24, 2019
1e75c7a
getting latest job with certain status
kevteg Apr 22, 2019
211ac8f
Moving method to job methods
kevteg Apr 23, 2019
4e8696d
updating name and version
kevteg Apr 23, 2019
b7eb6c9
0.9.12 version
kevteg Apr 23, 2019
556c40e
using query instead of status and path
kevteg Apr 23, 2019
5074b4c
042419IC Add support to delete success or cancel jobs that are alread…
icardenas Apr 24, 2019
cd1a56d
042419IC draft test for check test_delete_expired_jobs
icardenas Apr 24, 2019
91001d9
Update Version 0.9.13
icardenas Apr 24, 2019
6c17383
Update Version 0.9.14
icardenas Apr 24, 2019
1377520
Delete TODO word
icardenas Apr 25, 2019
78a1e87
Merge pull request #2 from apploitech/042419IC_add_remove_expires_jobs
icardenas Apr 25, 2019
c038608
042419IC verify version of python to iterate dictionary
icardenas Apr 25, 2019
7beb029
042419IC Update version
icardenas Apr 25, 2019
5b36b23
Update utils.py
icardenas Apr 26, 2019
42957f3
update version
icardenas Apr 26, 2019
7d51f8f
042419IC Fix and update version
icardenas Apr 26, 2019
87fe03b
Merge pull request #3 from apploitech/042419IC_fix_dictionary_iterati…
icardenas Apr 26, 2019
4eca72b
Add status maxretries in endpoint '/api/datatables/taskexceptions'
icardenas Apr 26, 2019
721ba39
Add status maxretries in endpoint '/api/datatables/taskexceptions'
icardenas Apr 26, 2019
e9fe738
Add the status field in the search for exceptions
icardenas Apr 28, 2019
e76e1a0
042619IC Update Version
icardenas Apr 28, 2019
fc95b0a
minor fix bundle
icardenas Apr 28, 2019
5d1bffc
Merge pull request #4 from apploitech/042619IC_add_maxretries_in_task…
icardenas Apr 28, 2019
74d913f
Adding get latest jobs based on a query
Darking360 May 3, 2019
45bc6c7
Adding sort
Darking360 May 6, 2019
6c85adf
Increasing MRQ lib version
Darking360 May 6, 2019
5ca8555
Merge pull request #5 from apploitech/050319MR_add_latest_jobs_query
Darking360 May 6, 2019
63fa50a
Add Redis SSL support
ghabrielv Jan 6, 2021
634c217
Increasing MRQ lib version
ghabrielv Jan 6, 2021
6fdfd29
Fix pypy portable url
ghabrielv Jan 6, 2021
5076764
Update dockerfile
ghabrielv Jan 6, 2021
ff2a3a4
Update dockerfile
ghabrielv Jan 6, 2021
ddda42a
Update library psutil
ghabrielv Jan 6, 2021
adc365e
Update requirements and dockerfile
ghabrielv Jan 6, 2021
964a14b
Specific log description content type
ghabrielv Jan 8, 2021
dd8cb5f
Version 0.9.23 and redis password
ghabrielv Jan 8, 2021
78485ac
Merge pull request #6 from apploitech/redis-ssl
ghabrielv Mar 1, 2021
aa98be8
Sentry cleanup (#7)
ghabrielv Mar 3, 2021
c902409
Upgrade psutil to 5.8.0 (#8)
ghabrielv May 19, 2021
e6d05e9
A3S-1593 - if "retry_current_job" is call outside of an MRQ context, …
orlandobcrra Jul 20, 2022
3389b17
A3S-2518 Remove print line and document the process of generating a n…
ghabrielv Feb 15, 2023
bd6ba3d
Update processes.py
orlandobcrra Apr 4, 2023
37ee776
A3S-2202: version upgrade
Apr 4, 2023
b8ecdf4
A3S-2753: Updated redis to 4.5.5 version
ghabrielv May 9, 2023
8b19f1e
A3S-2749: Settled MRQ version to 0.9.32
ghabrielv May 9, 2023
4143816
Merge pull request #13 from apploitech/A3S-2753
Sarahoyos May 11, 2023
adf6ec9
A3S-2753 Fixed redis SSL connection (#14)
ghabrielv May 15, 2023
c1d55de
A3S-2753: Fixed redis SSL connection (#15)
ghabrielv May 17, 2023
4f68e94
A3S-843: update pymongo to support python 3.11
May 30, 2023
e630216
Merge pull request #16 from apploitech/a3s-843_update_pymongo
jefarr-apploi Jun 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add support to delay a job and enqueue it N seconds later
orlandobcrra committed Mar 24, 2019
commit 5c8c401c142af79c96d287144c473adfabad7587
7 changes: 7 additions & 0 deletions docs/jobs-maintenance.md
Original file line number Diff line number Diff line change
@@ -7,6 +7,13 @@ To do that, you should add these recurring scheduled jobs to your mrq-config.py:
```
SCHEDULER_TASKS = [

# This will queue jobs in the 'delayed' status.
{
"path": "mrq.basetasks.cleaning.QueueDelayedJobs",
"params": {},
"interval": 60
},

# This will requeue jobs in the 'retry' status, until they reach their max_retries.
{
"path": "mrq.basetasks.cleaning.RequeueRetryJobs",
8 changes: 7 additions & 1 deletion docs/jobs.md
Original file line number Diff line number Diff line change
@@ -8,8 +8,9 @@ A **Job** is an instance of the execution of a Task. It must link to a specific

MRQ defines a list of statuses for jobs. A job can only be in one of them at a time.

When everything goes fine, a job will go through 3 statuses:
When everything goes fine, a job will go through 4 statuses:

* ```delayed```: The Job has been created and it is delayed to be queued later.
* ```queued```: The Job has been created and it is waiting to be dequeued by a Worker.
* ```started```: A Worker has dequeued the job and started executing it.
* ```success```: The job was successfully ran.
@@ -51,6 +52,11 @@ Queues a job. If `queue` is not provided, the default queue for that Task as def

Queues multiple jobs at once. Returns a list of IDs of the jobs.

* `queue_job(main_task_path, params, delay=120, queue=None)`

Create a job with `delayed` status, the job will be queued after of at least `delay` seconds.
Remember to add the base delayed job as explained in [Jobs maintenance](jobs-maintenance.md) to have `delayed` jobs actually queued.

* `queue_raw_jobs(queue, params_list, batch_size=1000)`

Queues multiple jobs at once on a [raw queue](queues.md#raw-queues). The queued jobs have no IDs on a raw queue so this function has no return.
14 changes: 14 additions & 0 deletions mrq/basetasks/cleaning.py
Original file line number Diff line number Diff line change
@@ -35,6 +35,20 @@ def run(self, params):
})


class QueueDelayedJobs(Task):

""" Requeue jobs that were marked as delayed. """

max_concurrency = 1

def run(self, params):
return run_task("mrq.basetasks.utils.JobAction", {
"status": "delayed",
"dateretry": {"$lte": datetime.datetime.utcnow()},
"action": "requeue"
})


class RequeueStartedJobs(Task):

""" Requeue jobs that were marked as status=started and never finished.
3 changes: 3 additions & 0 deletions mrq/dashboard/static/js/views/jobs.js
Original file line number Diff line number Diff line change
@@ -347,6 +347,9 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
if (source.time) {
display.push("cputime "+String(source.time).substring(0,6)+"s ("+source.switches+" switches)");
}
if (source.status === 'retry' || source.status === 'delayed') {
display.push("requeue "+moment.utc(source.dateretry).fromNow());
}

return "<small>" + display.join("<br/>") + "</small>";

1 change: 1 addition & 0 deletions mrq/dashboard/templates/index.html
Original file line number Diff line number Diff line change
@@ -391,6 +391,7 @@ <h4 class="modal-title"></h4>
<select class="form-control input-sm js-datatable-filters-status" id="jobs-form-status">
<option <%= filters.status==""?"selected='selected'":"" %> value="">-statuses-</option>
<% _.each({
"delayed": "delayed",
"queued": "queued",
"started": "started",
"success": "success",
28 changes: 19 additions & 9 deletions mrq/job.py
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@
from . import context


FINAL_STATUSES = {"timeout", "abort", "failed", "success", "interrupt", "retry", "maxretries", "maxconcurrency"}
FINAL_STATUSES = {"timeout", "abort", "failed", "success", "interrupt", "retry", "maxretries", "maxconcurrency", "delayed"}
TRANSIENT_STATUSES = {"cancel", "queued", "started"}


@@ -682,7 +682,7 @@ def set_queues_size(size_by_queues, action="incr"):
pipe.expire("queuesize:%s" % queue, context.get_current_config().get("queue_ttl"))
pipe.execute()

def queue_jobs(main_task_path, params_list, queue=None, batch_size=1000):
def queue_jobs(main_task_path, params_list, delay=0, queue=None, batch_size=1000):
""" Queue multiple jobs on a regular queue """
if len(params_list) == 0:
return []
@@ -702,14 +702,24 @@ def queue_jobs(main_task_path, params_list, queue=None, batch_size=1000):

context.metric("jobs.status.queued", len(params_group))

jobs_data = []
for params in params_group:
job_data = {
"path": main_task_path,
"params": params,
"queue": queue,
"datequeued": datetime.datetime.utcnow(),
"status": "queued"
}
if delay and delay > 0:
dateretry = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
job_data['status'] = 'delayed'
job_data['dateretry'] = dateretry

jobs_data.append(job_data)

# Insert the job in MongoDB
job_ids = Job.insert([{
"path": main_task_path,
"params": params,
"queue": queue,
"datequeued": datetime.datetime.utcnow(),
"status": "queued"
} for params in params_group], w=1, return_jobs=False)
job_ids = Job.insert(jobs_data, w=1, return_jobs=False)

all_ids += job_ids