Skip to content
This repository has been archived by the owner on Apr 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #28 from Carlosedo/add-job-by-name
Browse files Browse the repository at this point in the history
Add job by name
  • Loading branch information
igalarzab authored Nov 13, 2016
2 parents 96d0457 + f1038b1 commit 745f1a8
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 10 deletions.
6 changes: 3 additions & 3 deletions sqjobs/brokers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ def gen_job_id(self):
"""
return str(uuid4())

def serialize_job(self, job_class, job_id, args, kwargs):
def serialize_job(self, job_name, job_id, args, kwargs):
"""
Serialize a job into a string to be sent to the broker
:param job_class: python class of the payload job
:param job_name: python class of the payload job
:param job_id: the ID of the job
:param args: arguments of the job
:param kwargs: keyword arguments of the job
"""
return self.connector.serialize_job(job_class, job_id, args, kwargs)
return self.connector.serialize_job(job_name, job_id, args, kwargs)

def unserialize_job(self, job_class, queue_name, payload):
"""
Expand Down
9 changes: 7 additions & 2 deletions sqjobs/brokers/standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ def __repr__(self):
)

def add_job(self, job_class, *args, **kwargs):
job_id = self.gen_job_id()
job_name = job_class._task_name()
queue_name = kwargs.pop('queue_name', job_class.default_queue_name)

payload = self.serialize_job(job_class, job_id, args, kwargs)
return self.add_job_by_name(job_name, queue_name, *args, **kwargs)

def add_job_by_name(self, job_name, queue_name, *args, **kwargs):
job_id = self.gen_job_id()

payload = self.serialize_job(job_name, job_id, args, kwargs)
self.connector.enqueue(queue_name, payload)

result = JobResult()
Expand Down
4 changes: 2 additions & 2 deletions sqjobs/connectors/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ def set_retry_time(self, queue_name, message_id, delay):
self.retried_jobs.setdefault(queue_name, []).append((message_id, delay))
self.num_retried_jobs += 1

def serialize_job(self, job_class, job_id, args, kwargs):
def serialize_job(self, job_name, job_id, args, kwargs):
return {
'id': job_id,
'name': job_class._task_name(),
'name': job_name,
'args': args,
'kwargs': kwargs
}
Expand Down
4 changes: 2 additions & 2 deletions sqjobs/connectors/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ def set_retry_time(self, queue_name, message_id, delay):

logger.info('Changed retry time of a message from queue %s', queue_name)

def serialize_job(self, job_class, job_id, args, kwargs):
def serialize_job(self, job_name, job_id, args, kwargs):
return {
'id': job_id,
'name': job_class._task_name(),
'name': job_name,
'args': args,
'kwargs': kwargs
}
Expand Down
39 changes: 39 additions & 0 deletions sqjobs/tests/broker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ def test_add_job_to_broker(self):
messages = broker.connector.jobs['sqjobs']
assert len(messages) == 1

def test_add_job_by_name_to_broker(self):
broker = StandardBroker(self.connector)
broker.add_job_by_name('Adder', 'sqjobs', 2, 3)

queues = list(broker.connector.jobs.keys())
assert len(queues) == 1
assert queues[0] == 'sqjobs'
assert broker.connector.num_jobs == 1

messages = broker.connector.jobs['sqjobs']
assert len(messages) == 1

def test_right_payload_args_when_job_is_added(self):
broker = StandardBroker(self.connector)
broker.add_job(Adder, 2, 3)
Expand All @@ -45,6 +57,15 @@ def test_right_payload_args_when_job_is_added(self):

assert message == {'args': (2, 3), 'kwargs': {}, 'name': 'adder'}

def test_right_payload_args_when_job_is_added_by_name(self):
broker = StandardBroker(self.connector)
broker.add_job_by_name('adder', 'sqjobs', 2, 3)

message = broker.connector.jobs['sqjobs'][0]
del message['id'] # Check that exists, but we don't care about the value

assert message == {'args': (2, 3), 'kwargs': {}, 'name': 'adder'}

def test_right_payload_kwargs_when_job_is_added(self):
broker = StandardBroker(self.connector)
broker.add_job(Adder, num2=2, num1=3)
Expand All @@ -54,6 +75,15 @@ def test_right_payload_kwargs_when_job_is_added(self):

assert message == {'args': (), 'kwargs': {'num1': 3, 'num2': 2}, 'name': 'adder'}

def test_right_payload_kwargs_when_job_is_added_by_name(self):
broker = StandardBroker(self.connector)
broker.add_job_by_name('adder', 'sqjobs', num2=2, num1=3)

message = broker.connector.jobs['sqjobs'][0]
del message['id'] # Check that exists, but we don't care about the value

assert message == {'args': (), 'kwargs': {'num1': 3, 'num2': 2}, 'name': 'adder'}

def test_right_payload_both_when_job_is_added(self):
broker = StandardBroker(self.connector)
broker.add_job(Adder, 2, num2=3)
Expand All @@ -63,6 +93,15 @@ def test_right_payload_both_when_job_is_added(self):

assert message == {'args': (2,), 'kwargs': {'num2': 3}, 'name': 'adder'}

def test_right_payload_both_when_job_is_added_by_name(self):
broker = StandardBroker(self.connector)
broker.add_job_by_name('adder', 'sqjobs', 2, num2=3)

message = broker.connector.jobs['sqjobs'][0]
del message['id'] # Check that exists, but we don't care about the value

assert message == {'args': (2,), 'kwargs': {'num2': 3}, 'name': 'adder'}

def test_multiple_jobs_are_stored_correctly_by_the_broker(self):
broker = StandardBroker(self.connector)
job_ids, jobs = [], []
Expand Down
2 changes: 1 addition & 1 deletion sqjobs/tests/connector_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def test_serialize_job_returns_valid_json(self):
}

serializer = sqs_connector.serialize_job(
job_class=Adder,
job_name='adder',
job_id=15,
args=[3, 4],
kwargs={'first_param': 1, 'second_param': 'two'}
Expand Down

0 comments on commit 745f1a8

Please sign in to comment.