From 0d32031c9b3c1ba52792d10a6f806c31b6bf0421 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Ors=C3=A1k=20Maro=C5=A1?=
<30839163+Seequick1@users.noreply.github.com>
Date: Wed, 28 Aug 2019 14:05:10 +0200
Subject: [PATCH] [MO] - [refactor] -> ClientViaHttp to ClientViaExec (#1910)
* [MO] - [refactor] - ClientViaHttp to ClientViaExec
* [MO] - [refactor] -> httpCall to podCall
* [MO] - [refactor] -> delete whole test-client
* [MO] - [system test] -> remove from refactor
* [MO] - [refactor] -> renamed method
* [MO] - [refactor] -> fix dependencies
* [MO] - [refactor] -> fixed
* [MO] - [refactor] -> fix makefile
* [MO] - [refactor] -> fix messages
* [MO] - [system test] -> MirrorMaker fix
* [MO] - [system test] -> TopicsST fixed, KafkaST fixed, MBST clean, MMST fixed
* [MO] - [system test] -> fix imports
* [MO] - [system test] -> MM fix of messages
* [MO] - [system test] -> fix#check
* [MO] - [system test] -> fix
---
Makefile | 2 +-
docker-images/test-client/Dockerfile | 10 +-
docker-images/test-client/Makefile | 13 --
pom.xml | 1 -
.../strimzi/systemtest/MessagingBaseST.java | 171 ++++++--------
.../java/io/strimzi/systemtest/Resources.java | 41 ++--
.../clients/api/MsgCliApiClient.java | 211 ------------------
.../clients/api/VerifiableClient.java | 124 ++++++++--
.../java/io/strimzi/systemtest/KafkaST.java | 41 ++--
.../io/strimzi/systemtest/MirrorMakerST.java | 71 +++---
.../io/strimzi/systemtest/SecurityST.java | 2 +-
.../java/io/strimzi/systemtest/TopicST.java | 2 +-
test-client/Makefile | 11 -
test-client/pom.xml | 65 ------
test-client/scripts/consumer.sh | 67 ------
test-client/scripts/dynamic_resources.sh | 17 --
.../scripts/kafka_tls_prepare_certificates.sh | 45 ----
test-client/scripts/launch_java.sh | 37 ---
test-client/scripts/log4j.properties | 8 -
test-client/scripts/producer.sh | 62 -----
.../test_client/HttpClientsListener.java | 141 ------------
.../java/io/strimzi/test_client/Main.java | 14 --
.../src/main/resources/log4j2.properties | 15 --
23 files changed, 252 insertions(+), 919 deletions(-)
delete mode 100644 systemtest/src/main/java/io/strimzi/systemtest/clients/api/MsgCliApiClient.java
delete mode 100644 test-client/Makefile
delete mode 100644 test-client/pom.xml
delete mode 100755 test-client/scripts/consumer.sh
delete mode 100755 test-client/scripts/dynamic_resources.sh
delete mode 100755 test-client/scripts/kafka_tls_prepare_certificates.sh
delete mode 100755 test-client/scripts/launch_java.sh
delete mode 100644 test-client/scripts/log4j.properties
delete mode 100755 test-client/scripts/producer.sh
delete mode 100644 test-client/src/main/java/io/strimzi/test_client/HttpClientsListener.java
delete mode 100644 test-client/src/main/java/io/strimzi/test_client/Main.java
delete mode 100644 test-client/src/main/resources/log4j2.properties
diff --git a/Makefile b/Makefile
index bc543c161db..3d13a70fb39 100644
--- a/Makefile
+++ b/Makefile
@@ -11,7 +11,7 @@ ifneq ($(RELEASE_VERSION),latest)
GITHUB_VERSION = $(RELEASE_VERSION)
endif
-SUBDIRS=kafka-agent mirror-maker-agent crd-annotations test crd-generator api mockkube certificate-manager operator-common cluster-operator topic-operator user-operator kafka-init test-client docker-images helm-charts install examples metrics
+SUBDIRS=kafka-agent mirror-maker-agent crd-annotations test crd-generator api mockkube certificate-manager operator-common cluster-operator topic-operator user-operator kafka-init docker-images helm-charts install examples metrics
DOCKER_TARGETS=docker_build docker_push docker_tag
all: $(SUBDIRS)
diff --git a/docker-images/test-client/Dockerfile b/docker-images/test-client/Dockerfile
index 71ac52115f4..13094f7d9aa 100644
--- a/docker-images/test-client/Dockerfile
+++ b/docker-images/test-client/Dockerfile
@@ -1,17 +1,9 @@
FROM strimzi/kafka:latest
-ARG strimzi_version=1.0-SNAPSHOT
-ENV STRIMZI_VERSION ${strimzi_version}
ENV JAVA_OPTS "-DLOG_LEVEL=info"
# copy scripts for starting Kafka
COPY scripts $KAFKA_HOME
COPY scripts/dynamic_resources.sh /bin/dynamic_resources.sh
-ADD tmp/test-client-${STRIMZI_VERSION}.jar /test-client.jar
-
-USER 1001
-
-EXPOSE 4242/tcp
-
-CMD ["/opt/kafka/launch_java.sh", "/test-client.jar"]
\ No newline at end of file
+USER 1001
\ No newline at end of file
diff --git a/docker-images/test-client/Makefile b/docker-images/test-client/Makefile
index a921d32d010..e57c507e365 100644
--- a/docker-images/test-client/Makefile
+++ b/docker-images/test-client/Makefile
@@ -1,19 +1,6 @@
PROJECT_NAME=test-client
include ../../Makefile.os
-
-clean:
- rm -rf lib
- rm -rf tmp
- rm -f .*.tmp
-
-.test-client.tmp: ../../test-client/target/test-client*.jar
- test -d tmp || mkdir tmp
- $(CP) -f ../../test-client/target/test-client*.jar -d tmp
- touch .test-client.tmp
-
-docker_build: .test-client.tmp
-
include ../../Makefile.docker
.PHONY: build clean release
diff --git a/pom.xml b/pom.xml
index 5b799e6f55d..1fe59aafd56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,7 +116,6 @@
kafka-agent
mirror-maker-agent
test
- test-client
crd-annotations
crd-generator
api
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/MessagingBaseST.java b/systemtest/src/main/java/io/strimzi/systemtest/MessagingBaseST.java
index 5bd2b28b8cb..7daf89f4b95 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/MessagingBaseST.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/MessagingBaseST.java
@@ -7,22 +7,15 @@
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.systemtest.clients.api.ClientArgument;
import io.strimzi.systemtest.clients.api.ClientArgumentMap;
-import io.strimzi.systemtest.clients.api.MsgCliApiClient;
import io.strimzi.systemtest.clients.api.VerifiableClient;
-import io.strimzi.test.TestUtils;
-import io.vertx.core.json.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.junit.jupiter.api.BeforeAll;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static io.strimzi.systemtest.clients.api.ClientType.CLI_KAFKA_VERIFIABLE_CONSUMER;
import static io.strimzi.systemtest.clients.api.ClientType.CLI_KAFKA_VERIFIABLE_PRODUCER;
-import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
/**
@@ -31,107 +24,112 @@
public class MessagingBaseST extends AbstractST {
private static final Logger LOGGER = LogManager.getLogger(MessagingBaseST.class);
- private MsgCliApiClient cliApiClient;
- private JsonObject response;
private int sent = 0;
private int received = 0;
- public void setResponse(JsonObject response) {
- this.response = response;
+ /**
+ * Simple availability check for kafka cluster
+ * @param clusterName cluster name
+ */
+ void availabilityTest(String clusterName) throws Exception {
+ availabilityTest(100, clusterName, false, "my-topic", null);
}
- @BeforeAll
- public void setUpClientBase() throws MalformedURLException {
- String clientUrl = Environment.KUBERNETES_DOMAIN.equals(Environment.KUBERNETES_DOMAIN_DEFAULT) ? new URL(CONFIG.getMasterUrl()).getHost() + Environment.KUBERNETES_DOMAIN_DEFAULT : Environment.KUBERNETES_DOMAIN;
- cliApiClient = new MsgCliApiClient(new URL("http://" + Constants.KAFKA_CLIENTS + "." + clientUrl + ":80"));
+ /**
+ * Simple availability check for kafka cluster
+ * @param messageCount message count
+ * @param clusterName cluster name
+ */
+ void availabilityTest(int messageCount, String clusterName) throws Exception {
+ availabilityTest(messageCount, clusterName, false, "my-topic", null);
}
/**
* Simple availability check for kafka cluster
+ * @param messageCount message count
* @param clusterName cluster name
+ * @param topicName topic name
*/
- void availabilityTest(String clusterName) throws Exception {
- availabilityTest(100, 20000, clusterName, false, "my-topic", null);
+ void availabilityTest(int messageCount, String clusterName, String topicName) throws Exception {
+ availabilityTest(messageCount, clusterName, false, topicName, null);
}
/**
* Simple availability check for kafka cluster
* @param messageCount message count
- * @param timeout timeout
* @param clusterName cluster name
+ * @param tlsListener option for tls listener inside kafka cluster
+ * @param topicName topic name
+ * @param user user for tls if it's used for messages
*/
- void availabilityTest(int messageCount, long timeout, String clusterName) throws Exception {
- availabilityTest(messageCount, timeout, clusterName, false, "my-topic", null);
+ void availabilityTest(int messageCount, String clusterName, boolean tlsListener, String topicName, KafkaUser user) throws Exception {
+ final String defaultKafkaClientsPodName =
+ kubeClient().listPodsByPrefixInName(CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS).get(0).getMetadata().getName();
+ sent = sendMessages(messageCount, clusterName, tlsListener, topicName, user, defaultKafkaClientsPodName);
+ received = receiveMessages(messageCount, clusterName, tlsListener, topicName, user, defaultKafkaClientsPodName);
+ assertSentAndReceivedMessages(sent, received);
}
/**
* Simple availability check for kafka cluster
* @param messageCount message count
- * @param timeout timeout for producer and consumer to be finished
* @param clusterName cluster name
* @param tlsListener option for tls listener inside kafka cluster
* @param topicName topic name
* @param user user for tls if it's used for messages
+ * @param podName name of the pod
*/
- void availabilityTest(int messageCount, long timeout, String clusterName, boolean tlsListener, String topicName, KafkaUser user) throws Exception {
- sendMessages(messageCount, timeout, clusterName, tlsListener, topicName, user);
- receiveMessages(messageCount, timeout, clusterName, tlsListener, topicName, user);
+ void availabilityTest(int messageCount, String clusterName, boolean tlsListener, String topicName, KafkaUser user, String podName) throws Exception {
+ sent = sendMessages(messageCount, clusterName, tlsListener, topicName, user, podName);
+ received = receiveMessages(messageCount, clusterName, tlsListener, topicName, user, podName);
assertSentAndReceivedMessages(sent, received);
}
/**
* Method for send messages to specific kafka cluster. It uses test-client API for communication with deployed clients inside kubernetes cluster
* @param messageCount messages count
- * @param timeout timeout for producer to be finished
* @param clusterName cluster name
* @param tlsListener option for tls listener inside kafka cluster
* @param topicName topic name
* @param user user for tls if it's used for messages
* @return count of send and acknowledged messages
*/
- int sendMessages(int messageCount, long timeout, String clusterName, boolean tlsListener, String topicName, KafkaUser user) throws Exception {
+ int sendMessages(int messageCount, String clusterName, boolean tlsListener, String topicName, KafkaUser user, String podName) throws Exception {
String bootstrapServer = tlsListener ? clusterName + "-kafka-bootstrap:9093" : clusterName + "-kafka-bootstrap:9092";
ClientArgumentMap producerArguments = new ClientArgumentMap();
producerArguments.put(ClientArgument.BROKER_LIST, bootstrapServer);
producerArguments.put(ClientArgument.TOPIC, topicName);
producerArguments.put(ClientArgument.MAX_MESSAGES, Integer.toString(messageCount));
- VerifiableClient producer = new VerifiableClient(CLI_KAFKA_VERIFIABLE_PRODUCER);
+ VerifiableClient producer = new VerifiableClient(CLI_KAFKA_VERIFIABLE_PRODUCER,
+ podName,
+ kubeClient().getNamespace());
if (user != null) {
producerArguments.put(ClientArgument.USER, user.getMetadata().getName().replace("-", "_"));
}
producer.setArguments(producerArguments);
-
LOGGER.info("Sending {} messages to {}#{}", messageCount, bootstrapServer, topicName);
- response = cliApiClient.sendAndGetStatus(producer);
-
- waitTillProcessFinish(getClientUUID(response), "producer", timeout);
-
- assertThat(String.format("Return code of sender is not 0: %s", response),
- response.getInteger("ecode"), is(0));
- sent = getSentMessagesCount(response, messageCount);
+ boolean hasPassed = producer.run();
+ LOGGER.info("Producer finished correctly: {}", hasPassed);
- assertThat(String.format("Sent (%s) and expected (%s) message count is not equal", sent, messageCount),
- sent == messageCount);
+ sent = getSentMessagesCount(producer.getMessages().toString(), messageCount);
- LOGGER.info("Sent {} messages", sent);
return sent;
}
/**
* Method for receive messages from specific kafka cluster. It uses test-client API for communication with deployed clients inside kubernetes cluster
* @param messageCount message count
- * @param timeout timeout for consumer to be finished
* @param clusterName cluster name
* @param tlsListener option for tls listener inside kafka cluster
* @param topicName topic name
* @param user user for tls if it's used for messages
* @return count of received messages
*/
- int receiveMessages(int messageCount, long timeout, String clusterName, boolean tlsListener, String topicName, KafkaUser user) throws Exception {
+ int receiveMessages(int messageCount, String clusterName, boolean tlsListener, String topicName, KafkaUser user, String podName) {
String bootstrapServer = tlsListener ? clusterName + "-kafka-bootstrap:9093" : clusterName + "-kafka-bootstrap:9092";
ClientArgumentMap consumerArguments = new ClientArgumentMap();
consumerArguments.put(ClientArgument.BROKER_LIST, bootstrapServer);
@@ -143,7 +141,9 @@ int receiveMessages(int messageCount, long timeout, String clusterName, boolean
consumerArguments.put(ClientArgument.TOPIC, topicName);
consumerArguments.put(ClientArgument.MAX_MESSAGES, Integer.toString(messageCount));
- VerifiableClient consumer = new VerifiableClient(CLI_KAFKA_VERIFIABLE_CONSUMER);
+ VerifiableClient consumer = new VerifiableClient(CLI_KAFKA_VERIFIABLE_CONSUMER,
+ podName,
+ kubeClient().getNamespace());
if (user != null) {
consumerArguments.put(ClientArgument.USER, user.getMetadata().getName().replace("-", "_"));
@@ -152,44 +152,35 @@ int receiveMessages(int messageCount, long timeout, String clusterName, boolean
consumer.setArguments(consumerArguments);
LOGGER.info("Wait for receive {} messages from {}#{}", messageCount, bootstrapServer, topicName);
- response = cliApiClient.sendAndGetStatus(consumer);
-
- waitTillProcessFinish(getClientUUID(response), "consumer", timeout);
-
- assertThat(String.format("Return code of receiver is not 0: %s", response),
- response.getInteger("ecode"), is(0));
- received = getReceivedMessagesCount(response);
+ boolean hasPassed = consumer.run();
+ LOGGER.info("Consumer finished correctly: {}", hasPassed);
- assertThat(String.format("Received (%s) and expected (%s) message count is not equal", received, messageCount),
- received == messageCount);
+ received = getReceivedMessagesCount(consumer.getMessages().toString());
- LOGGER.info("Received {} messages", received);
return received;
}
- private String getClientUUID(JsonObject response) {
- return response.getString("UUID");
- }
-
/**
- * Checks if process containing producer/consumer inside client pod finished or not
- * @param processUuid process uuid
- * @param description description for wait method
- * @param timeout timeout
+ * Assert count of sent and received messages
+ * @param sent count of sent messages
+ * @param received count of received messages
*/
- private void waitTillProcessFinish(String processUuid, String description, long timeout) {
- TestUtils.waitFor("Wait till " + description + " finished", Constants.GLOBAL_POLL_INTERVAL, timeout, () -> {
- JsonObject out;
- try {
- out = cliApiClient.getClientInfo(processUuid);
- setResponse(out);
- return !out.getBoolean("isRunning");
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- });
+ void assertSentAndReceivedMessages(int sent, int received) {
+ assertThat(String.format("Sent (%s) and receive (%s) message count is not equal", sent, received),
+ sent == received);
+ }
+
+ private boolean allowParameter(String minimalVersion) {
+ Pattern pattern = Pattern.compile("(?[0-9]).(?[0-9]).(?[0-9])");
+ Matcher current = pattern.matcher(Environment.ST_KAFKA_VERSION);
+ Matcher minimal = pattern.matcher(minimalVersion);
+ if (current.find() && minimal.find()) {
+ return Integer.valueOf(current.group("major")) >= Integer.valueOf(minimal.group("major"))
+ && Integer.valueOf(current.group("minor")) >= Integer.valueOf(minimal.group("minor"))
+ && Integer.valueOf(current.group("micro")) >= Integer.valueOf(minimal.group("micro"));
+ }
+ return false;
}
/**
@@ -198,16 +189,16 @@ private void waitTillProcessFinish(String processUuid, String description, long
* @param messageCount expected message count
* @return count of acknowledged messages
*/
- private int getSentMessagesCount(JsonObject response, int messageCount) {
+ private int getSentMessagesCount(String response, int messageCount) {
int sentMessages;
String sentPattern = String.format("sent\":(%s)", messageCount);
String ackPattern = String.format("acked\":(%s)", messageCount);
Pattern r = Pattern.compile(sentPattern);
- Matcher m = r.matcher(response.getString("stdOut"));
+ Matcher m = r.matcher(response);
sentMessages = m.find() ? Integer.parseInt(m.group(1)) : -1;
r = Pattern.compile(ackPattern);
- m = r.matcher(response.getString("stdOut"));
+ m = r.matcher(response);
if (m.find()) {
return sentMessages == Integer.parseInt(m.group(1)) ? sentMessages : -1;
@@ -222,36 +213,16 @@ private int getSentMessagesCount(JsonObject response, int messageCount) {
* @param response response
* @return count of received messages
*/
- private int getReceivedMessagesCount(JsonObject response) {
+ private int getReceivedMessagesCount(String response) {
int receivedMessages = 0;
String pattern = String.format("records_consumed\",\"count\":([0-9]*)");
Pattern r = Pattern.compile(pattern);
- Matcher m = r.matcher(response.getString("stdOut"));
- while (m.find()) {
- receivedMessages += Integer.parseInt(m.group(1));
- }
- return receivedMessages;
- }
+ Matcher m = r.matcher(response);
- /**
- * Assert count of sent and received messages
- * @param sent count of sent messages
- * @param received count of received messages
- */
- void assertSentAndReceivedMessages(int sent, int received) {
- assertThat(String.format("Sent (%s) and receive (%s) message count is not equal", sent, received),
- sent == received);
- }
-
- private boolean allowParameter(String minimalVersion) {
- Pattern pattern = Pattern.compile("(?[0-9]).(?[0-9]).(?[0-9])");
- Matcher current = pattern.matcher(Environment.ST_KAFKA_VERSION);
- Matcher minimal = pattern.matcher(minimalVersion);
- if (current.find() && minimal.find()) {
- return Integer.valueOf(current.group("major")) >= Integer.valueOf(minimal.group("major"))
- && Integer.valueOf(current.group("minor")) >= Integer.valueOf(minimal.group("minor"))
- && Integer.valueOf(current.group("micro")) >= Integer.valueOf(minimal.group("micro"));
+ if (m.find()) {
+ receivedMessages = Integer.parseInt(m.group(1));
}
- return false;
+
+ return receivedMessages;
}
}
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/Resources.java b/systemtest/src/main/java/io/strimzi/systemtest/Resources.java
index e8ac3834078..473aaca7ae1 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/Resources.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/Resources.java
@@ -752,7 +752,7 @@ public DoneableDeployment clusterOperator(String namespace) {
}
public DoneableDeployment clusterOperator(String namespace, String operationTimeout) {
- return createNewDeployment(defaultCLusterOperator(namespace, operationTimeout).build(), namespace);
+ return createNewDeployment(defaultCLusterOperator(namespace, operationTimeout).build());
}
private DeploymentBuilder defaultCLusterOperator(String namespace, String operationTimeout) {
@@ -809,7 +809,7 @@ private DeploymentBuilder defaultCLusterOperator(String namespace, String operat
.endSpec();
}
- private DoneableDeployment createNewDeployment(Deployment deployment, String namespace) {
+ private DoneableDeployment createNewDeployment(Deployment deployment) {
return new DoneableDeployment(deployment, co -> {
TestUtils.waitFor("Deployment creation", Constants.POLL_INTERVAL_FOR_RESOURCE_CREATION, Constants.TIMEOUT_FOR_RESOURCE_CREATION,
() -> {
@@ -944,33 +944,34 @@ DoneableClusterRoleBinding clusterRoleBinding(ClusterRoleBinding clusterRoleBind
return new DoneableClusterRoleBinding(clusterRoleBinding);
}
- DoneableDeployment deployKafkaClients(String clusterName, String namespace) {
- return deployKafkaClients(false, clusterName, namespace, null);
+ DoneableDeployment deployKafkaClients(String kafkaClientsName) {
+ return deployKafkaClients(false, kafkaClientsName, null);
}
- DoneableDeployment deployKafkaClients(boolean tlsListener, String clusterName, String namespace) {
- return deployKafkaClients(tlsListener, clusterName, namespace, null);
+ DoneableDeployment deployKafkaClients(boolean tlsListener, String kafkaClientsName) {
+ return deployKafkaClients(tlsListener, kafkaClientsName, null);
}
- DoneableDeployment deployKafkaClients(boolean tlsListener, String clusterName, String namespace, KafkaUser... kafkaUsers) {
+
+ DoneableDeployment deployKafkaClients(boolean tlsListener, String kafkaClientsName, KafkaUser... kafkaUsers) {
Deployment kafkaClient = new DeploymentBuilder()
.withNewMetadata()
- .withName(clusterName + "-" + Constants.KAFKA_CLIENTS)
+ .withName(kafkaClientsName)
.endMetadata()
.withNewSpec()
.withNewSelector()
- .addToMatchLabels("app", Constants.KAFKA_CLIENTS)
+ .addToMatchLabels("app", kafkaClientsName)
.endSelector()
.withReplicas(1)
.withNewTemplate()
.withNewMetadata()
- .addToLabels("app", Constants.KAFKA_CLIENTS)
+ .addToLabels("app", kafkaClientsName)
.endMetadata()
- .withSpec(createClientSpec(tlsListener, kafkaUsers))
+ .withSpec(createClientSpec(tlsListener, kafkaClientsName, kafkaUsers))
.endTemplate()
.endSpec()
.build();
- return createNewDeployment(kafkaClient, namespace);
+ return createNewDeployment(kafkaClient);
}
public static ServiceBuilder getSystemtestsServiceResource(String appName, int port, String namespace) {
@@ -1037,21 +1038,13 @@ DoneableIngress createIngress(String appName, int port, String url, String clien
return new DoneableIngress(ingress);
}
- private PodSpec createClientSpec(boolean tlsListener, KafkaUser... kafkaUsers) {
+ private PodSpec createClientSpec(boolean tlsListener, String kafkaClientsName, KafkaUser... kafkaUsers) {
PodSpecBuilder podSpecBuilder = new PodSpecBuilder();
ContainerBuilder containerBuilder = new ContainerBuilder()
- .withName(Constants.KAFKA_CLIENTS)
+ .withName(kafkaClientsName)
.withImage(Environment.TEST_CLIENT_IMAGE)
- .addNewPort()
- .withContainerPort(Environment.KAFKA_CLIENTS_DEFAULT_PORT)
- .endPort()
- .withNewLivenessProbe()
- .withNewTcpSocket()
- .withNewPort(Environment.KAFKA_CLIENTS_DEFAULT_PORT)
- .endTcpSocket()
- .withInitialDelaySeconds(10)
- .withPeriodSeconds(5)
- .endLivenessProbe()
+ .withCommand("sleep")
+ .withArgs("infinity")
.withImagePullPolicy("IfNotPresent");
if (kafkaUsers == null) {
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/clients/api/MsgCliApiClient.java b/systemtest/src/main/java/io/strimzi/systemtest/clients/api/MsgCliApiClient.java
deleted file mode 100644
index e1a8ac1e8ba..00000000000
--- a/systemtest/src/main/java/io/strimzi/systemtest/clients/api/MsgCliApiClient.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Copyright 2018, Strimzi authors.
- * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
- */
-package io.strimzi.systemtest.clients.api;
-
-import io.strimzi.test.TestUtils;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Vertx;
-import io.vertx.core.json.JsonArray;
-import io.vertx.core.json.JsonObject;
-import io.vertx.ext.web.client.HttpResponse;
-import io.vertx.ext.web.client.WebClient;
-import io.vertx.ext.web.client.WebClientOptions;
-import io.vertx.ext.web.codec.BodyCodec;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.net.HttpURLConnection.HTTP_OK;
-
-/**
- * API for communication with kafka clients deployed as a Deployment inside Kubernetes
- */
-public class MsgCliApiClient {
- private static final Logger LOGGER = LogManager.getLogger(MsgCliApiClient.class);
- private WebClient webClient;
- private URL endpoint;
- private Vertx vertx;
- private final int initRetry = 10;
-
- /**
- * Constructor for api clients
- * @param endpoint URL on which deployed clients listening
- */
- public MsgCliApiClient(URL endpoint) {
- this.endpoint = endpoint;
- this.vertx = Vertx.vertx();
- this.connect();
- }
-
- private String apiClientName() {
- return "Kafka Clients";
- }
-
- protected void connect() {
- this.webClient = WebClient.create(vertx, new WebClientOptions()
- .setSsl(false)
- .setTrustAll(true)
- .setVerifyHost(false));
- }
-
- protected T doRequestTillSuccess(int retry, Callable fn, Optional reconnect) throws Exception {
- return TestUtils.doRequestTillSuccess(retry, fn, reconnect);
- }
-
- private void responseHandler(AsyncResult> ar, CompletableFuture promise, int expectedCode,
- String warnMessage) {
- try {
- if (ar.succeeded()) {
- HttpResponse response = ar.result();
- T body = response.body();
- if (response.statusCode() != expectedCode) {
- LOGGER.error("expected-code: {}, response-code: {}, body: {}", expectedCode, response.statusCode(), response.body());
- promise.completeExceptionally(new RuntimeException("Status " + response.statusCode() + " body: " + (body != null ? body.toString() : null)));
- } else if (response.statusCode() < HTTP_OK || response.statusCode() >= HttpURLConnection.HTTP_MULT_CHOICE) {
- promise.completeExceptionally(new RuntimeException(body.toString()));
- } else {
- promise.complete(ar.result().body());
- }
- } else {
- LOGGER.warn(warnMessage);
- promise.completeExceptionally(ar.cause());
- }
- } catch (io.vertx.core.json.DecodeException decEx) {
- if (ar.result().bodyAsString().contains("application is not available")) {
- LOGGER.warn("'{}' is not available.", apiClientName(), ar.cause());
- throw new IllegalStateException(String.format("'%s' is not available.", apiClientName()));
- } else {
- LOGGER.warn("Unexpected object received", ar.cause());
- throw new IllegalStateException("JsonObject expected, but following object was received: " + ar.result().bodyAsString());
- }
- }
- }
-
- /**
- * Start new messaging kafka client(s)
- *
- * @param clientArguments list of arguments for kafka client (together with kafka client name!)
- * @param count count of clients that will be started
- * @return Some JSON.
- * @throws Exception If something went wrong.
- */
- public JsonObject startClients(List clientArguments, int count) throws Exception {
- CompletableFuture responsePromise = new CompletableFuture<>();
- JsonObject request = new JsonObject();
- request.put("command", new JsonArray(clientArguments));
- request.put("count", count);
-
- return doRequestTillSuccess(initRetry, () -> {
- webClient.post(endpoint.getPort(), endpoint.getHost(), "")
- .as(BodyCodec.jsonObject())
- .timeout(120_000)
- .sendJson(request,
- ar -> responseHandler(ar, responsePromise, HttpURLConnection.HTTP_OK, "Error starting messaging clients"));
- return responsePromise.get(150_000, TimeUnit.SECONDS);
- },
- Optional.empty());
- }
-
- /**
- * Get all info about messaging kafka client (uuid, stdOut, stdErr, code, isRunning)
- *
- * @param uuid kafka client id
- * @return client info
- * @throws InterruptedException
- * @throws ExecutionException
- * @throws TimeoutException
- */
- public JsonObject getClientInfo(String uuid) throws Exception {
- CompletableFuture responsePromise = new CompletableFuture<>();
-
- JsonObject request = new JsonObject();
- request.put("id", uuid);
-
- return doRequestTillSuccess(initRetry, () -> {
- webClient.get(endpoint.getPort(), endpoint.getHost(), "")
- .as(BodyCodec.jsonObject())
- .timeout(120000)
- .sendJson(request,
- ar -> responseHandler(ar, responsePromise, HttpURLConnection.HTTP_OK, "Error getting messaging clients info"));
- return responsePromise.get(150000, TimeUnit.SECONDS);
- },
- Optional.empty());
-
- }
-
- /**
- * Stop messaging kafka client and get all information about them (uuid, stdOut, stdErr, code, isRunning)
- *
- * @param uuid kafka client id
- * @return cline info
- * @throws InterruptedException
- * @throws ExecutionException
- * @throws TimeoutException
- */
- public JsonObject stopClient(String uuid) throws Exception {
- CompletableFuture responsePromise = new CompletableFuture<>();
- JsonObject request = new JsonObject();
- request.put("id", uuid);
-
- return doRequestTillSuccess(initRetry, () -> {
- webClient.delete(endpoint.getPort(), endpoint.getHost(), "")
- .as(BodyCodec.jsonObject())
- .timeout(120000)
- .sendJson(request,
- ar -> responseHandler(ar, responsePromise, HttpURLConnection.HTTP_OK, "Error removing messaging clients"));
- return responsePromise.get(150000, TimeUnit.SECONDS);
- },
- Optional.empty());
- }
-
- /***
- * Send request with one kafka client and receive result
- * @param client kafka client
- * @return result of kafka client
- * @throws Exception If something went wrong.
- */
- public JsonObject sendAndGetStatus(VerifiableClient client) throws Exception {
- List apiArgument = new LinkedList<>();
- apiArgument.add(client.getExecutable());
- apiArgument.addAll(client.getArguments());
-
- JsonObject response = startClients(apiArgument, 1);
- JsonArray ids = response.getJsonArray("clients");
- String uuid = ids.getString(0);
-
- Thread.sleep(5000);
-
- response = getClientInfo(uuid);
- response.put("UUID", uuid);
- return response;
- }
-
- /***
- * Send request with one kafka client and receive id
- * @param client kafka client
- * @return id of kafka client
- * @throws Exception If something went wrong.
- */
- public String sendAndGetId(VerifiableClient client) throws Exception {
- List apiArgument = new LinkedList<>();
- apiArgument.add(client.getExecutable());
- apiArgument.addAll(client.getArguments());
-
- JsonObject response = startClients(apiArgument, 1);
-
- JsonArray ids = response.getJsonArray("clients");
- return ids.getString(0);
- }
-}
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/clients/api/VerifiableClient.java b/systemtest/src/main/java/io/strimzi/systemtest/clients/api/VerifiableClient.java
index 766beb0e1cd..a8117a2aa53 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/clients/api/VerifiableClient.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/clients/api/VerifiableClient.java
@@ -1,32 +1,46 @@
/*
- * Copyright 2018, Strimzi authors.
+ * Copyright 2019, Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.systemtest.clients.api;
-import io.vertx.core.json.JsonArray;
+import io.strimzi.test.executor.Exec;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static io.strimzi.test.BaseITST.cmdKubeClient;
/**
* Class represent verifiable kafka client which keeps common features of kafka clients
*/
public class VerifiableClient {
+
private static final Logger LOGGER = LogManager.getLogger(VerifiableClient.class);
- protected ArrayList allowedArguments = new ArrayList<>();
- private JsonArray messages = new JsonArray();
- private ArrayList arguments = new ArrayList<>();
+ private List allowedArguments = new ArrayList<>();
+ private final Object lock = new Object();
+ private List messages = new ArrayList<>();
+ private List arguments = new ArrayList<>();
private String executable;
+ private ClientType clientType;
+ private Exec executor;
+ private String podName;
+ private String podNamespace;
/**
* Constructor of verifiable kafka client
*
* @param clientType type of kafka client
*/
- public VerifiableClient(ClientType clientType) {
+ public VerifiableClient(ClientType clientType, String podName, String podNamespace) {
this.setAllowedArguments(clientType);
+ this.clientType = clientType;
+ this.podName = podName;
+ this.podNamespace = podNamespace;
this.executable = ClientType.getCommand(clientType);
}
@@ -35,23 +49,10 @@ public VerifiableClient(ClientType clientType) {
*
* @return Json array of messages;
*/
- public JsonArray getMessages() {
+ public List getMessages() {
return messages;
}
- /**
- * Get all kafka client arguments.
- *
- * @return The kafka client arguments.
- */
- public ArrayList getArguments() {
- return arguments;
- }
-
- public String getExecutable() {
- return this.executable;
- }
-
/**
* Set arguments of kafka client
*
@@ -81,6 +82,89 @@ public void setArguments(ClientArgumentMap args) {
}
}
+ /**
+ * Run clients
+ *
+ * @param timeout kill timeout in ms
+ * @return true if command end with exit code 0
+ */
+ private boolean runClient(int timeout, boolean logToOutput) {
+ messages.clear();
+ try {
+ executor = new Exec();
+ int ret = executor.execute(null, prepareCommand(), timeout);
+ synchronized (lock) {
+ LOGGER.info("{} {} Return code - {}", this.getClass().getName(), clientType, ret);
+ if (logToOutput) {
+ LOGGER.info("{} {} stdout : {}", this.getClass().getName(), clientType, executor.out());
+ if (!executor.err().isEmpty()) {
+ LOGGER.error("{} {} stderr : {}", this.getClass().getName(), clientType, executor.err());
+ }
+ if (ret == 0) {
+ parseToList(executor.out());
+ }
+ }
+ }
+ return ret == 0;
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return false;
+ }
+ }
+
+ /**
+ * Method for parse string output to List
+ *
+ * @param data string data output
+ */
+ private void parseToList(String data) {
+ if (data != null) {
+ for (String line : data.split(System.getProperty("line.separator"))) {
+ if (!Objects.equals(line, "") && !line.trim().isEmpty()) {
+ try {
+ messages.add(line);
+ } catch (Exception ignored) {
+ LOGGER.warn("{} - Failed to parse client output '{}' as JSON", clientType, line);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Merge command and arguments
+ *
+ * @return merged array of command and args
+ */
+ private ArrayList prepareCommand() {
+ ArrayList command = new ArrayList<>(arguments);
+ ArrayList executableCommand = new ArrayList<>();
+ executableCommand.addAll(Arrays.asList(cmdKubeClient().toString(), "exec", podName, "-n", podNamespace, "--"));
+ executableCommand.add(executable);
+ executableCommand.addAll(command);
+ return executableCommand;
+ }
+
+ /**
+ * Run client in sync mode
+ *
+ * @return exit status of client
+ */
+ public boolean run() {
+ return runClient(60000, true);
+ }
+
+ /**
+ * Method for stop client
+ */
+ public void stop() {
+ try {
+ executor.stop();
+ } catch (Exception ex) {
+ LOGGER.warn("Client stop raise exception: " + ex.getMessage());
+ }
+ }
+
/**
* Validates that kafka client support this arg
*
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/KafkaST.java b/systemtest/src/test/java/io/strimzi/systemtest/KafkaST.java
index fc47f2aa29b..092f23da5f5 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/KafkaST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/KafkaST.java
@@ -26,8 +26,6 @@
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.api.kafka.model.ZookeeperClusterSpec;
import io.strimzi.api.kafka.model.listener.KafkaListenerAuthenticationScramSha512;
-import io.strimzi.api.kafka.model.listener.KafkaListenerAuthenticationTls;
-import io.strimzi.api.kafka.model.listener.KafkaListenerPlain;
import io.strimzi.api.kafka.model.listener.KafkaListenerTls;
import io.strimzi.api.kafka.model.listener.NodePortListenerBrokerOverride;
import io.strimzi.api.kafka.model.storage.JbodStorage;
@@ -649,9 +647,9 @@ void testSendMessagesPlainAnonymous() throws Exception {
testMethodResources().kafkaEphemeral(CLUSTER_NAME, 3).done();
testMethodResources().topic(CLUSTER_NAME, topicName).done();
- testMethodResources().deployKafkaClients(CLUSTER_NAME, NAMESPACE).done();
+ testMethodResources().deployKafkaClients(CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS).done();
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, CLUSTER_NAME, false, topicName, null);
+ availabilityTest(messagesCount, CLUSTER_NAME, false, topicName, null);
}
/**
@@ -663,17 +661,14 @@ void testSendMessagesTlsAuthenticated() throws Exception {
int messagesCount = 200;
String topicName = TOPIC_NAME + "-" + rng.nextInt(Integer.MAX_VALUE);
- KafkaListenerAuthenticationTls auth = new KafkaListenerAuthenticationTls();
- KafkaListenerTls listenerTls = new KafkaListenerTls();
- listenerTls.setAuth(auth);
-
// Use a Kafka with plain listener disabled
testMethodResources().kafka(testMethodResources().defaultKafka(CLUSTER_NAME, 3)
.editSpec()
.editKafka()
.withNewListeners()
- .withTls(listenerTls)
.withNewTls()
+ .withNewKafkaListenerAuthenticationTlsAuth()
+ .endKafkaListenerAuthenticationTlsAuth()
.endTls()
.endListeners()
.endKafka()
@@ -682,8 +677,8 @@ void testSendMessagesTlsAuthenticated() throws Exception {
KafkaUser user = testMethodResources().tlsUser(CLUSTER_NAME, kafkaUser).done();
StUtils.waitForSecretReady(kafkaUser);
- testMethodResources().deployKafkaClients(true, CLUSTER_NAME, NAMESPACE, user).done();
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, CLUSTER_NAME, true, topicName, user);
+ testMethodResources().deployKafkaClients(true, CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS, user).done();
+ availabilityTest(messagesCount, CLUSTER_NAME, true, topicName, user);
}
/**
@@ -696,19 +691,18 @@ void testSendMessagesPlainScramSha() throws Exception {
int messagesCount = 200;
String topicName = TOPIC_NAME + "-" + rng.nextInt(Integer.MAX_VALUE);
- KafkaListenerAuthenticationScramSha512 auth = new KafkaListenerAuthenticationScramSha512();
- KafkaListenerPlain listenerTls = new KafkaListenerPlain();
- listenerTls.setAuthentication(auth);
-
// Use a Kafka with plain listener disabled
- testMethodResources().kafka(testMethodResources().defaultKafka(CLUSTER_NAME, 1)
+ testMethodResources().kafkaEphemeral(CLUSTER_NAME, 1)
.editSpec()
.editKafka()
.withNewListeners()
- .withPlain(listenerTls)
+ .withNewPlain()
+ .withNewKafkaListenerAuthenticationScramSha512()
+ .endKafkaListenerAuthenticationScramSha512()
+ .endPlain()
.endListeners()
.endKafka()
- .endSpec().build()).done();
+ .endSpec().done();
testMethodResources().topic(CLUSTER_NAME, topicName).done();
KafkaUser user = testMethodResources().scramShaUser(CLUSTER_NAME, kafkaUser).done();
StUtils.waitForSecretReady(kafkaUser);
@@ -725,8 +719,8 @@ void testSendMessagesPlainScramSha() throws Exception {
LOGGER.info("Broker pod log:\n----\n{}\n----\n", brokerPodLog);
}
- testMethodResources().deployKafkaClients(false, CLUSTER_NAME, NAMESPACE, user).done();
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, CLUSTER_NAME, false, topicName, user);
+ testMethodResources().deployKafkaClients(false, CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS, user).done();
+ availabilityTest(messagesCount, CLUSTER_NAME, false, topicName, user);
}
/**
@@ -754,8 +748,8 @@ void testSendMessagesTlsScramSha() throws Exception {
KafkaUser user = testMethodResources().scramShaUser(CLUSTER_NAME, kafkaUser).done();
StUtils.waitForSecretReady(kafkaUser);
- testMethodResources().deployKafkaClients(true, CLUSTER_NAME, NAMESPACE, user).done();
- availabilityTest(messagesCount, 180000, CLUSTER_NAME, true, topicName, user);
+ testMethodResources().deployKafkaClients(true, CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS, user).done();
+ availabilityTest(messagesCount, CLUSTER_NAME, true, topicName, user);
}
@Test
@@ -1703,6 +1697,7 @@ void testLabelModificationDoesNotBreakCluster() throws Exception {
.editKafka()
.editListeners()
.withNewKafkaListenerExternalNodePort()
+ .withTls(false)
.endKafkaListenerExternalNodePort()
.endListeners()
.endKafka()
@@ -1955,8 +1950,6 @@ void testUOListeningOnlyUsersInSameCluster() {
@BeforeEach
void createTestResources() throws Exception {
createTestMethodResources();
- testMethodResources.createServiceResource(Constants.KAFKA_CLIENTS, Environment.KAFKA_CLIENTS_DEFAULT_PORT, NAMESPACE).done();
- testMethodResources.createIngress(Constants.KAFKA_CLIENTS, Environment.KAFKA_CLIENTS_DEFAULT_PORT, CONFIG.getMasterUrl(), NAMESPACE).done();
}
@AfterEach
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/MirrorMakerST.java b/systemtest/src/test/java/io/strimzi/systemtest/MirrorMakerST.java
index 8ef7ac82acc..8ece1aa8956 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/MirrorMakerST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/MirrorMakerST.java
@@ -13,7 +13,6 @@
import io.strimzi.api.kafka.model.listener.KafkaListenerAuthenticationTls;
import io.strimzi.api.kafka.model.listener.KafkaListenerTls;
import io.strimzi.systemtest.utils.StUtils;
-import io.strimzi.test.TimeoutException;
import io.strimzi.test.timemeasuring.Operation;
import io.strimzi.test.timemeasuring.TimeMeasuringSystem;
import org.apache.logging.log4j.LogManager;
@@ -30,7 +29,8 @@
import static io.strimzi.systemtest.Constants.ACCEPTANCE;
import static io.strimzi.systemtest.Constants.REGRESSION;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
@Tag(REGRESSION)
public class MirrorMakerST extends MessagingBaseST {
@@ -57,11 +57,11 @@ void testMirrorMaker() throws Exception {
// Deploy Topic
testMethodResources().topic(kafkaClusterSourceName, topicSourceName).done();
- testMethodResources().deployKafkaClients(CLUSTER_NAME, NAMESPACE).done();
+ testMethodResources().deployKafkaClients(CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS).done();
// Check brokers availability
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, kafkaClusterSourceName);
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, kafkaClusterTargetName);
+ availabilityTest(messagesCount, kafkaClusterSourceName);
+ availabilityTest(messagesCount, kafkaClusterTargetName);
// Deploy Mirror Maker
testMethodResources().kafkaMirrorMaker(CLUSTER_NAME, kafkaClusterSourceName, kafkaClusterTargetName, "my-group" + rng.nextInt(Integer.MAX_VALUE), 1, false).
@@ -94,9 +94,11 @@ void testMirrorMaker() throws Exception {
TimeMeasuringSystem.stopOperation(getOperationID());
- int sent = sendMessages(messagesCount, Constants.TIMEOUT_SEND_MESSAGES, kafkaClusterSourceName, false, topicSourceName, null);
- int receivedSource = receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterSourceName, false, topicSourceName, null);
- int receivedTarget = receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterTargetName, false, topicSourceName, null);
+ final String kafkaClientsPodName = kubeClient().listPodsByPrefixInName(CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS).get(0).getMetadata().getName();
+
+ int sent = sendMessages(messagesCount, kafkaClusterSourceName, false, topicSourceName, null, kafkaClientsPodName);
+ int receivedSource = receiveMessages(messagesCount, kafkaClusterSourceName, false, topicSourceName, null, kafkaClientsPodName);
+ int receivedTarget = receiveMessages(messagesCount, kafkaClusterTargetName, false, topicSourceName, null, kafkaClientsPodName);
assertSentAndReceivedMessages(sent, receivedSource);
assertSentAndReceivedMessages(sent, receivedTarget);
@@ -161,11 +163,11 @@ void testMirrorMakerTlsAuthenticated() throws Exception {
certSecretTarget.setCertificate("ca.crt");
certSecretTarget.setSecretName(clusterCaCertSecretName(kafkaClusterTargetName));
- testMethodResources().deployKafkaClients(true, CLUSTER_NAME, NAMESPACE, userSource, userTarget).done();
+ testMethodResources().deployKafkaClients(true, CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS, userSource, userTarget).done();
// Check brokers availability
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, kafkaClusterSourceName, true, "my-topic-test-1", userSource);
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, kafkaClusterTargetName, true, "my-topic-test-2", userTarget);
+ availabilityTest(messagesCount, kafkaClusterSourceName, true, "my-topic-test-1", userSource);
+ availabilityTest(messagesCount, kafkaClusterTargetName, true, "my-topic-test-2", userTarget);
// Deploy Mirror Maker with tls listener and mutual tls auth
testMethodResources().kafkaMirrorMaker(CLUSTER_NAME, kafkaClusterSourceName, kafkaClusterTargetName, "my-group" + rng.nextInt(Integer.MAX_VALUE), 1, true)
@@ -185,9 +187,11 @@ void testMirrorMakerTlsAuthenticated() throws Exception {
TimeMeasuringSystem.stopOperation(getOperationID());
- int sent = sendMessages(messagesCount, Constants.TIMEOUT_SEND_MESSAGES, kafkaClusterSourceName, true, topicSourceName, userSource);
- int receivedSource = receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterSourceName, true, topicSourceName, userSource);
- int receivedTarget = receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterTargetName, true, topicSourceName, userTarget);
+ final String kafkaClientsPodName = kubeClient().listPodsByPrefixInName(CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS).get(0).getMetadata().getName();
+
+ int sent = sendMessages(messagesCount, kafkaClusterSourceName, true, topicSourceName, userSource, kafkaClientsPodName);
+ int receivedSource = receiveMessages(messagesCount, kafkaClusterSourceName, true, topicSourceName, userSource, kafkaClientsPodName);
+ int receivedTarget = receiveMessages(messagesCount, kafkaClusterTargetName, true, topicSourceName, userTarget, kafkaClientsPodName);
assertSentAndReceivedMessages(sent, receivedSource);
assertSentAndReceivedMessages(sent, receivedTarget);
@@ -199,9 +203,9 @@ void testMirrorMakerTlsAuthenticated() throws Exception {
@Test
void testMirrorMakerTlsScramSha() throws Exception {
setOperationID(startTimeMeasuring(Operation.MM_DEPLOYMENT));
+ String topicName = TOPIC_NAME + "-" + rng.nextInt(Integer.MAX_VALUE);
String kafkaUserSource = "my-user-source";
String kafkaUserTarget = "my-user-target";
- String topicName = TOPIC_NAME + "-" + rng.nextInt(Integer.MAX_VALUE);
// Deploy source kafka with tls listener and SCRAM-SHA authentication
testMethodResources().kafka(testMethodResources().defaultKafka(kafkaClusterSourceName, 1, 1)
@@ -252,11 +256,11 @@ void testMirrorMakerTlsScramSha() throws Exception {
certSecretTarget.setSecretName(clusterCaCertSecretName(kafkaClusterTargetName));
// Deploy client
- testMethodResources().deployKafkaClients(true, CLUSTER_NAME, NAMESPACE, userSource, userTarget).done();
+ testMethodResources().deployKafkaClients(true, CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS, userSource, userTarget).done();
// Check brokers availability
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, kafkaClusterSourceName, true, "my-topic-test-1", userSource);
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, kafkaClusterTargetName, true, "my-topic-test-2", userTarget);
+ availabilityTest(messagesCount, kafkaClusterSourceName, true, "my-topic-test-1", userSource);
+ availabilityTest(messagesCount, kafkaClusterTargetName, true, "my-topic-test-2", userTarget);
// Deploy Mirror Maker with TLS and ScramSha512
testMethodResources().kafkaMirrorMaker(CLUSTER_NAME, kafkaClusterSourceName, kafkaClusterTargetName, "my-group" + rng.nextInt(Integer.MAX_VALUE), 1, true)
@@ -286,9 +290,11 @@ void testMirrorMakerTlsScramSha() throws Exception {
TimeMeasuringSystem.stopOperation(getOperationID());
- int sent = sendMessages(messagesCount, Constants.TIMEOUT_SEND_MESSAGES, kafkaClusterSourceName, true, topicName, userSource);
- int receivedSource = receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterSourceName, true, topicName, userSource);
- int receivedTarget = receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterTargetName, true, topicName, userTarget);
+ final String kafkaClientsPodName = kubeClient().listPodsByPrefixInName(CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS).get(0).getMetadata().getName();
+
+ int sent = sendMessages(messagesCount, kafkaClusterSourceName, true, topicName, userSource, kafkaClientsPodName);
+ int receivedSource = receiveMessages(messagesCount, kafkaClusterSourceName, true, topicName, userSource, kafkaClientsPodName);
+ int receivedTarget = receiveMessages(messagesCount, kafkaClusterTargetName, true, topicName, userTarget, kafkaClientsPodName);
assertSentAndReceivedMessages(sent, receivedSource);
assertSentAndReceivedMessages(sent, receivedTarget);
@@ -311,11 +317,11 @@ void testWhiteList() throws Exception {
StUtils.waitForKafkaTopicCreation(topicName);
StUtils.waitForKafkaTopicCreation(topicNotInWhitelist);
- testMethodResources().deployKafkaClients(CLUSTER_NAME, NAMESPACE).done();
+ testMethodResources().deployKafkaClients(CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS).done();
// Check brokers availability
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, kafkaClusterSourceName);
- availabilityTest(messagesCount, Constants.TIMEOUT_AVAILABILITY_TEST, kafkaClusterTargetName);
+ availabilityTest(messagesCount, kafkaClusterSourceName);
+ availabilityTest(messagesCount, kafkaClusterTargetName);
testMethodResources().kafkaMirrorMaker(CLUSTER_NAME, kafkaClusterSourceName, kafkaClusterTargetName, "my-group" + rng.nextInt(Integer.MAX_VALUE), 1, false)
.editSpec()
@@ -323,25 +329,26 @@ void testWhiteList() throws Exception {
.endSpec()
.done();
- int sent = sendMessages(messagesCount, Constants.TIMEOUT_SEND_MESSAGES, kafkaClusterSourceName, false, topicName, null);
- int receivedSource = receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterSourceName, false, topicName, null);
- int receivedTarget = receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterTargetName, false, topicName, null);
+ final String kafkaClientsPodName = kubeClient().listPodsByPrefixInName(CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS).get(0).getMetadata().getName();
+
+ int sent = sendMessages(messagesCount, kafkaClusterSourceName, false, topicName, null, kafkaClientsPodName);
+ int receivedSource = receiveMessages(messagesCount, kafkaClusterSourceName, false, topicName, null, kafkaClientsPodName);
+ int receivedTarget = receiveMessages(messagesCount, kafkaClusterTargetName, false, topicName, null, kafkaClientsPodName);
assertSentAndReceivedMessages(sent, receivedSource);
assertSentAndReceivedMessages(sent, receivedTarget);
- sent = sendMessages(messagesCount, Constants.TIMEOUT_SEND_MESSAGES, kafkaClusterSourceName, false, topicNotInWhitelist, null);
- receivedSource = receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterSourceName, false, topicNotInWhitelist, null);
+ sent = sendMessages(messagesCount, kafkaClusterSourceName, false, topicNotInWhitelist, null, kafkaClientsPodName);
+ receivedSource = receiveMessages(messagesCount, kafkaClusterSourceName, false, topicNotInWhitelist, null, kafkaClientsPodName);
- assertThrows(TimeoutException.class, () -> receiveMessages(messagesCount, Constants.TIMEOUT_RECV_MESSAGES, kafkaClusterTargetName, false, topicNotInWhitelist, null));
assertSentAndReceivedMessages(sent, receivedSource);
+ assertThat("Received 0 messages in target kafka because topic " + topicNotInWhitelist + " is not in whitelist",
+ receiveMessages(messagesCount, kafkaClusterTargetName, false, topicNotInWhitelist, null, kafkaClientsPodName), is(0));
}
@BeforeEach
void createTestResources() throws Exception {
createTestMethodResources();
- testMethodResources.createServiceResource(Constants.KAFKA_CLIENTS, Environment.KAFKA_CLIENTS_DEFAULT_PORT, NAMESPACE).done();
- testMethodResources.createIngress(Constants.KAFKA_CLIENTS, Environment.KAFKA_CLIENTS_DEFAULT_PORT, CONFIG.getMasterUrl(), NAMESPACE).done();
}
@AfterEach
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/SecurityST.java b/systemtest/src/test/java/io/strimzi/systemtest/SecurityST.java
index cc80f91c3eb..320720b2b09 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/SecurityST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/SecurityST.java
@@ -48,7 +48,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
@Tag(REGRESSION)
-class SecurityST extends AbstractST {
+class SecurityST extends MessagingBaseST {
public static final String NAMESPACE = "security-cluster-test";
private static final Logger LOGGER = LogManager.getLogger(SecurityST.class);
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/TopicST.java b/systemtest/src/test/java/io/strimzi/systemtest/TopicST.java
index 90a865a3749..962bb91feec 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/TopicST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/TopicST.java
@@ -127,7 +127,7 @@ void testBigAmountOfTopicsCreatingViaKafka() {
for (int i = 0; i < numberOfTopics; i++) {
currentTopic = topicName + i;
StUtils.waitForKafkaTopicCreation(currentTopic);
- KafkaTopic kafkaTopic = testMethodResources().kafkaTopic().withName(currentTopic).get();
+ KafkaTopic kafkaTopic = testMethodResources().kafkaTopic().inNamespace(NAMESPACE).withName(currentTopic).get();
verifyTopicViaKafkaTopicCRK8s(kafkaTopic, currentTopic, topicPartitions);
}
diff --git a/test-client/Makefile b/test-client/Makefile
deleted file mode 100644
index 8a9c8fe76ec..00000000000
--- a/test-client/Makefile
+++ /dev/null
@@ -1,11 +0,0 @@
-PROJECT_NAME=test-client
-
-docker_build: java_install
-docker_push:
-docker_tag:
-all: docker_build docker_push
-clean: java_clean
-
-include ../Makefile.maven
-
-.PHONY: build clean release
diff --git a/test-client/pom.xml b/test-client/pom.xml
deleted file mode 100644
index ac078f4a5cd..00000000000
--- a/test-client/pom.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-
-
-
- strimzi
- io.strimzi
- 0.14.0-SNAPSHOT
-
- 4.0.0
- test-client
-
-
-
- io.strimzi
- test
- compile
-
-
-
- io.vertx
- vertx-core
- 3.6.2
-
-
- org.apache.logging.log4j
- log4j-api
-
-
- org.apache.logging.log4j
- log4j-core
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- ${maven.shade.version}
-
-
- package
-
- shade
-
-
- false
-
-
-
- io.strimzi.test_client.Main
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/test-client/scripts/consumer.sh b/test-client/scripts/consumer.sh
deleted file mode 100755
index 423ab4a52e5..00000000000
--- a/test-client/scripts/consumer.sh
+++ /dev/null
@@ -1,67 +0,0 @@
-#!/usr/bin/env bash
-set -x
-
-. ./set_kafka_gc_options.sh
-
-# Defaults
-CONFIG_REGEX="USER=([A-Za-z\_]*)"
-CONSUMER_CONFIGURATION_ENV="CONSUMER_CONFIGURATION"
-
-# Create options for consumer
-if [ -z "$CONSUMER_OPTS" ]; then
- CONSUMER_OPTS_TMP=( "${@}" )
- for i in ${CONSUMER_OPTS_TMP[@]};do
- if [[ ${i} =~ $CONFIG_REGEX ]]
- then
- USER="${BASH_REMATCH[1]}"
- TMP_ENV="CONSUMER_CONFIGURATION_${USER}"
- CONSUMER_CONFIGURATION=="${!TMP_ENV}"
- CONSUMER_TLS=$(eval "echo \$$(echo CONSUMER_TLS_${USER})")
- KEYSTORE_LOCATION=$(eval "echo \$$(echo KEYSTORE_LOCATION_${USER})")
- TRUSTSTORE_LOCATION=$(eval "echo \$$(echo TRUSTSTORE_LOCATION_${USER})")
- else
- CONSUMER_OPTS+=("${i}")
- fi
- done
-fi
-
-if [ -z "$KAFKA_LOG4J_OPTS" ]; then
- export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$KAFKA_HOME/log4j.properties"
-fi
-
-# We don't need LOG_DIR because we write no log files, but setting it to a
-# directory avoids trying to create it (and logging a permission denied error)
-export LOG_DIR="$KAFKA_HOME"
-
-CONSUMER_CONFIGURATION="${!CONSUMER_CONFIGURATION_ENV}"
-
-if [ "$CONSUMER_TLS"="TRUE" ]; then
- if [ -z "$CERTS_STORE_PASSWORD" ]; then
- export CERTS_STORE_PASSWORD=$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c32)
- fi
- if [ -n "${KEYSTORE_LOCATION}" ]; then
- CONSUMER_CONFIGURATION="${CONSUMER_CONFIGURATION}
-ssl.keystore.password=${CERTS_STORE_PASSWORD}"
- fi
- if [ -n "${TRUSTSTORE_LOCATION}" ]; then
- CONSUMER_CONFIGURATION="${CONSUMER_CONFIGURATION}
-ssl.truststore.password=${CERTS_STORE_PASSWORD}"
- fi
- ./kafka_tls_prepare_certificates.sh "${USER}"
-fi
-
-PROPERTIES_FILE="/tmp/${USER}.properties"
-echo $PROPERTIES_FILE
-
-# Generate config file for consumer
-echo "Starting Consumer with configuration:"
-echo "${CONSUMER_CONFIGURATION}" | tee ${PROPERTIES_FILE}
-
-# starting Kafka server with final configuration
-$KAFKA_HOME/bin/kafka-verifiable-consumer.sh --consumer.config $PROPERTIES_FILE "${CONSUMER_OPTS[@]}"
-
-RET=$?
-
-echo $RET
-
-exit $RET
diff --git a/test-client/scripts/dynamic_resources.sh b/test-client/scripts/dynamic_resources.sh
deleted file mode 100755
index 6da7908d96d..00000000000
--- a/test-client/scripts/dynamic_resources.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/usr/bin/env bash
-
-function get_heap_size {
- CONTAINER_MEMORY_IN_BYTES=`cat /sys/fs/cgroup/memory/memory.limit_in_bytes`
- # use max of 31G memory, java performs much better with Compressed Ordinary Object Pointers
- DEFAULT_MEMORY_CEILING=$((31 * 2**20))
- if [ "${CONTAINER_MEMORY_IN_BYTES}" -lt "${DEFAULT_MEMORY_CEILING}" ]; then
- if [ -z $CONTAINER_HEAP_PERCENT ]; then
- CONTAINER_HEAP_PERCENT=0.50
- fi
-
- CONTAINER_MEMORY_IN_MB=$((${CONTAINER_MEMORY_IN_BYTES}/1024**2))
- CONTAINER_HEAP_MAX=$(echo "${CONTAINER_MEMORY_IN_MB} ${CONTAINER_HEAP_PERCENT}" | awk '{ printf "%d", $1 * $2 }')
-
- echo "${CONTAINER_HEAP_MAX}"
- fi
-}
diff --git a/test-client/scripts/kafka_tls_prepare_certificates.sh b/test-client/scripts/kafka_tls_prepare_certificates.sh
deleted file mode 100755
index 5354054c9fc..00000000000
--- a/test-client/scripts/kafka_tls_prepare_certificates.sh
+++ /dev/null
@@ -1,45 +0,0 @@
-#!/usr/bin/env bash
-
-USER=$1
-TRUSTSTORE_LOCATION=$(eval "echo \$$(echo TRUSTSTORE_LOCATION_${USER})")
-CA_LOCATION=$(eval "echo \$$(echo CA_LOCATION_${USER})")
-KEYSTORE_LOCATION=$(eval "echo \$$(echo KEYSTORE_LOCATION_${USER})")
-USER_LOCATION=$(eval "echo \$$(echo USER_LOCATION_${USER})")
-KAFKA_USER=$(eval "echo \$$(echo KAFKA_USER_${USER})")
-
-set -x
-
-# Parameters:
-# $1: Path to the new truststore
-# $2: Truststore password
-# $3: Public key to be imported
-# $4: Alias of the certificate
-function create_truststore {
- keytool -keystore "$1" -storepass "$2" -noprompt -alias "$4" -import -file "$3" -storetype PKCS12
-}
-
-if [ -n "${TRUSTSTORE_LOCATION}" ]; then
- rm "${TRUSTSTORE_LOCATION}"
-
- create_truststore "$TRUSTSTORE_LOCATION" "${CERTS_STORE_PASSWORD}" \
- "${CA_LOCATION}/ca.crt" "clients-ca"
-fi
-
-# Parameters:
-# $1: Path to the new keystore
-# $2: Truststore password
-# $3: Public key to be imported
-# $4: Private key to be imported
-# $5: CA public key to be imported
-# $6: Alias of the certificate
-function create_keystore {
- RANDFILE=/tmp/.rnd openssl pkcs12 -export -in "$3" -inkey "$4" -chain -CAfile "$5" -name "$6" -password pass:"$2" -out "$1"
-}
-
-if [ -n "${KEYSTORE_LOCATION}" ]; then
- rm "${KEYSTORE_LOCATION}"
-
- create_keystore "$KEYSTORE_LOCATION" "${CERTS_STORE_PASSWORD}" \
- "${USER_LOCATION}/user.crt" "${USER_LOCATION}/user.key" \
- "${USER_LOCATION}/ca.crt" "$KAFKA_USER"
-fi
\ No newline at end of file
diff --git a/test-client/scripts/launch_java.sh b/test-client/scripts/launch_java.sh
deleted file mode 100755
index c97bffc21d6..00000000000
--- a/test-client/scripts/launch_java.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/usr/bin/env bash
-set -x
-JAR=$1
-shift
-
-. /bin/dynamic_resources.sh
-
-# expand gc options based upon java version
-function get_gc_opts {
- if [ "${STRIMZI_GC_LOG_ENABLED}" == "true" ]; then
- # The first segment of the version number is '1' for releases < 9; then '9', '10', '11', ...
- JAVA_MAJOR_VERSION=$(java -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
- if [ "$JAVA_MAJOR_VERSION" -ge "9" ] ; then
- echo "-Xlog:gc*:stdout:time -XX:NativeMemoryTracking=summary"
- else
- echo "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:NativeMemoryTracking=summary"
- fi
- else
- # no gc options
- echo ""
- fi
-}
-
-MAX_HEAP=`get_heap_size`
-if [ -n "$MAX_HEAP" ]; then
- JAVA_OPTS="-Xms${MAX_HEAP}m -Xmx${MAX_HEAP}m $JAVA_OPTS"
-fi
-
-export MALLOC_ARENA_MAX=2
-
-# Make sure that we use /dev/urandom
-JAVA_OPTS="${JAVA_OPTS} -Dvertx.cacheDirBase=/tmp -Djava.security.egd=file:/dev/./urandom"
-
-# Enable GC logging for memory tracking
-JAVA_OPTS="${JAVA_OPTS} $(get_gc_opts)"
-
-exec java $JAVA_OPTS -jar $JAR $@
diff --git a/test-client/scripts/log4j.properties b/test-client/scripts/log4j.properties
deleted file mode 100644
index 666fe0ea77d..00000000000
--- a/test-client/scripts/log4j.properties
+++ /dev/null
@@ -1,8 +0,0 @@
-# Root logger option
-log4j.rootLogger=DEBUG, stdout
-
-# Direct log messages to stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d{ISO8601}] %p %m (%c{1}:%L)%n
\ No newline at end of file
diff --git a/test-client/scripts/producer.sh b/test-client/scripts/producer.sh
deleted file mode 100755
index 4e403bf0834..00000000000
--- a/test-client/scripts/producer.sh
+++ /dev/null
@@ -1,62 +0,0 @@
-#!/usr/bin/env bash
-set -x
-
-# Defaults
-CONFIG_REGEX="USER=([A-Za-z\_]*)"
-PRODUCER_CONFIGURATION_ENV="PRODUCER_CONFIGURATION"
-
-# Create options for consumer
-if [ -z "$PRODUCER_OPTS" ]; then
- PRODUCER_OPTS_TMP=( "${@}" )
- for i in ${PRODUCER_OPTS_TMP[@]};do
- if [[ ${i} =~ $CONFIG_REGEX ]]
- then
- USER="${BASH_REMATCH[1]}"
- TMP_ENV="PRODUCER_CONFIGURATION_${USER}"
- PRODUCER_CONFIGURATION="${!TMP_ENV}"
- PRODUCER_TLS=$(eval "echo \$$(echo PRODUCER_TLS_${USER})")
- KEYSTORE_LOCATION=$(eval "echo \$$(echo KEYSTORE_LOCATION_${USER})")
- TRUSTSTORE_LOCATION=$(eval "echo \$$(echo TRUSTSTORE_LOCATION_${USER})")
- else
- PRODUCER_OPTS+=("${i}")
- fi
- done
-fi
-
-if [ -z "$KAFKA_LOG4J_OPTS" ]; then
- export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$KAFKA_HOME/log4j.properties"
-fi
-
-if [ -z "$PRODUCER_OPTS" ]; then
- PRODUCER_OPTS="$@"
-fi
-
-# We don't need LOG_DIR because we write no log files, but setting it to a
-# directory avoids trying to create it (and logging a permission denied error)
-export LOG_DIR="$KAFKA_HOME"
-
-if [ "$PRODUCER_TLS"="TRUE" ]; then
- if [ -z "$CERTS_STORE_PASSWORD" ]; then
- export CERTS_STORE_PASSWORD=$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c32)
- fi
- if [ -n "${KEYSTORE_LOCATION}" ]; then
- PRODUCER_CONFIGURATION="${PRODUCER_CONFIGURATION}
-ssl.keystore.password=${CERTS_STORE_PASSWORD}"
- fi
- if [ -n "${TRUSTSTORE_LOCATION}" ]; then
- PRODUCER_CONFIGURATION="${PRODUCER_CONFIGURATION}
-ssl.truststore.password=${CERTS_STORE_PASSWORD}"
- fi
- ./kafka_tls_prepare_certificates.sh "${USER}"
-fi
-
-PROPERTIES_FILE="/tmp/${USER}.properties"
-echo $PROPERTIES_FILE
-
-echo "Starting Producer with configuration:"
-echo "${PRODUCER_CONFIGURATION}" | tee ${PROPERTIES_FILE}
-
-. ./set_kafka_gc_options.sh
-
-# starting Kafka server with final configuration
-$KAFKA_HOME/bin/kafka-verifiable-producer.sh --producer.config $PROPERTIES_FILE "${PRODUCER_OPTS[@]}"
diff --git a/test-client/src/main/java/io/strimzi/test_client/HttpClientsListener.java b/test-client/src/main/java/io/strimzi/test_client/HttpClientsListener.java
deleted file mode 100644
index 1b0c3deecad..00000000000
--- a/test-client/src/main/java/io/strimzi/test_client/HttpClientsListener.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Copyright 2017-2018, Strimzi authors.
- * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
- */
-package io.strimzi.test_client;
-
-import io.strimzi.test.executor.Exec;
-import io.vertx.core.AbstractVerticle;
-import io.vertx.core.Vertx;
-import io.vertx.core.http.HttpServer;
-import io.vertx.core.http.HttpServerRequest;
-import io.vertx.core.http.HttpServerResponse;
-import io.vertx.core.json.JsonArray;
-import io.vertx.core.json.JsonObject;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-/**
- * HTTP listener which can handle vert.x client messages and run proper commands on client pod
- */
-public class HttpClientsListener extends AbstractVerticle {
- private HttpServer httpServer = null;
- private static final Logger LOGGER = LogManager.getLogger(HttpClientsListener.class);
- private HashMap executors = new HashMap<>();
-
- @Override
- public void start() {
- vertx = Vertx.vertx();
- httpServer = vertx.createHttpServer();
- httpServer.requestHandler(request -> {
- switch (request.method()) {
- case POST:
- postHandler(request);
- break;
- case GET:
- getHandler(request);
- break;
- case DELETE:
- deleteHandler(request);
- break;
- default:
- LOGGER.warn("Unexpected HTTP method ({}).", request.method());
- break;
- }
- });
- int port = 4242;
- httpServer.listen(port);
- LOGGER.info("Client listener listening on port: {}", port);
- }
-
- private void deleteHandler(HttpServerRequest request) {
- request.bodyHandler(handler -> {
- JsonObject json = handler.toJsonObject();
- LOGGER.info("Incoming DELETE request: {}", json);
- String clientUUID = json.getString("id");
-
- Exec executor = executors.get(clientUUID);
- executor.stop();
- executors.remove(clientUUID);
-
- HttpServerResponse response = successfulResponse(request);
- JsonObject responseData = new JsonObject();
- responseData.put("ecode", executor.getRetCode());
- responseData.put("stdOut", executor.out());
- responseData.put("stdErr", executor.err());
- responseData.put("isRunning", executor.isRunning());
- response.end(responseData.toString());
-
- });
- }
-
- private void getHandler(HttpServerRequest request) {
- LOGGER.info(executors);
- request.bodyHandler(handler -> {
- JsonObject json = handler.toJsonObject();
- LOGGER.info("Incoming GET request: {}", json);
- String clientUUID = json.getString("id");
-
- Exec executor = executors.get(clientUUID);
-
- HttpServerResponse response = successfulResponse(request);
- JsonObject responseData = new JsonObject();
-
- responseData.put("ecode", executor.getRetCode());
- responseData.put("stdOut", executor.out());
- responseData.put("stdErr", executor.err());
- responseData.put("isRunning", executor.isRunning());
- response.end(responseData.toString());
- });
- }
-
- @SuppressWarnings("unchecked")
- private void postHandler(HttpServerRequest request) {
- request.bodyHandler(handler -> {
- JsonObject json = handler.toJsonObject();
- LOGGER.info("Incoming POST request: {}", json);
- Exec executor = new Exec(Paths.get("/tmp/"));
- UUID uuid = UUID.randomUUID();
-
- JsonArray command = json.getJsonArray("command");
- int count = json.getInteger("count");
-
- JsonArray clientsIDs = new JsonArray();
- for (int i = 0; i < count; i++) {
- try {
- CompletableFuture.runAsync(() -> {
- try {
- LOGGER.info("Execute command: {}", command);
- executor.execute(null, command.getList(), 0);
- } catch (IOException | InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }, runnable -> new Thread(runnable).start());
- } catch (Exception e) {
- e.printStackTrace();
- }
- executors.put(uuid.toString(), executor);
- clientsIDs.add(uuid.toString());
- }
-
- HttpServerResponse response = successfulResponse(request);
- JsonObject responseData = new JsonObject();
- responseData.put("clients", clientsIDs);
- response.end(responseData.toString());
- });
- }
-
- private HttpServerResponse successfulResponse(HttpServerRequest request) {
- HttpServerResponse response = request.response();
- response.setStatusCode(200);
- response.headers().add("Content-Type", "application/json");
- return response;
- }
-}
diff --git a/test-client/src/main/java/io/strimzi/test_client/Main.java b/test-client/src/main/java/io/strimzi/test_client/Main.java
deleted file mode 100644
index e64de29740c..00000000000
--- a/test-client/src/main/java/io/strimzi/test_client/Main.java
+++ /dev/null
@@ -1,14 +0,0 @@
-/*
- * Copyright 2017-2018, Strimzi authors.
- * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
- */
-package io.strimzi.test_client;
-
-
-public class Main {
-
- public static void main(String[] args) {
- HttpClientsListener server = new HttpClientsListener();
- server.start();
- }
-}
diff --git a/test-client/src/main/resources/log4j2.properties b/test-client/src/main/resources/log4j2.properties
deleted file mode 100644
index c762b23422b..00000000000
--- a/test-client/src/main/resources/log4j2.properties
+++ /dev/null
@@ -1,15 +0,0 @@
-name = STConfig
-
-appender.console.type = Console
-appender.console.name = STDOUT
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = [%d] %p %m (%c:%L)%n
-
-rootLogger.level = ${env:STRIMZI_LOG_LEVEL:-INFO}
-rootLogger.appenderRefs = stdout
-rootLogger.appenderRef.console.ref = STDOUT
-rootLogger.additivity = false
-
-logger.clients.name = org.apache.kafka.clients
-logger.clients.level = info
-