forked from arnaud-lb/php-rdkafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbug115.phpt
44 lines (37 loc) · 1.18 KB
/
bug115.phpt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
--TEST--
Bug 115
--SKIPIF--
<?php
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php
require __DIR__ . '/integration-tests-check.php';
$delivered = 0;
$conf = new RdKafka\Conf();
if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
$conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
}
$conf->setErrorCb(function ($producer, $err, $errstr) {
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
exit;
});
$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
if ($msg->err) {
throw new Exception("Message delivery failed: " . $msg->errstr());
}
$delivered++;
});
$topicName = sprintf("test_rdkafka_%s", uniqid());
$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers(getenv('TEST_KAFKA_BROKERS'));
$topic = $consumer->newTopic($topicName);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$msg = $topic->consume(0, 1000);
// librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
if (!$msg || RD_KAFKA_RESP_ERR__PARTITION_EOF === $msg->err) {
break;
}
}
$topic->consumeStop(0);
--EXPECT--