Skip to content

Commit d6372d8

Browse files
authored
Fix #516: Ensure Redis driver messages are consumed at least once
1 parent 8f4c350 commit d6372d8

File tree

4 files changed

+69
-2
lines changed

4 files changed

+69
-2
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ Yii2 Queue Extension Change Log
33

44
2.3.8 under development
55
-----------------------
6-
6+
- Enh #516: Ensure Redis driver messages are consumed at least once (soul11201)
77
- Bug #522: Fix SQS driver type error with custom value passed to `queue/listen` (flaviovs)
88

99

src/drivers/redis/Queue.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,10 @@ protected function moveExpired($from)
170170
{
171171
$now = time();
172172
if ($expired = $this->redis->zrevrangebyscore($from, $now, '-inf')) {
173-
$this->redis->zremrangebyscore($from, '-inf', $now);
174173
foreach ($expired as $id) {
175174
$this->redis->rpush("$this->channel.waiting", $id);
176175
}
176+
$this->redis->zremrangebyscore($from, '-inf', $now);
177177
}
178178
}
179179

tests/drivers/redis/QueueTest.php

+50
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use tests\app\RetryJob;
1111
use tests\drivers\CliTestCase;
1212
use Yii;
13+
use yii\di\Instance;
1314
use yii\queue\redis\Queue;
1415

1516
/**
@@ -137,4 +138,53 @@ protected function tearDown()
137138
$this->getQueue()->redis->flushdb();
138139
parent::tearDown();
139140
}
141+
142+
/**
143+
* Verify that Redis data persists when process crashes during moveExpired.
144+
*
145+
* Steps:
146+
* 1. Push a delayed job into queue
147+
* 2. Wait for the job to expire
148+
* 3. Mock Redis to simulate crash during moveExpired
149+
* 4. Successfully process job after recovery
150+
*/
151+
public function testConsumeDelayedMessageAtLeastOnce()
152+
{
153+
$job = $this->createSimpleJob();
154+
$this->getQueue()->delay(1)->push($job);
155+
// Expect a single message to be received.
156+
$messageCount = 0;
157+
$this->getQueue()->messageHandler = function () use(&$messageCount) {
158+
$messageCount++;
159+
};
160+
161+
// Ensure the delayed message can be consumed when more time passed than the delay is.
162+
sleep(2);
163+
164+
// Based on the implemention, emulate a crash when redis "rpush"
165+
// command should be executed.
166+
$mockRedis = Instance::ensure([
167+
'class' => RedisCrashMock::class,
168+
'hostname' => getenv('REDIS_HOST') ?: 'localhost',
169+
'database' => getenv('REDIS_DB') ?: 1,
170+
'crashOnCommand' => 'rpush' // Crash when trying to move job to waiting queue.
171+
], 'yii\redis\Connection');
172+
173+
$queue = $this->getQueue();
174+
$old = $queue->redis;
175+
$queue->redis = $mockRedis;
176+
177+
try {
178+
$queue->run(false);
179+
} catch (\Exception $e) {
180+
// Ignore exceptions.
181+
} finally {
182+
$queue->redis = $old;
183+
}
184+
185+
// Ensure the red lock is invalid. The red lock is valid for 1s.
186+
sleep(2);
187+
$this->getQueue()->run(false);
188+
$this->assertEquals(1, $messageCount);
189+
}
140190
}
+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
namespace tests\drivers\redis;
3+
4+
use yii\redis\Connection;
5+
6+
class RedisCrashMock extends Connection
7+
{
8+
public $crashOnCommand;
9+
10+
public function executeCommand($name, $params = [])
11+
{
12+
if ($name === $this->crashOnCommand) {
13+
throw new \RuntimeException('Simulated Redis crash');
14+
}
15+
return parent::executeCommand($name, $params);
16+
}
17+
}

0 commit comments

Comments
 (0)