Skip to content

Commit

Permalink
[MO] - [refactor] -> ClientViaHttp to ClientViaExec (strimzi#1910)
Browse files Browse the repository at this point in the history
* [MO] - [refactor] - ClientViaHttp to ClientViaExec

* [MO] - [refactor] -> httpCall to podCall

* [MO] - [refactor] -> delete whole test-client

* [MO] - [system test] -> remove from refactor

* [MO] - [refactor] -> renamed method

* [MO] - [refactor] -> fix dependencies

* [MO] - [refactor] -> fixed

* [MO] - [refactor] -> fix makefile

* [MO] - [refactor] -> fix messages

* [MO] - [system test] -> MirrorMaker fix

* [MO] - [system test] -> TopicsST fixed, KafkaST fixed, MBST clean, MMST fixed

* [MO] - [system test] -> fix imports

* [MO] - [system test] -> MM fix of messages

* [MO] - [system test] -> fix#check

* [MO] - [system test] -> fix
  • Loading branch information
see-quick authored and scholzj committed Aug 28, 2019
1 parent ebf4075 commit 0d32031
Show file tree
Hide file tree
Showing 23 changed files with 252 additions and 919 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ifneq ($(RELEASE_VERSION),latest)
GITHUB_VERSION = $(RELEASE_VERSION)
endif

SUBDIRS=kafka-agent mirror-maker-agent crd-annotations test crd-generator api mockkube certificate-manager operator-common cluster-operator topic-operator user-operator kafka-init test-client docker-images helm-charts install examples metrics
SUBDIRS=kafka-agent mirror-maker-agent crd-annotations test crd-generator api mockkube certificate-manager operator-common cluster-operator topic-operator user-operator kafka-init docker-images helm-charts install examples metrics
DOCKER_TARGETS=docker_build docker_push docker_tag

all: $(SUBDIRS)
Expand Down
10 changes: 1 addition & 9 deletions docker-images/test-client/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
FROM strimzi/kafka:latest

ARG strimzi_version=1.0-SNAPSHOT
ENV STRIMZI_VERSION ${strimzi_version}
ENV JAVA_OPTS "-DLOG_LEVEL=info"

# copy scripts for starting Kafka
COPY scripts $KAFKA_HOME
COPY scripts/dynamic_resources.sh /bin/dynamic_resources.sh

ADD tmp/test-client-${STRIMZI_VERSION}.jar /test-client.jar

USER 1001

EXPOSE 4242/tcp

CMD ["/opt/kafka/launch_java.sh", "/test-client.jar"]
USER 1001
13 changes: 0 additions & 13 deletions docker-images/test-client/Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
PROJECT_NAME=test-client

include ../../Makefile.os

clean:
rm -rf lib
rm -rf tmp
rm -f .*.tmp

.test-client.tmp: ../../test-client/target/test-client*.jar
test -d tmp || mkdir tmp
$(CP) -f ../../test-client/target/test-client*.jar -d tmp
touch .test-client.tmp

docker_build: .test-client.tmp

include ../../Makefile.docker

.PHONY: build clean release
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
<module>kafka-agent</module>
<module>mirror-maker-agent</module>
<module>test</module>
<module>test-client</module>
<module>crd-annotations</module>
<module>crd-generator</module>
<module>api</module>
Expand Down
171 changes: 71 additions & 100 deletions systemtest/src/main/java/io/strimzi/systemtest/MessagingBaseST.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,15 @@
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.systemtest.clients.api.ClientArgument;
import io.strimzi.systemtest.clients.api.ClientArgumentMap;
import io.strimzi.systemtest.clients.api.MsgCliApiClient;
import io.strimzi.systemtest.clients.api.VerifiableClient;
import io.strimzi.test.TestUtils;
import io.vertx.core.json.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.BeforeAll;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.strimzi.systemtest.clients.api.ClientType.CLI_KAFKA_VERIFIABLE_CONSUMER;
import static io.strimzi.systemtest.clients.api.ClientType.CLI_KAFKA_VERIFIABLE_PRODUCER;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

/**
Expand All @@ -31,107 +24,112 @@
public class MessagingBaseST extends AbstractST {
private static final Logger LOGGER = LogManager.getLogger(MessagingBaseST.class);

private MsgCliApiClient cliApiClient;
private JsonObject response;
private int sent = 0;
private int received = 0;

public void setResponse(JsonObject response) {
this.response = response;
/**
* Simple availability check for kafka cluster
* @param clusterName cluster name
*/
void availabilityTest(String clusterName) throws Exception {
availabilityTest(100, clusterName, false, "my-topic", null);
}

@BeforeAll
public void setUpClientBase() throws MalformedURLException {
String clientUrl = Environment.KUBERNETES_DOMAIN.equals(Environment.KUBERNETES_DOMAIN_DEFAULT) ? new URL(CONFIG.getMasterUrl()).getHost() + Environment.KUBERNETES_DOMAIN_DEFAULT : Environment.KUBERNETES_DOMAIN;
cliApiClient = new MsgCliApiClient(new URL("http://" + Constants.KAFKA_CLIENTS + "." + clientUrl + ":80"));
/**
* Simple availability check for kafka cluster
* @param messageCount message count
* @param clusterName cluster name
*/
void availabilityTest(int messageCount, String clusterName) throws Exception {
availabilityTest(messageCount, clusterName, false, "my-topic", null);
}

/**
* Simple availability check for kafka cluster
* @param messageCount message count
* @param clusterName cluster name
* @param topicName topic name
*/
void availabilityTest(String clusterName) throws Exception {
availabilityTest(100, 20000, clusterName, false, "my-topic", null);
void availabilityTest(int messageCount, String clusterName, String topicName) throws Exception {
availabilityTest(messageCount, clusterName, false, topicName, null);
}

/**
* Simple availability check for kafka cluster
* @param messageCount message count
* @param timeout timeout
* @param clusterName cluster name
* @param tlsListener option for tls listener inside kafka cluster
* @param topicName topic name
* @param user user for tls if it's used for messages
*/
void availabilityTest(int messageCount, long timeout, String clusterName) throws Exception {
availabilityTest(messageCount, timeout, clusterName, false, "my-topic", null);
void availabilityTest(int messageCount, String clusterName, boolean tlsListener, String topicName, KafkaUser user) throws Exception {
final String defaultKafkaClientsPodName =
kubeClient().listPodsByPrefixInName(CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS).get(0).getMetadata().getName();
sent = sendMessages(messageCount, clusterName, tlsListener, topicName, user, defaultKafkaClientsPodName);
received = receiveMessages(messageCount, clusterName, tlsListener, topicName, user, defaultKafkaClientsPodName);
assertSentAndReceivedMessages(sent, received);
}

/**
* Simple availability check for kafka cluster
* @param messageCount message count
* @param timeout timeout for producer and consumer to be finished
* @param clusterName cluster name
* @param tlsListener option for tls listener inside kafka cluster
* @param topicName topic name
* @param user user for tls if it's used for messages
* @param podName name of the pod
*/
void availabilityTest(int messageCount, long timeout, String clusterName, boolean tlsListener, String topicName, KafkaUser user) throws Exception {
sendMessages(messageCount, timeout, clusterName, tlsListener, topicName, user);
receiveMessages(messageCount, timeout, clusterName, tlsListener, topicName, user);
void availabilityTest(int messageCount, String clusterName, boolean tlsListener, String topicName, KafkaUser user, String podName) throws Exception {
sent = sendMessages(messageCount, clusterName, tlsListener, topicName, user, podName);
received = receiveMessages(messageCount, clusterName, tlsListener, topicName, user, podName);
assertSentAndReceivedMessages(sent, received);
}

/**
* Method for send messages to specific kafka cluster. It uses test-client API for communication with deployed clients inside kubernetes cluster
* @param messageCount messages count
* @param timeout timeout for producer to be finished
* @param clusterName cluster name
* @param tlsListener option for tls listener inside kafka cluster
* @param topicName topic name
* @param user user for tls if it's used for messages
* @return count of send and acknowledged messages
*/
int sendMessages(int messageCount, long timeout, String clusterName, boolean tlsListener, String topicName, KafkaUser user) throws Exception {
int sendMessages(int messageCount, String clusterName, boolean tlsListener, String topicName, KafkaUser user, String podName) throws Exception {
String bootstrapServer = tlsListener ? clusterName + "-kafka-bootstrap:9093" : clusterName + "-kafka-bootstrap:9092";
ClientArgumentMap producerArguments = new ClientArgumentMap();
producerArguments.put(ClientArgument.BROKER_LIST, bootstrapServer);
producerArguments.put(ClientArgument.TOPIC, topicName);
producerArguments.put(ClientArgument.MAX_MESSAGES, Integer.toString(messageCount));

VerifiableClient producer = new VerifiableClient(CLI_KAFKA_VERIFIABLE_PRODUCER);
VerifiableClient producer = new VerifiableClient(CLI_KAFKA_VERIFIABLE_PRODUCER,
podName,
kubeClient().getNamespace());

if (user != null) {
producerArguments.put(ClientArgument.USER, user.getMetadata().getName().replace("-", "_"));
}

producer.setArguments(producerArguments);

LOGGER.info("Sending {} messages to {}#{}", messageCount, bootstrapServer, topicName);
response = cliApiClient.sendAndGetStatus(producer);

waitTillProcessFinish(getClientUUID(response), "producer", timeout);

assertThat(String.format("Return code of sender is not 0: %s", response),
response.getInteger("ecode"), is(0));

sent = getSentMessagesCount(response, messageCount);
boolean hasPassed = producer.run();
LOGGER.info("Producer finished correctly: {}", hasPassed);

assertThat(String.format("Sent (%s) and expected (%s) message count is not equal", sent, messageCount),
sent == messageCount);
sent = getSentMessagesCount(producer.getMessages().toString(), messageCount);

LOGGER.info("Sent {} messages", sent);
return sent;
}

/**
* Method for receive messages from specific kafka cluster. It uses test-client API for communication with deployed clients inside kubernetes cluster
* @param messageCount message count
* @param timeout timeout for consumer to be finished
* @param clusterName cluster name
* @param tlsListener option for tls listener inside kafka cluster
* @param topicName topic name
* @param user user for tls if it's used for messages
* @return count of received messages
*/
int receiveMessages(int messageCount, long timeout, String clusterName, boolean tlsListener, String topicName, KafkaUser user) throws Exception {
int receiveMessages(int messageCount, String clusterName, boolean tlsListener, String topicName, KafkaUser user, String podName) {
String bootstrapServer = tlsListener ? clusterName + "-kafka-bootstrap:9093" : clusterName + "-kafka-bootstrap:9092";
ClientArgumentMap consumerArguments = new ClientArgumentMap();
consumerArguments.put(ClientArgument.BROKER_LIST, bootstrapServer);
Expand All @@ -143,7 +141,9 @@ int receiveMessages(int messageCount, long timeout, String clusterName, boolean
consumerArguments.put(ClientArgument.TOPIC, topicName);
consumerArguments.put(ClientArgument.MAX_MESSAGES, Integer.toString(messageCount));

VerifiableClient consumer = new VerifiableClient(CLI_KAFKA_VERIFIABLE_CONSUMER);
VerifiableClient consumer = new VerifiableClient(CLI_KAFKA_VERIFIABLE_CONSUMER,
podName,
kubeClient().getNamespace());

if (user != null) {
consumerArguments.put(ClientArgument.USER, user.getMetadata().getName().replace("-", "_"));
Expand All @@ -152,44 +152,35 @@ int receiveMessages(int messageCount, long timeout, String clusterName, boolean
consumer.setArguments(consumerArguments);

LOGGER.info("Wait for receive {} messages from {}#{}", messageCount, bootstrapServer, topicName);
response = cliApiClient.sendAndGetStatus(consumer);

waitTillProcessFinish(getClientUUID(response), "consumer", timeout);

assertThat(String.format("Return code of receiver is not 0: %s", response),
response.getInteger("ecode"), is(0));

received = getReceivedMessagesCount(response);
boolean hasPassed = consumer.run();
LOGGER.info("Consumer finished correctly: {}", hasPassed);

assertThat(String.format("Received (%s) and expected (%s) message count is not equal", received, messageCount),
received == messageCount);
received = getReceivedMessagesCount(consumer.getMessages().toString());

LOGGER.info("Received {} messages", received);
return received;
}

private String getClientUUID(JsonObject response) {
return response.getString("UUID");
}

/**
* Checks if process containing producer/consumer inside client pod finished or not
* @param processUuid process uuid
* @param description description for wait method
* @param timeout timeout
* Assert count of sent and received messages
* @param sent count of sent messages
* @param received count of received messages
*/
private void waitTillProcessFinish(String processUuid, String description, long timeout) {
TestUtils.waitFor("Wait till " + description + " finished", Constants.GLOBAL_POLL_INTERVAL, timeout, () -> {
JsonObject out;
try {
out = cliApiClient.getClientInfo(processUuid);
setResponse(out);
return !out.getBoolean("isRunning");
} catch (Exception e) {
e.printStackTrace();
return false;
}
});
void assertSentAndReceivedMessages(int sent, int received) {
assertThat(String.format("Sent (%s) and receive (%s) message count is not equal", sent, received),
sent == received);
}

private boolean allowParameter(String minimalVersion) {
Pattern pattern = Pattern.compile("(?<major>[0-9]).(?<minor>[0-9]).(?<micro>[0-9])");
Matcher current = pattern.matcher(Environment.ST_KAFKA_VERSION);
Matcher minimal = pattern.matcher(minimalVersion);
if (current.find() && minimal.find()) {
return Integer.valueOf(current.group("major")) >= Integer.valueOf(minimal.group("major"))
&& Integer.valueOf(current.group("minor")) >= Integer.valueOf(minimal.group("minor"))
&& Integer.valueOf(current.group("micro")) >= Integer.valueOf(minimal.group("micro"));
}
return false;
}

/**
Expand All @@ -198,16 +189,16 @@ private void waitTillProcessFinish(String processUuid, String description, long
* @param messageCount expected message count
* @return count of acknowledged messages
*/
private int getSentMessagesCount(JsonObject response, int messageCount) {
private int getSentMessagesCount(String response, int messageCount) {
int sentMessages;
String sentPattern = String.format("sent\":(%s)", messageCount);
String ackPattern = String.format("acked\":(%s)", messageCount);
Pattern r = Pattern.compile(sentPattern);
Matcher m = r.matcher(response.getString("stdOut"));
Matcher m = r.matcher(response);
sentMessages = m.find() ? Integer.parseInt(m.group(1)) : -1;

r = Pattern.compile(ackPattern);
m = r.matcher(response.getString("stdOut"));
m = r.matcher(response);

if (m.find()) {
return sentMessages == Integer.parseInt(m.group(1)) ? sentMessages : -1;
Expand All @@ -222,36 +213,16 @@ private int getSentMessagesCount(JsonObject response, int messageCount) {
* @param response response
* @return count of received messages
*/
private int getReceivedMessagesCount(JsonObject response) {
private int getReceivedMessagesCount(String response) {
int receivedMessages = 0;
String pattern = String.format("records_consumed\",\"count\":([0-9]*)");
Pattern r = Pattern.compile(pattern);
Matcher m = r.matcher(response.getString("stdOut"));
while (m.find()) {
receivedMessages += Integer.parseInt(m.group(1));
}
return receivedMessages;
}
Matcher m = r.matcher(response);

/**
* Assert count of sent and received messages
* @param sent count of sent messages
* @param received count of received messages
*/
void assertSentAndReceivedMessages(int sent, int received) {
assertThat(String.format("Sent (%s) and receive (%s) message count is not equal", sent, received),
sent == received);
}

private boolean allowParameter(String minimalVersion) {
Pattern pattern = Pattern.compile("(?<major>[0-9]).(?<minor>[0-9]).(?<micro>[0-9])");
Matcher current = pattern.matcher(Environment.ST_KAFKA_VERSION);
Matcher minimal = pattern.matcher(minimalVersion);
if (current.find() && minimal.find()) {
return Integer.valueOf(current.group("major")) >= Integer.valueOf(minimal.group("major"))
&& Integer.valueOf(current.group("minor")) >= Integer.valueOf(minimal.group("minor"))
&& Integer.valueOf(current.group("micro")) >= Integer.valueOf(minimal.group("micro"));
if (m.find()) {
receivedMessages = Integer.parseInt(m.group(1));
}
return false;

return receivedMessages;
}
}
Loading

0 comments on commit 0d32031

Please sign in to comment.