From 04e27d1bbe0fe2cd14be9bd1f3be773feec3d772 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 13 Oct 2023 17:32:24 -0400 Subject: [PATCH] AOT required changes in Kafka Streams binder --- .../KafkaStreamsBindableProxyFactory.java | 4 + ...KafkaStreamsFunctionBeanPostProcessor.java | 77 +++++++++---------- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java index 55dfcf1ba..1abe70ee2 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java @@ -285,5 +285,9 @@ public String getFunctionName() { public Map getOutputHolders() { return outputHolders; } + + public void setStreamFunctionProperties(StreamFunctionProperties streamFunctionProperties) { + this.streamFunctionProperties = streamFunctionProperties; + } } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionBeanPostProcessor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionBeanPostProcessor.java index 1c4b7c963..50537f21e 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionBeanPostProcessor.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionBeanPostProcessor.java @@ -51,8 +51,9 @@ import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderUtils; import org.springframework.cloud.stream.function.StreamFunctionProperties; import org.springframework.core.ResolvableType; +import org.springframework.core.type.StandardClassMetadata; +import org.springframework.core.type.StandardMethodMetadata; import org.springframework.lang.NonNull; -import org.springframework.util.ClassUtils; /** * @author Soby Chacko @@ -122,6 +123,7 @@ public void afterPropertiesSet() { ResolvableType[] resolvableTypes = new ResolvableType[]{getResolvableTypes().get(s)}; RootBeanDefinition rootBeanDefinition = new RootBeanDefinition( KafkaStreamsBindableProxyFactory.class); + rootBeanDefinition.getPropertyValues().add("streamFunctionProperties", this.streamFunctionProperties); registerKakaStreamsProxyFactory(registry, s, resolvableTypes, rootBeanDefinition); } } @@ -146,6 +148,7 @@ public void afterPropertiesSet() { if (!nonKafkaStreamsFunctionsFound) { RootBeanDefinition rootBeanDefinition = new RootBeanDefinition( KafkaStreamsBindableProxyFactory.class); + rootBeanDefinition.getPropertyValues().add("streamFunctionProperties", this.streamFunctionProperties); registerKakaStreamsProxyFactory(registry, derivedNameFromComposed, resolvableTypes, rootBeanDefinition); } } @@ -155,6 +158,7 @@ public void afterPropertiesSet() { ResolvableType[] resolvableTypes = new ResolvableType[]{getResolvableTypes().get(functionUnit)}; RootBeanDefinition rootBeanDefinition = new RootBeanDefinition( KafkaStreamsBindableProxyFactory.class); + rootBeanDefinition.getPropertyValues().add("streamFunctionProperties", this.streamFunctionProperties); registerKakaStreamsProxyFactory(registry, functionUnit, resolvableTypes, rootBeanDefinition); } } @@ -173,55 +177,50 @@ private void registerKakaStreamsProxyFactory(BeanDefinitionRegistry registry, St } private void extractResolvableTypes(String key) { - final Class classObj = ClassUtils.resolveClassName(((AnnotatedBeanDefinition) - this.beanFactory.getBeanDefinition(key)) - .getMetadata().getClassName(), - ClassUtils.getDefaultClassLoader()); + BeanDefinition beanDefinition = this.beanFactory.getBeanDefinition(key); + ResolvableType resolvableType = null; + Class rawClass = null; try { - Method[] methods = classObj.getDeclaredMethods(); - Optional functionalBeanMethods = KafkaStreamsBinderUtils.findMethodWithName(key, methods); - if (functionalBeanMethods.isEmpty()) { - methods = classObj.getMethods(); // check the inherited methods - functionalBeanMethods = KafkaStreamsBinderUtils.findMethodWithName(key, methods); - } - if (functionalBeanMethods.isEmpty()) { - final BeanDefinition beanDefinition = this.beanFactory.getBeanDefinition(key); - final String factoryMethodName = beanDefinition.getFactoryMethodName(); - functionalBeanMethods = KafkaStreamsBinderUtils.findMethodWithName(factoryMethodName, methods); - } - - if (functionalBeanMethods.isPresent()) { - Method method = functionalBeanMethods.get(); - ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj); - final Class rawClass = resolvableType.getGeneric(0).getRawClass(); - if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) { - if (onlySingleFunction) { - resolvableTypeMap.put(key, resolvableType); - } - else { - discoverOnlyKafkaStreamsResolvableTypes(key, resolvableType); - } + if (beanDefinition instanceof AnnotatedBeanDefinition annotatedBeanDefinition) { + StandardMethodMetadata factoryMethodMetadata = (StandardMethodMetadata) annotatedBeanDefinition.getFactoryMethodMetadata(); + if (factoryMethodMetadata != null) { + Method introspectedMethod = factoryMethodMetadata.getIntrospectedMethod(); + resolvableType = ResolvableType.forMethodReturnType(introspectedMethod); + rawClass = resolvableType.getGeneric(0).getRawClass(); } - } - else { - Optional componentBeanMethods = Arrays.stream(methods) + else { + Class introspectedClass = ((StandardClassMetadata) annotatedBeanDefinition.getMetadata()).getIntrospectedClass(); + Method[] methods = introspectedClass.getDeclaredMethods(); + + Optional componentBeanMethods = Arrays.stream(methods) .filter(m -> m.getName().equals("apply") && isKafkaStreamsTypeFound(m) || - m.getName().equals("accept") && isKafkaStreamsTypeFound(m)).findFirst(); - if (componentBeanMethods.isPresent()) { - Method method = componentBeanMethods.get(); - final ResolvableType resolvableType = ResolvableType.forMethodParameter(method, 0); - final Class rawClass = resolvableType.getRawClass(); - if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) { + m.getName().equals("accept") && isKafkaStreamsTypeFound(m)).findFirst(); + if (componentBeanMethods.isPresent()) { + Method method = componentBeanMethods.get(); + resolvableType = ResolvableType.forMethodParameter(method, 0); + rawClass = resolvableType.getRawClass(); if (onlySingleFunction) { - resolvableTypeMap.put(key, resolvableType); this.methods.put(key, method); } else { - discoverOnlyKafkaStreamsResolvableTypesAndMethods(key, resolvableType, method); + kafakStreamsOnlyMethods.put(key, method); } } } } + else { + resolvableType = beanDefinition.getResolvableType(); + rawClass = resolvableType.getGeneric(0).getRawClass(); + } + + if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) { + if (onlySingleFunction) { + resolvableTypeMap.put(key, resolvableType); + } + else { + discoverOnlyKafkaStreamsResolvableTypes(key, resolvableType); + } + } } catch (Exception e) { LOG.error("Function activation issues while mapping the function: " + key, e);