From 24147566e201b7e068fec4e3a0a5c0f12bbad98c Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Mon, 25 Dec 2023 13:50:41 +0800 Subject: [PATCH] minor improvement `DelegatingInvocableHandler` and AKLE relate --- .../config/AbstractKafkaListenerEndpoint.java | 11 +++-- .../config/MethodKafkaListenerEndpoint.java | 12 ++--- .../MultiMethodKafkaListenerEndpoint.java | 4 +- .../kafka/listener/adapter/AdapterUtils.java | 5 +-- .../adapter/DelegatingInvocableHandler.java | 45 +++++++++---------- 5 files changed, 37 insertions(+), 40 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 826335f79d..cdc8ca944a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -123,9 +123,9 @@ public abstract class AbstractKafkaListenerEndpoint @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; - if (beanFactory instanceof ConfigurableListableBeanFactory) { - this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver(); - this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null); + if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) { + this.resolver = configurableListableBeanFactory.getBeanExpressionResolver(); + this.expressionContext = new BeanExpressionContext(configurableListableBeanFactory, null); } this.beanResolver = new BeanFactoryResolver(beanFactory); } @@ -275,7 +275,7 @@ public void setGroup(String group) { * @since 1.1 */ public boolean isBatchListener() { - return this.batchListener == null ? false : this.batchListener; + return this.batchListener != null && this.batchListener; } /** @@ -530,11 +530,10 @@ private void setupMessageListener(MessageListenerContainer container, .acceptIfNotNull(this.correlationHeaderName, adapter::setCorrelationHeaderName); adapter.setSplitIterables(this.splitIterables); Object messageListener = adapter; - boolean isBatchListener = isBatchListener(); Assert.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener"); if (this.recordFilterStrategy != null) { - if (isBatchListener) { + if (isBatchListener()) { if (((MessagingMessageListenerAdapter) messageListener).isConsumerRecords()) { this.logger.warn(() -> "Filter strategy ignored when consuming 'ConsumerRecords' instead of a List" + (this.id != null ? " id: " + this.id : "")); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 66875add5b..28e190674e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -207,22 +207,22 @@ protected MessagingMessageListenerAdapter createMessageListenerInstance( MessagingMessageListenerAdapter listener; if (isBatchListener()) { - BatchMessagingMessageListenerAdapter messageListener = new BatchMessagingMessageListenerAdapter( + BatchMessagingMessageListenerAdapter messageListener = new BatchMessagingMessageListenerAdapter<>( this.bean, this.method, this.errorHandler); BatchToRecordAdapter batchToRecordAdapter = getBatchToRecordAdapter(); if (batchToRecordAdapter != null) { messageListener.setBatchToRecordAdapter(batchToRecordAdapter); } - if (messageConverter instanceof BatchMessageConverter) { - messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter); + if (messageConverter instanceof BatchMessageConverter batchMessageConverter) { + messageListener.setBatchMessageConverter(batchMessageConverter); } listener = messageListener; } else { - RecordMessagingMessageListenerAdapter messageListener = new RecordMessagingMessageListenerAdapter( + RecordMessagingMessageListenerAdapter messageListener = new RecordMessagingMessageListenerAdapter<>( this.bean, this.method, this.errorHandler); - if (messageConverter instanceof RecordMessageConverter) { - messageListener.setMessageConverter((RecordMessageConverter) messageConverter); + if (messageConverter instanceof RecordMessageConverter recordMessageConverter) { + messageListener.setMessageConverter(recordMessageConverter); } listener = messageListener; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java index 80fc064f17..577a90eb49 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,7 +71,7 @@ public void setValidator(Validator validator) { @Override protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) { - List invocableHandlerMethods = new ArrayList(); + List invocableHandlerMethods = new ArrayList<>(); InvocableHandlerMethod defaultHandler = null; for (Method method : this.methods) { InvocableHandlerMethod handler = getMessageHandlerMethodFactory() diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java index b51909fa52..010ae1a863 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,10 +67,9 @@ public static Object buildConsumerRecordMetadataFromArray(Object... data) { */ @Nullable public static ConsumerRecordMetadata buildConsumerRecordMetadata(Object data) { - if (!(data instanceof ConsumerRecord)) { + if (!(data instanceof ConsumerRecord record)) { return null; } - ConsumerRecord record = (ConsumerRecord) data; return new ConsumerRecordMetadata(new RecordMetadata(new TopicPartition(record.topic(), record.partition()), record.offset(), 0, record.timestamp(), record.serializedKeySize(), record.serializedValueSize()), record.timestampType()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index 44f0784b51..d89f83dae4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -112,8 +112,8 @@ public DelegatingInvocableHandler(List handlers, this.bean = bean; this.resolver = beanExpressionResolver; this.beanExpressionContext = beanExpressionContext; - this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory - ? (ConfigurableListableBeanFactory) beanFactory + this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory + ? configurableListableBeanFactory : null; this.validator = validator == null ? null : new PayloadValidator(validator); } @@ -124,7 +124,7 @@ private void checkSpecial(@Nullable InvocableHandlerMethod handler) { } Parameter[] parameters = handler.getMethod().getParameters(); for (Parameter parameter : parameters) { - if (parameter.getType().equals(ConsumerRecordMetadata.class)) { + if (ConsumerRecordMetadata.class.equals(parameter.getType())) { this.handlerMetadataAware.put(handler, true); return; } @@ -148,7 +148,7 @@ public Object getBean() { * or the method raised an exception. */ public Object invoke(Message message, Object... providedArgs) throws Exception { //NOSONAR - Class payloadClass = message.getPayload().getClass(); + Class payloadClass = message.getPayload().getClass(); InvocableHandlerMethod handler = getHandlerForPayload(payloadClass); if (this.validator != null && this.defaultHandler != null) { MethodParameter parameter = this.payloadMethodParameters.get(handler); @@ -175,7 +175,7 @@ public Object invoke(Message message, Object... providedArgs) throws Exceptio * @param payloadClass the payload class. * @return the handler. */ - protected InvocableHandlerMethod getHandlerForPayload(Class payloadClass) { + protected InvocableHandlerMethod getHandlerForPayload(Class payloadClass) { InvocableHandlerMethod handler = this.cachedHandlers.get(payloadClass); if (handler == null) { handler = findHandlerForPayload(payloadClass); @@ -246,15 +246,12 @@ protected InvocableHandlerMethod findHandlerForPayload(Class p InvocableHandlerMethod result = null; for (InvocableHandlerMethod handler : this.handlers) { if (matchHandlerMethod(payloadClass, handler)) { - if (result != null) { - boolean resultIsDefault = result.equals(this.defaultHandler); - if (!handler.equals(this.defaultHandler) && !resultIsDefault) { + if (result != null && !result.equals(this.defaultHandler)) { + if (!handler.equals(this.defaultHandler)) { throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " + result.getMethod().getName() + " and " + handler.getMethod().getName()); } - if (!resultIsDefault) { - continue; // otherwise replace the result with the actual match - } + continue; // otherwise replace the result with the actual match } result = handler; } @@ -262,20 +259,19 @@ protected InvocableHandlerMethod findHandlerForPayload(Class p return result != null ? result : this.defaultHandler; } - protected boolean matchHandlerMethod(Class payloadClass, InvocableHandlerMethod handler) { + protected boolean matchHandlerMethod(Class payloadClass, InvocableHandlerMethod handler) { Method method = handler.getMethod(); Annotation[][] parameterAnnotations = method.getParameterAnnotations(); // Single param; no annotation or not @Header if (parameterAnnotations.length == 1) { MethodParameter methodParameter = new MethodParameter(method, 0); - if ((methodParameter.getParameterAnnotations().length == 0 - || !methodParameter.hasParameterAnnotation(Header.class)) - && methodParameter.getParameterType().isAssignableFrom(payloadClass)) { + boolean isPayload = assignPayload(methodParameter, payloadClass); + if (isPayload) { if (this.validator != null) { this.payloadMethodParameters.put(handler, methodParameter); } - return true; } + return isPayload; } MethodParameter foundCandidate = findCandidate(payloadClass, method, parameterAnnotations); @@ -285,14 +281,12 @@ protected boolean matchHandlerMethod(Class payloadClass, Invoc return foundCandidate != null; } - private MethodParameter findCandidate(Class payloadClass, Method method, - Annotation[][] parameterAnnotations) { + @Nullable + private MethodParameter findCandidate(Class payloadClass, Method method, Annotation[][] parameterAnnotations) { MethodParameter foundCandidate = null; for (int i = 0; i < parameterAnnotations.length; i++) { MethodParameter methodParameter = new MethodParameter(method, i); - if ((methodParameter.getParameterAnnotations().length == 0 - || !methodParameter.hasParameterAnnotation(Header.class)) - && methodParameter.getParameterType().isAssignableFrom(payloadClass)) { + if (assignPayload(methodParameter, payloadClass)) { if (foundCandidate != null) { throw new KafkaException("Ambiguous payload parameter for " + method.toGenericString()); } @@ -316,6 +310,12 @@ public boolean hasDefaultHandler() { return this.defaultHandler != null; } + private boolean assignPayload(MethodParameter methodParameter, Class payloadClass) { + return (methodParameter.getParameterAnnotations().length == 0 + || !methodParameter.hasParameterAnnotation(Header.class)) + && methodParameter.getParameterType().isAssignableFrom(payloadClass); + } + private static final class PayloadValidator extends PayloadMethodArgumentResolver { PayloadValidator(Validator validator) { @@ -323,8 +323,7 @@ private static final class PayloadValidator extends PayloadMethodArgumentResolve @Override @Nullable - public Message toMessage(Object payload, @Nullable - MessageHeaders headers) { + public Message toMessage(Object payload, @Nullable MessageHeaders headers) { return null; }