diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cf4e3a2e7b..1474743ee1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # CHANGELOG +## 0.17.0 +* Add Jmxtrans deployment ## 0.17.0 diff --git a/Makefile b/Makefile index 7ddc803990c..59977791bc0 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,7 @@ release_version: $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/value: "\?strimzi\/kafka-bridge:[a-zA-Z0-9_.-]\+"\?/s/strimzi\/kafka-bridge:[a-zA-Z0-9_.-]\+/strimzi\/kafka-bridge:$(BRIDGE_VERSION)/g' {} \; $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/value: "\?strimzi\/kafka:[a-zA-Z0-9_.-]\+"\?/s/strimzi\/kafka:[a-zA-Z0-9_.-]\+-kafka-\([0-9.]\+\)/strimzi\/kafka:$(RELEASE_VERSION)-kafka-\1/g' {} \; $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/[0-9.]\+=strimzi\/kafka[a-zA-Z0-9_.-]\?\+:[a-zA-Z0-9_.-]\+-kafka-[0-9.]\+"\?/s/:[a-zA-Z0-9_.-]\+-kafka-\([0-9.]\+\)/:$(RELEASE_VERSION)-kafka-\1/g' {} \; + $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/value: "\?strimzi\/jmxtrans:[a-zA-Z0-9_.-]\+"\?/s/strimzi\/jmxtrans:[a-zA-Z0-9_.-]\+/strimzi\/jmxtrans:$(RELEASE_VERSION)/g' {} \; # Set Kafka Bridge version to its own version $(FIND) ./install -name '*.yaml' -type f -exec $(SED) -i '/value: "\?strimzi\/kafka-bridge:[a-zA-Z0-9_.-]\+"\?/s/strimzi\/kafka-bridge:[a-zA-Z0-9_.-]\+/strimzi\/kafka-bridge:$(BRIDGE_VERSION)/g' {} \; diff --git a/api/src/main/java/io/strimzi/api/kafka/model/JmxTransResources.java b/api/src/main/java/io/strimzi/api/kafka/model/JmxTransResources.java new file mode 100644 index 00000000000..c602e1ed7e4 --- /dev/null +++ b/api/src/main/java/io/strimzi/api/kafka/model/JmxTransResources.java @@ -0,0 +1,30 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model; + +/** + * Encapsulates the naming scheme used for the resources which the Cluster Operator manages for a + * {@code JmxTrans} instance. + */ +public class JmxTransResources { + protected JmxTransResources() { } + /** + * Returns the name of the JmxTrans {@code Deployment}. + * @param kafkaClusterName The {@code metadata.name} of the {@code Kafka} resource. + * @return The name of the corresponding JmxTrans {@code Deployment}. + */ + public static String deploymentName(String kafkaClusterName) { + return kafkaClusterName + "-kafka-jmx-trans"; + } + /** + * Returns the name of the JmxTrans {@code ServiceAccount}. + * @param kafkaClusterName The {@code metadata.name} of the {@code Kafka} resource. + * @return The name of the corresponding JmxTrans {@code ServiceAccount}. + */ + public static String serviceAccountName(String kafkaClusterName) { + return deploymentName(kafkaClusterName); + } + +} \ No newline at end of file diff --git a/api/src/main/java/io/strimzi/api/kafka/model/JmxTransSpec.java b/api/src/main/java/io/strimzi/api/kafka/model/JmxTransSpec.java new file mode 100644 index 00000000000..902b151ccd1 --- /dev/null +++ b/api/src/main/java/io/strimzi/api/kafka/model/JmxTransSpec.java @@ -0,0 +1,111 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model; + + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.template.JmxTransOutputDefinitionTemplate; +import io.strimzi.api.kafka.model.template.JmxTransQueryTemplate; +import io.strimzi.crdgenerator.annotations.Description; +import io.sundr.builder.annotations.Buildable; +import lombok.EqualsAndHashCode; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Representation for options to be passed into setting up the JmxTrans. + */ +@Buildable( + editableEnabled = false, + generateBuilderPackage = false, + builderPackage = "io.fabric8.kubernetes.api.builder" +) +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +@JsonPropertyOrder({"image", "outputDefinitions", "logLevel", "kafkaQueries", "resources"}) +@EqualsAndHashCode +public class JmxTransSpec implements UnknownPropertyPreserving, Serializable { + public static final int DEFAULT_HEALTHCHECK_DELAY = 15; + public static final int DEFAULT_HEALTHCHECK_TIMEOUT = 5; + + private static final long serialVersionUID = 1L; + protected String image; + private String logLevel; + private List outputDefinitions = null; + private List kafkaQueries = null; + + private ResourceRequirements resources; + + private Map additionalProperties = new HashMap<>(0); + + @Description("The image to use for the JmxTrans") + public String getImage() { + return image; + } + + public void setImage(String image) { + this.image = image; + } + + @Description("Sets the logging level of the JmxTrans deployment." + + "For more information see, https://github.com/jmxtrans/jmxtrans-agent/wiki/Troubleshooting[JmxTrans Logging Level]") + public String getLogLevel() { + return logLevel; + } + + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + } + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + @Description("Defines the output hosts that will be referenced later on. " + + "For more information on these properties see, xref:type-JmxTransOutputDefinitionTemplate-reference[`JmxTransOutputDefinitionTemplate` schema reference].") + public List getOutputDefinitions() { + return outputDefinitions; + } + + public void setOutputDefinitions(List outputDefinitions) { + this.outputDefinitions = outputDefinitions; + } + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("Queries to send to the Kafka brokers to define what data should be read from each broker. " + + "For more information on these properties see, xref:type-JmxTransQueryTemplate-reference[`JmxTransQueryTemplate` schema reference].") + public List getKafkaQueries() { + return kafkaQueries; + } + + public void setKafkaQueries(List kafkaQueries) { + this.kafkaQueries = kafkaQueries; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("CPU and memory resources to reserve.") + public ResourceRequirements getResources() { + return resources; + } + + public void setResources(ResourceRequirements resources) { + this.resources = resources; + } + + @Override + public Map getAdditionalProperties() { + return additionalProperties; + } + + @Override + public void setAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + } + +} \ No newline at end of file diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaJmxOptions.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaJmxOptions.java index 4c3ff958cda..a38dfc21aaf 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaJmxOptions.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaJmxOptions.java @@ -47,4 +47,3 @@ public void setAdditionalProperty(String name, Object value) { this.additionalProperties.put(name, value); } } - diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaSpec.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaSpec.java index 6c2a771bae6..e7174b3ac5b 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaSpec.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaSpec.java @@ -41,6 +41,7 @@ public class KafkaSpec implements UnknownPropertyPreserving, Serializable { private TopicOperatorSpec topicOperator; private EntityOperatorSpec entityOperator; private CertificateAuthority clusterCa; + private JmxTransSpec jmxTrans; private KafkaExporterSpec kafkaExporter; private CertificateAuthority clientsCa; @@ -113,6 +114,16 @@ public List getMaintenanceTimeWindows() { return maintenanceTimeWindows; } + @Description("Configuration for JmxTrans. When the property is present a JmxTrans deployment is created for gathering JMX metrics from each Kafka broker. " + + "For more information see https://github.com/jmxtrans/jmxtrans[JmxTrans GitHub]") + public JmxTransSpec getJmxTrans() { + return jmxTrans; + } + + public void setJmxTrans(JmxTransSpec jmxTrans) { + this.jmxTrans = jmxTrans; + } + public void setMaintenanceTimeWindows(List maintenanceTimeWindows) { this.maintenanceTimeWindows = maintenanceTimeWindows; } diff --git a/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransOutputDefinitionTemplate.java b/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransOutputDefinitionTemplate.java new file mode 100644 index 00000000000..9b42ba61a1e --- /dev/null +++ b/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransOutputDefinitionTemplate.java @@ -0,0 +1,118 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model.template; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.strimzi.api.kafka.model.UnknownPropertyPreserving; +import io.strimzi.crdgenerator.annotations.Description; +import io.sundr.builder.annotations.Buildable; +import lombok.EqualsAndHashCode; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Representation for options to define where and how information will be pushed to remote sources of information + */ +@Buildable( + editableEnabled = false, + generateBuilderPackage = false, + builderPackage = "io.fabric8.kubernetes.api.builder" +) +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +@JsonPropertyOrder({"outputType", "host", "port", "flushDelayInSeconds", "typeNames", "name"}) +@EqualsAndHashCode +public class JmxTransOutputDefinitionTemplate implements Serializable, UnknownPropertyPreserving { + + private static final long serialVersionUID = 1L; + + private String outputType; + private String host; + private Integer port; + private Integer flushDelayInSeconds; + private String name; + private List typeNames; + + private Map additionalProperties = new HashMap<>(0); + + @JsonProperty(value = "outputType", required = true) + @Description("Template for setting the format of the data that will be pushed." + + "For more information see https://github.com/jmxtrans/jmxtrans/wiki/OutputWriters[JmxTrans OutputWriters]") + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getOutputType() { + return outputType; + } + + public void setOutputType(String outputType) { + this.outputType = outputType; + } + + @Description("The DNS/hostname of the remote host that the data is pushed to.") + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + @Description("The port of the remote host that the data is pushed to.") + @JsonInclude(JsonInclude.Include.NON_NULL) + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + @Description("How many seconds the JmxTrans waits before pushing a new set of data out.") + @JsonInclude(JsonInclude.Include.NON_NULL) + public Integer getFlushDelayInSeconds() { + return flushDelayInSeconds; + } + + public void setFlushDelayInSeconds(Integer flushDelayInSeconds) { + this.flushDelayInSeconds = flushDelayInSeconds; + } + + @Description("Template for filtering data to be included in response to a wildcard query. " + + "For more information see https://github.com/jmxtrans/jmxtrans/wiki/Queries[JmxTrans queries]") + @JsonInclude(JsonInclude.Include.NON_NULL) + public List getTypeNames() { + return typeNames; + } + + public void setTypeNames(List typeNames) { + this.typeNames = typeNames; + } + + @Description("Template for setting the name of the output definition. This is used to identify where to send " + + "the results of queries should be sent.") + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(required = true) + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public Map getAdditionalProperties() { + return this.additionalProperties; + } + + @Override + public void setAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + } +} \ No newline at end of file diff --git a/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransQueryTemplate.java b/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransQueryTemplate.java new file mode 100644 index 00000000000..f0ea6e1627f --- /dev/null +++ b/api/src/main/java/io/strimzi/api/kafka/model/template/JmxTransQueryTemplate.java @@ -0,0 +1,81 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model.template; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.strimzi.api.kafka.model.UnknownPropertyPreserving; +import io.strimzi.crdgenerator.annotations.Description; +import io.sundr.builder.annotations.Buildable; +import lombok.EqualsAndHashCode; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Buildable( + editableEnabled = false, + generateBuilderPackage = false, + builderPackage = "io.fabric8.kubernetes.api.builder" +) +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +@JsonPropertyOrder({ + "targetMBean", "attributes", "outputs"}) +@EqualsAndHashCode +public class JmxTransQueryTemplate implements Serializable, UnknownPropertyPreserving { + private String targetMBean; + private List attributes; + private List outputs; + + private static final long serialVersionUID = 1L; + private Map additionalProperties = new HashMap<>(0); + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("If using wildcards instead of a specific MBean then the data is gathered from multiple MBeans. " + + "Otherwise if specifying an MBean then data is gathered from that specified MBean.") + public String getTargetMBean() { + return targetMBean; + } + + public void setTargetMBean(String targetMBean) { + this.targetMBean = targetMBean; + } + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("Determine which attributes of the targeted MBean should be included") + public List getAttributes() { + return attributes; + } + + public void setAttributes(List attributes) { + this.attributes = attributes; + } + + @JsonProperty(required = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("List of the names of output definitions specified in the spec.kafka.jmxTrans.outputDefinitions that have defined where JMX metrics are pushed to, and in which data format") + public List getOutputs() { + return outputs; + } + + public void setOutputs(List outputs) { + this.outputs = outputs; + } + + @Override + public Map getAdditionalProperties() { + return this.additionalProperties; + } + + @Override + public void setAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + } + +} \ No newline at end of file diff --git a/api/src/test/java/io/strimzi/api/kafka/model/JmxTransTest.java b/api/src/test/java/io/strimzi/api/kafka/model/JmxTransTest.java new file mode 100644 index 00000000000..72e0cae5833 --- /dev/null +++ b/api/src/test/java/io/strimzi/api/kafka/model/JmxTransTest.java @@ -0,0 +1,74 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model; + +import io.strimzi.test.TestUtils; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +public class JmxTransTest { + @Test + public void testQueries() { + JmxTransSpec opts = TestUtils.fromJson("{" + + "\"kafkaQueries\": [{" + + "\"targetMBean\": \"targetMBean\"," + + "\"attributes\": [\"attribute0\", \"attribute1\"]," + + "\"outputs\": [\"output0\", \"output1\"]" + + "}, {" + + "\"targetMBean\": \"targetMBean\"," + + "\"attributes\": [\"attribute1\", \"attribute2\"]," + + "\"outputs\": [\"output0\", \"output1\"]" + + "}]" + + "}", JmxTransSpec.class); + + assertThat(opts, is(notNullValue())); + assertThat(opts.getKafkaQueries().size(), is(2)); + assertThat(opts.getKafkaQueries().get(0).getTargetMBean(), is("targetMBean")); + assertThat(opts.getKafkaQueries().get(0).getAttributes().size(), is(2)); + assertThat(opts.getKafkaQueries().get(0).getAttributes().get(0), is("attribute0")); + assertThat(opts.getKafkaQueries().get(0).getAttributes().get(1), is("attribute1")); + assertThat(opts.getKafkaQueries().get(0).getOutputs().get(0), is("output0")); + assertThat(opts.getKafkaQueries().get(0).getOutputs().get(1), is("output1")); + } + + @Test + public void testOutputDefinition() { + JmxTransSpec opts = TestUtils.fromJson("{" + + "\"outputDefinitions\": [{" + + "\"outputType\": \"targetOutputType\"," + + "\"host\": \"targetHost\"," + + "\"port\": 9999," + + "\"flushDelayInSeconds\": 1," + + "\"typeNames\": [\"typeName0\", \"typeName1\"]," + + "\"name\": \"targetName\"" + + "}, {" + + "\"outputType\": \"targetOutputType\"," + + "\"name\": \"name1\"" + + "}]" + + "}", JmxTransSpec.class); + + assertThat(opts, is(notNullValue())); + assertThat(opts.getOutputDefinitions().size(), is(2)); + assertThat(opts.getOutputDefinitions().get(0).getHost(), is("targetHost")); + assertThat(opts.getOutputDefinitions().get(0).getOutputType(), is("targetOutputType")); + assertThat(opts.getOutputDefinitions().get(0).getFlushDelayInSeconds(), is(1)); + assertThat(opts.getOutputDefinitions().get(0).getTypeNames().get(0), is("typeName0")); + assertThat(opts.getOutputDefinitions().get(0).getTypeNames().get(1), is("typeName1")); + assertThat(opts.getOutputDefinitions().get(0).getName(), is("targetName")); + } + + @Test + public void getCustomImage() { + JmxTransSpec opts = TestUtils.fromJson("{" + + "\"image\": \"testImage\"" + + "}", JmxTransSpec.class); + + assertThat(opts, is(notNullValue())); + assertThat(opts.getImage(), is("testImage")); + } +} diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java index 86d9a4733dd..e24ac869ee1 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java @@ -6,6 +6,7 @@ import io.strimzi.test.TestUtils; import io.strimzi.test.k8s.exceptions.KubeClusterException; +import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -154,6 +155,31 @@ public void testKafkaWithInvalidJmxAuthentication() { containsStringIgnoringCase("spec.kafka.jmxOptions.authentication.type: Unsupported value: \"not-right\": supported values: \"password\""))); } + @Test + void testJmxOptionsWithoutRequiredOutputDefinitionKeys() { + Throwable exception = assertThrows( + KubeClusterException.InvalidResource.class, + () -> { + createDelete(Kafka.class, "JmxTrans-output-definition-with-missing-required-property.yaml"); + }); + + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.outputDefinitions.outputType in body is required")); + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.outputDefinitions.name in body is required")); + } + + @Test + void testJmxOptionsWithoutRequiredQueryKeys() { + Throwable exception = assertThrows( + KubeClusterException.InvalidResource.class, + () -> { + createDelete(Kafka.class, "JmxTrans-queries-with-missing-required-property.yaml"); + }); + + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.kafkaQueries.attributes in body is required")); + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.kafkaQueries.targetMBean in body is required")); + assertThat(exception.getMessage(), CoreMatchers.containsStringIgnoringCase("spec.jmxTrans.kafkaQueries.outputs in body is required")); + } + @BeforeAll void setupEnvironment() { cluster.createNamespace(NAMESPACE); diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaJmxOptionsTest.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaJmxOptionsTest.java index 2f234c1a381..0dc58fe9ed1 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaJmxOptionsTest.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaJmxOptionsTest.java @@ -35,4 +35,4 @@ public void testNoJmxOpts() { assertThat(opts.getAuthentication(), is(nullValue())); assertThat(opts.getAdditionalProperties(), is(Collections.emptyMap())); } -} +} \ No newline at end of file diff --git a/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-output-definition-with-missing-required-property.yaml b/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-output-definition-with-missing-required-property.yaml new file mode 100644 index 00000000000..171cd65549e --- /dev/null +++ b/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-output-definition-with-missing-required-property.yaml @@ -0,0 +1,27 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: Kafka +metadata: + name: strimzi-jmxtrans +spec: + kafka: + version: 2.3.1 + replicas: 3 + storage: + type: persistent-claim + size: 500Gi + listeners: + plain: {} + tls: {} + jmxTrans: + outputDefinitions: + - host: "wont matter about the host" + port: 123 + flushDelayInSeconds: 321 + kafkaQueries: + - targetMBean: "MBean" + attributes: ["attribute1"] + outputs: ["outputs"] + zookeeper: + replicas: 1 + storage: + type: ephemeral \ No newline at end of file diff --git a/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-queries-with-missing-required-property.yaml b/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-queries-with-missing-required-property.yaml new file mode 100644 index 00000000000..3bf366a5f62 --- /dev/null +++ b/api/src/test/resources/io/strimzi/api/kafka/model/JmxTrans-queries-with-missing-required-property.yaml @@ -0,0 +1,24 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: Kafka +metadata: + name: strimzi-jmxtrans +spec: + kafka: + version: 2.3.1 + replicas: 3 + storage: + type: persistent-claim + size: 500Gi + listeners: + plain: {} + tls: {} + jmxTrans: + outputDefinitions: + - outputType: "wont matter about type" + name: "won't matter" + kafkaQueries: + - empty: "object" + zookeeper: + replicas: 1 + storage: + type: ephemeral \ No newline at end of file diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/JmxTrans.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/JmxTrans.java new file mode 100644 index 00000000000..89865fd6b07 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/JmxTrans.java @@ -0,0 +1,333 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.IntOrString; +import io.fabric8.kubernetes.api.model.LocalObjectReference; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentStrategy; +import io.fabric8.kubernetes.api.model.apps.DeploymentStrategyBuilder; +import io.fabric8.kubernetes.api.model.apps.RollingUpdateDeploymentBuilder; +import io.strimzi.api.kafka.model.JmxTransResources; +import io.strimzi.api.kafka.model.JmxTransSpec; +import io.strimzi.api.kafka.model.Kafka; +import io.strimzi.api.kafka.model.KafkaJmxAuthenticationPassword; +import io.strimzi.api.kafka.model.Probe; +import io.strimzi.api.kafka.model.ProbeBuilder; +import io.strimzi.api.kafka.model.template.JmxTransOutputDefinitionTemplate; +import io.strimzi.api.kafka.model.template.JmxTransQueryTemplate; +import io.strimzi.operator.cluster.model.components.JmxTransOutputWriter; +import io.strimzi.operator.cluster.model.components.JmxTransQueries; +import io.strimzi.operator.cluster.model.components.JmxTransServer; +import io.strimzi.operator.cluster.model.components.JmxTransServers; +import io.strimzi.operator.common.model.Labels; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/* + * Class for handling JmxTrans configuration passed by the user. Used to get the resources needed to create the + * JmxTrans deployment including: config map, deployment, and service accounts. + */ +public class JmxTrans extends AbstractModel { + + // Configuration defaults + private static final String STRIMZI_DEFAULT_JMXTRANS_IMAGE = "STRIMZI_DEFAULT_JMXTRANS_IMAGE"; + public static final Probe READINESS_PROBE_OPTIONS = new ProbeBuilder().withTimeoutSeconds(5).withInitialDelaySeconds(15).build(); + private static final io.strimzi.api.kafka.model.Probe DEFAULT_JMX_TRANS_PROBE = new io.strimzi.api.kafka.model.ProbeBuilder() + .withInitialDelaySeconds(JmxTransSpec.DEFAULT_HEALTHCHECK_DELAY) + .withTimeoutSeconds(JmxTransSpec.DEFAULT_HEALTHCHECK_TIMEOUT) + .build(); + + // Configuration for mounting `config.json` to be used as Config during run time of the JmxTrans + public static final String JMXTRANS_CONFIGMAP_KEY = "config.json"; + public static final String JMXTRANS_VOLUME_NAME = "jmx-config"; + public static final String CONFIG_MAP_ANNOTATION_KEY = "config-map-revision"; + protected static final String JMX_METRICS_CONFIG_SUFFIX = "-jmxtrans-config"; + public static final String JMX_FILE_PATH = "/var/lib/jmxtrans"; + + protected static final String ENV_VAR_JMXTRANS_LOGGING_LEVEL = "JMXTRANS_LOGGING_LEVEL"; + + + private boolean isDeployed; + private boolean isJmxAuthenticated; + private String configMapName; + private String clusterName; + private String loggingLevel; + + /** + * Constructor + * + * @param namespace Kubernetes/OpenShift namespace where JmxTrans resources are going to be created + * @param kafkaCluster kafkaCluster name + * @param labels labels to add to the kafkaCluster + */ + protected JmxTrans(String namespace, String kafkaCluster, Labels labels) { + super(namespace, kafkaCluster, labels); + this.name = JmxTransResources.deploymentName(kafkaCluster); + this.clusterName = kafkaCluster; + this.replicas = 1; + this.readinessPath = "/metrics"; + this.livenessPath = "/metrics"; + this.readinessProbeOptions = READINESS_PROBE_OPTIONS; + + this.mountPath = "/var/lib/kafka"; + + this.logAndMetricsConfigVolumeName = "kafka-metrics-and-logging"; + this.logAndMetricsConfigMountPath = "/usr/share/jmxtrans/conf/"; + + // Metrics must be enabled as JmxTrans is all about gathering JMX metrics from the Kafka brokers and pushing it to remote sources. + this.isMetricsEnabled = true; + } + + public static JmxTrans fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup versions) { + JmxTrans result = null; + JmxTransSpec spec = kafkaAssembly.getSpec().getJmxTrans(); + if (spec != null) { + if (kafkaAssembly.getSpec().getKafka().getJmxOptions() == null) { + String error = String.format("Can't start up JmxTrans '%s' in '%s' as Kafka spec.kafka.jmxOptions is not specified", + JmxTransResources.deploymentName(kafkaAssembly.getMetadata().getName()), + kafkaAssembly.getMetadata().getNamespace()); + log.warn(error); + throw new InvalidResourceException(error); + } + result = new JmxTrans(kafkaAssembly.getMetadata().getNamespace(), + kafkaAssembly.getMetadata().getName(), + Labels.fromResource(kafkaAssembly).withKind(kafkaAssembly.getKind())); + result.isDeployed = true; + + if (kafkaAssembly.getSpec().getKafka().getJmxOptions().getAuthentication() instanceof KafkaJmxAuthenticationPassword) { + result.isJmxAuthenticated = true; + } + + result.loggingLevel = spec.getLogLevel() == null ? "" : spec.getLogLevel(); + + result.setResources(spec.getResources()); + + String image = spec.getImage(); + if (image == null) { + image = System.getenv().getOrDefault(STRIMZI_DEFAULT_JMXTRANS_IMAGE, "strimzi/jmxtrans:latest"); + } + result.setImage(image); + + result.setOwnerReference(kafkaAssembly); + } + + return result; + } + + public Deployment generateDeployment(ImagePullPolicy imagePullPolicy, List imagePullSecrets) { + if (!isDeployed()) { + return null; + } + + DeploymentStrategy updateStrategy = new DeploymentStrategyBuilder() + .withType("RollingUpdate") + .withRollingUpdate(new RollingUpdateDeploymentBuilder() + .withMaxSurge(new IntOrString(1)) + .withMaxUnavailable(new IntOrString(0)) + .build()) + .build(); + + return createDeployment( + updateStrategy, + Collections.emptyMap(), + Collections.emptyMap(), + getMergedAffinity(), + getInitContainers(imagePullPolicy), + getContainers(imagePullPolicy), + getVolumes(), + imagePullSecrets + ); + } + + /** + * Generates the string'd config that the JmxTrans deployment needs to run. It is configured by the user in the yaml + * and this method will convert that into the config the JmxTrans understands. + * @param spec The JmxTrans that was defined by the user + * @param numOfBrokers number of kafka brokers + * @return the jmx trans config file that targets each broker + */ + private String generateJMXConfig(JmxTransSpec spec, int numOfBrokers) throws JsonProcessingException { + JmxTransServers servers = new JmxTransServers(); + servers.setServers(new ArrayList<>()); + ObjectMapper mapper = new ObjectMapper(); + String headlessService = KafkaCluster.headlessServiceName(cluster); + for (int brokerNumber = 0; brokerNumber < numOfBrokers; brokerNumber++) { + String brokerServiceName = KafkaCluster.externalServiceName(clusterName, brokerNumber) + "." + headlessService; + servers.getServers().add(convertSpecToServers(spec, brokerServiceName)); + } + try { + return mapper.writeValueAsString(servers); + } catch (JsonProcessingException e) { + log.error("Could not create JmxTrans config json because: " + e.getMessage()); + throw e; + } + } + + /** + * Generates the JmxTrans config map + * + * @param spec The JmxTransSpec that was defined by the user + * @param numOfBrokers number of kafka brokers + * @return the config map that mounts the JmxTrans config + * @throws JsonProcessingException when JmxTrans config can't be created properly + */ + public ConfigMap generateJmxTransConfigMap(JmxTransSpec spec, int numOfBrokers) throws JsonProcessingException { + Map data = new HashMap<>(); + String jmxConfig = generateJMXConfig(spec, numOfBrokers); + data.put(JMXTRANS_CONFIGMAP_KEY, jmxConfig); + configMapName = jmxTransConfigName(clusterName); + return createConfigMap(jmxTransConfigName(clusterName), data); + } + + public List getVolumes() { + List volumes = new ArrayList<>(); + volumes.add(createConfigMapVolume(JMXTRANS_VOLUME_NAME, configMapName)); + volumes.add(createConfigMapVolume(logAndMetricsConfigVolumeName, KafkaCluster.metricAndLogConfigsName(clusterName))); + return volumes; + } + + private List getVolumeMounts() { + List volumeMountList = new ArrayList<>(); + + volumeMountList.add(createVolumeMount(logAndMetricsConfigVolumeName, logAndMetricsConfigMountPath)); + volumeMountList.add(createVolumeMount(JMXTRANS_VOLUME_NAME, JMX_FILE_PATH)); + return volumeMountList; + } + + @Override + protected List getContainers(ImagePullPolicy imagePullPolicy) { + List containers = new ArrayList<>(); + Container container = new ContainerBuilder() + .withName(name) + .withImage(getImage()) + .withEnv(getEnvVars()) + .withReadinessProbe(jmxTransReadinessProbe(readinessProbeOptions, clusterName)) + .withResources(getResources()) + .withVolumeMounts(getVolumeMounts()) + .withImagePullPolicy(determineImagePullPolicy(imagePullPolicy, getImage())) + .build(); + + containers.add(container); + + return containers; + } + + @Override + protected List getEnvVars() { + List varList = new ArrayList<>(); + + if (isJmxAuthenticated()) { + varList.add(buildEnvVarFromSecret(KafkaCluster.ENV_VAR_KAFKA_JMX_USERNAME, KafkaCluster.jmxSecretName(cluster), KafkaCluster.SECRET_JMX_USERNAME_KEY)); + varList.add(buildEnvVarFromSecret(KafkaCluster.ENV_VAR_KAFKA_JMX_PASSWORD, KafkaCluster.jmxSecretName(cluster), KafkaCluster.SECRET_JMX_PASSWORD_KEY)); + } + varList.add(buildEnvVar(ENV_VAR_JMXTRANS_LOGGING_LEVEL, loggingLevel)); + + addContainerEnvsToExistingEnvs(varList, Collections.emptyList()); + + return varList; + } + + /** + * Generates the name of the JmxTrans deployment + * + * @param kafkaCluster Name of the Kafka Custom Resource + * @return Name of the JmxTrans deployment + */ + public static String jmxTransName(String kafkaCluster) { + return JmxTransResources.deploymentName(kafkaCluster); + } + + /** + * Get the name of the JmxTrans service account given the name of the {@code kafkaCluster}. + * @param kafkaCluster The cluster name + * @return The name of the JmxTrans service account. + */ + public static String containerServiceAccountName(String kafkaCluster) { + return JmxTransResources.serviceAccountName(kafkaCluster); + } + + @Override + protected String getDefaultLogConfigFileName() { + return null; + } + + @Override + protected String getServiceAccountName() { + return JmxTransResources.serviceAccountName(cluster); + } + + public static String jmxTransConfigName(String cluster) { + return cluster + JMX_METRICS_CONFIG_SUFFIX; + } + + public boolean isDeployed() { + return isDeployed; + } + + public boolean isJmxAuthenticated() { + return isJmxAuthenticated; + } + + protected static io.fabric8.kubernetes.api.model.Probe jmxTransReadinessProbe(io.strimzi.api.kafka.model.Probe kafkaJmxMetricsReadinessProbe, String clusterName) { + String internalBootstrapServiceName = KafkaCluster.headlessServiceName(clusterName); + String metricsPortValue = String.valueOf(KafkaCluster.JMX_PORT); + kafkaJmxMetricsReadinessProbe = kafkaJmxMetricsReadinessProbe == null ? DEFAULT_JMX_TRANS_PROBE : kafkaJmxMetricsReadinessProbe; + return ModelUtils.createExecProbe(Arrays.asList("/opt/jmx/jmxtrans_readiness_check.sh", internalBootstrapServiceName, metricsPortValue), kafkaJmxMetricsReadinessProbe); + } + + private JmxTransServer convertSpecToServers(JmxTransSpec spec, String brokerServiceName) { + JmxTransServer server = new JmxTransServer(); + server.setHost(brokerServiceName); + server.setPort(AbstractModel.JMX_PORT); + if (isJmxAuthenticated()) { + server.setUsername("${kafka.username}"); + server.setPassword("${kafka.password}"); + } + List queries = new ArrayList<>(); + for (JmxTransQueryTemplate queryTemplate : spec.getKafkaQueries()) { + JmxTransQueries query = new JmxTransQueries(); + query.setObj(queryTemplate.getTargetMBean()); + query.setAttr(queryTemplate.getAttributes()); + query.setOutputWriters(new ArrayList<>()); + + for (JmxTransOutputDefinitionTemplate outputDefinitionTemplate : spec.getOutputDefinitions()) { + if (queryTemplate.getOutputs().contains(outputDefinitionTemplate.getName())) { + JmxTransOutputWriter outputWriter = new JmxTransOutputWriter(); + outputWriter.setAtClass(outputDefinitionTemplate.getOutputType()); + if (outputDefinitionTemplate.getHost() != null) { + outputWriter.setHost(outputDefinitionTemplate.getHost()); + } + if (outputDefinitionTemplate.getPort() != null) { + outputWriter.setPort(outputDefinitionTemplate.getPort()); + } + if (outputDefinitionTemplate.getFlushDelayInSeconds() != null) { + outputWriter.setFlushDelayInSeconds(outputDefinitionTemplate.getFlushDelayInSeconds()); + } + outputWriter.setTypeNames(outputDefinitionTemplate.getTypeNames()); + query.getOutputWriters().add(outputWriter); + } + } + + queries.add(query); + } + server.setQueries(queries); + return server; + } + +} \ No newline at end of file diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java index 48d0433924d..d41afd5d277 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java @@ -157,11 +157,11 @@ public class KafkaCluster extends AbstractModel { private static final String NAME_SUFFIX = "-kafka"; - private static final String KAFKA_JMX_SECRET_SUFFIX = NAME_SUFFIX + "-jmx"; - private static final String SECRET_JMX_USERNAME_KEY = "jmx-username"; - private static final String SECRET_JMX_PASSWORD_KEY = "jmx-password"; - private static final String ENV_VAR_KAFKA_JMX_USERNAME = "KAFKA_JMX_USERNAME"; - private static final String ENV_VAR_KAFKA_JMX_PASSWORD = "KAFKA_JMX_PASSWORD"; + protected static final String KAFKA_JMX_SECRET_SUFFIX = NAME_SUFFIX + "-jmx"; + protected static final String SECRET_JMX_USERNAME_KEY = "jmx-username"; + protected static final String SECRET_JMX_PASSWORD_KEY = "jmx-password"; + protected static final String ENV_VAR_KAFKA_JMX_USERNAME = "KAFKA_JMX_USERNAME"; + protected static final String ENV_VAR_KAFKA_JMX_PASSWORD = "KAFKA_JMX_PASSWORD"; // Suffixes for secrets with certificates private static final String SECRET_BROKERS_SUFFIX = NAME_SUFFIX + "-brokers"; diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransOutputWriter.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransOutputWriter.java new file mode 100644 index 00000000000..4adc86af787 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransOutputWriter.java @@ -0,0 +1,77 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.components; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; +import java.util.List; + +/** + * Wrapper class used specify which remote host to output to and in what format to push it in + * atClass: the format of the data to push to remote host + * host: The host of the remote host to push to + * port: The port of the remote host to push to + * flushDelayInSeconds: how often to push the data in seconds + */ +public class JmxTransOutputWriter implements Serializable { + private static final long serialVersionUID = 1L; + + @JsonProperty("@class") + private String atClass; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private String host; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private int port; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private int flushDelayInSeconds; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private List typeNames; + + public String getAtClass() { + return atClass; + } + + public void setAtClass(String atClass) { + this.atClass = atClass; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public int getFlushDelayInSeconds() { + return flushDelayInSeconds; + } + + public void setFlushDelayInSeconds(int flushDelayInSeconds) { + this.flushDelayInSeconds = flushDelayInSeconds; + } + + public List getTypeNames() { + return typeNames; + } + + public void setTypeNames(List typeNames) { + this.typeNames = typeNames; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransQueries.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransQueries.java new file mode 100644 index 00000000000..888a6a8c40d --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransQueries.java @@ -0,0 +1,48 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.components; + +import java.io.Serializable; +import java.util.List; + +/** + * Wrapper class used to create the JmxTrans queries and which remote hosts to output the results to + * obj: references what Kafka MBean to reference + * attr: an array of what MBean properties to target + * outputDefinitionTemplates: Which hosts and how to output to the hosts + */ +public class JmxTransQueries implements Serializable { + private static final long serialVersionUID = 1L; + + private String obj; + + private List attr; + + private List outputWriters; + + public String getObj() { + return obj; + } + + public void setObj(String obj) { + this.obj = obj; + } + + public List getAttr() { + return attr; + } + + public void setAttr(List attr) { + this.attr = attr; + } + + public List getOutputWriters() { + return outputWriters; + } + + public void setOutputWriters(List outputDefinitionTemplates) { + this.outputWriters = outputDefinitionTemplates; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServer.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServer.java new file mode 100644 index 00000000000..b0f1fadd6ae --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServer.java @@ -0,0 +1,74 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.components; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; +import java.util.List; + +/** + * Wrapper class used to create the per broker JmxTrans queries and which remote hosts it will output to + * host: references which broker the JmxTrans will read from + * port: port of the host + * queries: what queries are passed in + */ +public class JmxTransServer implements Serializable { + private static final long serialVersionUID = 1L; + + private String host; + + private int port; + + @JsonProperty("queries") + private List queriesTemplate; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private String username; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public String password; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public List getQueries() { + return queriesTemplate; + } + + public void setQueries(List queriesTemplate) { + this.queriesTemplate = queriesTemplate; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServers.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServers.java new file mode 100644 index 00000000000..e8066e77382 --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/components/JmxTransServers.java @@ -0,0 +1,25 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.components; + +import java.io.Serializable; +import java.util.List; + +/** + * Wrapper class used to create the overall config file + * Servers: A list of servers and what they will query and output + */ +public class JmxTransServers implements Serializable { + private static final long serialVersionUID = 1L; + private List servers; + + public List getServers() { + return servers; + } + + public void setServers(List servers) { + this.servers = servers; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java index a64517d1900..793523af329 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java @@ -59,13 +59,14 @@ import io.strimzi.operator.cluster.model.EntityTopicOperator; import io.strimzi.operator.cluster.model.EntityUserOperator; import io.strimzi.operator.cluster.model.InvalidResourceException; +import io.strimzi.operator.cluster.model.JmxTrans; import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.KafkaExporter; import io.strimzi.operator.cluster.model.KafkaVersion; import io.strimzi.operator.cluster.model.KafkaVersionChange; import io.strimzi.operator.cluster.model.ModelUtils; -import io.strimzi.operator.cluster.model.NodeUtils; import io.strimzi.operator.cluster.model.NoSuchResourceException; +import io.strimzi.operator.cluster.model.NodeUtils; import io.strimzi.operator.cluster.model.StatusDiff; import io.strimzi.operator.cluster.model.StorageUtils; import io.strimzi.operator.cluster.model.ZookeeperCluster; @@ -105,8 +106,8 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Base64; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -339,6 +340,12 @@ Future reconcile(ReconciliationState reconcileState) { .compose(state -> state.kafkaExporterDeployment()) .compose(state -> state.kafkaExporterReady()) + .compose(state -> state.getJmxTransDescription()) + .compose(state -> state.jmxTransServiceAccount()) + .compose(state -> state.jmxTransConfigMap()) + .compose(state -> state.jmxTransDeployment()) + .compose(state -> state.jmxTransDeploymentReady()) + .map((Void) null) .setHandler(chainPromise); @@ -413,6 +420,10 @@ class ReconciliationState { // Stores node port of the external bootstrap service for use in KafkaStatus /* test */ int externalBootstrapNodePort; + private JmxTrans jmxTrans = null; + private ConfigMap jmxTransConfigMap = null; + private Deployment jmxTransDeployment = null; + ReconciliationState(Reconciliation reconciliation, Kafka kafkaAssembly) { this.reconciliation = reconciliation; this.kafkaAssembly = kafkaAssembly; @@ -3222,6 +3233,66 @@ Future kafkaExporterReady() { return withVoid(Future.succeededFuture()); } + Future getJmxTransDescription() { + try { + int numOfBrokers = kafkaCluster.getReplicas(); + this.jmxTrans = JmxTrans.fromCrd(kafkaAssembly, versions); + if (this.jmxTrans != null) { + this.jmxTransConfigMap = jmxTrans.generateJmxTransConfigMap(kafkaAssembly.getSpec().getJmxTrans(), numOfBrokers); + this.jmxTransDeployment = jmxTrans.generateDeployment(imagePullPolicy, imagePullSecrets); + } + + return Future.succeededFuture(this); + } catch (Throwable e) { + return Future.failedFuture(e); + } + } + + Future jmxTransConfigMap() { + return withVoid(configMapOperations.reconcile(namespace, + JmxTrans.jmxTransConfigName(name), + jmxTransConfigMap)); + } + + + Future jmxTransServiceAccount() { + return withVoid(serviceAccountOperations.reconcile(namespace, + JmxTrans.containerServiceAccountName(name), + jmxTrans != null ? jmxTrans.generateServiceAccount() : null)); + } + + Future jmxTransDeployment() { + if (this.jmxTrans != null && this.jmxTransDeployment != null) { + return deploymentOperations.getAsync(namespace, this.jmxTrans.getName()).compose(dep -> { + return configMapOperations.getAsync(namespace, jmxTransConfigMap.getMetadata().getName()).compose(res -> { + String resourceVersion = res.getMetadata().getResourceVersion(); + // getting the current cluster CA generation from the current deployment, if it exists + int caCertGeneration = getCaCertGeneration(this.clusterCa); + Annotations.annotations(jmxTransDeployment.getSpec().getTemplate()).put( + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(caCertGeneration)); + Annotations.annotations(jmxTransDeployment.getSpec().getTemplate()).put( + JmxTrans.CONFIG_MAP_ANNOTATION_KEY, resourceVersion); + return withVoid(deploymentOperations.reconcile(namespace, JmxTrans.jmxTransName(name), + jmxTransDeployment)); + }); + }); + } else { + return withVoid(deploymentOperations.reconcile(namespace, JmxTrans.jmxTransName(name), null)); + } + } + + Future jmxTransDeploymentReady() { + if (this.jmxTrans != null && jmxTransDeployment != null) { + Future future = deploymentOperations.getAsync(namespace, this.jmxTrans.getName()); + return future.compose(dep -> { + return withVoid(deploymentOperations.waitForObserved(namespace, this.jmxTrans.getName(), 1_000, operationTimeoutMs)); + }).compose(dep -> { + return withVoid(deploymentOperations.readiness(namespace, this.jmxTrans.getName(), 1_000, operationTimeoutMs)); + }).map(i -> this); + } + return withVoid(Future.succeededFuture()); + } + } /* test */ Date dateSupplier() { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/JmxTransTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/JmxTransTest.java new file mode 100644 index 00000000000..e8398683f29 --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/JmxTransTest.java @@ -0,0 +1,161 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.strimzi.api.kafka.model.InlineLogging; +import io.strimzi.api.kafka.model.JmxTransSpec; +import io.strimzi.api.kafka.model.JmxTransSpecBuilder; +import io.strimzi.api.kafka.model.Kafka; +import io.strimzi.api.kafka.model.KafkaBuilder; +import io.strimzi.api.kafka.model.KafkaJmxAuthenticationPasswordBuilder; +import io.strimzi.api.kafka.model.KafkaJmxOptionsBuilder; +import io.strimzi.api.kafka.model.template.JmxTransOutputDefinitionTemplateBuilder; +import io.strimzi.api.kafka.model.template.JmxTransQueryTemplateBuilder; +import io.strimzi.operator.cluster.KafkaVersionTestUtils; +import io.strimzi.operator.cluster.ResourceUtils; +import io.strimzi.operator.cluster.model.components.JmxTransOutputWriter; +import io.strimzi.operator.cluster.model.components.JmxTransQueries; +import io.strimzi.operator.cluster.model.components.JmxTransServer; +import io.vertx.core.json.JsonObject; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class JmxTransTest { + private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); + private final String namespace = "test"; + private final String cluster = "foo"; + private final int replicas = 3; + private final String image = "image"; + private final int healthDelay = 120; + private final int healthTimeout = 30; + private final Map metricsCm = singletonMap("animal", "wombat"); + private final Map configuration = singletonMap("foo", "bar"); + private final InlineLogging kafkaLog = new InlineLogging(); + private final InlineLogging zooLog = new InlineLogging(); + + private final JmxTransSpec jmxTransSpec = new JmxTransSpecBuilder() + .withOutputDefinitions(new JmxTransOutputDefinitionTemplateBuilder() + .withName("Name") + .withOutputType("output") + .build()) + .withKafkaQueries(new JmxTransQueryTemplateBuilder() + .withOutputs("name") + .withAttributes("attributes") + .withNewTargetMBean("mbean") + .build()) + .build(); + + private final Kafka kafkaAssembly = new KafkaBuilder(ResourceUtils.createKafkaCluster(namespace, cluster, replicas, image, healthDelay, healthTimeout, metricsCm, configuration, kafkaLog, zooLog)) + .editSpec() + .withJmxTrans(jmxTransSpec) + .editKafka().withJmxOptions(new KafkaJmxOptionsBuilder() + .withAuthentication(new KafkaJmxAuthenticationPasswordBuilder().build()) + .build()) + .endKafka() + .endSpec() + .build(); + + private final JmxTrans jmxTrans = JmxTrans.fromCrd(kafkaAssembly, VERSIONS); + + @Test + public void testOutputDefinitionWriterDeserialization() { + JmxTransOutputWriter outputWriter = new JmxTransOutputWriter(); + + outputWriter.setAtClass("class"); + outputWriter.setHost("host"); + outputWriter.setPort(9999); + outputWriter.setFlushDelayInSeconds(1); + outputWriter.setTypeNames(Collections.singletonList("SingleType")); + + JsonObject targetJson = JsonObject.mapFrom(outputWriter); + + assertThat(targetJson.getString("host"), is("host")); + assertThat(targetJson.getString("@class"), is("class")); + assertThat(targetJson.getInteger("port"), is(9999)); + assertThat(targetJson.getInteger("flushDelayInSeconds"), is(1)); + assertThat(targetJson.getJsonArray("typeNames").size(), is(1)); + assertThat(targetJson.getJsonArray("typeNames").getList().get(0), is("SingleType")); + + } + + @Test + public void testServersDeserialization() { + JmxTransServer server = new JmxTransServer(); + + server.setHost("host"); + server.setPort(9999); + server.setUsername("username"); + server.setPassword("password"); + server.setQueries(Collections.emptyList()); + + JsonObject targetJson = JsonObject.mapFrom(server); + + assertThat(targetJson.getString("host"), is("host")); + assertThat(targetJson.getInteger("port"), is(9999)); + assertThat(targetJson.getString("username"), is("username")); + assertThat(targetJson.getString("password"), is("password")); + assertThat(targetJson.getJsonArray("queries").getList().size(), is(0)); + } + + @Test + public void testQueriesDeserialization() { + JmxTransOutputWriter outputWriter = new JmxTransOutputWriter(); + + outputWriter.setAtClass("class"); + outputWriter.setHost("host"); + outputWriter.setPort(9999); + outputWriter.setFlushDelayInSeconds(1); + outputWriter.setTypeNames(Collections.singletonList("SingleType")); + + JmxTransQueries queries = new JmxTransQueries(); + + queries.setObj("object"); + queries.setAttr(Collections.singletonList("attribute")); + + queries.setOutputWriters(Collections.singletonList(outputWriter)); + + JsonObject targetJson = JsonObject.mapFrom(queries); + JsonObject outputWriterJson = targetJson.getJsonArray("outputWriters").getJsonObject(0); + + assertThat(targetJson.getString("obj"), is("object")); + assertThat(targetJson.getJsonArray("attr").size(), is(1)); + assertThat(targetJson.getJsonArray("attr").getString(0), is("attribute")); + + assertThat(outputWriterJson.getString("host"), is("host")); + assertThat(outputWriterJson.getString("@class"), is("class")); + assertThat(outputWriterJson.getInteger("port"), is(9999)); + assertThat(outputWriterJson.getInteger("flushDelayInSeconds"), is(1)); + assertThat(outputWriterJson.getJsonArray("typeNames").size(), is(1)); + assertThat(outputWriterJson.getJsonArray("typeNames").getList().get(0), is("SingleType")); + } + + @Test + public void testConfigMapOnScaleUp() throws JsonProcessingException { + ConfigMap originalCM = jmxTrans.generateJmxTransConfigMap(jmxTransSpec, 1); + ConfigMap scaledCM = jmxTrans.generateJmxTransConfigMap(jmxTransSpec, 2); + + assertThat(originalCM.getData().get(JmxTrans.JMXTRANS_CONFIGMAP_KEY).length() < + scaledCM.getData().get(JmxTrans.JMXTRANS_CONFIGMAP_KEY).length(), + is(true)); + } + + @Test + public void testConfigMapOnScaleDown() throws JsonProcessingException { + ConfigMap originalCM = jmxTrans.generateJmxTransConfigMap(jmxTransSpec, 2); + ConfigMap scaledCM = jmxTrans.generateJmxTransConfigMap(jmxTransSpec, 1); + + assertThat(originalCM.getData().get(JmxTrans.JMXTRANS_CONFIGMAP_KEY).length() > + scaledCM.getData().get(JmxTrans.JMXTRANS_CONFIGMAP_KEY).length(), + is(true)); + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMockTest.java index 844f2921188..caf973ff34a 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMockTest.java @@ -19,16 +19,17 @@ import io.strimzi.api.kafka.Crds; import io.strimzi.api.kafka.KafkaList; import io.strimzi.api.kafka.model.DoneableKafka; -import io.strimzi.api.kafka.model.storage.EphemeralStorage; import io.strimzi.api.kafka.model.Kafka; import io.strimzi.api.kafka.model.KafkaBuilder; +import io.strimzi.api.kafka.model.storage.EphemeralStorage; import io.strimzi.api.kafka.model.storage.PersistentClaimStorage; import io.strimzi.api.kafka.model.storage.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.storage.SingleVolumeStorage; import io.strimzi.api.kafka.model.storage.Storage; +import io.strimzi.operator.KubernetesVersion; +import io.strimzi.operator.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ClusterOperator; import io.strimzi.operator.cluster.ClusterOperatorConfig; -import io.strimzi.operator.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.KafkaVersionTestUtils; import io.strimzi.operator.cluster.ResourceUtils; import io.strimzi.operator.cluster.model.AbstractModel; @@ -37,7 +38,6 @@ import io.strimzi.operator.cluster.model.KafkaVersion; import io.strimzi.operator.cluster.model.TopicOperator; import io.strimzi.operator.cluster.model.ZookeeperCluster; -import io.strimzi.operator.KubernetesVersion; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; import io.strimzi.operator.cluster.operator.resource.StatefulSetOperator; import io.strimzi.operator.cluster.operator.resource.ZookeeperLeaderFinder; diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorTest.java index 73784743c64..3577eae8d61 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorTest.java @@ -21,6 +21,7 @@ import io.strimzi.api.kafka.model.EntityTopicOperatorSpecBuilder; import io.strimzi.api.kafka.model.EntityUserOperatorSpecBuilder; import io.strimzi.api.kafka.model.InlineLogging; +import io.strimzi.api.kafka.model.JmxTransSpecBuilder; import io.strimzi.api.kafka.model.Kafka; import io.strimzi.api.kafka.model.KafkaBuilder; import io.strimzi.api.kafka.model.KafkaExporterResources; @@ -37,6 +38,8 @@ import io.strimzi.api.kafka.model.storage.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.storage.SingleVolumeStorage; import io.strimzi.api.kafka.model.storage.Storage; +import io.strimzi.api.kafka.model.template.JmxTransOutputDefinitionTemplateBuilder; +import io.strimzi.api.kafka.model.template.JmxTransQueryTemplateBuilder; import io.strimzi.operator.KubernetesVersion; import io.strimzi.operator.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ClusterOperator; @@ -47,6 +50,7 @@ import io.strimzi.operator.cluster.model.ClientsCa; import io.strimzi.operator.cluster.model.ClusterCa; import io.strimzi.operator.cluster.model.EntityOperator; +import io.strimzi.operator.cluster.model.JmxTrans; import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.KafkaExporter; import io.strimzi.operator.cluster.model.KafkaVersion; @@ -352,6 +356,38 @@ public void testCreateClusterWithJmxEnabled(Params params, VertxTestContext cont )); //getInitialCertificates(getKafkaAssembly("foo").getMetadata().getName())); } + @ParameterizedTest + @MethodSource("data") + public void testCreateClusterWithJmxTrans(Params params, VertxTestContext context) { + setFields(params); + Kafka kafka = getKafkaAssembly("foo"); + kafka.getSpec() + .getKafka().setJmxOptions(new KafkaJmxOptionsBuilder() + .withAuthentication(new KafkaJmxAuthenticationPasswordBuilder().build()) + .build()); + + kafka.getSpec().setJmxTrans(new JmxTransSpecBuilder() + .withKafkaQueries(new JmxTransQueryTemplateBuilder() + .withNewTargetMBean("mbean") + .withAttributes("attribute") + .withOutputs("output") + .build()) + .withOutputDefinitions(new JmxTransOutputDefinitionTemplateBuilder() + .withOutputType("host") + .withName("output") + .build()) + .build()); + + createCluster(context, kafka, Collections.singletonList(new SecretBuilder() + .withNewMetadata() + .withName(KafkaCluster.jmxSecretName("foo")) + .withNamespace("test") + .endMetadata() + .withData(Collections.singletonMap("foo", "bar")) + .build() + )); + } + private Map createPvcs(String namespace, Storage storage, int replicas, BiFunction pvcNameFunction) { @@ -550,6 +586,12 @@ private void createCluster(VertxTestContext context, Kafka clusterCm, List routeCaptor = ArgumentCaptor.forClass(Route.class); ArgumentCaptor routeNameCaptor = ArgumentCaptor.forClass(String.class); if (openShift) { diff --git a/docker-images/build.sh b/docker-images/build.sh index fdf31847493..7b90f09c3c4 100755 --- a/docker-images/build.sh +++ b/docker-images/build.sh @@ -6,7 +6,7 @@ source $(dirname $(realpath $0))/../multi-platform-support.sh # Image directories base_images="base" -java_images="operator" +java_images="operator jmxtrans" kafka_image="kafka" kafka_images="kafka test-client" @@ -183,7 +183,6 @@ function build { THIRD_PARTY_LIBS="${lib_directory}" done done - } dependency_check diff --git a/docker-images/jmxtrans/Dockerfile b/docker-images/jmxtrans/Dockerfile new file mode 100644 index 00000000000..f4f584e283c --- /dev/null +++ b/docker-images/jmxtrans/Dockerfile @@ -0,0 +1,48 @@ +FROM strimzi/base:latest + +ENV JMXTRANS_HOME /usr/share/jmxtrans +ENV PATH $JMXTRANS_HOME/bin:$PATH +ENV JAR_FILE $JMXTRANS_HOME/lib/jmxtrans.jar +ENV JMXTRANS_VERSION 271 +ENV JMXTRANS_CHECKSUM="9c9116b628be912a723fae8ab65853908cc0872139340827b4af3dbdb7274bc88956af7f33dc927a43ea291c721701fb52368ccd414218ef33e7de3060baf849 jmxtrans-${JMXTRANS_VERSION}-all.jar" +ENV HEAP_SIZE 512 +ENV PERM_SIZE 384 +ENV MAX_PERM_SIZE 384 +ENV SECONDS_BETWEEN_RUNS 60 +ENV CONTINUE_ON_ERROR false +ENV JSON_DIR /var/lib/jmxtrans + +WORKDIR ${JMXTRANS_HOME} +RUN mkdir -p ${JMXTRANS_HOME}/conf + +##### +# Add JmxTrans Jar +##### +RUN mkdir -p /usr/share/jmxtrans/lib/ \ + && mkdir -p /var/log/jmxtrans \ + && curl -k https://repo1.maven.org/maven2/org/jmxtrans/jmxtrans/${JMXTRANS_VERSION}/jmxtrans-${JMXTRANS_VERSION}-all.jar --output jmxtrans-${JMXTRANS_VERSION}-all.jar \ + && echo $JMXTRANS_CHECKSUM > jmxtrans-${JMXTRANS_VERSION}-all.jar.sha512 \ + && sha512sum --check jmxtrans-${JMXTRANS_VERSION}-all.jar.sha512 \ + && rm jmxtrans-${JMXTRANS_VERSION}-all.jar.sha512 \ + && mv jmxtrans-${JMXTRANS_VERSION}-all.jar ${JAR_FILE} + +COPY docker-entrypoint.sh /docker-entrypoint.sh +COPY jmxtrans_readiness_check.sh /opt/jmx/ + +##### +# Add Tini +##### +ENV TINI_VERSION v0.18.0 +ENV TINI_SHA256=12d20136605531b09a2c2dac02ccee85e1b874eb322ef6baf7561cd93f93c855 +ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /usr/bin/tini +RUN echo "${TINI_SHA256} */usr/bin/tini" | sha256sum -c \ + && chmod +x /usr/bin/tini + +##### +# Add NC +##### +RUN yum install -y nc + +VOLUME ${JSON_DIR} + +ENTRYPOINT [ "/docker-entrypoint.sh", "start-without-jmx" ] \ No newline at end of file diff --git a/docker-images/jmxtrans/Makefile b/docker-images/jmxtrans/Makefile new file mode 100644 index 00000000000..b66d2bcf7df --- /dev/null +++ b/docker-images/jmxtrans/Makefile @@ -0,0 +1,8 @@ +PROJECT_NAME := jmxtrans +DOCKER_TAG ?= latest + +include ../../Makefile.docker + +.PHONY: build clean tag + +docker_build: docker_build_default \ No newline at end of file diff --git a/docker-images/jmxtrans/docker-entrypoint.sh b/docker-images/jmxtrans/docker-entrypoint.sh new file mode 100755 index 00000000000..2b1e512fb87 --- /dev/null +++ b/docker-images/jmxtrans/docker-entrypoint.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +set -e + +EXEC="-jar $JAR_FILE -e -j $JSON_DIR -s $SECONDS_BETWEEN_RUNS -c $CONTINUE_ON_ERROR $ADDITIONAL_JARS_OPTS" +GC_OPTS="-Xms${HEAP_SIZE}m -Xmx${HEAP_SIZE}m -XX:PermSize=${PERM_SIZE}m -XX:MaxPermSize=${MAX_PERM_SIZE}m" +JMXTRANS_OPTS="$JMXTRANS_OPTS -Dlog4j2.configurationFile=file:///${JMXTRANS_HOME}/conf/log4j2.properties" + +if [ -n "${KAFKA_JMX_USERNAME}" ]; then + JMXTRANS_OPTS="$JMXTRANS_OPTS -Dkafka.username=${KAFKA_JMX_USERNAME} -Dkafka.password=${KAFKA_JMX_PASSWORD}" +fi + +MONITOR_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false \ + -Dcom.sun.management.jmxremote.authenticate=false \ + -Dcom.sun.management.jmxremote.port=9999 \ + -Dcom.sun.management.jmxremote.rmi.port=9999 \ + -Djava.rmi.server.hostname=${PROXY_HOST}" + +if [ "$1" = 'start-without-jmx' ]; then + set /usr/bin/tini -- java -server $JAVA_OPTS $JMXTRANS_OPTS $GC_OPTS $EXEC +elif [ "$1" = 'start-with-jmx' ]; then + set /usr/bin/tini -- java -server $JAVA_OPTS $JMXTRANS_OPTS $GC_OPTS $MONITOR_OPTS $EXEC +fi + +exec "$@" diff --git a/docker-images/jmxtrans/jmxtrans_readiness_check.sh b/docker-images/jmxtrans/jmxtrans_readiness_check.sh new file mode 100755 index 00000000000..40552b4877e --- /dev/null +++ b/docker-images/jmxtrans/jmxtrans_readiness_check.sh @@ -0,0 +1,21 @@ +#!/bin/bash +set -e + +if [ -z "$1" ]; then + echo "No kafka bootstrap has been given" + exit 1 +else + export KAFKA_HEADLESS_SERVICE="$1" +fi + +if [ -z "$2" ]; then + echo "No kafka bootstrap port given" +else + export KAFKA_METRICS_PORT="$2" +fi + +nc -z "$KAFKA_HEADLESS_SERVICE" "$KAFKA_METRICS_PORT" +if [ "$?" -ne 0 ]; then + echo "Couldn't connect to $KAFKA_HEADLESS_SERVICE:$KAFKA_METRICS_PORT" + exit 1 +fi \ No newline at end of file diff --git a/documentation/assemblies/assembly-jmxtrans.adoc b/documentation/assemblies/assembly-jmxtrans.adoc new file mode 100644 index 00000000000..969f8e472ae --- /dev/null +++ b/documentation/assemblies/assembly-jmxtrans.adoc @@ -0,0 +1,24 @@ +// This assembly is included in the following assemblies: +// +// assembly-deployment-configuration-kafka.adoc +// assembly-deployment-configuration-kafka.adoc + +:parent-context: {context} + +[id='assembly-jmxtrans-{context}'] += Retrieving JMX metrics with JMXTrans + +JmxTrans is a way of retrieving JMX metrics data from Java processes and pushing that data in various formats to remote sinks inside or outside of the cluster. +JmxTrans can communicate with a secure JMX port. +{ProductName} supports using JmxTrans to read JMX data from Kafka brokers. + +include::../modules/con-jmxtrans.adoc[leveloffset=+1] + +include::../modules/proc-jmxtrans-deployment.adoc[leveloffset=+1] + +== Additional resources + +For more information about the Jmxtrans see link:https://github.com/jmxtrans/jmxtrans[Jmxtrans github] + +// Restore the context to what it was before this assembly. +:context: {parent-context} \ No newline at end of file diff --git a/documentation/assemblies/assembly-kafka-jmx-options.adoc b/documentation/assemblies/assembly-kafka-jmx-options.adoc index 8f30987b2f3..a89d8efd58e 100644 --- a/documentation/assemblies/assembly-kafka-jmx-options.adoc +++ b/documentation/assemblies/assembly-kafka-jmx-options.adoc @@ -9,7 +9,7 @@ [id='assembly-jmx-options-{context}'] -= JMX Remote += JMX Options {ProductName} supports obtaining JMX metrics from the Kafka brokers by opening a JMX port on 9999. You can obtain various metrics about each Kafka broker, for example, usage data such as the `BytesPerSecond` value diff --git a/documentation/modules/appendix_crds.adoc b/documentation/modules/appendix_crds.adoc index 1e545b47bd7..2aefd507887 100644 --- a/documentation/modules/appendix_crds.adoc +++ b/documentation/modules/appendix_crds.adoc @@ -35,6 +35,8 @@ Used in: xref:type-Kafka-{context}[`Kafka`] |xref:type-CertificateAuthority-{context}[`CertificateAuthority`] |clientsCa 1.2+<.<|Configuration of the clients certificate authority. |xref:type-CertificateAuthority-{context}[`CertificateAuthority`] +|jmxTrans 1.2+<.<|Configuration for JmxTrans. When the property is present a JmxTrans deployment is created for gathering JMX metrics from each Kafka broker. For more information see https://github.com/jmxtrans/jmxtrans[JmxTrans GitHub]. +|xref:type-JmxTransSpec-{context}[`JmxTransSpec`] |kafkaExporter 1.2+<.<|Configuration of the Kafka Exporter. Kafka Exporter can provide additional metrics, for example lag of consumer group at topic/partition. |xref:type-KafkaExporterSpec-{context}[`KafkaExporterSpec`] |maintenanceTimeWindows 1.2+<.<|A list of time windows for maintenance tasks (that is, certificates renewal). Each time window is defined by a cron expression. @@ -806,7 +808,7 @@ It must have the value `password` for the type `KafkaJmxAuthenticationPassword`. [id='type-ResourceRequirements-{context}'] ### `ResourceRequirements` schema reference -Used in: xref:type-EntityTopicOperatorSpec-{context}[`EntityTopicOperatorSpec`], xref:type-EntityUserOperatorSpec-{context}[`EntityUserOperatorSpec`], xref:type-KafkaBridgeSpec-{context}[`KafkaBridgeSpec`], xref:type-KafkaClusterSpec-{context}[`KafkaClusterSpec`], xref:type-KafkaConnectS2ISpec-{context}[`KafkaConnectS2ISpec`], xref:type-KafkaConnectSpec-{context}[`KafkaConnectSpec`], xref:type-KafkaExporterSpec-{context}[`KafkaExporterSpec`], xref:type-KafkaMirrorMakerSpec-{context}[`KafkaMirrorMakerSpec`], xref:type-TlsSidecar-{context}[`TlsSidecar`], xref:type-TopicOperatorSpec-{context}[`TopicOperatorSpec`], xref:type-ZookeeperClusterSpec-{context}[`ZookeeperClusterSpec`] +Used in: xref:type-EntityTopicOperatorSpec-{context}[`EntityTopicOperatorSpec`], xref:type-EntityUserOperatorSpec-{context}[`EntityUserOperatorSpec`], xref:type-JmxTransSpec-{context}[`JmxTransSpec`], xref:type-KafkaBridgeSpec-{context}[`KafkaBridgeSpec`], xref:type-KafkaClusterSpec-{context}[`KafkaClusterSpec`], xref:type-KafkaConnectS2ISpec-{context}[`KafkaConnectS2ISpec`], xref:type-KafkaConnectSpec-{context}[`KafkaConnectSpec`], xref:type-KafkaExporterSpec-{context}[`KafkaExporterSpec`], xref:type-KafkaMirrorMakerSpec-{context}[`KafkaMirrorMakerSpec`], xref:type-TlsSidecar-{context}[`TlsSidecar`], xref:type-TopicOperatorSpec-{context}[`TopicOperatorSpec`], xref:type-ZookeeperClusterSpec-{context}[`ZookeeperClusterSpec`] [options="header"] @@ -1285,6 +1287,67 @@ Configuration of how TLS certificates are used within the cluster. This applies |string (one of [replace-key, renew-certificate]) |==== +[id='type-JmxTransSpec-{context}'] +### `JmxTransSpec` schema reference + +Used in: xref:type-KafkaSpec-{context}[`KafkaSpec`] + + +[options="header"] +|==== +|Property |Description +|image 1.2+<.<|The image to use for the JmxTrans. +|string +|outputDefinitions 1.2+<.<|Defines the output hosts that will be referenced later on. For more information on these properties see, xref:type-JmxTransOutputDefinitionTemplate-reference[`JmxTransOutputDefinitionTemplate` schema reference]. +|xref:type-JmxTransOutputDefinitionTemplate-{context}[`JmxTransOutputDefinitionTemplate`] array +|logLevel 1.2+<.<|Sets the logging level of the JmxTrans deployment.For more information see, https://github.com/jmxtrans/jmxtrans-agent/wiki/Troubleshooting[JmxTrans Logging Level]. +|string +|kafkaQueries 1.2+<.<|Queries to send to the Kafka brokers to define what data should be read from each broker. For more information on these properties see, xref:type-JmxTransQueryTemplate-reference[`JmxTransQueryTemplate` schema reference]. +|xref:type-JmxTransQueryTemplate-{context}[`JmxTransQueryTemplate`] array +|resources 1.2+<.<|CPU and memory resources to reserve. +|xref:type-ResourceRequirements-{context}[`ResourceRequirements`] +|==== + +[id='type-JmxTransOutputDefinitionTemplate-{context}'] +### `JmxTransOutputDefinitionTemplate` schema reference + +Used in: xref:type-JmxTransSpec-{context}[`JmxTransSpec`] + + +[options="header"] +|==== +|Property |Description +|outputType 1.2+<.<|Template for setting the format of the data that will be pushed.For more information see https://github.com/jmxtrans/jmxtrans/wiki/OutputWriters[JmxTrans OutputWriters]. +|string +|host 1.2+<.<|The DNS/hostname of the remote host that the data is pushed to. +|string +|port 1.2+<.<|The port of the remote host that the data is pushed to. +|integer +|flushDelayInSeconds 1.2+<.<|How many seconds the JmxTrans waits before pushing a new set of data out. +|integer +|typeNames 1.2+<.<|Template for filtering data to be included in response to a wildcard query. For more information see https://github.com/jmxtrans/jmxtrans/wiki/Queries[JmxTrans queries]. +|string array +|name 1.2+<.<|Template for setting the name of the output definition. This is used to identify where to send the results of queries should be sent. +|string +|==== + +[id='type-JmxTransQueryTemplate-{context}'] +### `JmxTransQueryTemplate` schema reference + +Used in: xref:type-JmxTransSpec-{context}[`JmxTransSpec`] + + +[options="header"] +|==== +|Property |Description +|targetMBean 1.2+<.<|If using wildcards instead of a specific MBean then the data is gathered from multiple MBeans. Otherwise if specifying an MBean then data is gathered from that specified MBean. +|string +|attributes 1.2+<.<|Determine which attributes of the targeted MBean should be included. +|string array +|outputs 1.2+<.<|List of the names of output definitions specified in the spec.kafka.jmxTrans.outputDefinitions that have defined where JMX metrics are pushed to, and in which data format. +|string array +|==== + [id='type-KafkaExporterSpec-{context}'] ### `KafkaExporterSpec` schema reference diff --git a/documentation/modules/con-jmxtrans.adoc b/documentation/modules/con-jmxtrans.adoc new file mode 100644 index 00000000000..bc7bc2933b1 --- /dev/null +++ b/documentation/modules/con-jmxtrans.adoc @@ -0,0 +1,11 @@ +// Module included in the following assemblies: +// +// assembly-jmxtrans.adoc + +[id='con-jmxtrans-{context}'] += Jmxtrans + +JmxTrans reads JMX metrics data from secure or insecure Kafka brokers and pushes the data to remote sinks in various data formats. +An example use case of the Jmxtrans would be to obtain JMX metrics about the request rate of each Kafka broker's network +and push it to a Logstash database outside of the Kubernetes cluster. + diff --git a/documentation/modules/proc-jmxtrans-deployment.adoc b/documentation/modules/proc-jmxtrans-deployment.adoc new file mode 100644 index 00000000000..531380f6070 --- /dev/null +++ b/documentation/modules/proc-jmxtrans-deployment.adoc @@ -0,0 +1,80 @@ +// Module included in the following assemblies: +// +// assembly-deployment-configuration-kafka.adoc +// assembly-jmxtrans.adoc. + +[id='proc-jmxtrans-deployment-{context}'] += Configuring a JMXTrans deployment + +.Prerequisites +* A running Kubernetes cluster + +You can configure a JmxTrans deployment by using the `Kafka.spec.jmxTrans` property. +A JmxTrans deployment can read from a secure or insecure Kafka broker. +To configure a JmxTrans deployment, define the following properties: + +* `Kafka.spec.jmxTrans.outputDefinitions` +* `Kafka.spec.jmxTrans.kafkaQueries` + +For more information on these properties see, xref:type-JmxTransSpec-reference[JmxTransSpec schema reference]. + +NOTE: JmxTrans will not come up enable you specify that JmxOptions on the Kafka broker. +For more information see, xref:assembly-kafka-jmx-options[Kafka Jmx Options] +.Configuring JmxTrans output definitions + +Output definitions specify where JMX metrics are pushed to, and in which data format. +For information about supported data formats, see link:https://github.com/jmxtrans/jmxtrans/wiki/OutputWriters[Data formats^]. +How many seconds JmxTrans agent waits for before pushing new data can be configured through the `flushDelay` property. +The `host` and `port` properties define the target host address and target port the data is pushed to. +The `name` property is a required property that is referenced by the `Kafka.spec.kafka.jmxOptions.jmxTrans.queries` property. + +Here is an example configuration pushing JMX data in the Graphite format every 5 seconds to a Logstash database on \http://myLogstash:9999, and another pushing to `standardOut` (standard output): +[source,yaml,subs=attributes+] +---- +apiVersion: {KafkaApiVersion} +kind: Kafka +metadata: + name: my-cluster +spec: + jmxTrans: + outputDefinitions: + - outputType: "com.googlecode.jmxtrans.model.output.GraphiteWriter" + host: "http://myLogstash" + port: 9999 + flushDelay: 5 + name: "logstash" + - outputType: "com.googlecode.jmxtrans.model.output.StdOutWriter" + name: "standardOut" + # ... + # ... + zookeeper: + # ... +---- + +.Configuring JmxTrans queries +JmxTrans queries specify what JMX metrics are read from the Kafka brokers. +Currently JmxTrans queries can only be sent to the Kafka Brokers. +Configure the `targetMBean` property to specify which target MBean on the Kafka broker is addressed. +Configuring the `attributes` property specifies which MBean attribute is read as JMX metrics from the target MBean. +JmxTrans supports wildcards to read from target MBeans, and filter by specifying the `typenames`. +The `outputs` property defines where the metrics are pushed to by specifying the name of the output definitions. + +The following JmxTrans deployment reads from all MBeans that match the pattern `kafka.server:type=BrokerTopicMetrics,name=*` and have `name` in the target MBean's name. +From those Mbeans, it obtains JMX metrics about the `Count` attribute and pushes the metrics to standard output as defined by `outputs`. +[source,yaml,subs=attributes+] +---- +apiVersion: {KafkaApiVersion} +kind: Kafka +metadata: + name: my-cluster +spec: + # ... + jmxTrans: + kafkaQueries: + - targetMBean: "kafka.server:type=BrokerTopicMetrics,*" + typeNames: ["name"] + attributes: ["Count"] + outputs: ["standardOut"] + zookeeper: + # ... +---- \ No newline at end of file diff --git a/documentation/modules/ref-configuring-container-images.adoc b/documentation/modules/ref-configuring-container-images.adoc index 250d0b92b0c..8b10c5deec4 100644 --- a/documentation/modules/ref-configuring-container-images.adoc +++ b/documentation/modules/ref-configuring-container-images.adoc @@ -14,6 +14,7 @@ You can specify which container image to use for each component using the `image * `Kafka.spec.entityOperator.topicOperator` * `Kafka.spec.entityOperator.userOperator` * `Kafka.spec.entityOperator.tlsSidecar` +* `Kafka.spec.jmxTrans` * `KafkaConnect.spec` * `KafkaConnectS2I.spec` * `KafkaBridge.spec` @@ -80,6 +81,9 @@ If the `image` name is not defined in the Cluster Operator configuration, then t * For Kafka broker initializer: . Container image specified in the `STRIMZI_DEFAULT_KAFKA_INIT_IMAGE` environment variable from the Cluster Operator configuration. . `{DockerKafkaInit}` container image. +* For Kafka broker initializer: +. Container image specified in the `STRIMZI_DEFAULT_JMXTRANS_IMAGE` environment variable from the Cluster Operator configuration. +. `{DockerKafkaInit}` container image. WARNING: Overriding container images is recommended only in special situations, where you need to use a different container registry. For example, because your network does not allow access to the container repository used by {ProductName}. diff --git a/documentation/modules/snip-images.adoc b/documentation/modules/snip-images.adoc index d5fa082b2ec..8a8cdb5111f 100644 --- a/documentation/modules/snip-images.adoc +++ b/documentation/modules/snip-images.adoc @@ -37,4 +37,11 @@ a| a| {ProductName} image for running the {ProductName} kafka Bridge +|JmxTrans +a| +* {DockerOrg}/jmxtrans:{DockerTag} + +a| +{ProductName} image for running the {ProductName} JmxTrans + |=== diff --git a/documentation/snip-images.sh b/documentation/snip-images.sh index aa0e19a2859..fc91ab2acb7 100755 --- a/documentation/snip-images.sh +++ b/documentation/snip-images.sh @@ -55,5 +55,12 @@ a| a| {ProductName} image for running the {ProductName} kafka Bridge +|JmxTrans +a| +* {DockerOrg}/jmxtrans:{DockerTag} + +a| +{ProductName} image for running the {ProductName} JmxTrans + |=== EOF diff --git a/examples/metrics/jmxtrans.yaml b/examples/metrics/jmxtrans.yaml new file mode 100644 index 00000000000..fa971fc61c1 --- /dev/null +++ b/examples/metrics/jmxtrans.yaml @@ -0,0 +1,38 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + version: 2.4.0 + replicas: 2 + listeners: + plain: {} + tls: {} + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + log.message.format.version: "2.4" + storage: + type: ephemeral + jmxOptions: + authentication: + type: "password" + jmxTrans: + outputDefinitions: + - outputType: "com.googlecode.jmxtrans.model.output.StdOutWriter" + name: "standardOut" + - outputType: "com.googlecode.jmxtrans.model.output.GraphiteOutputWriter" + host: "mylogstash.com" + port: 31028 + flushDelayInSeconds: 5 + name: "logstash" + kafkaQueries: + - targetMBean: "kafka.server:type=BrokerTopicMetrics,name=*" + attributes: ["Count"] + outputs: ["standardOut"] + zookeeper: + replicas: 1 + storage: + type: ephemeral \ No newline at end of file diff --git a/helm-charts/strimzi-kafka-operator/README.md b/helm-charts/strimzi-kafka-operator/README.md index 2f8a75e7d06..bc55937d010 100644 --- a/helm-charts/strimzi-kafka-operator/README.md +++ b/helm-charts/strimzi-kafka-operator/README.md @@ -62,6 +62,9 @@ the documentation for more details. | `zookeeper.image.repository` | ZooKeeper image repository | `strimzi` | | `zookeeper.image.name` | ZooKeeper image name | `kafka` | | `zookeeper.image.tag` | ZooKeeper image tag prefix | `latest` | +| `jmxtrans.image.repository` | JmxTrans image repository | `strimzi` | +| `jmxtrans.image.name` | JmxTrans image name | `jmxtrans` | +| `jmxtrans.image.tag` | JmxTrans image tag prefix | `latest` | | `kafka.image.repository` | Kafka image repository | `strimzi` | | `kafka.image.name` | Kafka image name | `kafka` | | `kafka.image.tagPrefix` | Kafka image tag prefix | `latest` | diff --git a/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml b/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml index 08fa52f7723..ed90221d9bc 100644 --- a/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml @@ -3274,6 +3274,64 @@ spec: enum: - renew-certificate - replace-key + jmxTrans: + type: object + properties: + image: + type: string + outputDefinitions: + type: array + items: + type: object + properties: + outputType: + type: string + host: + type: string + port: + type: integer + flushDelayInSeconds: + type: integer + typeNames: + type: array + items: + type: string + name: + type: string + required: + - outputType + - name + logLevel: + type: string + kafkaQueries: + type: array + items: + type: object + properties: + targetMBean: + type: string + attributes: + type: array + items: + type: string + outputs: + type: array + items: + type: string + required: + - targetMBean + - attributes + - outputs + resources: + type: object + properties: + limits: + type: object + requests: + type: object + required: + - outputDefinitions + - kafkaQueries kafkaExporter: type: object properties: diff --git a/helm-charts/strimzi-kafka-operator/templates/050-Deployment-strimzi-cluster-operator.yaml b/helm-charts/strimzi-kafka-operator/templates/050-Deployment-strimzi-cluster-operator.yaml index af7b096fe95..ab9872092fa 100755 --- a/helm-charts/strimzi-kafka-operator/templates/050-Deployment-strimzi-cluster-operator.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/050-Deployment-strimzi-cluster-operator.yaml @@ -61,6 +61,8 @@ spec: value: "{{ default .Values.kafkaInit.image.repository .Values.imageRepositoryOverride }}/{{ .Values.kafkaInit.image.name }}:{{ default .Values.kafkaInit.image.tag .Values.imageTagOverride }}" - name: STRIMZI_DEFAULT_KAFKA_BRIDGE_IMAGE value: "{{ default .Values.kafkaBridge.image.repository .Values.imageRepositoryOverride }}/{{ .Values.kafkaBridge.image.name }}:{{ default .Values.kafkaBridge.image.tag .Values.imageTagOverride }}" + - name: STRIMZI_DEFAULT_JMXTRANS_IMAGE + value: "{{ default .Values.jmxTrans.image.repository .Values.imageRepositoryOverride }}/{{ .Values.jmxTrans.image.name }}:{{ default .Values.jmxTrans.image.tag .Values.imageTagOverride }}" - name: STRIMZI_LOG_LEVEL value: {{ .Values.logLevel | quote }} {{- if .Values.image.imagePullSecrets }} diff --git a/helm-charts/strimzi-kafka-operator/values.yaml b/helm-charts/strimzi-kafka-operator/values.yaml index 3abab1c1fae..725356e7075 100755 --- a/helm-charts/strimzi-kafka-operator/values.yaml +++ b/helm-charts/strimzi-kafka-operator/values.yaml @@ -87,6 +87,11 @@ kafkaExporter: repository: strimzi name: kafka tagPrefix: latest +jmxTrans: + image: + repository: strimzi + name: jmxtrans + tag: latest resources: limits: memory: 256Mi diff --git a/install/cluster-operator/040-Crd-kafka.yaml b/install/cluster-operator/040-Crd-kafka.yaml index d89ac3414c6..5c7ab6e81a4 100644 --- a/install/cluster-operator/040-Crd-kafka.yaml +++ b/install/cluster-operator/040-Crd-kafka.yaml @@ -3269,6 +3269,64 @@ spec: enum: - renew-certificate - replace-key + jmxTrans: + type: object + properties: + image: + type: string + outputDefinitions: + type: array + items: + type: object + properties: + outputType: + type: string + host: + type: string + port: + type: integer + flushDelayInSeconds: + type: integer + typeNames: + type: array + items: + type: string + name: + type: string + required: + - outputType + - name + logLevel: + type: string + kafkaQueries: + type: array + items: + type: object + properties: + targetMBean: + type: string + attributes: + type: array + items: + type: string + outputs: + type: array + items: + type: string + required: + - targetMBean + - attributes + - outputs + resources: + type: object + properties: + limits: + type: object + requests: + type: object + required: + - outputDefinitions + - kafkaQueries kafkaExporter: type: object properties: diff --git a/install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml b/install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml index 7772bcfcee7..8c7ece4e8ff 100644 --- a/install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml +++ b/install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml @@ -63,6 +63,8 @@ spec: value: strimzi/operator:latest - name: STRIMZI_DEFAULT_KAFKA_BRIDGE_IMAGE value: strimzi/kafka-bridge:0.15.0 + - name: STRIMZI_DEFAULT_JMXTRANS_IMAGE + value: strimzi/jmxtrans:latest - name: STRIMZI_LOG_LEVEL value: INFO livenessProbe: diff --git a/metrics/examples/prometheus/install/strimzi-service-monitor.yaml b/metrics/examples/prometheus/install/strimzi-service-monitor.yaml index 00a7c2692a9..b6861cf0b4a 100644 --- a/metrics/examples/prometheus/install/strimzi-service-monitor.yaml +++ b/metrics/examples/prometheus/install/strimzi-service-monitor.yaml @@ -16,7 +16,7 @@ spec: # configured in additional secret #### job_name: kube-state-metrics - - port: metrics + - port: prometheus honorLabels: true interval: 10s scrapeTimeout: 10s @@ -64,7 +64,7 @@ spec: action: replace #### job_name: node-exporter - - port: metrics + - port: prometheus honorLabels: true interval: 10s scrapeTimeout: 10s