Skip to content

Commit

Permalink
ST: Add clients with consume/produce to upgrade tests (strimzi#2287)
Browse files Browse the repository at this point in the history
* Add clients with consume/produce to upgrade tests

Signed-off-by: Jakub Stejskal <[email protected]>

* Fix clients usage for alpha version

Signed-off-by: Jakub Stejskal <[email protected]>

* fix build

Signed-off-by: Jakub Stejskal <[email protected]>

* Rebase fixes

Signed-off-by: Jakub Stejskal <[email protected]>
  • Loading branch information
Frawless authored and scholzj committed Dec 9, 2019
1 parent 4309de9 commit de5ae47
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 23 deletions.
48 changes: 42 additions & 6 deletions systemtest/src/main/resources/StrimziUpgradeST.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"logMessageVersion": "2.2"
},
"client": {
"beforeKafkaUpdate": "strimzi/test-client:0.11.4-kafka-2.1.0",
"beforeKafkaUpdate": "strimzi/test-client:0.12.1-kafka-2.1.0",
"afterKafkaUpdate": "strimzi/test-client:0.12.1-kafka-2.2.1"
},
"supportedK8sVersion": {
Expand Down Expand Up @@ -145,14 +145,50 @@
},
{
"fromVersion":"0.14.0",
"toVersion":"HEAD",
"toVersion":"0.15.0",
"fromExamples":"strimzi-0.14.0",
"toExamples":"HEAD",
"toExamples":"strimzi-0.15.0",
"urlFrom":"https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.14.0/strimzi-0.14.0.zip",
"urlTo":"https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.15.0/strimzi-0.15.0.zip",
"imagesBeforeKafkaUpdate": {
"zookeeper": "strimzi/kafka:0.15.0-kafka-2.3.1",
"kafka": "strimzi/kafka:0.15.0-kafka-2.3.0",
"topicOperator": "strimzi/operator:0.15.0",
"userOperator": "strimzi/operator:0.15.0"
},
"imagesAfterKafkaUpdate": {
"zookeeper": "strimzi/kafka:0.15.0-kafka-2.3.1",
"kafka": "strimzi/kafka:0.15.0-kafka-2.3.1",
"topicOperator": "strimzi/operator:0.15.0",
"userOperator": "strimzi/operator:0.15.0"
},
"proceduresBefore": {
"kafkaVersion": "",
"logMessageVersion": ""
},
"proceduresAfter": {
"kafkaVersion": "2.3.1",
"logMessageVersion": ""
},
"client": {
"beforeKafkaUpdate": "strimzi/test-client:0.14.0-kafka-2.3.0",
"afterKafkaUpdate": "strimzi/test-client:0.15.0-kafka-2.3.1"
},
"supportedK8sVersion": {
"version": "latest",
"reason" : "Test is working on all environment used by QE."
}
},
{
"fromVersion":"0.15.0",
"toVersion":"HEAD",
"fromExamples":"strimzi-0.15.0",
"toExamples":"HEAD",
"urlFrom":"https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.15.0/strimzi-0.15.0.zip",
"urlTo":"HEAD",
"imagesBeforeKafkaUpdate": {
"zookeeper": "strimzi/kafka:latest-kafka-2.3.0",
"kafka": "strimzi/kafka:latest-kafka-2.3.0",
"zookeeper": "strimzi/kafka:latest-kafka-2.3.1",
"kafka": "strimzi/kafka:latest-kafka-2.3.1",
"topicOperator": "strimzi/operator:latest",
"userOperator": "strimzi/operator:latest"
},
Expand All @@ -171,7 +207,7 @@
"logMessageVersion": ""
},
"client": {
"beforeKafkaUpdate": "strimzi/test-client:0.14.0-kafka-2.3.0",
"beforeKafkaUpdate": "strimzi/test-client:0.15.0-kafka-2.3.1",
"afterKafkaUpdate": "strimzi/test-client:latest-kafka-2.3.1"
},
"supportedK8sVersion": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ int receiveMessages(int messageCount, String clusterName, boolean tlsListener, S
ClientArgumentMap consumerArguments = new ClientArgumentMap();
consumerArguments.put(ClientArgument.BROKER_LIST, bootstrapServer);
consumerArguments.put(ClientArgument.GROUP_ID, "my-group" + rng.nextInt(Integer.MAX_VALUE));
if (allowParameter("2.3.0")) {

String image = kubeClient().getPod(podName).getSpec().getContainers().get(0).getImage();
String clientVersion = image.substring(image.length() - 5);

if (allowParameter("2.3.0", clientVersion)) {
consumerArguments.put(ClientArgument.GROUP_INSTANCE_ID, "instance" + rng.nextInt(Integer.MAX_VALUE));
}
consumerArguments.put(ClientArgument.VERBOSE, "");
Expand Down Expand Up @@ -174,9 +178,9 @@ void assertSentAndReceivedMessages(int sent, int received) {
sent == received);
}

private boolean allowParameter(String minimalVersion) {
private boolean allowParameter(String minimalVersion, String clientVersion) {
Pattern pattern = Pattern.compile("(?<major>[0-9]).(?<minor>[0-9]).(?<micro>[0-9])");
Matcher current = pattern.matcher(Environment.ST_KAFKA_VERSION);
Matcher current = pattern.matcher(clientVersion);
Matcher minimal = pattern.matcher(minimalVersion);
if (current.find() && minimal.find()) {
return Integer.parseInt(current.group("major")) >= Integer.parseInt(minimal.group("major"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.strimzi.api.kafka.model.KafkaResources;
import io.strimzi.systemtest.logs.LogCollector;
import io.strimzi.systemtest.utils.FileUtils;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.systemtest.resources.crd.KafkaClientsResource;
import io.strimzi.systemtest.utils.StUtils;
import io.strimzi.systemtest.utils.kubeUtils.controllers.DeploymentUtils;
import io.strimzi.systemtest.utils.kubeUtils.controllers.StatefulSetUtils;
Expand Down Expand Up @@ -43,7 +45,7 @@
import static org.junit.jupiter.api.Assumptions.assumeTrue;

@Tag(UPGRADE)
public class StrimziUpgradeST extends AbstractST {
public class StrimziUpgradeST extends MessagingBaseST {

private static final Logger LOGGER = LogManager.getLogger(StrimziUpgradeST.class);

Expand All @@ -68,6 +70,9 @@ void upgradeStrimziVersion(JsonObject parameters) throws Exception {
File kafkaEphemeralYaml = null;
File kafkaTopicYaml = null;
File kafkaUserYaml = null;
String kafkaClusterName = "my-cluster";
String topicName = "my-topic";
String userName = "my-user";

try {
String url = parameters.getString("urlFrom");
Expand All @@ -93,15 +98,29 @@ void upgradeStrimziVersion(JsonObject parameters) throws Exception {
kafkaUserYaml = new File(dir, parameters.getString("fromExamples") + "/examples/user/kafka-user.yaml");
cmdKubeClient().create(kafkaUserYaml);

// Wait until user will be created
// We cannot use utils wait for that, because in older version there were no status field for CRs
Thread.sleep(10000);

// Deploy clients and exchange messages
KafkaUser kafkaUser = TestUtils.fromYamlString(cmdKubeClient().getResourceAsYaml("kafkauser", userName), KafkaUser.class);
deployClients(parameters.getJsonObject("client").getString("beforeKafkaUpdate"), kafkaUser);

final String defaultKafkaClientsPodName =
kubeClient().listPodsByPrefixInName(kafkaClusterName + "-" + Constants.KAFKA_CLIENTS).get(0).getMetadata().getName();
int sent = sendMessages(50, kafkaClusterName, true, topicName, kafkaUser, defaultKafkaClientsPodName);
int received = receiveMessages(50, kafkaClusterName, true, topicName, kafkaUser, defaultKafkaClientsPodName);
assertSentAndReceivedMessages(sent, received);

makeSnapshots();
logPodImages();
// Execution of required procedures before upgrading CO
changeKafkaAndLogFormatVersion(parameters.getJsonObject("proceduresBefore"));

// Upgrade the CO
// Modify + apply installation files
LOGGER.info("Going to update CO from {} to {}", parameters.getString("fromVersion"), parameters.getString("toVersion"));
if ("HEAD" .equals(parameters.getString("toVersion"))) {
LOGGER.info("Updating");
coDir = new File("../install/cluster-operator");
upgradeClusterOperator(coDir, parameters.getJsonObject("imagesBeforeKafkaUpdate"));
} else {
Expand All @@ -119,18 +138,38 @@ void upgradeStrimziVersion(JsonObject parameters) throws Exception {
logPodImages();
checkAllImages(parameters.getJsonObject("imagesAfterKafkaUpdate"));

// Delete old clients
kubeClient().deleteDeployment(kafkaClusterName + "-" + Constants.KAFKA_CLIENTS);
DeploymentUtils.waitForDeploymentDeletion(kafkaClusterName + "-" + Constants.KAFKA_CLIENTS);

deployClients(parameters.getJsonObject("client").getString("afterKafkaUpdate"), kafkaUser);

final String afterUpgradeKafkaClientsPodName =
kubeClient().listPodsByPrefixInName(kafkaClusterName + "-" + Constants.KAFKA_CLIENTS).get(0).getMetadata().getName();
received = receiveMessages(50, kafkaClusterName, true, topicName, kafkaUser, afterUpgradeKafkaClientsPodName);
assertSentAndReceivedMessages(sent, received);

// Check errors in CO log
assertNoCoErrorsLogged(0);

// Tidy up
} catch (KubeClusterException e) {
if (kafkaEphemeralYaml != null) {
cmdKubeClient().delete(kafkaEphemeralYaml);
e.printStackTrace();
try {
if (kafkaEphemeralYaml != null) {
cmdKubeClient().delete(kafkaEphemeralYaml);
}
} catch (Exception ex) {
LOGGER.warn("Failed to delete resources: {}", kafkaEphemeralYaml.getName());
}
if (coDir != null) {
cmdKubeClient().delete(coDir);
try {
if (coDir != null) {
cmdKubeClient().delete(coDir);
}
} catch (Exception ex) {
LOGGER.warn("Failed to delete resources: {}", coDir.getName());
}
e.printStackTrace();

throw e;
} finally {
// Get current date to create a unique folder
Expand All @@ -148,7 +187,7 @@ void upgradeStrimziVersion(JsonObject parameters) throws Exception {

private void upgradeClusterOperator(File coInstallDir, JsonObject images) {
copyModifyApply(coInstallDir);
LOGGER.info("Waiting for CO deployment");
LOGGER.info("Waiting for CO upgrade");
DeploymentUtils.waitForDeploymentReady("strimzi-cluster-operator", 1);
waitForRollingUpdate();
checkAllImages(images);
Expand All @@ -169,10 +208,14 @@ private void copyModifyApply(File root) {
private void deleteInstalledYamls(File root) {
if (root != null) {
Arrays.stream(Objects.requireNonNull(root.listFiles())).sorted().forEach(f -> {
if (f.getName().matches(".*RoleBinding.*")) {
cmdKubeClient().deleteContent(TestUtils.changeRoleBindingSubject(f, NAMESPACE));
} else {
cmdKubeClient().delete(f);
try {
if (f.getName().matches(".*RoleBinding.*")) {
cmdKubeClient().deleteContent(TestUtils.changeRoleBindingSubject(f, NAMESPACE));
} else {
cmdKubeClient().delete(f);
}
} catch (Exception ex) {
LOGGER.warn("Failed to delete resources: {}", f.getName());
}
});
}
Expand Down Expand Up @@ -239,8 +282,8 @@ private void checkContainerImages(Map<String, String> matchLabels, int container
List<Pod> pods1 = kubeClient().listPods(matchLabels);
for (Pod pod : pods1) {
if (!image.equals(pod.getSpec().getContainers().get(container).getImage())) {
LOGGER.debug("Expected image: {} \nCurrent image: {}", image, pod.getSpec().getContainers().get(container).getImage());
assertThat("Used image is not valid!", pod.getSpec().getContainers().get(container).getImage(), is(image));
LOGGER.debug("Expected image for pod {}: {} \nCurrent image: {}", pod.getMetadata().getName(), image, pod.getSpec().getContainers().get(container).getImage());
assertThat("Used image for pod " + pod.getMetadata().getName() + " is not valid!", pod.getSpec().getContainers().get(container).getImage(), is(image));
}
}
}
Expand Down Expand Up @@ -285,6 +328,20 @@ void logPodImages() {
}
}

void deployClients(String image, KafkaUser kafkaUser) {
// Deploy new clients
KafkaClientsResource.deployKafkaClients(true, CLUSTER_NAME + "-" + Constants.KAFKA_CLIENTS, kafkaUser)
.editSpec()
.editTemplate()
.editSpec()
.editFirstContainer()
.withImage(image)
.endContainer()
.endSpec()
.endTemplate()
.endSpec().done();
}

@BeforeEach
void setupEnvironment() {
cluster.createNamespace(NAMESPACE);
Expand Down

0 comments on commit de5ae47

Please sign in to comment.