forked from arnaud-lb/php-rdkafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproduce_consume_queue.phpt
107 lines (87 loc) · 2.89 KB
/
produce_consume_queue.phpt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
--TEST--
Produce, consume queue
--SKIPIF--
<?php
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php
require __DIR__ . '/integration-tests-check.php';
$delivered = 0;
$conf = new RdKafka\Conf();
// Required to detect actual reaching of partition EOF for both topics
$conf->set('enable.partition.eof', 'true');
if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
$conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
}
$conf->setErrorCb(function ($producer, $err, $errstr) {
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
exit;
});
$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
if ($msg->err) {
throw new Exception("Message delivery failed: " . $msg->errstr());
}
$delivered++;
});
$producer = new RdKafka\Producer($conf);
if ($producer->addBrokers(getenv('TEST_KAFKA_BROKERS')) < 1) {
echo "Failed adding brokers\n";
exit;
}
$topicNames = [
sprintf("test_rdkafka_0_%s", uniqid()),
sprintf("test_rdkafka_1_%s", uniqid()),
];
$topics = array_map(function ($topicName) use ($producer) {
return $producer->newTopic($topicName);
}, $topicNames);
if (!$producer->getMetadata(false, reset($topics), 2*1000)) {
echo "Failed to get metadata, is broker down?\n";
}
for ($i = 0; $i < 10; $i++) {
$topics[$i%2]->produce(0, 0, "message $i");
$producer->poll(0);
}
while ($producer->getOutQLen()) {
$producer->poll(50);
}
printf("%d messages delivered\n", $delivered);
$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers(getenv('TEST_KAFKA_BROKERS'));
$queue = $consumer->newQueue();
array_walk($topicNames, function ($topicName) use ($consumer, $queue) {
$topic = $consumer->newTopic($topicName);
$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
});
$messages = [];
$receivedTopicEofs = [];
while (count($receivedTopicEofs) < 2) {
$msg = $queue->consume(15000);
if (!$msg) {
// Still waiting for messages
continue;
}
if (RD_KAFKA_RESP_ERR__PARTITION_EOF === $msg->err) {
// Reached actual EOF
$receivedTopicEofs[$msg->topic_name] = true;
continue;
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
throw new Exception($msg->errstr(), $msg->err);
}
$messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
}
sort($messages);
echo implode("\n", $messages), "\n";
--EXPECTF--
10 messages delivered
Got message: message 0 from test_rdkafka_0_%s
Got message: message 1 from test_rdkafka_1_%s
Got message: message 2 from test_rdkafka_0_%s
Got message: message 3 from test_rdkafka_1_%s
Got message: message 4 from test_rdkafka_0_%s
Got message: message 5 from test_rdkafka_1_%s
Got message: message 6 from test_rdkafka_0_%s
Got message: message 7 from test_rdkafka_1_%s
Got message: message 8 from test_rdkafka_0_%s
Got message: message 9 from test_rdkafka_1_%s