Skip to content

Commit

Permalink
Run integration tests in CI (arnaud-lb#223)
Browse files Browse the repository at this point in the history
Fix tests for librdkafka 1.0.0, run integration tests as part of CI
  • Loading branch information
Steveb-p authored and arnaud-lb committed Dec 6, 2019
1 parent c34e221 commit 33eea10
Show file tree
Hide file tree
Showing 17 changed files with 209 additions and 163 deletions.
5 changes: 5 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ indent_size = 4

[*.md]
trim_trailing_whitespace = false

[*.phpt]
trim_trailing_whitespace = true
indent_style = space
indent_size = 4
25 changes: 19 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,27 @@ php:
- 7.1
- 7.2
- 7.3
- 7.4snapshot
- 7.4
- nightly
matrix:
allow_failures:
- php: nightly
env:
- LIBRDKAFKA_VERSION=0.11.x
- LIBRDKAFKA_VERSION=v1.0.1
- LIBRDKAFKA_VERSION=v1.1.0
- LIBRDKAFKA_VERSION=master
script: ./.travis/build.sh
global:
- TEST_KAFKA_BROKERS=localhost:9092
- TEST_KAFKA_BROKER_VERSION=2.3
- LIBRDKAFKA_REPOSITORY_URL=https://github.com/edenhill/librdkafka.git
- LIBRDKAFKA_VERSION=v1.2.2
matrix:
- LIBRDKAFKA_VERSION=0.11.x
- LIBRDKAFKA_VERSION=v1.0.1
- LIBRDKAFKA_VERSION=v1.1.0
- LIBRDKAFKA_VERSION=v1.2.2
- LIBRDKAFKA_VERSION=master
before_script:
- ./.travis/start-kafka.sh
script:
- ./.travis/build.sh
### Issues with environmental variables causes this copy to be required
- cp tests/test_env.php.sample tests/test_env.php
- ./.travis/test.sh
26 changes: 9 additions & 17 deletions .travis/build.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#!/bin/sh

set -xe
set -xve

if ! [ -d "librdkafka" ]; then
git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-v1.2.2}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}"
fi

git clone --depth 1 --branch "$LIBRDKAFKA_VERSION" https://github.com/edenhill/librdkafka.git
(
cd librdkafka
./configure
make
sudo make install
cd librdkafka
./configure
make
sudo make install
)
sudo ldconfig

Expand All @@ -17,14 +20,3 @@ phpenv config-rm xdebug.ini || true
phpize
CFLAGS='-Werror=implicit-function-declaration' ./configure
make

export PATH=$TRAVIS_BUILD_DIR/.travis:$PATH

showmem=
if grep -q 'cfgfiles.*mem' run-tests.php; then
echo "Will enable the --show-mem flag"
showmem=--show-mem
fi

PHP=$(which php)
REPORT_EXIT_STATUS=1 TEST_PHP_EXECUTABLE="$PHP" "$PHP" run-tests.php -q -m --show-diff $showmem
6 changes: 6 additions & 0 deletions .travis/start-kafka.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/sh

wget http://ftp.man.poznan.pl/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0.tgz
kafka_2.12-2.3.0/bin/zookeeper-server-start.sh -daemon kafka_2.12-2.3.0/config/zookeeper.properties
kafka_2.12-2.3.0/bin/kafka-server-start.sh -daemon kafka_2.12-2.3.0/config/server.properties
14 changes: 14 additions & 0 deletions .travis/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/sh

set -xve

export PATH=$TRAVIS_BUILD_DIR/.travis:$PATH

showmem=
if grep -q 'cfgfiles.*mem' run-tests.php; then
echo "Will enable the --show-mem flag"
showmem=--show-mem
fi

PHP=$(which php)
REPORT_EXIT_STATUS=1 TEST_PHP_EXECUTABLE="$PHP" "$PHP" run-tests.php -q -m --show-diff $showmem
11 changes: 10 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,13 @@ Pull requests should be made against the master branch, which supports both PHP

## Testing

Tests are in phpt format in the tests directory. They can be run by executing `make test`.
Tests are in phpt file format in the tests directory.

