From 7e2014d14f00497f57014ec5b3f000186cc5f104 Mon Sep 17 00:00:00 2001 From: Jakub Scholz Date: Wed, 29 Jan 2020 17:27:45 +0100 Subject: [PATCH] Add JMX Trans deployment (#2474) * feat: Add jmxtrans deployment Added a jmxtrans deployment which will obtain JMX metrics automatically from the kafka brokers. It will also automatically roll when security or kafka brokers are updated. Signed-off-by: Julian Goh * refactor: Address PR comments Added more documentations and removed redundant variables. Signed-off-by: Julian Goh * refactor: Address PR comments Implement suggested doc changes. Signed-off-by: Julian Goh * docs: Add references Signed-off-by: Julian Goh * refactor: Address PR comments Restructure CRD, built custom Dockerfile for new JmxTrans Signed-off-by: Julian Goh * refactor: Address PR comments Renamed variables and classes and got rid of unnecessary values. Added JmxTrans image in the Makefiles Signed-off-by: Julian Goh * fix: add jmxtrans image into files Add jmxtrans image into the helm charts. Signed-off-by: Julian Goh * refactor: Moved inner classes and rewording Moved inner static classes outside of the main class and moved it into its own class. Added unit tests to test deserializing of the classes. Improved wording on classes. Also added more logging Signed-off-by: Julian Goh * Review commennts Signed-off-by: Jakub Scholz --- CHANGELOG.md | 2 + Makefile | 1 + .../api/kafka/model/JmxTransResources.java | 30 ++ .../strimzi/api/kafka/model/JmxTransSpec.java | 111 ++++++ .../api/kafka/model/KafkaJmxOptions.java | 1 - .../io/strimzi/api/kafka/model/KafkaSpec.java | 11 + .../JmxTransOutputDefinitionTemplate.java | 118 +++++++ .../model/template/JmxTransQueryTemplate.java | 81 +++++ .../strimzi/api/kafka/model/JmxTransTest.java | 74 ++++ .../strimzi/api/kafka/model/KafkaCrdIT.java | 26 ++ .../api/kafka/model/KafkaJmxOptionsTest.java | 2 +- ...nition-with-missing-required-property.yaml | 27 ++ ...ueries-with-missing-required-property.yaml | 24 ++ .../operator/cluster/model/JmxTrans.java | 333 ++++++++++++++++++ .../operator/cluster/model/KafkaCluster.java | 10 +- .../components/JmxTransOutputWriter.java | 77 ++++ .../model/components/JmxTransQueries.java | 48 +++ .../model/components/JmxTransServer.java | 74 ++++ .../model/components/JmxTransServers.java | 25 ++ .../assembly/KafkaAssemblyOperator.java | 75 +++- .../operator/cluster/model/JmxTransTest.java | 161 +++++++++ .../KafkaAssemblyOperatorMockTest.java | 6 +- .../assembly/KafkaAssemblyOperatorTest.java | 42 +++ docker-images/build.sh | 3 +- docker-images/jmxtrans/Dockerfile | 48 +++ docker-images/jmxtrans/Makefile | 8 + docker-images/jmxtrans/docker-entrypoint.sh | 25 ++ .../jmxtrans/jmxtrans_readiness_check.sh | 21 ++ .../assemblies/assembly-jmxtrans.adoc | 24 ++ .../assembly-kafka-jmx-options.adoc | 2 +- documentation/modules/appendix_crds.adoc | 65 +++- documentation/modules/con-jmxtrans.adoc | 11 + .../modules/proc-jmxtrans-deployment.adoc | 80 +++++ .../ref-configuring-container-images.adoc | 4 + documentation/modules/snip-images.adoc | 7 + documentation/snip-images.sh | 7 + examples/metrics/jmxtrans.yaml | 38 ++ helm-charts/strimzi-kafka-operator/README.md | 3 + .../templates/040-Crd-kafka.yaml | 58 +++ ...0-Deployment-strimzi-cluster-operator.yaml | 2 + .../strimzi-kafka-operator/values.yaml | 5 + install/cluster-operator/040-Crd-kafka.yaml | 58 +++ ...0-Deployment-strimzi-cluster-operator.yaml | 2 + .../install/strimzi-service-monitor.yaml | 4 +- 44 files changed, 1816 insertions(+), 18 deletions(-) create mode 100644 api/src/main/java/io/strimzi/api/kafka/model/JmxTransResources.java create mode 100644 api/src/main/java/io/strimzi/api/kafka/model/JmxTransSpec.java create mode 100644 api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransOutputDefinitionTemplate.java create mode 100644 api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransQueryTemplate.java create mode 100644 api/src/test/java/io/strimzi/api/kafka/model/JmxTransTest.java create mode 100644 api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-output-definition-with-missing-required-property.yaml create mode 100644 api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-queries-with-missing-required-property.yaml create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/JmxTrans.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransOutputWriter.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransQueries.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServer.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServers.java create mode 100644 cluster-operator/src/test/java/io/strimzi/operator/cluster/model/JmxTransTest.java create mode 100644 docker-images/jmxtrans/Dockerfile create mode 100644 docker-images/jmxtrans/Makefile create mode 100755 docker-images/jmxtrans/docker-entrypoint.sh create mode 100755 docker-images/jmxtrans/jmxtrans_readiness_check.sh create mode 100644 documentation/assemblies/assembly-jmxtrans.adoc create mode 100644 documentation/modules/con-jmxtrans.adoc create mode 100644 documentation/modules/proc-jmxtrans-deployment.adoc create mode 100644 examples/metrics/jmxtrans.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cf4e3a2e7b..1474743ee1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # CHANGELOG +## 0.17.0 +* Add Jmxtrans deployment ## 0.17.0 diff --git a/Makefile b/Makefile index 7ddc803990c..59977791bc0 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,7 @@ release_version: $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/value: "\?strimzi\/kafka-bridge:[a-zA-Z0-9_.-]\+"\?/s/strimzi\/kafka-bridge:[a-zA-Z0-9_.-]\+/strimzi\/kafka-bridge:$(BRIDGE_VERSION)/g' {} \; $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/value: "\?strimzi\/kafka:[a-zA-Z0-9_.-]\+"\?/s/strimzi\/kafka:[a-zA-Z0-9_.-]\+-kafka-\([0-9.]\+\)/strimzi\/kafka:$(RELEASE_VERSION)-kafka-\1/g' {} \; $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/[0-9.]\+=strimzi\/kafka[a-zA-Z0-9_.-]\?\+:[a-zA-Z0-9_.-]\+-kafka-[0-9.]\+"\?/s/:[a-zA-Z0-9_.-]\+-kafka-\([0-9.]\+\)/:$(RELEASE_VERSION)-kafka-\1/g' {} \; + $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/value: "\?strimzi\/jmxtrans:[a-zA-Z0-9_.-]\+"\?/s/strimzi\/jmxtrans:[a-zA-Z0-9_.-]\+/strimzi\/jmxtrans:$(RELEASE_VERSION)/g' {} \; # Set Kafka Bridge version to its own version $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/value: "\?strimzi\/kafka-bridge:[a-zA-Z0-9_.-]\+"\?/s/strimzi\/kafka-bridge:[a-zA-Z0-9_.-]\+/strimzi\/kafka-bridge:$(BRIDGE_VERSION)/g' {} \; diff --git a/api/src/main/java/io/strimzi/api/kafka/model/JmxTransResources.java b/api/src/main/java/io/strimzi/api/kafka/model/JmxTransResources.java new file mode 100644 index 00000000000..c602e1ed7e4 --- /dev/null +++ b/api/src/main/java/io/strimzi/api/kafka/model/JmxTransResources.java @@ -0,0 +1,30 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model; + +/** + * Encapsulates the naming scheme used for the resources which the Cluster Operator manages for a + * {@code JmxTrans} instance. + */ +public class JmxTransResources { + protected JmxTransResources() { } + /** + * Returns the name of the JmxTrans {@code Deployment}. + * @param kafkaClusterName The {@code metadata.name} of the {@code Kafka} resource. + * @return The name of the corresponding JmxTrans {@code Deployment}. + */ + public static String deploymentName(String kafkaClusterName) { + return kafkaClusterName + "-kafka-jmx-trans"; + } + /** + * Returns the name of the JmxTrans {@code ServiceAccount}. + * @param kafkaClusterName The {@code metadata.name} of the {@code Kafka} resource. + * @return The name of the corresponding JmxTrans {@code ServiceAccount}. + */ + public static String serviceAccountName(String kafkaClusterName) { + return deploymentName(kafkaClusterName); + } + +} \ No newline at end of file diff --git a/api/src/main/java/io/strimzi/api/kafka/model/JmxTransSpec.java b/api/src/main/java/io/strimzi/api/kafka/model/JmxTransSpec.java new file mode 100644 index 00000000000..902b151ccd1 --- /dev/null +++ b/api/src/main/java/io/strimzi/api/kafka/model/JmxTransSpec.java @@ -0,0 +1,111 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model; + + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.template.JmxTransOutputDefinitionTemplate; +import io.strimzi.api.kafka.model.template.JmxTransQueryTemplate; +import io.strimzi.crdgenerator.annotations.Description; +import io.sundr.builder.annotations.Buildable; +import lombok.EqualsAndHashCode; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Representation for options to be passed into setting up the JmxTrans. + */ +@Buildable( + editableEnabled = false, + generateBuilderPackage = false, + builderPackage = "io.fabric8.kubernetes.api.builder" +) +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +@JsonPropertyOrder({"image", "outputDefinitions", "logLevel", "kafkaQueries", "resources"}) +@EqualsAndHashCode +public class JmxTransSpec implements UnknownPropertyPreserving, Serializable { + public static final int DEFAULT_HEALTHCHECK_DELAY = 15; + public static final int DEFAULT_HEALTHCHECK_TIMEOUT = 5; + + private static final long serialVersionUID = 1L; + protected String image; + private String logLevel; + private List outputDefinitions = null; + private List kafkaQueries = null; + + private ResourceRequirements resources; + + private Map additionalProperties = new HashMap<>(0); + + @Description("The image to use for the JmxTrans") + public String getImage() { + return image; + } + + public void setImage(String image) { + this.image = image; + } + + @Description("Sets the logging level of the JmxTrans deployment." + + "For more information see, https://github.com/jmxtrans/jmxtrans-agent/wiki/Troubleshooting[JmxTrans Logging Level]") + public String getLogLevel() { + return logLevel; + } + + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + } + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + @Description("Defines the output hosts that will be referenced later on. " + + "For more information on these properties see, xref:type-JmxTransOutputDefinitionTemplate-reference[`JmxTransOutputDefinitionTemplate` schema reference].") + public List getOutputDefinitions() { + return outputDefinitions; + } + + public void setOutputDefinitions(List outputDefinitions) { + this.outputDefinitions = outputDefinitions; + } + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("Queries to send to the Kafka brokers to define what data should be read from each broker. " + + "For more information on these properties see, xref:type-JmxTransQueryTemplate-reference[`JmxTransQueryTemplate` schema reference].") + public List getKafkaQueries() { + return kafkaQueries; + } + + public void setKafkaQueries(List kafkaQueries) { + this.kafkaQueries = kafkaQueries; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("CPU and memory resources to reserve.") + public ResourceRequirements getResources() { + return resources; + } + + public void setResources(ResourceRequirements resources) { + this.resources = resources; + } + + @Override + public Map getAdditionalProperties() { + return additionalProperties; + } + + @Override + public void setAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + } + +} \ No newline at end of file diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaJmxOptions.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaJmxOptions.java index 4c3ff958cda..a38dfc21aaf 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaJmxOptions.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaJmxOptions.java @@ -47,4 +47,3 @@ public void setAdditionalProperty(String name, Object value) { this.additionalProperties.put(name, value); } } - diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaSpec.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaSpec.java index 6c2a771bae6..e7174b3ac5b 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaSpec.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaSpec.java @@ -41,6 +41,7 @@ public class KafkaSpec implements UnknownPropertyPreserving, Serializable { private TopicOperatorSpec topicOperator; private EntityOperatorSpec entityOperator; private CertificateAuthority clusterCa; + private JmxTransSpec jmxTrans; private KafkaExporterSpec kafkaExporter; private CertificateAuthority clientsCa; @@ -113,6 +114,16 @@ public List getMaintenanceTimeWindows() { return maintenanceTimeWindows; } + @Description("Configuration for JmxTrans. When the property is present a JmxTrans deployment is created for gathering JMX metrics from each Kafka broker. " + + "For more information see https://github.com/jmxtrans/jmxtrans[JmxTrans GitHub]") + public JmxTransSpec getJmxTrans() { + return jmxTrans; + } + + public void setJmxTrans(JmxTransSpec jmxTrans) { + this.jmxTrans = jmxTrans; + } + public void setMaintenanceTimeWindows(List maintenanceTimeWindows) { this.maintenanceTimeWindows = maintenanceTimeWindows; } diff --git a/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransOutputDefinitionTemplate.java b/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransOutputDefinitionTemplate.java new file mode 100644 index 00000000000..9b42ba61a1e --- /dev/null +++ b/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransOutputDefinitionTemplate.java @@ -0,0 +1,118 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model.template; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.strimzi.api.kafka.model.UnknownPropertyPreserving; +import io.strimzi.crdgenerator.annotations.Description; +import io.sundr.builder.annotations.Buildable; +import lombok.EqualsAndHashCode; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Representation for options to define where and how information will be pushed to remote sources of information + */ +@Buildable( + editableEnabled = false, + generateBuilderPackage = false, + builderPackage = "io.fabric8.kubernetes.api.builder" +) +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +@JsonPropertyOrder({"outputType", "host", "port", "flushDelayInSeconds", "typeNames", "name"}) +@EqualsAndHashCode +public class JmxTransOutputDefinitionTemplate implements Serializable, UnknownPropertyPreserving { + + private static final long serialVersionUID = 1L; + + private String outputType; + private String host; + private Integer port; + private Integer flushDelayInSeconds; + private String name; + private List typeNames; + + private Map additionalProperties = new HashMap<>(0); + + @JsonProperty(value = "outputType", required = true) + @Description("Template for setting the format of the data that will be pushed." + + "For more information see https://github.com/jmxtrans/jmxtrans/wiki/OutputWriters[JmxTrans OutputWriters]") + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getOutputType() { + return outputType; + } + + public void setOutputType(String outputType) { + this.outputType = outputType; + } + + @Description("The DNS/hostname of the remote host that the data is pushed to.") + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + @Description("The port of the remote host that the data is pushed to.") + @JsonInclude(JsonInclude.Include.NON_NULL) + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + @Description("How many seconds the JmxTrans waits before pushing a new set of data out.") + @JsonInclude(JsonInclude.Include.NON_NULL) + public Integer getFlushDelayInSeconds() { + return flushDelayInSeconds; + } + + public void setFlushDelayInSeconds(Integer flushDelayInSeconds) { + this.flushDelayInSeconds = flushDelayInSeconds; + } + + @Description("Template for filtering data to be included in response to a wildcard query. " + + "For more information see https://github.com/jmxtrans/jmxtrans/wiki/Queries[JmxTrans queries]") + @JsonInclude(JsonInclude.Include.NON_NULL) + public List getTypeNames() { + return typeNames; + } + + public void setTypeNames(List typeNames) { + this.typeNames = typeNames; + } + + @Description("Template for setting the name of the output definition. This is used to identify where to send " + + "the results of queries should be sent.") + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(required = true) + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public Map getAdditionalProperties() { + return this.additionalProperties; + } + + @Override + public void setAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + } +} \ No newline at end of file diff --git a/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransQueryTemplate.java b/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransQueryTemplate.java new file mode 100644 index 00000000000..f0ea6e1627f --- /dev/null +++ b/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransQueryTemplate.java @@ -0,0 +1,81 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model.template; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.strimzi.api.kafka.model.UnknownPropertyPreserving; +import io.strimzi.crdgenerator.annotations.Description; +import io.sundr.builder.annotations.Buildable; +import lombok.EqualsAndHashCode; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Buildable( + editableEnabled = false, + generateBuilderPackage = false, + builderPackage = "io.fabric8.kubernetes.api.builder" +) +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +@JsonPropertyOrder({ + "targetMBean", "attributes", "outputs"}) +@EqualsAndHashCode +public class JmxTransQueryTemplate implements Serializable, UnknownPropertyPreserving { + private String targetMBean; + private List attributes; + private List outputs; + + private static final long serialVersionUID = 1L; + private Map additionalProperties = new HashMap<>(0); + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("If using wildcards instead of a specific MBean then the data is gathered from multiple MBeans. " + + "Otherwise if specifying an MBean then data is gathered from that specified MBean.") + public String getTargetMBean() { + return targetMBean; + } + + public void setTargetMBean(String targetMBean) { + this.targetMBean = targetMBean; + } + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("Determine which attributes of the targeted MBean should be included") + public List getAttributes() { + return attributes; + } + + public void setAttributes(List attributes) { + this.attributes = attributes; + } + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("List of the names of output definitions specified in the spec.kafka.jmxTrans.outputDefinitions that have defined where JMX metrics are pushed to, and in which data format") + public List getOutputs() { + return outputs; + } + + public void setOutputs(List outputs) { + this.outputs = outputs; + } + + @Override + public Map getAdditionalProperties() { + return this.additionalProperties; + } + + @Override + public void setAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + } + +} \ No newline at end of file diff --git a/api/src/test/java/io/strimzi/api/kafka/model/JmxTransTest.java b/api/src/test/java/io/strimzi/api/kafka/model/JmxTransTest.java new file mode 100644 index 00000000000..72e0cae5833 --- /dev/null +++ b/api/src/test/java/io/strimzi/api/kafka/model/JmxTransTest.java @@ -0,0 +1,74 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model; + +import io.strimzi.test.TestUtils; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +public class JmxTransTest { + @Test + public void testQueries() { + JmxTransSpec opts = TestUtils.fromJson("{" + + "\"kafkaQueries\": [{" + + "\"targetMBean\": \"targetMBean\"," + + "\"attributes\": [\"attribute0\", \"attribute1\"]," + + "\"outputs\": [\"output0\", \"output1\"]" + + "}, {" + + "\"targetMBean\": \"targetMBean\"," + + "\"attributes\": [\"attribute1\", \"attribute2\"]," + + "\"outputs\": [\"output0\", \"output1\"]" + + "}]" + + "}", JmxTransSpec.class); + + assertThat(opts, is(notNullValue())); + assertThat(opts.getKafkaQueries().size(), is(2)); + assertThat(opts.getKafkaQueries().get(0).getTargetMBean(), is("targetMBean")); + assertThat(opts.getKafkaQueries().get(0).getAttributes().size(), is(2)); + assertThat(opts.getKafkaQueries().get(0).getAttributes().get(0), is("attribute0")); + assertThat(opts.getKafkaQueries().get(0).getAttributes().get(1), is("attribute1")); + assertThat(opts.getKafkaQueries().get(0).getOutputs().get(0), is("output0")); + assertThat(opts.getKafkaQueries().get(0).getOutputs().get(1), is("output1")); + } + + @Test + public void testOutputDefinition() { + JmxTransSpec opts = TestUtils.fromJson("{" + + "\"outputDefinitions\": [{" + + "\"outputType\": \"targetOutputType\"," + + "\"host\": \"targetHost\"," + + "\"port\": 9999," + + "\"flushDelayInSeconds\": 1," + + "\"typeNames\": [\"typeName0\", \"typeName1\"]," + + "\"name\": \"targetName\"" + + "}, {" + + "\"outputType\": \"targetOutputType\"," + + "\"name\": \"name1\"" + + "}]" + + "}", JmxTransSpec.class); + + assertThat(opts, is(notNullValue())); + assertThat(opts.getOutputDefinitions().size(), is(2)); + assertThat(opts.getOutputDefinitions().get(0).getHost(), is("targetHost")); + assertThat(opts.getOutputDefinitions().get(0).getOutputType(), is("targetOutputType")); + assertThat(opts.getOutputDefinitions().get(0).getFlushDelayInSeconds(), is(1)); + assertThat(opts.getOutputDefinitions().get(0).getTypeNames().get(0), is("typeName0")); + assertThat(opts.getOutputDefinitions().get(0).getTypeNames().get(1), is("typeName1")); + assertThat(opts.getOutputDefinitions().get(0).getName(), is("targetName")); + } + + @Test + public void getCustomImage() { + JmxTransSpec opts = TestUtils.fromJson("{" + + "\"image\": \"testImage\"" + + "}", JmxTransSpec.class); + + assertThat(opts, is(notNullValue())); + assertThat(opts.getImage(), is("testImage")); + } +} diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java index 86d9a4733dd..e24ac869ee1 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java @@ -6,6 +6,7 @@ import io.strimzi.test.TestUtils; import io.strimzi.test.k8s.exceptions.KubeClusterException; +import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -154,6 +155,31 @@ public void testKafkaWithInvalidJmxAuthentication() { containsStringIgnoringCase("spec.kafka.jmxOptions.authentication.type: Unsupported value: \"not-right\": supported values: \"password\""))); } + @Test + void testJmxOptionsWithoutRequiredOutputDefinitionKeys() { + Throwable exception = assertThrows( + KubeClusterException.InvalidResource.class, + () -> { + createDelete(Kafka.class, "JmxTrans-output-definition-with-missing-required-property.yaml"); + }); + + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.outputDefinitions.outputType in body is required")); + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.outputDefinitions.name in body is required")); + } + + @Test + void testJmxOptionsWithoutRequiredQueryKeys() { + Throwable exception = assertThrows( + KubeClusterException.InvalidResource.class, + () -> { + createDelete(Kafka.class, "JmxTrans-queries-with-missing-required-property.yaml"); + }); + + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.kafkaQueries.attributes in body is required")); + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.kafkaQueries.targetMBean in body is required")); + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.kafkaQueries.outputs in body is required")); + } + @BeforeAll void setupEnvironment() { cluster.createNamespace(NAMESPACE); diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaJmxOptionsTest.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaJmxOptionsTest.java index 2f234c1a381..0dc58fe9ed1 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaJmxOptionsTest.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaJmxOptionsTest.java @@ -35,4 +35,4 @@ public void testNoJmxOpts() { assertThat(opts.getAuthentication(), is(nullValue())); assertThat(opts.getAdditionalProperties(), is(Collections.emptyMap())); } -} +} \ No newline at end of file diff --git a/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-output-definition-with-missing-required-property.yaml b/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-output-definition-with-missing-required-property.yaml new file mode 100644 index 00000000000..171cd65549e --- /dev/null +++ b/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-output-definition-with-missing-required-property.yaml @@ -0,0 +1,27 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: Kafka +metadata: + name: strimzi-jmxtrans +spec: + kafka: + version: 2.3.1 + replicas: 3 + storage: + type: persistent-claim + size: 500Gi + listeners: + plain: {} + tls: {} + jmxTrans: + outputDefinitions: + - host: "wont matter about the host" + port: 123 + flushDelayInSeconds: 321 + kafkaQueries: + - targetMBean: "MBean" + attributes: ["attribute1"] + outputs: ["outputs"] + zookeeper: + replicas: 1 + storage: + type: ephemeral \ No newline at end of file diff --git a/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-queries-with-missing-required-property.yaml b/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-queries-with-missing-required-property.yaml new file mode 100644 index 00000000000..3bf366a5f62 --- /dev/null +++ b/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-queries-with-missing-required-property.yaml @@ -0,0 +1,24 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: Kafka +metadata: + name: strimzi-jmxtrans +spec: + kafka: + version: 2.3.1 + replicas: 3 + storage: + type: persistent-claim + size: 500Gi + listeners: + plain: {} + tls: {} + jmxTrans: + outputDefinitions: + - outputType: "wont matter about type" + name: "won't matter" + kafkaQueries: + - empty: "object" + zookeeper: + replicas: 1 + storage: + type: ephemeral \ No newline at end of file diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/JmxTrans.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/JmxTrans.java new file mode 100644 index 00000000000..89865fd6b07 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/JmxTrans.java @@ -0,0 +1,333 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.IntOrString; +import io.fabric8.kubernetes.api.model.LocalObjectReference; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentStrategy; +import io.fabric8.kubernetes.api.model.apps.DeploymentStrategyBuilder; +import io.fabric8.kubernetes.api.model.apps.RollingUpdateDeploymentBuilder; +import io.strimzi.api.kafka.model.JmxTransResources; +import io.strimzi.api.kafka.model.JmxTransSpec; +import io.strimzi.api.kafka.model.Kafka; +import io.strimzi.api.kafka.model.KafkaJmxAuthenticationPassword; +import io.strimzi.api.kafka.model.Probe; +import io.strimzi.api.kafka.model.ProbeBuilder; +import io.strimzi.api.kafka.model.template.JmxTransOutputDefinitionTemplate; +import io.strimzi.api.kafka.model.template.JmxTransQueryTemplate; +import io.strimzi.operator.cluster.model.components.JmxTransOutputWriter; +import io.strimzi.operator.cluster.model.components.JmxTransQueries; +import io.strimzi.operator.cluster.model.components.JmxTransServer; +import io.strimzi.operator.cluster.model.components.JmxTransServers; +import io.strimzi.operator.common.model.Labels; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/* + * Class for handling JmxTrans configuration passed by the user. Used to get the resources needed to create the + * JmxTrans deployment including: config map, deployment, and service accounts. + */ +public class JmxTrans extends AbstractModel { + + // Configuration defaults + private static final String STRIMZI_DEFAULT_JMXTRANS_IMAGE = "STRIMZI_DEFAULT_JMXTRANS_IMAGE"; + public static final Probe READINESS_PROBE_OPTIONS = new ProbeBuilder().withTimeoutSeconds(5).withInitialDelaySeconds(15).build(); + private static final io.strimzi.api.kafka.model.Probe DEFAULT_JMX_TRANS_PROBE = new io.strimzi.api.kafka.model.ProbeBuilder() + .withInitialDelaySeconds(JmxTransSpec.DEFAULT_HEALTHCHECK_DELAY) + .withTimeoutSeconds(JmxTransSpec.DEFAULT_HEALTHCHECK_TIMEOUT) + .build(); + + // Configuration for mounting `config.json` to be used as Config during run time of the JmxTrans + public static final String JMXTRANS_CONFIGMAP_KEY = "config.json"; + public static final String JMXTRANS_VOLUME_NAME = "jmx-config"; + public static final String CONFIG_MAP_ANNOTATION_KEY = "config-map-revision"; + protected static final String JMX_METRICS_CONFIG_SUFFIX = "-jmxtrans-config"; + public static final String JMX_FILE_PATH = "/var/lib/jmxtrans"; + + protected static final String ENV_VAR_JMXTRANS_LOGGING_LEVEL = "JMXTRANS_LOGGING_LEVEL"; + + + private boolean isDeployed; + private boolean isJmxAuthenticated; + private String configMapName; + private String clusterName; + private String loggingLevel; + + /** + * Constructor + * + * @param namespace Kubernetes/OpenShift namespace where JmxTrans resources are going to be created + * @param kafkaCluster kafkaCluster name + * @param labels labels to add to the kafkaCluster + */ + protected JmxTrans(String namespace, String kafkaCluster, Labels labels) { + super(namespace, kafkaCluster, labels); + this.name = JmxTransResources.deploymentName(kafkaCluster); + this.clusterName = kafkaCluster; + this.replicas = 1; + this.readinessPath = "/metrics"; + this.livenessPath = "/metrics"; + this.readinessProbeOptions = READINESS_PROBE_OPTIONS; + + this.mountPath = "/var/lib/kafka"; + + this.logAndMetricsConfigVolumeName = "kafka-metrics-and-logging"; + this.logAndMetricsConfigMountPath = "/usr/share/jmxtrans/conf/"; + + // Metrics must be enabled as JmxTrans is all about gathering JMX metrics from the Kafka brokers and pushing it to remote sources. + this.isMetricsEnabled = true; + } + + public static JmxTrans fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup versions) { + JmxTrans result = null; + JmxTransSpec spec = kafkaAssembly.getSpec().getJmxTrans(); + if (spec != null) { + if (kafkaAssembly.getSpec().getKafka().getJmxOptions() == null) { + String error = String.format("Can't start up JmxTrans '%s' in '%s' as Kafka spec.kafka.jmxOptions is not specified", + JmxTransResources.deploymentName(kafkaAssembly.getMetadata().getName()), + kafkaAssembly.getMetadata().getNamespace()); + log.warn(error); + throw new InvalidResourceException(error); + } + result = new JmxTrans(kafkaAssembly.getMetadata().getNamespace(), + kafkaAssembly.getMetadata().getName(), + Labels.fromResource(kafkaAssembly).withKind(kafkaAssembly.getKind())); + result.isDeployed = true; + + if (kafkaAssembly.getSpec().getKafka().getJmxOptions().getAuthentication() instanceof KafkaJmxAuthenticationPassword) { + result.isJmxAuthenticated = true; + } + + result.loggingLevel = spec.getLogLevel() == null ? "" : spec.getLogLevel(); + + result.setResources(spec.getResources()); + + String image = spec.getImage(); + if (image == null) { + image = System.getenv().getOrDefault(STRIMZI_DEFAULT_JMXTRANS_IMAGE, "strimzi/jmxtrans:latest"); + } + result.setImage(image); + + result.setOwnerReference(kafkaAssembly); + } + + return result; + } + + public Deployment generateDeployment(ImagePullPolicy imagePullPolicy, List imagePullSecrets) { + if (!isDeployed()) { + return null; + } + + DeploymentStrategy updateStrategy = new DeploymentStrategyBuilder() + .withType("RollingUpdate") + .withRollingUpdate(new RollingUpdateDeploymentBuilder() + .withMaxSurge(new IntOrString(1)) + .withMaxUnavailable(new IntOrString(0)) + .build()) + .build(); + + return createDeployment( + updateStrategy, + Collections.emptyMap(), + Collections.emptyMap(), + getMergedAffinity(), + getInitContainers(imagePullPolicy), + getContainers(imagePullPolicy), + getVolumes(), + imagePullSecrets + ); + } + + /** + * Generates the string'd config that the JmxTrans deployment needs to run. It is configured by the user in the yaml + * and this method will convert that into the config the JmxTrans understands. + * @param spec The JmxTrans that was defined by the user + * @param numOfBrokers number of kafka brokers + * @return the jmx trans config file that targets each broker + */ + private String generateJMXConfig(JmxTransSpec spec, int numOfBrokers) throws JsonProcessingException { + JmxTransServers servers = new JmxTransServers(); + servers.setServers(new ArrayList<>()); + ObjectMapper mapper = new ObjectMapper(); + String headlessService = KafkaCluster.headlessServiceName(cluster); + for (int brokerNumber = 0; brokerNumber < numOfBrokers; brokerNumber++) { + String brokerServiceName = KafkaCluster.externalServiceName(clusterName, brokerNumber) + "." + headlessService; + servers.getServers().add(convertSpecToServers(spec, brokerServiceName)); + } + try { + return mapper.writeValueAsString(servers); + } catch (JsonProcessingException e) { + log.error("Could not create JmxTrans config json because: " + e.getMessage()); + throw e; + } + } + + /** + * Generates the JmxTrans config map + * + * @param spec The JmxTransSpec that was defined by the user + * @param numOfBrokers number of kafka brokers + * @return the config map that mounts the JmxTrans config + * @throws JsonProcessingException when JmxTrans config can't be created properly + */ + public ConfigMap generateJmxTransConfigMap(JmxTransSpec spec, int numOfBrokers) throws JsonProcessingException { + Map data = new HashMap<>(); + String jmxConfig = generateJMXConfig(spec, numOfBrokers); + data.put(JMXTRANS_CONFIGMAP_KEY, jmxConfig); + configMapName = jmxTransConfigName(clusterName); + return createConfigMap(jmxTransConfigName(clusterName), data); + } + + public List getVolumes() { + List volumes = new ArrayList<>(); + volumes.add(createConfigMapVolume(JMXTRANS_VOLUME_NAME, configMapName)); + volumes.add(createConfigMapVolume(logAndMetricsConfigVolumeName, KafkaCluster.metricAndLogConfigsName(clusterName))); + return volumes; + } + + private List getVolumeMounts() { + List volumeMountList = new ArrayList<>(); + + volumeMountList.add(createVolumeMount(logAndMetricsConfigVolumeName, logAndMetricsConfigMountPath)); + volumeMountList.add(createVolumeMount(JMXTRANS_VOLUME_NAME, JMX_FILE_PATH)); + return volumeMountList; + } + + @Override + protected List getContainers(ImagePullPolicy imagePullPolicy) { + List containers = new ArrayList<>(); + Container container = new ContainerBuilder() + .withName(name) + .withImage(getImage()) + .withEnv(getEnvVars()) + .withReadinessProbe(jmxTransReadinessProbe(readinessProbeOptions, clusterName)) + .withResources(getResources()) + .withVolumeMounts(getVolumeMounts()) + .withImagePullPolicy(determineImagePullPolicy(imagePullPolicy, getImage())) + .build(); + + containers.add(container); + + return containers; + } + + @Override + protected List getEnvVars() { + List varList = new ArrayList<>(); + + if (isJmxAuthenticated()) { + varList.add(buildEnvVarFromSecret(KafkaCluster.ENV_VAR_KAFKA_JMX_USERNAME, KafkaCluster.jmxSecretName(cluster), KafkaCluster.SECRET_JMX_USERNAME_KEY)); + varList.add(buildEnvVarFromSecret(KafkaCluster.ENV_VAR_KAFKA_JMX_PASSWORD, KafkaCluster.jmxSecretName(cluster), KafkaCluster.SECRET_JMX_PASSWORD_KEY)); + } + varList.add(buildEnvVar(ENV_VAR_JMXTRANS_LOGGING_LEVEL, loggingLevel)); + + addContainerEnvsToExistingEnvs(varList, Collections.emptyList()); + + return varList; + } + + /** + * Generates the name of the JmxTrans deployment + * + * @param kafkaCluster Name of the Kafka Custom Resource + * @return Name of the JmxTrans deployment + */ + public static String jmxTransName(String kafkaCluster) { + return JmxTransResources.deploymentName(kafkaCluster); + } + + /** + * Get the name of the JmxTrans service account given the name of the {@code kafkaCluster}. + * @param kafkaCluster The cluster name + * @return The name of the JmxTrans service account. + */ + public static String containerServiceAccountName(String kafkaCluster) { + return JmxTransResources.serviceAccountName(kafkaCluster); + } + + @Override + protected String getDefaultLogConfigFileName() { + return null; + } + + @Override + protected String getServiceAccountName() { + return JmxTransResources.serviceAccountName(cluster); + } + + public static String jmxTransConfigName(String cluster) { + return cluster + JMX_METRICS_CONFIG_SUFFIX; + } + + public boolean isDeployed() { + return isDeployed; + } + + public boolean isJmxAuthenticated() { + return isJmxAuthenticated; + } + + protected static io.fabric8.kubernetes.api.model.Probe jmxTransReadinessProbe(io.strimzi.api.kafka.model.Probe kafkaJmxMetricsReadinessProbe, String clusterName) { + String internalBootstrapServiceName = KafkaCluster.headlessServiceName(clusterName); + String metricsPortValue = String.valueOf(KafkaCluster.JMX_PORT); + kafkaJmxMetricsReadinessProbe = kafkaJmxMetricsReadinessProbe == null ? DEFAULT_JMX_TRANS_PROBE : kafkaJmxMetricsReadinessProbe; + return ModelUtils.createExecProbe(Arrays.asList("/opt/jmx/jmxtrans_readiness_check.sh", internalBootstrapServiceName, metricsPortValue), kafkaJmxMetricsReadinessProbe); + } + + private JmxTransServer convertSpecToServers(JmxTransSpec spec, String brokerServiceName) { + JmxTransServer server = new JmxTransServer(); + server.setHost(brokerServiceName); + server.setPort(AbstractModel.JMX_PORT); + if (isJmxAuthenticated()) { + server.setUsername("${kafka.username}"); + server.setPassword("${kafka.password}"); + } + List queries = new ArrayList<>(); + for (JmxTransQueryTemplate queryTemplate : spec.getKafkaQueries()) { + JmxTransQueries query = new JmxTransQueries(); + query.setObj(queryTemplate.getTargetMBean()); + query.setAttr(queryTemplate.getAttributes()); + query.setOutputWriters(new ArrayList<>()); + + for (JmxTransOutputDefinitionTemplate outputDefinitionTemplate : spec.getOutputDefinitions()) { + if (queryTemplate.getOutputs().contains(outputDefinitionTemplate.getName())) { + JmxTransOutputWriter outputWriter = new JmxTransOutputWriter(); + outputWriter.setAtClass(outputDefinitionTemplate.getOutputType()); + if (outputDefinitionTemplate.getHost() != null) { + outputWriter.setHost(outputDefinitionTemplate.getHost()); + } + if (outputDefinitionTemplate.getPort() != null) { + outputWriter.setPort(outputDefinitionTemplate.getPort()); + } + if (outputDefinitionTemplate.getFlushDelayInSeconds() != null) { + outputWriter.setFlushDelayInSeconds(outputDefinitionTemplate.getFlushDelayInSeconds()); + } + outputWriter.setTypeNames(outputDefinitionTemplate.getTypeNames()); + query.getOutputWriters().add(outputWriter); + } + } + + queries.add(query); + } + server.setQueries(queries); + return server; + } + +} \ No newline at end of file diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java index 48d0433924d..d41afd5d277 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java @@ -157,11 +157,11 @@ public class KafkaCluster extends AbstractModel { private static final String NAME_SUFFIX = "-kafka"; - private static final String KAFKA_JMX_SECRET_SUFFIX = NAME_SUFFIX + "-jmx"; - private static final String SECRET_JMX_USERNAME_KEY = "jmx-username"; - private static final String SECRET_JMX_PASSWORD_KEY = "jmx-password"; - private static final String ENV_VAR_KAFKA_JMX_USERNAME = "KAFKA_JMX_USERNAME"; - private static final String ENV_VAR_KAFKA_JMX_PASSWORD = "KAFKA_JMX_PASSWORD"; + protected static final String KAFKA_JMX_SECRET_SUFFIX = NAME_SUFFIX + "-jmx"; + protected static final String SECRET_JMX_USERNAME_KEY = "jmx-username"; + protected static final String SECRET_JMX_PASSWORD_KEY = "jmx-password"; + protected static final String ENV_VAR_KAFKA_JMX_USERNAME = "KAFKA_JMX_USERNAME"; + protected static final String ENV_VAR_KAFKA_JMX_PASSWORD = "KAFKA_JMX_PASSWORD"; // Suffixes for secrets with certificates private static final String SECRET_BROKERS_SUFFIX = NAME_SUFFIX + "-brokers"; diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransOutputWriter.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransOutputWriter.java new file mode 100644 index 00000000000..4adc86af787 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransOutputWriter.java @@ -0,0 +1,77 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.components; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; +import java.util.List; + +/** + * Wrapper class used specify which remote host to output to and in what format to push it in + * atClass: the format of the data to push to remote host + * host: The host of the remote host to push to + * port: The port of the remote host to push to + * flushDelayInSeconds: how often to push the data in seconds + */ +public class JmxTransOutputWriter implements Serializable { + private static final long serialVersionUID = 1L; + + @JsonProperty("@class") + private String atClass; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private String host; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private int port; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private int flushDelayInSeconds; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private List typeNames; + + public String getAtClass() { + return atClass; + } + + public void setAtClass(String atClass) { + this.atClass = atClass; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public int getFlushDelayInSeconds() { + return flushDelayInSeconds; + } + + public void setFlushDelayInSeconds(int flushDelayInSeconds) { + this.flushDelayInSeconds = flushDelayInSeconds; + } + + public List getTypeNames() { + return typeNames; + } + + public void setTypeNames(List typeNames) { + this.typeNames = typeNames; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransQueries.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransQueries.java new file mode 100644 index 00000000000..888a6a8c40d --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransQueries.java @@ -0,0 +1,48 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.components; + +import java.io.Serializable; +import java.util.List; + +/** + * Wrapper class used to create the JmxTrans queries and which remote hosts to output the results to + * obj: references what Kafka MBean to reference + * attr: an array of what MBean properties to target + * outputDefinitionTemplates: Which hosts and how to output to the hosts + */ +public class JmxTransQueries implements Serializable { + private static final long serialVersionUID = 1L; + + private String obj; + + private List attr; + + private List outputWriters; + + public String getObj() { + return obj; + } + + public void setObj(String obj) { + this.obj = obj; + } + + public List getAttr() { + return attr; + } + + public void setAttr(List attr) { + this.attr = attr; + } + + public List getOutputWriters() { + return outputWriters; + } + + public void setOutputWriters(List outputDefinitionTemplates) { + this.outputWriters = outputDefinitionTemplates; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServer.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServer.java new file mode 100644 index 00000000000..b0f1fadd6ae --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServer.java @@ -0,0 +1,74 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.components; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; +import java.util.List; + +/** + * Wrapper class used to create the per broker JmxTrans queries and which remote hosts it will output to + * host: references which broker the JmxTrans will read from + * port: port of the host + * queries: what queries are passed in + */ +public class JmxTransServer implements Serializable { + private static final long serialVersionUID = 1L; + + private String host; + + private int port; + + @JsonProperty("queries") + private List queriesTemplate; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private String username; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public String password; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public List getQueries() { + return queriesTemplate; + } + + public void setQueries(List queriesTemplate) { + this.queriesTemplate = queriesTemplate; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServers.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServers.java new file mode 100644 index 00000000000..e8066e77382 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServers.java @@ -0,0 +1,25 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.components; + +import java.io.Serializable; +import java.util.List; + +/** + * Wrapper class used to create the overall config file + * Servers: A list of servers and what they will query and output + */ +public class JmxTransServers implements Serializable { + private static final long serialVersionUID = 1L; + private List servers; + + public List getServers() { + return servers; + } + + public void setServers(List servers) { + this.servers = servers; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java index a64517d1900..793523af329 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java @@ -59,13 +59,14 @@ import io.strimzi.operator.cluster.model.EntityTopicOperator; import io.strimzi.operator.cluster.model.EntityUserOperator; import io.strimzi.operator.cluster.model.InvalidResourceException; +import io.strimzi.operator.cluster.model.JmxTrans; import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.KafkaExporter; import io.strimzi.operator.cluster.model.KafkaVersion; import io.strimzi.operator.cluster.model.KafkaVersionChange; import io.strimzi.operator.cluster.model.ModelUtils; -import io.strimzi.operator.cluster.model.NodeUtils; import io.strimzi.operator.cluster.model.NoSuchResourceException; +import io.strimzi.operator.cluster.model.NodeUtils; import io.strimzi.operator.cluster.model.StatusDiff; import io.strimzi.operator.cluster.model.StorageUtils; import io.strimzi.operator.cluster.model.ZookeeperCluster; @@ -105,8 +106,8 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Base64; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -339,6 +340,12 @@ Future reconcile(ReconciliationState reconcileState) { .compose(state -> state.kafkaExporterDeployment()) .compose(state -> state.kafkaExporterReady()) + .compose(state -> state.getJmxTransDescription()) + .compose(state -> state.jmxTransServiceAccount()) + .compose(state -> state.jmxTransConfigMap()) + .compose(state -> state.jmxTransDeployment()) + .compose(state -> state.jmxTransDeploymentReady()) + .map((Void) null) .setHandler(chainPromise); @@ -413,6 +420,10 @@ class ReconciliationState { // Stores node port of the external bootstrap service for use in KafkaStatus /* test */ int externalBootstrapNodePort; + private JmxTrans jmxTrans = null; + private ConfigMap jmxTransConfigMap = null; + private Deployment jmxTransDeployment = null; + ReconciliationState(Reconciliation reconciliation, Kafka kafkaAssembly) { this.reconciliation = reconciliation; this.kafkaAssembly = kafkaAssembly; @@ -3222,6 +3233,66 @@ Future kafkaExporterReady() { return withVoid(Future.succeededFuture()); } + Future getJmxTransDescription() { + try { + int numOfBrokers = kafkaCluster.getReplicas(); + this.jmxTrans = JmxTrans.fromCrd(kafkaAssembly, versions); + if (this.jmxTrans != null) { + this.jmxTransConfigMap = jmxTrans.generateJmxTransConfigMap(kafkaAssembly.getSpec().getJmxTrans(), numOfBrokers); + this.jmxTransDeployment = jmxTrans.generateDeployment(imagePullPolicy, imagePullSecrets); + } + + return Future.succeededFuture(this); + } catch (Throwable e) { + return Future.failedFuture(e); + } + } + + Future jmxTransConfigMap() { + return withVoid(configMapOperations.reconcile(namespace, + JmxTrans.jmxTransConfigName(name), + jmxTransConfigMap)); + } + + + Future jmxTransServiceAccount() { + return withVoid(serviceAccountOperations.reconcile(namespace, + JmxTrans.containerServiceAccountName(name), + jmxTrans != null ? jmxTrans.generateServiceAccount() : null)); + } + + Future jmxTransDeployment() { + if (this.jmxTrans != null && this.jmxTransDeployment != null) { + return deploymentOperations.getAsync(namespace, this.jmxTrans.getName()).compose(dep -> { + return configMapOperations.getAsync(namespace, jmxTransConfigMap.getMetadata().getName()).compose(res -> { + String resourceVersion = res.getMetadata().getResourceVersion(); + // getting the current cluster CA generation from the current deployment, if it exists + int caCertGeneration = getCaCertGeneration(this.clusterCa); + Annotations.annotations(jmxTransDeployment.getSpec().getTemplate()).put( + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(caCertGeneration)); + Annotations.annotations(jmxTransDeployment.getSpec().getTemplate()).put( + JmxTrans.CONFIG_MAP_ANNOTATION_KEY, resourceVersion); + return withVoid(deploymentOperations.reconcile(namespace, JmxTrans.jmxTransName(name), + jmxTransDeployment)); + }); + }); + } else { + return withVoid(deploymentOperations.reconcile(namespace, JmxTrans.jmxTransName(name), null)); + } + } + + Future jmxTransDeploymentReady() { + if (this.jmxTrans != null && jmxTransDeployment != null) { + Future future = deploymentOperations.getAsync(namespace, this.jmxTrans.getName()); + return future.compose(dep -> { + return withVoid(deploymentOperations.waitForObserved(namespace, this.jmxTrans.getName(), 1_000, operationTimeoutMs)); + }).compose(dep -> { + return withVoid(deploymentOperations.readiness(namespace, this.jmxTrans.getName(), 1_000, operationTimeoutMs)); + }).map(i -> this); + } + return withVoid(Future.succeededFuture()); + } + } /* test */ Date dateSupplier() { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/JmxTransTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/JmxTransTest.java new file mode 100644 index 00000000000..e8398683f29 --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/JmxTransTest.java @@ -0,0 +1,161 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.strimzi.api.kafka.model.InlineLogging; +import io.strimzi.api.kafka.model.JmxTransSpec; +import io.strimzi.api.kafka.model.JmxTransSpecBuilder; +import io.strimzi.api.kafka.model.Kafka; +import io.strimzi.api.kafka.model.KafkaBuilder; +import io.strimzi.api.kafka.model.KafkaJmxAuthenticationPasswordBuilder; +import io.strimzi.api.kafka.model.KafkaJmxOptionsBuilder; +import io.strimzi.api.kafka.model.template.JmxTransOutputDefinitionTemplateBuilder; +import io.strimzi.api.kafka.model.template.JmxTransQueryTemplateBuilder; +import io.strimzi.operator.cluster.KafkaVersionTestUtils; +import io.strimzi.operator.cluster.ResourceUtils; +import io.strimzi.operator.cluster.model.components.JmxTransOutputWriter; +import io.strimzi.operator.cluster.model.components.JmxTransQueries; +import io.strimzi.operator.cluster.model.components.JmxTransServer; +import io.vertx.core.json.JsonObject; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class JmxTransTest { + private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); + private final String namespace = "test"; + private final String cluster = "foo"; + private final int replicas = 3; + private final String image = "image"; + private final int healthDelay = 120; + private final int healthTimeout = 30; + private final Map metricsCm = singletonMap("animal", "wombat"); + private final Map configuration = singletonMap("foo", "bar"); + private final InlineLogging kafkaLog = new InlineLogging(); + private final InlineLogging zooLog = new InlineLogging(); + + private final JmxTransSpec jmxTransSpec = new JmxTransSpecBuilder() + .withOutputDefinitions(new JmxTransOutputDefinitionTemplateBuilder() + .withName("Name") + .withOutputType("output") + .build()) + .withKafkaQueries(new JmxTransQueryTemplateBuilder() + .withOutputs("name") + .withAttributes("attributes") + .withNewTargetMBean("mbean") + .build()) + .build(); + + private final Kafka kafkaAssembly = new KafkaBuilder(ResourceUtils.createKafkaCluster(namespace, cluster, replicas, image, healthDelay, healthTimeout, metricsCm, configuration, kafkaLog, zooLog)) + .editSpec() + .withJmxTrans(jmxTransSpec) + .editKafka().withJmxOptions(new KafkaJmxOptionsBuilder() + .withAuthentication(new KafkaJmxAuthenticationPasswordBuilder().build()) + .build()) + .endKafka() + .endSpec() + .build(); + + private final JmxTrans jmxTrans = JmxTrans.fromCrd(kafkaAssembly, VERSIONS); + + @Test + public void testOutputDefinitionWriterDeserialization() { + JmxTransOutputWriter outputWriter = new JmxTransOutputWriter(); + + outputWriter.setAtClass("class"); + outputWriter.setHost("host"); + outputWriter.setPort(9999); + outputWriter.setFlushDelayInSeconds(1); + outputWriter.setTypeNames(Collections.singletonList("SingleType")); + + JsonObject targetJson = JsonObject.mapFrom(outputWriter); + + assertThat(targetJson.getString("host"), is("host")); + assertThat(targetJson.getString("@class"), is("class")); + assertThat(targetJson.getInteger("port"), is(9999)); + assertThat(targetJson.getInteger("flushDelayInSeconds"), is(1)); + assertThat(targetJson.getJsonArray("typeNames").size(), is(1)); + assertThat(targetJson.getJsonArray("typeNames").getList().get(0), is("SingleType")); + + } + + @Test + public void testServersDeserialization() { + JmxTransServer server = new JmxTransServer(); + + server.setHost("host"); + server.setPort(9999); + server.setUsername("username"); + server.setPassword("password"); + server.setQueries(Collections.emptyList()); + + JsonObject targetJson = JsonObject.mapFrom(server); + + assertThat(targetJson.getString("host"), is("host")); + assertThat(targetJson.getInteger("port"), is(9999)); + assertThat(targetJson.getString("username"), is("username")); + assertThat(targetJson.getString("password"), is("password")); + assertThat(targetJson.getJsonArray("queries").getList().size(), is(0)); + } + + @Test + public void testQueriesDeserialization() { + JmxTransOutputWriter outputWriter = new JmxTransOutputWriter(); + + outputWriter.setAtClass("class"); + outputWriter.setHost("host"); + outputWriter.setPort(9999); + outputWriter.setFlushDelayInSeconds(1); + outputWriter.setTypeNames(Collections.singletonList("SingleType")); + + JmxTransQueries queries = new JmxTransQueries(); + + queries.setObj("object"); + queries.setAttr(Collections.singletonList("attribute")); + + queries.setOutputWriters(Collections.singletonList(outputWriter)); + + JsonObject targetJson = JsonObject.mapFrom(queries); + JsonObject outputWriterJson = targetJson.getJsonArray("outputWriters").getJsonObject(0); + + assertThat(targetJson.getString("obj"), is("object")); + assertThat(targetJson.getJsonArray("attr").size(), is(1)); + assertThat(targetJson.getJsonArray("attr").getString(0), is("attribute")); + + assertThat(outputWriterJson.getString("host"), is("host")); + assertThat(outputWriterJson.getString("@class"), is("class")); + assertThat(outputWriterJson.getInteger("port"), is(9999)); + assertThat(outputWriterJson.getInteger("flushDelayInSeconds"), is(1)); + assertThat(outputWriterJson.getJsonArray("typeNames").size(), is(1)); + assertThat(outputWriterJson.getJsonArray("typeNames").getList().get(0), is("SingleType")); + } + + @Test + public void testConfigMapOnScaleUp() throws JsonProcessingException { + ConfigMap originalCM = jmxTrans.generateJmxTransConfigMap(jmxTransSpec, 1); + ConfigMap scaledCM = jmxTrans.generateJmxTransConfigMap(jmxTransSpec, 2); + + assertThat(originalCM.getData().get(JmxTrans.JMXTRANS_CONFIGMAP_KEY).length() < + scaledCM.getData().get(JmxTrans.JMXTRANS_CONFIGMAP_KEY).length(), + is(true)); + } + + @Test + public void testConfigMapOnScaleDown() throws JsonProcessingException { + ConfigMap originalCM = jmxTrans.generateJmxTransConfigMap(jmxTransSpec, 2); + ConfigMap scaledCM = jmxTrans.generateJmxTransConfigMap(jmxTransSpec, 1); + + assertThat(originalCM.getData().get(JmxTrans.JMXTRANS_CONFIGMAP_KEY).length() > + scaledCM.getData().get(JmxTrans.JMXTRANS_CONFIGMAP_KEY).length(), + is(true)); + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMockTest.java index 844f2921188..caf973ff34a 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMockTest.java @@ -19,16 +19,17 @@ import io.strimzi.api.kafka.Crds; import io.strimzi.api.kafka.KafkaList; import io.strimzi.api.kafka.model.DoneableKafka; -import io.strimzi.api.kafka.model.storage.EphemeralStorage; import io.strimzi.api.kafka.model.Kafka; import io.strimzi.api.kafka.model.KafkaBuilder; +import io.strimzi.api.kafka.model.storage.EphemeralStorage; import io.strimzi.api.kafka.model.storage.PersistentClaimStorage; import io.strimzi.api.kafka.model.storage.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.storage.SingleVolumeStorage; import io.strimzi.api.kafka.model.storage.Storage; +import io.strimzi.operator.KubernetesVersion; +import io.strimzi.operator.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ClusterOperator; import io.strimzi.operator.cluster.ClusterOperatorConfig; -import io.strimzi.operator.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.KafkaVersionTestUtils; import io.strimzi.operator.cluster.ResourceUtils; import io.strimzi.operator.cluster.model.AbstractModel; @@ -37,7 +38,6 @@ import io.strimzi.operator.cluster.model.KafkaVersion; import io.strimzi.operator.cluster.model.TopicOperator; import io.strimzi.operator.cluster.model.ZookeeperCluster; -import io.strimzi.operator.KubernetesVersion; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; import io.strimzi.operator.cluster.operator.resource.StatefulSetOperator; import io.strimzi.operator.cluster.operator.resource.ZookeeperLeaderFinder; diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorTest.java index 73784743c64..3577eae8d61 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorTest.java @@ -21,6 +21,7 @@ import io.strimzi.api.kafka.model.EntityTopicOperatorSpecBuilder; import io.strimzi.api.kafka.model.EntityUserOperatorSpecBuilder; import io.strimzi.api.kafka.model.InlineLogging; +import io.strimzi.api.kafka.model.JmxTransSpecBuilder; import io.strimzi.api.kafka.model.Kafka; import io.strimzi.api.kafka.model.KafkaBuilder; import io.strimzi.api.kafka.model.KafkaExporterResources; @@ -37,6 +38,8 @@ import io.strimzi.api.kafka.model.storage.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.storage.SingleVolumeStorage; import io.strimzi.api.kafka.model.storage.Storage; +import io.strimzi.api.kafka.model.template.JmxTransOutputDefinitionTemplateBuilder; +import io.strimzi.api.kafka.model.template.JmxTransQueryTemplateBuilder; import io.strimzi.operator.KubernetesVersion; import io.strimzi.operator.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ClusterOperator; @@ -47,6 +50,7 @@ import io.strimzi.operator.cluster.model.ClientsCa; import io.strimzi.operator.cluster.model.ClusterCa; import io.strimzi.operator.cluster.model.EntityOperator; +import io.strimzi.operator.cluster.model.JmxTrans; import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.KafkaExporter; import io.strimzi.operator.cluster.model.KafkaVersion; @@ -352,6 +356,38 @@ public void testCreateClusterWithJmxEnabled(Params params, VertxTestContext cont )); //getInitialCertificates(getKafkaAssembly("foo").getMetadata().getName())); } + @ParameterizedTest + @MethodSource("data") + public void testCreateClusterWithJmxTrans(Params params, VertxTestContext context) { + setFields(params); + Kafka kafka = getKafkaAssembly("foo"); + kafka.getSpec() + .getKafka().setJmxOptions(new KafkaJmxOptionsBuilder() + .withAuthentication(new KafkaJmxAuthenticationPasswordBuilder().build()) + .build()); + + kafka.getSpec().setJmxTrans(new JmxTransSpecBuilder() + .withKafkaQueries(new JmxTransQueryTemplateBuilder() + .withNewTargetMBean("mbean") + .withAttributes("attribute") + .withOutputs("output") + .build()) + .withOutputDefinitions(new JmxTransOutputDefinitionTemplateBuilder() + .withOutputType("host") + .withName("output") + .build()) + .build()); + + createCluster(context, kafka, Collections.singletonList(new SecretBuilder() + .withNewMetadata() + .withName(KafkaCluster.jmxSecretName("foo")) + .withNamespace("test") + .endMetadata() + .withData(Collections.singletonMap("foo", "bar")) + .build() + )); + } + private Map createPvcs(String namespace, Storage storage, int replicas, BiFunction pvcNameFunction) { @@ -550,6 +586,12 @@ private void createCluster(VertxTestContext context, Kafka clusterCm, List routeCaptor = ArgumentCaptor.forClass(Route.class); ArgumentCaptor routeNameCaptor = ArgumentCaptor.forClass(String.class); if (openShift) { diff --git a/docker-images/build.sh b/docker-images/build.sh index fdf31847493..7b90f09c3c4 100755 --- a/docker-images/build.sh +++ b/docker-images/build.sh @@ -6,7 +6,7 @@ source $(dirname $(realpath $0))/../multi-platform-support.sh # Image directories base_images="base" -java_images="operator" +java_images="operator jmxtrans" kafka_image="kafka" kafka_images="kafka test-client" @@ -183,7 +183,6 @@ function build { THIRD_PARTY_LIBS="${lib_directory}" done done - } dependency_check diff --git a/docker-images/jmxtrans/Dockerfile b/docker-images/jmxtrans/Dockerfile new file mode 100644 index 00000000000..f4f584e283c --- /dev/null +++ b/docker-images/jmxtrans/Dockerfile @@ -0,0 +1,48 @@ +FROM strimzi/base:latest + +ENV JMXTRANS_HOME /usr/share/jmxtrans +ENV PATH $JMXTRANS_HOME/bin:$PATH +ENV JAR_FILE $JMXTRANS_HOME/lib/jmxtrans.jar +ENV JMXTRANS_VERSION 271 +ENV JMXTRANS_CHECKSUM="9c9116b628be912a723fae8ab65853908cc0872139340827b4af3dbdb7274bc88956af7f33dc927a43ea291c721701fb52368ccd414218ef33e7de3060baf849 jmxtrans-${JMXTRANS_VERSION}-all.jar" +ENV HEAP_SIZE 512 +ENV PERM_SIZE 384 +ENV MAX_PERM_SIZE 384 +ENV SECONDS_BETWEEN_RUNS 60 +ENV CONTINUE_ON_ERROR false +ENV JSON_DIR /var/lib/jmxtrans + +WORKDIR ${JMXTRANS_HOME} +RUN mkdir -p ${JMXTRANS_HOME}/conf + +##### +# Add JmxTrans Jar +##### +RUN mkdir -p /usr/share/jmxtrans/lib/ \ + && mkdir -p /var/log/jmxtrans \ + && curl -k https://repo1.maven.org/maven2/org/jmxtrans/jmxtrans/${JMXTRANS_VERSION}/jmxtrans-${JMXTRANS_VERSION}-all.jar --output jmxtrans-${JMXTRANS_VERSION}-all.jar \ + && echo $JMXTRANS_CHECKSUM > jmxtrans-${JMXTRANS_VERSION}-all.jar.sha512 \ + && sha512sum --check jmxtrans-${JMXTRANS_VERSION}-all.jar.sha512 \ + && rm jmxtrans-${JMXTRANS_VERSION}-all.jar.sha512 \ + && mv jmxtrans-${JMXTRANS_VERSION}-all.jar ${JAR_FILE} + +COPY docker-entrypoint.sh /docker-entrypoint.sh +COPY jmxtrans_readiness_check.sh /opt/jmx/ + +##### +# Add Tini +##### +ENV TINI_VERSION v0.18.0 +ENV TINI_SHA256=12d20136605531b09a2c2dac02ccee85e1b874eb322ef6baf7561cd93f93c855 +ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /usr/bin/tini +RUN echo "${TINI_SHA256} */usr/bin/tini" | sha256sum -c \ + && chmod +x /usr/bin/tini + +##### +# Add NC +##### +RUN yum install -y nc + +VOLUME ${JSON_DIR} + +ENTRYPOINT [ "/docker-entrypoint.sh", "start-without-jmx" ] \ No newline at end of file diff --git a/docker-images/jmxtrans/Makefile b/docker-images/jmxtrans/Makefile new file mode 100644 index 00000000000..b66d2bcf7df --- /dev/null +++ b/docker-images/jmxtrans/Makefile @@ -0,0 +1,8 @@ +PROJECT_NAME := jmxtrans +DOCKER_TAG ?= latest + +include ../../Makefile.docker + +.PHONY: build clean tag + +docker_build: docker_build_default \ No newline at end of file diff --git a/docker-images/jmxtrans/docker-entrypoint.sh b/docker-images/jmxtrans/docker-entrypoint.sh new file mode 100755 index 00000000000..2b1e512fb87 --- /dev/null +++ b/docker-images/jmxtrans/docker-entrypoint.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +set -e + +EXEC="-jar $JAR_FILE -e -j $JSON_DIR -s $SECONDS_BETWEEN_RUNS -c $CONTINUE_ON_ERROR $ADDITIONAL_JARS_OPTS" +GC_OPTS="-Xms${HEAP_SIZE}m -Xmx${HEAP_SIZE}m -XX:PermSize=${PERM_SIZE}m -XX:MaxPermSize=${MAX_PERM_SIZE}m" +JMXTRANS_OPTS="$JMXTRANS_OPTS -Dlog4j2.configurationFile=file:///${JMXTRANS_HOME}/conf/log4j2.properties" + +if [ -n "${KAFKA_JMX_USERNAME}" ]; then + JMXTRANS_OPTS="$JMXTRANS_OPTS -Dkafka.username=${KAFKA_JMX_USERNAME} -Dkafka.password=${KAFKA_JMX_PASSWORD}" +fi + +MONITOR_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false \ + -Dcom.sun.management.jmxremote.authenticate=false \ + -Dcom.sun.management.jmxremote.port=9999 \ + -Dcom.sun.management.jmxremote.rmi.port=9999 \ + -Djava.rmi.server.hostname=${PROXY_HOST}" + +if [ "$1" = 'start-without-jmx' ]; then + set /usr/bin/tini -- java -server $JAVA_OPTS $JMXTRANS_OPTS $GC_OPTS $EXEC +elif [ "$1" = 'start-with-jmx' ]; then + set /usr/bin/tini -- java -server $JAVA_OPTS $JMXTRANS_OPTS $GC_OPTS $MONITOR_OPTS $EXEC +fi + +exec "$@" diff --git a/docker-images/jmxtrans/jmxtrans_readiness_check.sh b/docker-images/jmxtrans/jmxtrans_readiness_check.sh new file mode 100755 index 00000000000..40552b4877e --- /dev/null +++ b/docker-images/jmxtrans/jmxtrans_readiness_check.sh @@ -0,0 +1,21 @@ +#!/bin/bash +set -e + +if [ -z "$1" ]; then + echo "No kafka bootstrap has been given" + exit 1 +else + export KAFKA_HEADLESS_SERVICE="$1" +fi + +if [ -z "$2" ]; then + echo "No kafka bootstrap port given" +else + export KAFKA_METRICS_PORT="$2" +fi + +nc -z "$KAFKA_HEADLESS_SERVICE" "$KAFKA_METRICS_PORT" +if [ "$?" -ne 0 ]; then + echo "Couldn't connect to $KAFKA_HEADLESS_SERVICE:$KAFKA_METRICS_PORT" + exit 1 +fi \ No newline at end of file diff --git a/documentation/assemblies/assembly-jmxtrans.adoc b/documentation/assemblies/assembly-jmxtrans.adoc new file mode 100644 index 00000000000..969f8e472ae --- /dev/null +++ b/documentation/assemblies/assembly-jmxtrans.adoc @@ -0,0 +1,24 @@ +// This assembly is included in the following assemblies: +// +// assembly-deployment-configuration-kafka.adoc +// assembly-deployment-configuration-kafka.adoc + +:parent-context: {context} + +[id='assembly-jmxtrans-{context}'] += Retrieving JMX metrics with JMXTrans + +JmxTrans is a way of retrieving JMX metrics data from Java processes and pushing that data in various formats to remote sinks inside or outside of the cluster. +JmxTrans can communicate with a secure JMX port. +{ProductName} supports using JmxTrans to read JMX data from Kafka brokers. + +include::../modules/con-jmxtrans.adoc[leveloffset=+1] + +include::../modules/proc-jmxtrans-deployment.adoc[leveloffset=+1] + +== Additional resources + +For more information about the Jmxtrans see link:https://github.com/jmxtrans/jmxtrans[Jmxtrans github] + +// Restore the context to what it was before this assembly. +:context: {parent-context} \ No newline at end of file diff --git a/documentation/assemblies/assembly-kafka-jmx-options.adoc b/documentation/assemblies/assembly-kafka-jmx-options.adoc index 8f30987b2f3..a89d8efd58e 100644 --- a/documentation/assemblies/assembly-kafka-jmx-options.adoc +++ b/documentation/assemblies/assembly-kafka-jmx-options.adoc @@ -9,7 +9,7 @@ [id='assembly-jmx-options-{context}'] -= JMX Remote += JMX Options {ProductName} supports obtaining JMX metrics from the Kafka brokers by opening a JMX port on 9999. You can obtain various metrics about each Kafka broker, for example, usage data such as the `BytesPerSecond` value diff --git a/documentation/modules/appendix_crds.adoc b/documentation/modules/appendix_crds.adoc index 1e545b47bd7..2aefd507887 100644 --- a/documentation/modules/appendix_crds.adoc +++ b/documentation/modules/appendix_crds.adoc @@ -35,6 +35,8 @@ Used in: xref:type-Kafka-{context}[`Kafka`] |xref:type-CertificateAuthority-{context}[`CertificateAuthority`] |clientsCa 1.2+<.<|Configuration of the clients certificate authority. |xref:type-CertificateAuthority-{context}[`CertificateAuthority`] +|jmxTrans 1.2+<.<|Configuration for JmxTrans. When the property is present a JmxTrans deployment is created for gathering JMX metrics from each Kafka broker. For more information see https://github.com/jmxtrans/jmxtrans[JmxTrans GitHub]. +|xref:type-JmxTransSpec-{context}[`JmxTransSpec`] |kafkaExporter 1.2+<.<|Configuration of the Kafka Exporter. Kafka Exporter can provide additional metrics, for example lag of consumer group at topic/partition. |xref:type-KafkaExporterSpec-{context}[`KafkaExporterSpec`] |maintenanceTimeWindows 1.2+<.<|A list of time windows for maintenance tasks (that is, certificates renewal). Each time window is defined by a cron expression. @@ -806,7 +808,7 @@ It must have the value `password` for the type `KafkaJmxAuthenticationPassword`. [id='type-ResourceRequirements-{context}'] ### `ResourceRequirements` schema reference -Used in: xref:type-EntityTopicOperatorSpec-{context}[`EntityTopicOperatorSpec`], xref:type-EntityUserOperatorSpec-{context}[`EntityUserOperatorSpec`], xref:type-KafkaBridgeSpec-{context}[`KafkaBridgeSpec`], xref:type-KafkaClusterSpec-{context}[`KafkaClusterSpec`], xref:type-KafkaConnectS2ISpec-{context}[`KafkaConnectS2ISpec`], xref:type-KafkaConnectSpec-{context}[`KafkaConnectSpec`], xref:type-KafkaExporterSpec-{context}[`KafkaExporterSpec`], xref:type-KafkaMirrorMakerSpec-{context}[`KafkaMirrorMakerSpec`], xref:type-TlsSidecar-{context}[`TlsSidecar`], xref:type-TopicOperatorSpec-{context}[`TopicOperatorSpec`], xref:type-ZookeeperClusterSpec-{context}[`ZookeeperClusterSpec`] +Used in: xref:type-EntityTopicOperatorSpec-{context}[`EntityTopicOperatorSpec`], xref:type-EntityUserOperatorSpec-{context}[`EntityUserOperatorSpec`], xref:type-JmxTransSpec-{context}[`JmxTransSpec`], xref:type-KafkaBridgeSpec-{context}[`KafkaBridgeSpec`], xref:type-KafkaClusterSpec-{context}[`KafkaClusterSpec`], xref:type-KafkaConnectS2ISpec-{context}[`KafkaConnectS2ISpec`], xref:type-KafkaConnectSpec-{context}[`KafkaConnectSpec`], xref:type-KafkaExporterSpec-{context}[`KafkaExporterSpec`], xref:type-KafkaMirrorMakerSpec-{context}[`KafkaMirrorMakerSpec`], xref:type-TlsSidecar-{context}[`TlsSidecar`], xref:type-TopicOperatorSpec-{context}[`TopicOperatorSpec`], xref:type-ZookeeperClusterSpec-{context}[`ZookeeperClusterSpec`] [options="header"] @@ -1285,6 +1287,67 @@ Configuration of how TLS certificates are used within the cluster. This applies |string (one of [replace-key, renew-certificate]) |==== +[id='type-JmxTransSpec-{context}'] +### `JmxTransSpec` schema reference + +Used in: xref:type-KafkaSpec-{context}[`KafkaSpec`] + + +[options="header"] +|==== +|Property |Description +|image 1.2+<.<|The image to use for the JmxTrans. +|string +|outputDefinitions 1.2+<.<|Defines the output hosts that will be referenced later on. For more information on these properties see, xref:type-JmxTransOutputDefinitionTemplate-reference[`JmxTransOutputDefinitionTemplate` schema reference]. +|xref:type-JmxTransOutputDefinitionTemplate-{context}[`JmxTransOutputDefinitionTemplate`] array +|logLevel 1.2+<.<|Sets the logging level of the JmxTrans deployment.For more information see, https://github.com/jmxtrans/jmxtrans-agent/wiki/Troubleshooting[JmxTrans Logging Level]. +|string +|kafkaQueries 1.2+<.<|Queries to send to the Kafka brokers to define what data should be read from each broker. For more information on these properties see, xref:type-JmxTransQueryTemplate-reference[`JmxTransQueryTemplate` schema reference]. +|xref:type-JmxTransQueryTemplate-{context}[`JmxTransQueryTemplate`] array +|resources 1.2+<.<|CPU and memory resources to reserve. +|xref:type-ResourceRequirements-{context}[`ResourceRequirements`] +|==== + +[id='type-JmxTransOutputDefinitionTemplate-{context}'] +### `JmxTransOutputDefinitionTemplate` schema reference + +Used in: xref:type-JmxTransSpec-{context}[`JmxTransSpec`] + + +[options="header"] +|==== +|Property |Description +|outputType 1.2+<.<|Template for setting the format of the data that will be pushed.For more information see https://github.com/jmxtrans/jmxtrans/wiki/OutputWriters[JmxTrans OutputWriters]. +|string +|host 1.2+<.<|The DNS/hostname of the remote host that the data is pushed to. +|string +|port 1.2+<.<|The port of the remote host that the data is pushed to. +|integer +|flushDelayInSeconds 1.2+<.<|How many seconds the JmxTrans waits before pushing a new set of data out. +|integer +|typeNames 1.2+<.<|Template for filtering data to be included in response to a wildcard query. For more information see https://github.com/jmxtrans/jmxtrans/wiki/Queries[JmxTrans queries]. +|string array +|name 1.2+<.<|Template for setting the name of the output definition. This is used to identify where to send the results of queries should be sent. +|string +|==== + +[id='type-JmxTransQueryTemplate-{context}'] +### `JmxTransQueryTemplate` schema reference + +Used in: xref:type-JmxTransSpec-{context}[`JmxTransSpec`] + + +[options="header"] +|==== +|Property |Description +|targetMBean 1.2+<.<|If using wildcards instead of a specific MBean then the data is gathered from multiple MBeans. Otherwise if specifying an MBean then data is gathered from that specified MBean. +|string +|attributes 1.2+<.<|Determine which attributes of the targeted MBean should be included. +|string array +|outputs 1.2+<.<|List of the names of output definitions specified in the spec.kafka.jmxTrans.outputDefinitions that have defined where JMX metrics are pushed to, and in which data format. +|string array +|==== + [id='type-KafkaExporterSpec-{context}'] ### `KafkaExporterSpec` schema reference diff --git a/documentation/modules/con-jmxtrans.adoc b/documentation/modules/con-jmxtrans.adoc new file mode 100644 index 00000000000..bc7bc2933b1 --- /dev/null +++ b/documentation/modules/con-jmxtrans.adoc @@ -0,0 +1,11 @@ +// Module included in the following assemblies: +// +// assembly-jmxtrans.adoc + +[id='con-jmxtrans-{context}'] += Jmxtrans + +JmxTrans reads JMX metrics data from secure or insecure Kafka brokers and pushes the data to remote sinks in various data formats. +An example use case of the Jmxtrans would be to obtain JMX metrics about the request rate of each Kafka broker's network +and push it to a Logstash database outside of the Kubernetes cluster. + diff --git a/documentation/modules/proc-jmxtrans-deployment.adoc b/documentation/modules/proc-jmxtrans-deployment.adoc new file mode 100644 index 00000000000..531380f6070 --- /dev/null +++ b/documentation/modules/proc-jmxtrans-deployment.adoc @@ -0,0 +1,80 @@ +// Module included in the following assemblies: +// +// assembly-deployment-configuration-kafka.adoc +// assembly-jmxtrans.adoc. + +[id='proc-jmxtrans-deployment-{context}'] += Configuring a JMXTrans deployment + +.Prerequisites +* A running Kubernetes cluster + +You can configure a JmxTrans deployment by using the `Kafka.spec.jmxTrans` property. +A JmxTrans deployment can read from a secure or insecure Kafka broker. +To configure a JmxTrans deployment, define the following properties: + +* `Kafka.spec.jmxTrans.outputDefinitions` +* `Kafka.spec.jmxTrans.kafkaQueries` + +For more information on these properties see, xref:type-JmxTransSpec-reference[JmxTransSpec schema reference]. + +NOTE: JmxTrans will not come up enable you specify that JmxOptions on the Kafka broker. +For more information see, xref:assembly-kafka-jmx-options[Kafka Jmx Options] +.Configuring JmxTrans output definitions + +Output definitions specify where JMX metrics are pushed to, and in which data format. +For information about supported data formats, see link:https://github.com/jmxtrans/jmxtrans/wiki/OutputWriters[Data formats^]. +How many seconds JmxTrans agent waits for before pushing new data can be configured through the `flushDelay` property. +The `host` and `port` properties define the target host address and target port the data is pushed to. +The `name` property is a required property that is referenced by the `Kafka.spec.kafka.jmxOptions.jmxTrans.queries` property. + +Here is an example configuration pushing JMX data in the Graphite format every 5 seconds to a Logstash database on \http://myLogstash:9999, and another pushing to `standardOut` (standard output): +[source,yaml,subs=attributes+] +---- +apiVersion: {KafkaApiVersion} +kind: Kafka +metadata: + name: my-cluster +spec: + jmxTrans: + outputDefinitions: + - outputType: "com.googlecode.jmxtrans.model.output.GraphiteWriter" + host: "http://myLogstash" + port: 9999 + flushDelay: 5 + name: "logstash" + - outputType: "com.googlecode.jmxtrans.model.output.StdOutWriter" + name: "standardOut" + # ... + # ... + zookeeper: + # ... +---- + +.Configuring JmxTrans queries +JmxTrans queries specify what JMX metrics are read from the Kafka brokers. +Currently JmxTrans queries can only be sent to the Kafka Brokers. +Configure the `targetMBean` property to specify which target MBean on the Kafka broker is addressed. +Configuring the `attributes` property specifies which MBean attribute is read as JMX metrics from the target MBean. +JmxTrans supports wildcards to read from target MBeans, and filter by specifying the `typenames`. +The `outputs` property defines where the metrics are pushed to by specifying the name of the output definitions. + +The following JmxTrans deployment reads from all MBeans that match the pattern `kafka.server:type=BrokerTopicMetrics,name=*` and have `name` in the target MBean's name. +From those Mbeans, it obtains JMX metrics about the `Count` attribute and pushes the metrics to standard output as defined by `outputs`. +[source,yaml,subs=attributes+] +---- +apiVersion: {KafkaApiVersion} +kind: Kafka +metadata: + name: my-cluster +spec: + # ... + jmxTrans: + kafkaQueries: + - targetMBean: "kafka.server:type=BrokerTopicMetrics,*" + typeNames: ["name"] + attributes: ["Count"] + outputs: ["standardOut"] + zookeeper: + # ... +---- \ No newline at end of file diff --git a/documentation/modules/ref-configuring-container-images.adoc b/documentation/modules/ref-configuring-container-images.adoc index 250d0b92b0c..8b10c5deec4 100644 --- a/documentation/modules/ref-configuring-container-images.adoc +++ b/documentation/modules/ref-configuring-container-images.adoc @@ -14,6 +14,7 @@ You can specify which container image to use for each component using the `image * `Kafka.spec.entityOperator.topicOperator` * `Kafka.spec.entityOperator.userOperator` * `Kafka.spec.entityOperator.tlsSidecar` +* `Kafka.spec.jmxTrans` * `KafkaConnect.spec` * `KafkaConnectS2I.spec` * `KafkaBridge.spec` @@ -80,6 +81,9 @@ If the `image` name is not defined in the Cluster Operator configuration, then t * For Kafka broker initializer: . Container image specified in the `STRIMZI_DEFAULT_KAFKA_INIT_IMAGE` environment variable from the Cluster Operator configuration. . `{DockerKafkaInit}` container image. +* For Kafka broker initializer: +. Container image specified in the `STRIMZI_DEFAULT_JMXTRANS_IMAGE` environment variable from the Cluster Operator configuration. +. `{DockerKafkaInit}` container image. WARNING: Overriding container images is recommended only in special situations, where you need to use a different container registry. For example, because your network does not allow access to the container repository used by {ProductName}. diff --git a/documentation/modules/snip-images.adoc b/documentation/modules/snip-images.adoc index d5fa082b2ec..8a8cdb5111f 100644 --- a/documentation/modules/snip-images.adoc +++ b/documentation/modules/snip-images.adoc @@ -37,4 +37,11 @@ a| a| {ProductName} image for running the {ProductName} kafka Bridge +|JmxTrans +a| +* {DockerOrg}/jmxtrans:{DockerTag} + +a| +{ProductName} image for running the {ProductName} JmxTrans + |=== diff --git a/documentation/snip-images.sh b/documentation/snip-images.sh index aa0e19a2859..fc91ab2acb7 100755 --- a/documentation/snip-images.sh +++ b/documentation/snip-images.sh @@ -55,5 +55,12 @@ a| a| {ProductName} image for running the {ProductName} kafka Bridge +|JmxTrans +a| +* {DockerOrg}/jmxtrans:{DockerTag} + +a| +{ProductName} image for running the {ProductName} JmxTrans + |=== EOF diff --git a/examples/metrics/jmxtrans.yaml b/examples/metrics/jmxtrans.yaml new file mode 100644 index 00000000000..fa971fc61c1 --- /dev/null +++ b/examples/metrics/jmxtrans.yaml @@ -0,0 +1,38 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + version: 2.4.0 + replicas: 2 + listeners: + plain: {} + tls: {} + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + log.message.format.version: "2.4" + storage: + type: ephemeral + jmxOptions: + authentication: + type: "password" + jmxTrans: + outputDefinitions: + - outputType: "com.googlecode.jmxtrans.model.output.StdOutWriter" + name: "standardOut" + - outputType: "com.googlecode.jmxtrans.model.output.GraphiteOutputWriter" + host: "mylogstash.com" + port: 31028 + flushDelayInSeconds: 5 + name: "logstash" + kafkaQueries: + - targetMBean: "kafka.server:type=BrokerTopicMetrics,name=*" + attributes: ["Count"] + outputs: ["standardOut"] + zookeeper: + replicas: 1 + storage: + type: ephemeral \ No newline at end of file diff --git a/helm-charts/strimzi-kafka-operator/README.md b/helm-charts/strimzi-kafka-operator/README.md index 2f8a75e7d06..bc55937d010 100644 --- a/helm-charts/strimzi-kafka-operator/README.md +++ b/helm-charts/strimzi-kafka-operator/README.md @@ -62,6 +62,9 @@ the documentation for more details. | `zookeeper.image.repository` | ZooKeeper image repository | `strimzi` | | `zookeeper.image.name` | ZooKeeper image name | `kafka` | | `zookeeper.image.tag` | ZooKeeper image tag prefix | `latest` | +| `jmxtrans.image.repository` | JmxTrans image repository | `strimzi` | +| `jmxtrans.image.name` | JmxTrans image name | `jmxtrans` | +| `jmxtrans.image.tag` | JmxTrans image tag prefix | `latest` | | `kafka.image.repository` | Kafka image repository | `strimzi` | | `kafka.image.name` | Kafka image name | `kafka` | | `kafka.image.tagPrefix` | Kafka image tag prefix | `latest` | diff --git a/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml b/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml index 08fa52f7723..ed90221d9bc 100644 --- a/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml @@ -3274,6 +3274,64 @@ spec: enum: - renew-certificate - replace-key + jmxTrans: + type: object + properties: + image: + type: string + outputDefinitions: + type: array + items: + type: object + properties: + outputType: + type: string + host: + type: string + port: + type: integer + flushDelayInSeconds: + type: integer + typeNames: + type: array + items: + type: string + name: + type: string + required: + - outputType + - name + logLevel: + type: string + kafkaQueries: + type: array + items: + type: object + properties: + targetMBean: + type: string + attributes: + type: array + items: + type: string + outputs: + type: array + items: + type: string + required: + - targetMBean + - attributes + - outputs + resources: + type: object + properties: + limits: + type: object + requests: + type: object + required: + - outputDefinitions + - kafkaQueries kafkaExporter: type: object properties: diff --git a/helm-charts/strimzi-kafka-operator/templates/050-Deployment-strimzi-cluster-operator.yaml b/helm-charts/strimzi-kafka-operator/templates/050-Deployment-strimzi-cluster-operator.yaml index af7b096fe95..ab9872092fa 100755 --- a/helm-charts/strimzi-kafka-operator/templates/050-Deployment-strimzi-cluster-operator.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/050-Deployment-strimzi-cluster-operator.yaml @@ -61,6 +61,8 @@ spec: value: "{{ default .Values.kafkaInit.image.repository .Values.imageRepositoryOverride }}/{{ .Values.kafkaInit.image.name }}:{{ default .Values.kafkaInit.image.tag .Values.imageTagOverride }}" - name: STRIMZI_DEFAULT_KAFKA_BRIDGE_IMAGE value: "{{ default .Values.kafkaBridge.image.repository .Values.imageRepositoryOverride }}/{{ .Values.kafkaBridge.image.name }}:{{ default .Values.kafkaBridge.image.tag .Values.imageTagOverride }}" + - name: STRIMZI_DEFAULT_JMXTRANS_IMAGE + value: "{{ default .Values.jmxTrans.image.repository .Values.imageRepositoryOverride }}/{{ .Values.jmxTrans.image.name }}:{{ default .Values.jmxTrans.image.tag .Values.imageTagOverride }}" - name: STRIMZI_LOG_LEVEL value: {{ .Values.logLevel | quote }} {{- if .Values.image.imagePullSecrets }} diff --git a/helm-charts/strimzi-kafka-operator/values.yaml b/helm-charts/strimzi-kafka-operator/values.yaml index 3abab1c1fae..725356e7075 100755 --- a/helm-charts/strimzi-kafka-operator/values.yaml +++ b/helm-charts/strimzi-kafka-operator/values.yaml @@ -87,6 +87,11 @@ kafkaExporter: repository: strimzi name: kafka tagPrefix: latest +jmxTrans: + image: + repository: strimzi + name: jmxtrans + tag: latest resources: limits: memory: 256Mi diff --git a/install/cluster-operator/040-Crd-kafka.yaml b/install/cluster-operator/040-Crd-kafka.yaml index d89ac3414c6..5c7ab6e81a4 100644 --- a/install/cluster-operator/040-Crd-kafka.yaml +++ b/install/cluster-operator/040-Crd-kafka.yaml @@ -3269,6 +3269,64 @@ spec: enum: - renew-certificate - replace-key + jmxTrans: + type: object + properties: + image: + type: string + outputDefinitions: + type: array + items: + type: object + properties: + outputType: + type: string + host: + type: string + port: + type: integer + flushDelayInSeconds: + type: integer + typeNames: + type: array + items: + type: string + name: + type: string + required: + - outputType + - name + logLevel: + type: string + kafkaQueries: + type: array + items: + type: object + properties: + targetMBean: + type: string + attributes: + type: array + items: + type: string + outputs: + type: array + items: + type: string + required: + - targetMBean + - attributes + - outputs + resources: + type: object + properties: + limits: + type: object + requests: + type: object + required: + - outputDefinitions + - kafkaQueries kafkaExporter: type: object properties: diff --git a/install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml b/install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml index 7772bcfcee7..8c7ece4e8ff 100644 --- a/install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml +++ b/install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml @@ -63,6 +63,8 @@ spec: value: strimzi/operator:latest - name: STRIMZI_DEFAULT_KAFKA_BRIDGE_IMAGE value: strimzi/kafka-bridge:0.15.0 + - name: STRIMZI_DEFAULT_JMXTRANS_IMAGE + value: strimzi/jmxtrans:latest - name: STRIMZI_LOG_LEVEL value: INFO livenessProbe: diff --git a/metrics/examples/prometheus/install/strimzi-service-monitor.yaml b/metrics/examples/prometheus/install/strimzi-service-monitor.yaml index 00a7c2692a9..b6861cf0b4a 100644 --- a/metrics/examples/prometheus/install/strimzi-service-monitor.yaml +++ b/metrics/examples/prometheus/install/strimzi-service-monitor.yaml @@ -16,7 +16,7 @@ spec: # configured in additional secret #### job_name: kube-state-metrics - - port: metrics + - port: prometheus honorLabels: true interval: 10s scrapeTimeout: 10s @@ -64,7 +64,7 @@ spec: action: replace #### job_name: node-exporter - - port: metrics + - port: prometheus honorLabels: true interval: 10s scrapeTimeout: 10s