Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
danhunsaker committed Dec 20, 2019
2 parents fdca42b + 25ff307 commit 7dd1474
Show file tree
Hide file tree
Showing 16 changed files with 468 additions and 117 deletions.
14 changes: 11 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
language: php
php:
- 5.6
- 7.0
- 7.1
- '7.4'
- '7.3'
- '7.2'
- '7.1'
- '7.0'
- '5.6'
- hhvm
matrix:
exclude:
- php: hhvm
env: ENABLE_REDIS_EXT=1
allow_failures:
- php: '7.4'
- php: '7.3'
- php: '7.2'
- php: hhvm
env:
- ENABLE_REDIS_EXT=0
- ENABLE_REDIS_EXT=1
Expand Down
41 changes: 22 additions & 19 deletions HOWITWORKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,51 +106,54 @@ How do the workers process the queues?
8. `Resque_Job->fail()` returns control to the worker (still in
`Resque_Worker::work()`) without a value
- Job
1. The job calls `Resque_Worker->perform()` with the `Resque_Job` as
1. `Resque_Job_PID` is created, registering the PID of the actual process
doing the job.
2. The job calls `Resque_Worker->perform()` with the `Resque_Job` as
its only argument.
2. `Resque_Worker->perform()` sets up a `try...catch` block so it can
3. `Resque_Worker->perform()` sets up a `try...catch` block so it can
properly handle exceptions by marking jobs as failed (by calling
`Resque_Job->fail()`, as above)
3. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
4. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
`afterFork` event
4. Still inside the `try...catch`, `Resque_Worker->perform()` calls
5. Still inside the `try...catch`, `Resque_Worker->perform()` calls
`Resque_Job->perform()` with no arguments
5. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
6. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
arguments
6. If `Resque_Job->getInstance()` has already been called, it returns
7. If `Resque_Job->getInstance()` has already been called, it returns
the existing instance; otherwise:
7. `Resque_Job->getInstance()` checks that the job's class (type)
8. `Resque_Job->getInstance()` checks that the job's class (type)
exists and has a `perform()` method; if not, in either case, it
throws an exception which will be caught by
`Resque_Worker->perform()`
8. `Resque_Job->getInstance()` creates an instance of the job's class,
9. `Resque_Job->getInstance()` creates an instance of the job's class,
and initializes it with a reference to the `Resque_Job` itself, the
job's arguments (which it gets by calling
`Resque_Job->getArguments()`, which in turn simply returns the value
of `args[0]`, or an empty array if no arguments were passed), and
the queue name
9. `Resque_Job->getInstance()` returns control, along with the job
10. `Resque_Job->getInstance()` returns control, along with the job
class instance, to `Resque_Job->perform()`
10. `Resque_Job->perform()` sets up its own `try...catch` block to
11. `Resque_Job->perform()` sets up its own `try...catch` block to
handle `Resque_Job_DontPerform` exceptions; any other exceptions are
passed up to `Resque_Worker->perform()`
11. `Resque_Job->perform()` triggers a `beforePerform` event
12. `Resque_Job->perform()` calls `setUp()` on the instance, if it
12. `Resque_Job->perform()` triggers a `beforePerform` event
13. `Resque_Job->perform()` calls `setUp()` on the instance, if it
exists
13. `Resque_Job->perform()` calls `perform()` on the instance
14. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
14. `Resque_Job->perform()` calls `perform()` on the instance
15. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
exists
15. `Resque_Job->perform()` triggers an `afterPerform` event
16. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
16. `Resque_Job->perform()` triggers an `afterPerform` event
17. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
exceptions by returning control, and the value `FALSE`, to
`Resque_Worker->perform()`; any other situation returns the value
`TRUE` along with control, instead
17. The `try...catch` block in `Resque_Worker->perform()` ends
18. `Resque_Worker->perform()` updates the job status from `RUNNING` to
18. The `try...catch` block in `Resque_Worker->perform()` ends
19. `Resque_Worker->perform()` updates the job status from `RUNNING` to
`COMPLETE`, then returns control, with no value, to the worker
(again still in `Resque_Worker::work()`)
19. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
20. `Resque_Job_PID()` is removed, the forked process will terminate soon
cleanly
21. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
- SPECIAL CASE: Non-forking OS (Windows)
1. Same as the job above, except it doesn't call `exit(0)` when done
7. `Resque_Worker::work()` calls `Resque_Worker->doneWorking()` with no
Expand Down
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,19 @@ Statuses are available for up to 24 hours after a job has completed or failed,
and are then automatically expired. A status can also forcefully be expired by
calling the `stop()` method on a status class.

### Obtaining job PID ###

You can obtain the PID of the actual process doing the work through `Resque_Job_PID`. On a forking OS this will be the
PID of the forked process.

CAUTION: on a non-forking OS, the PID returned will be of the worker itself.

```php
echo Resque_Job_PID::get($token);
```

Function returns `0` if the `perform` hasn't started yet, or if it has already ended.

## Workers