### Using your own machine for building and testing.

Tests can be run by following compilation and installation procedure
and executing `make test`.

To run integration tests, make sure you have Kafka instance running.
Then, rename `test_env.php.sample` to `test_env.php` and adjust it
with values proper for your kafka instance.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ while (true) {
// The first argument is the partition (again).
// The second argument is the timeout.
$msg = $topic->consume(0, 1000);
if (null === $msg) {
if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue;
} elseif ($msg->err) {
echo $msg->errstr(), "\n";
Expand Down Expand Up @@ -185,7 +186,8 @@ Next, retrieve the consumed messages from the queue:
while (true) {
// The only argument is the timeout.
$msg = $queue->consume(1000);
if (null === $msg) {
if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue;
} elseif ($msg->err) {
echo $msg->errstr(), "\n";
Expand Down
21 changes: 10 additions & 11 deletions tests/allow_null_payload.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
Allow null payload
--SKIPIF--
<?php
file_exists(__DIR__."/test_env.php") || die("skip");
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php

require __DIR__."/test_env.php";
require __DIR__ . '/integration-tests-check.php';

$topicName = sprintf('test_rdkafka_%s', uniqid());

$producer = new RdKafka\Producer();
$producer->addBrokers(TEST_KAFKA_BROKERS);
$producer->addBrokers(getenv('TEST_KAFKA_BROKERS'));
$topic = $producer->newTopic($topicName);

$topic->produce(0, 0, NULL, 'message_key_1');
Expand All @@ -21,7 +20,7 @@ while ($producer->getOutQLen() > 0) {
}

$consumer = new RdKafka\Consumer();
$consumer->addBrokers(TEST_KAFKA_BROKERS);
$consumer->addBrokers(getenv('TEST_KAFKA_BROKERS'));

$topic = $consumer->newTopic($topicName);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
Expand All @@ -31,16 +30,16 @@ while (true) {
if ($message === null) {
continue;
}
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message->payload);
var_dump($message->key);
break 2;

if (RD_KAFKA_RESP_ERR_NO_ERROR === $message->err) {
var_dump($message->payload);
var_dump($message->key);
break;
}
}

$topic->consumeStop(0);

--EXPECTF--
NULL
string(13) "message_key_1"
string(13) "message_key_1"
21 changes: 10 additions & 11 deletions tests/allow_null_payload_and_key.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
Allow null payload
--SKIPIF--
<?php
file_exists(__DIR__."/test_env.php") || die("skip");
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php

require __DIR__."/test_env.php";
require __DIR__ . '/integration-tests-check.php';

$topicName = sprintf('test_rdkafka_%s', uniqid());

$producer = new RdKafka\Producer();
$producer->addBrokers(TEST_KAFKA_BROKERS);
$producer->addBrokers(getenv('TEST_KAFKA_BROKERS'));
$topic = $producer->newTopic($topicName);

$topic->produce(0, 0);
Expand All @@ -21,7 +20,7 @@ while ($producer->getOutQLen() > 0) {
}

$consumer = new RdKafka\Consumer();
$consumer->addBrokers(TEST_KAFKA_BROKERS);
$consumer->addBrokers(getenv('TEST_KAFKA_BROKERS'));

$topic = $consumer->newTopic($topicName);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
Expand All @@ -31,16 +30,16 @@ while (true) {
if ($message === null) {
continue;
}
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message->payload);
var_dump($message->key);
break 2;

if (RD_KAFKA_RESP_ERR_NO_ERROR === $message->err) {
var_dump($message->payload);
var_dump($message->key);
break;
}
}

$topic->consumeStop(0);

--EXPECTF--
NULL
NULL
NULL
15 changes: 7 additions & 8 deletions tests/bug115.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
Bug 115
--SKIPIF--
<?php
file_exists(__DIR__."/test_env.php") || die("skip");
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php

require __DIR__."/test_env.php";
require __DIR__ . '/integration-tests-check.php';

$delivered = 0;

$conf = new RdKafka\Conf();
if (RD_KAFKA_VERSION >= 0x090000) {
$conf->set('broker.version.fallback', TEST_KAFKA_BROKER_VERSION);
if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
$conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
}
$conf->setErrorCb(function ($producer, $err, $errstr) {
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
Expand All @@ -28,18 +27,18 @@ $conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
$topicName = sprintf("test_rdkafka_%s", uniqid());

$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers(TEST_KAFKA_BROKERS);
$consumer->addBrokers(getenv('TEST_KAFKA_BROKERS'));

$topic = $consumer->newTopic($topicName);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
$msg = $topic->consume(0, 1000);
if ($msg && $msg->err) {
// librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
if (!$msg || RD_KAFKA_RESP_ERR__PARTITION_EOF === $msg->err) {
break;
}
}

$topic->consumeStop(0);
--EXPECT--

2 changes: 1 addition & 1 deletion tests/conf_callbacks.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
RdKafka\Conf
--SKIPIF--
<?php
RD_KAFKA_VERSION >= 0x090000 || die("skip");
RD_KAFKA_VERSION >= 0x090000 || die("skip librdkafka too old");
--FILE--
<?php

Expand Down
73 changes: 34 additions & 39 deletions tests/conf_callbacks_integration.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,18 @@
RdKafka\Conf
--SKIPIF--
<?php
RD_KAFKA_VERSION >= 0x090000 || die("skip");
file_exists(__DIR__."/test_env.php") || die("skip");
RD_KAFKA_VERSION >= 0x090000 || die("skip librdkafka too old");
(!isset($_ENV['TESTS_DONT_SKIP_RISKY']) || $_ENV['TESTS_DONT_SKIP_RISKY']) && die("skip Callbacks often fail and are skipped by default");
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php

require __DIR__."/test_env.php";
require __DIR__ . '/integration-tests-check.php';

$conf = new RdKafka\Conf();

$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf->setDefaultTopicConf($topicConf);
$conf->set('metadata.broker.list', TEST_KAFKA_BROKERS);
$conf->set('auto.offset.reset', 'smallest');
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
$conf->set('group.id', sprintf("test_rdkafka_group_%s", uniqid()));
$conf->set('statistics.interval.ms', 10);

$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) {
echo "Offset " . $topicPartitions[0]->getOffset() . " committed.\n";
});

$consumerLagFound = false;
$conf->setStatsCb(function ($consumer, $json) use (&$consumerLagFound) {
if ($consumerLagFound) {
return;
}

// At some point there should be a consumer lag of 9
if (false !== strpos($json, 'consumer_lag":9')) {
$consumerLagFound = true;
}
});

$producer = new RdKafka\Producer($conf);

Expand All @@ -52,29 +32,44 @@ while ($producer->getOutQLen()) {
// Make sure there is enough time for the stats_cb to pick up the consumer lag
sleep(1);

$conf = new RdKafka\Conf();

$conf->set('auto.offset.reset', 'smallest');
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
$conf->set('group.id', sprintf("test_rdkafka_group_%s", uniqid()));
$conf->set('statistics.interval.ms', 10);

$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) {
echo "Offset " . $topicPartitions[0]->getOffset() . " committed.\n";
});

$statsCbCalled = false;
$conf->setStatsCb(function ($consumer, $json) use (&$statsCbCalled) {
if ($statsCbCalled) {
return;
}

$statsCbCalled = true;
});

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$topicName]);

while (true) {
$msg = $consumer->consume(60 * 1000);
$msg = $consumer->consume(15000);

if (!$msg) {
continue;
if (!$msg || RD_KAFKA_RESP_ERR__PARTITION_EOF === $msg->err) {
break;
}

switch ($msg->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$consumer->commit($msg);

break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
break 2;
default:
throw new Exception($msg->errstr());
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
throw new Exception($msg->errstr(), $msg->err);
}

$consumer->commit($msg);
}

var_dump($consumerLagFound);
var_dump($statsCbCalled);

--EXPECT--
Offset 1 committed.
Expand Down
Loading

0 comments on commit 33eea10

Please sign in to comment.