forked from arnaud-lb/php-rdkafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconf_callbacks_integration.phpt
85 lines (66 loc) · 2.1 KB
/
conf_callbacks_integration.phpt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
--TEST--
RdKafka\Conf
--SKIPIF--
<?php
RD_KAFKA_VERSION >= 0x090000 || die("skip librdkafka too old");
(!isset($_ENV['TESTS_DONT_SKIP_RISKY']) || $_ENV['TESTS_DONT_SKIP_RISKY']) && die("skip Callbacks often fail and are skipped by default");
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php
require __DIR__ . '/integration-tests-check.php';
$conf = new RdKafka\Conf();
$conf->set('auto.offset.reset', 'smallest');
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
$conf->set('group.id', sprintf("test_rdkafka_group_%s", uniqid()));
$producer = new RdKafka\Producer($conf);
$topicName = sprintf("test_rdkafka_%s", uniqid());
$topic = $producer->newTopic($topicName);
for ($i = 0; $i < 10; $i++) {
$topic->produce(0, 0, "message $i");
$producer->poll(0);
}
while ($producer->getOutQLen()) {
$producer->poll(50);
}
// Make sure there is enough time for the stats_cb to pick up the consumer lag
sleep(1);
$conf = new RdKafka\Conf();
$conf->set('auto.offset.reset', 'smallest');
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
$conf->set('group.id', sprintf("test_rdkafka_group_%s", uniqid()));
$conf->set('statistics.interval.ms', 10);
$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) {
echo "Offset " . $topicPartitions[0]->getOffset() . " committed.\n";
});
$statsCbCalled = false;
$conf->setStatsCb(function ($consumer, $json) use (&$statsCbCalled) {
if ($statsCbCalled) {
return;
}
$statsCbCalled = true;
});
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$topicName]);
while (true) {
$msg = $consumer->consume(15000);
if (!$msg || RD_KAFKA_RESP_ERR__PARTITION_EOF === $msg->err) {
break;
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
throw new Exception($msg->errstr(), $msg->err);
}
$consumer->commit($msg);
}
var_dump($statsCbCalled);
--EXPECT--
Offset 1 committed.
Offset 2 committed.
Offset 3 committed.
Offset 4 committed.
Offset 5 committed.
Offset 6 committed.
Offset 7 committed.
Offset 8 committed.
Offset 9 committed.
Offset 10 committed.
bool(true)