Workers work in the exact same way as the Ruby workers. For complete
Expand Down Expand Up @@ -300,6 +313,15 @@ custom prefix to separate the Resque data:
$ PREFIX=my-app-name bin/resque
```

### Setting Redis backend ###

When you have the Redis database on a different host than the one the workers
are running, you must set the `REDIS_BACKEND` environment variable:

```sh
$ REDIS_BACKEND=my-redis-ip:my-redis-port bin/resque
```

### Forking

Similarly to the Ruby versions, supported platforms will immediately fork after
Expand Down
13 changes: 6 additions & 7 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@
"role": "Creator"
}
],
"repositories": [
{
"type": "vcs",
"url": "https://github.com/chrisboulton/credis"
}
],
"require": {
"php": ">=5.3.0",
"ext-pcntl": "*",
Expand All @@ -53,5 +47,10 @@
"psr-0": {
"Resque": "lib"
}
}
},
"extra": {
"branch-alias": {
"dev-master": "1.0-dev"
}
}
}
2 changes: 1 addition & 1 deletion demo/resque.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
require 'job.php';
require 'php_error_job.php';

require '../bin/resque';
require __DIR__ . '/../bin/resque';
72 changes: 49 additions & 23 deletions lib/Resque.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ class Resque
*/
protected static $redisDatabase = 0;

/**
* @var string auth of Redis database
*/
protected static $auth;

/**
* Given a host/port combination separated by a colon, set it as
* the redis server that Resque will talk to.
Expand All @@ -37,11 +42,13 @@ class Resque
* and returns a Resque_Redis instance, or
* a nested array of servers with host/port pairs.
* @param int $database
* @param string $auth
*/
public static function setBackend($server, $database = 0)
public static function setBackend($server, $database = 0, $auth = null)
{
self::$redisServer = $server;
self::$redisDatabase = $database;
self::$auth = $auth;
self::$redis = null;
}

Expand All @@ -62,6 +69,10 @@ public static function redis()
self::$redis = new Resque_Redis(self::$redisServer, self::$redisDatabase);
}

if (!empty(self::$auth)) {
self::$redis->auth(self::$auth);
}

return self::$redis;
}

Expand Down Expand Up @@ -141,9 +152,9 @@ public static function pop($queue)
public static function dequeue($queue, $items = Array())
{
if(count($items) > 0) {
return self::removeItems($queue, $items);
return self::removeItems($queue, $items);
} else {
return self::removeList($queue);
return self::removeList($queue);
}
}

Expand Down Expand Up @@ -213,10 +224,11 @@ public static function size($queue)
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $trackStatus Set to true to be able to monitor the status of a job.
* @param string $prefix The prefix needs to be set for the status key
*
* @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue
*/
public static function enqueue($queue, $class, $args = null, $trackStatus = false)
public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "")
{
$id = Resque::generateJobId();
$hookParams = array(
Expand All @@ -232,7 +244,7 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals
return false;
}

Resque_Job::create($queue, $class, $args, $trackStatus, $id);
Resque_Job::create($queue, $class, $args, $trackStatus, $id, $prefix);
Resque_Event::trigger('afterEnqueue', $hookParams);

return $id;
Expand Down Expand Up @@ -263,6 +275,20 @@ public static function queues()
return $queues;
}

/**
* Retrieve all the items of a queue with Redis
*
* @return array Array of items.
*/
public static function items($queue, $start = 0, $stop = -1)
{
$list = self::redis()->lrange('queue:' . $queue, $start, $stop);
if(!is_array($list)) {
$list = array();
}
return $list;
}

/**
* Remove Items from the queue
* Safely moving each item to a temporary queue before processing it
Expand Down Expand Up @@ -317,7 +343,7 @@ private static function removeItems($queue, $items = Array())

/**
* matching item
* item can be ['class'] or ['class' => 'id'] or ['class' => {:foo => 1, :bar => 2}]
* item can be ['class'] or ['class' => 'id'] or ['class' => {'foo' => 1, 'bar' => 2}]
* @private
*
* @params string $string redis result in json
Expand All @@ -330,24 +356,24 @@ private static function matchItem($string, $items)
$decoded = json_decode($string, true);

foreach($items as $key => $val) {
# class name only ex: item[0] = ['class']
if (is_numeric($key)) {
if($decoded['class'] == $val) {
return true;
}
# class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}]
} elseif (is_array($val)) {
$decodedArgs = (array)$decoded['args'][0];
if ($decoded['class'] == $key &&
count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) {
return true;
# class name only ex: item[0] = ['class']
if (is_numeric($key)) {
if($decoded['class'] == $val) {
return true;
}
# class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}]
} elseif (is_array($val)) {
$decodedArgs = (array)$decoded['args'][0];
if ($decoded['class'] == $key &&
count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) {
return true;
}
# class name with ID, example: item[0] = ['class' => 'id']
} else {
if ($decoded['class'] == $key && $decoded['id'] == $val) {
return true;
}
}
# class name with ID, example: item[0] = ['class' => 'id']
} else {
if ($decoded['class'] == $key && $decoded['id'] == $val) {
return true;
}
}
}
return false;
}
Expand Down
14 changes: 14 additions & 0 deletions lib/Resque/Failure.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ public static function create($payload, Exception $exception, Resque_Worker $wor
new $backend($payload, $exception, $worker, $queue);
}

/**
* Create a new failed job on the backend from PHP 7 errors.
*
* @param object $payload The contents of the job that has just failed.
* @param \Error $exception The PHP 7 error generated when the job failed to run.
* @param \Resque_Worker $worker Instance of Resque_Worker that was running this job when it failed.
* @param string $queue The name of the queue that this job was fetched from.
*/
public static function createFromError($payload, Error $exception, Resque_Worker $worker, $queue)
{
$backend = self::getBackend();
new $backend($payload, $exception, $worker, $queue);
}

/**
* Return an instance of the backend for saving job failures.
*
Expand Down
2 changes: 1 addition & 1 deletion lib/Resque/Failure/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Resque_Failure_Redis implements Resque_Failure_Interface
public function __construct($payload, $exception, $worker, $queue)
{
$data = new stdClass;
$data->failed_at = strftime('%a %b %d %H:%M:%S %Z %Y');
$data->failed_at = date('c');
$data->payload = $payload;
$data->exception = get_class($exception);
$data->error = $exception->getMessage();
Expand Down
Loading

0 comments on commit 7dd1474

Please sign in to comment.