From 7845be50effe311891a72243556cd3d11cad7cc9 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 17 Apr 2024 17:29:58 -0400 Subject: [PATCH] GH-2932: Sanitize sensitive data on bindings endpoint Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2932 Spring Boot provides `SanitizingFunction` to allow the applicaitons to clear out sensitive data when using certain actuator endpoints. This feature can be extended to custom endpoints as well. Enable the bindings actuator endpoint to sanitze sensitive data based on user-provided logic in `SantizingFuction` beans in the application. --- .../integration/KafkaBinderActuatorTests.java | 53 ++++++++++++++- .../stream/endpoint/ActuatorBindingsTest.java | 8 +-- .../binding/BindingsLifecycleController.java | 14 +++- .../BindingsEndpointAutoConfiguration.java | 11 ++- .../stream/endpoint/BindingsEndpoint.java | 68 +++++++++++++++++-- .../binding_visualization_control.adoc | 21 ++++++ 6 files changed, 161 insertions(+), 14 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderActuatorTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderActuatorTests.java index f0b49f08e4..c5589652c6 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderActuatorTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderActuatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Consumer; import io.micrometer.core.instrument.MeterRegistry; @@ -27,6 +28,7 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.actuate.endpoint.SanitizingFunction; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.FilteredClassLoader; import org.springframework.boot.test.context.SpringBootTest; @@ -37,6 +39,8 @@ import org.springframework.cloud.stream.config.ListenerContainerCustomizer; import org.springframework.cloud.stream.config.MessageSourceCustomizer; import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer; +import org.springframework.cloud.stream.endpoint.BindingsEndpoint; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; @@ -63,6 +67,8 @@ properties = { "spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP, "spring.cloud.stream.function.bindings.process-in-0=input", + "management.endpoints.web.exposure.include=bindings", + "spring.cloud.stream.kafka.bindings.input.consumer.configuration.sasl.jaas.config=secret", "spring.cloud.stream.pollable-source=input"} ) @DirtiesContext @@ -77,6 +83,9 @@ class KafkaBinderActuatorTests { @Autowired private KafkaTemplate kafkaTemplate; + @Autowired + private ApplicationContext context; + @Test void kafkaBinderMetricsExposed() { this.kafkaTemplate.send("input", null, "foo".getBytes()); @@ -87,6 +96,36 @@ void kafkaBinderMetricsExposed() { .value()).isGreaterThan(0); } + @Test + @SuppressWarnings("unchecked") + void bindingsActuatorEndpointInKafkaBinderBasedApp() { + + BindingsEndpoint controller = context.getBean(BindingsEndpoint.class); + List> bindings = controller.queryStates(); + Optional> first = bindings.stream().filter(m -> m.get("bindingName").equals("input")).findFirst(); + assertThat(first.isPresent()).isTrue(); + Map inputBindingMap = first.get(); + + Map extendedInfo = (Map) inputBindingMap.get("extendedInfo"); + Map extendedConsumerProperties = (Map) extendedInfo.get("ExtendedConsumerProperties"); + Map extension = (Map) extendedConsumerProperties.get("extension"); + Map configuration = (Map) extension.get("configuration"); + String saslJaasConfig = (String) configuration.get("sasl.jaas.config"); + + assertThat(saslJaasConfig).isEqualTo("data-scrambled!!"); + + List> input = controller.queryState("input"); + // Since the above call goes through JSON serialization, we receive the type as a map of bindings. + // The above call goes through this serialization because we provide a sanitization function. + Map extendedInfo1 = (Map) ((Map) input.get(0)).get("extendedInfo"); + Map extendedConsumerProperties1 = (Map) extendedInfo1.get("ExtendedConsumerProperties"); + Map extension1 = (Map) extendedConsumerProperties1.get("extension"); + Map configuration1 = (Map) extension1.get("configuration"); + String saslJaasConfig1 = (String) configuration1.get("sasl.jaas.config"); + + assertThat(saslJaasConfig1).isEqualTo("data-scrambled!!"); + } + @Test @Disabled void kafkaBinderMetricsWhenNoMicrometer() { @@ -169,5 +208,17 @@ public Consumer process() { }; } + @Bean + public SanitizingFunction sanitizingFunction() { + return sd -> { + if (sd.getKey().equals("sasl.jaas.config")) { + return sd.withValue("data-scrambled!!"); + } + else { + return sd; + } + }; + } + } } diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/endpoint/ActuatorBindingsTest.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/endpoint/ActuatorBindingsTest.java index 1cd6e43daf..11758fb641 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/endpoint/ActuatorBindingsTest.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/endpoint/ActuatorBindingsTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ void actuatorDoesNotCauseInfiniteRecursion() { BindingsLifecycleController controller = context .getBean(BindingsLifecycleController.class); - List> bindings = controller.queryStates(); + List> bindings = controller.queryStates(); assertThat(bindings.size()).isEqualTo(1); assertThat(bindings.get(0).get("bindingName")).isEqualTo("consume-in-0"); } @@ -115,7 +115,7 @@ void whenTwoBindersFoundNoErrorIfBinderProvidedThroughBinding() throws Exception BindingsLifecycleController controller = context .getBean(BindingsLifecycleController.class); - List> bindings = controller.queryStates(); + List> bindings = controller.queryStates(); assertThat(bindings.size()).isEqualTo(1); assertThat(bindings.get(0).get("bindingName")).isEqualTo("consume-in-0"); assertThat(bindings.get(0).get("binderName")).isEqualTo("integration"); @@ -136,7 +136,7 @@ void whenTwoBindersFoundNoErrorWhenDefaultBinderIsProvided() throws Exception { BindingsLifecycleController controller = context .getBean(BindingsLifecycleController.class); - List> bindings = controller.queryStates(); + List> bindings = controller.queryStates(); assertThat(bindings.size()).isEqualTo(1); assertThat(bindings.get(0).get("bindingName")).isEqualTo("consume-in-0"); assertThat(bindings.get(0).get("binderName")).isEqualTo("integration1"); diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingsLifecycleController.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingsLifecycleController.java index cc85c92f22..9530f746ec 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingsLifecycleController.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingsLifecycleController.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2021 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ * It is registered as a bean and once injected could be used to control the lifecycle f the bindings. * * @author Oleg Zhurakousky + * @author Soby Chacko * @since 3.x */ public class BindingsLifecycleController { @@ -71,6 +72,15 @@ public BindingsLifecycleController(List inputBindingLifec } } + /** + * Provide an accessor for the custom ObjectMapper created by this controller. + * @return {@link ObjectMapper} + * @since 4.1.2 + */ + public ObjectMapper getObjectMapper() { + return objectMapper; + } + /** * Convenience method to stop the binding with provided `bindingName`. * @param bindingName the name of the binding. @@ -129,7 +139,7 @@ public void changeState(String bindingName, State state) { * @return the list of {@link Binding}s */ @SuppressWarnings("unchecked") - public List> queryStates() { + public List> queryStates() { List> bindings = new ArrayList<>(gatherInputBindings()); bindings.addAll(gatherOutputBindings()); return this.objectMapper.convertValue(bindings, List.class); diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingsEndpointAutoConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingsEndpointAutoConfiguration.java index ea759db584..9ab42c6740 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingsEndpointAutoConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingsEndpointAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,10 @@ package org.springframework.cloud.stream.config; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration; import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint; +import org.springframework.boot.actuate.endpoint.SanitizingFunction; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; @@ -29,6 +31,7 @@ /** * @author Oleg Zhurakousky + * @author Soby Chacko * @since 2.0 */ @AutoConfiguration @@ -40,8 +43,10 @@ public class BindingsEndpointAutoConfiguration { @Bean @ConditionalOnAvailableEndpoint - public BindingsEndpoint bindingsEndpoint(BindingsLifecycleController bindingsLifecycleController) { - return new BindingsEndpoint(bindingsLifecycleController); + public BindingsEndpoint bindingsEndpoint(BindingsLifecycleController bindingsLifecycleController, + ObjectProvider sanitizingFunctions) { + return new BindingsEndpoint(bindingsLifecycleController, sanitizingFunctions.orderedStream().toList(), + bindingsLifecycleController.getObjectMapper()); } } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/endpoint/BindingsEndpoint.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/endpoint/BindingsEndpoint.java index 5f4238d001..a2455f2dd1 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/endpoint/BindingsEndpoint.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/endpoint/BindingsEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,15 @@ package org.springframework.cloud.stream.endpoint; +import java.util.Collection; import java.util.List; +import java.util.Map; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.springframework.boot.actuate.endpoint.SanitizableData; +import org.springframework.boot.actuate.endpoint.Sanitizer; +import org.springframework.boot.actuate.endpoint.SanitizingFunction; import org.springframework.boot.actuate.endpoint.annotation.Endpoint; import org.springframework.boot.actuate.endpoint.annotation.ReadOperation; import org.springframework.boot.actuate.endpoint.annotation.Selector; @@ -25,12 +32,15 @@ import org.springframework.cloud.stream.binder.Binding; import org.springframework.cloud.stream.binding.BindingsLifecycleController; import org.springframework.cloud.stream.binding.BindingsLifecycleController.State; +import org.springframework.lang.Nullable; +import org.springframework.util.CollectionUtils; /** * * Actuator endpoint for binding control. * * @author Oleg Zhurakousky + * @author Soby Chacko * @since 2.0 */ @Endpoint(id = "bindings") @@ -38,8 +48,26 @@ public class BindingsEndpoint { private final BindingsLifecycleController lifecycleController; + private final Sanitizer sanitizer; + + private final ObjectMapper objectMapper; + public BindingsEndpoint(BindingsLifecycleController lifecycleController) { + this(lifecycleController, null, null); + } + + /** + * @param lifecycleController {@link BindingsLifecycleController} + * @param sanitizingFunctions list of user provided {@link SanitizingFunction} beans + * @param objectMapper from {@link BindingsLifecycleController} + * @since 4.1.2 + */ + public BindingsEndpoint(BindingsLifecycleController lifecycleController, + @Nullable Iterable sanitizingFunctions, @Nullable ObjectMapper objectMapper) { this.lifecycleController = lifecycleController; + this.sanitizer = CollectionUtils.isEmpty((Collection) sanitizingFunctions) ? null : + new Sanitizer(sanitizingFunctions); + this.objectMapper = objectMapper; } @WriteOperation @@ -48,13 +76,45 @@ public void changeState(@Selector String name, State state) { } @ReadOperation - public List queryStates() { - return this.lifecycleController.queryStates(); + public List> queryStates() { + List> bindings = this.lifecycleController.queryStates(); + if (this.sanitizer != null) { + for (Map binding : bindings) { + sanitizeSensitiveData(binding); + } + } + return bindings; } @ReadOperation + @SuppressWarnings("unchecked") public List> queryState(@Selector String name) { - return this.lifecycleController.queryState(name); + List> bindings = this.lifecycleController.queryState(name); + if (this.sanitizer != null) { + List> bindingsAsMap = this.objectMapper.convertValue(bindings, List.class); + for (Map binding : bindingsAsMap) { + sanitizeSensitiveData(binding); + } + // End users will get a list of map that contains information from the underlying Binding. + return this.objectMapper.convertValue(bindingsAsMap, List.class); + } + return bindings; + } + + @SuppressWarnings("unchecked") + public void sanitizeSensitiveData(Map binding) { + for (String key : binding.keySet()) { + Object value = binding.get(key); + if (value != null && Map.class.isAssignableFrom(value.getClass())) { + // Recursive call since we encountered an inner map + sanitizeSensitiveData((Map) value); + } + else { + SanitizableData sanitizableData = new SanitizableData(null, key, value); + Object sanitized = this.sanitizer.sanitize(sanitizableData, true); + binding.put(key, sanitized); + } + } } } diff --git a/docs/modules/ROOT/pages/spring-cloud-stream/binding_visualization_control.adoc b/docs/modules/ROOT/pages/spring-cloud-stream/binding_visualization_control.adoc index 451b2ae5e5..89307d900b 100644 --- a/docs/modules/ROOT/pages/spring-cloud-stream/binding_visualization_control.adoc +++ b/docs/modules/ROOT/pages/spring-cloud-stream/binding_visualization_control.adoc @@ -82,3 +82,24 @@ You can also stop, start, pause, and resume individual bindings by posting to th NOTE: `PAUSED` and `RESUMED` work only when the corresponding binder and its underlying technology supports it. Otherwise, you see the warning message in the logs. Currently, only Kafka and [Solace](https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) binders supports the `PAUSED` and `RESUMED` states. +[[sanitize-sensitive-data]] +=== Sanitize Sensitive Data + +When using the binding actuator endpoint, it is sometimes critical to sanitize any sensitive data such as user credentials, information about SSL keys, etc. +To achieve this, end user applications can provide a `SanitizingFunction` from Spring Boot as a bean in the application. +Here is an example to scramble the data when providing a value for Apache Kafka's `sasl.jaas.config` property. + +``` +@Bean +public SanitizingFunction sanitizingFunction() { + return sanitizableData -> { + if (sanitizableData.getKey().equals("sasl.jaas.config")) { + return sanitizableData.withValue("data-scrambled!!"); + } + else { + return sanitizableData; + } + }; +} +``` +