diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 3c43335c45..0b9b7097fc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -81,6 +81,7 @@ * @author Artem Bilan * @author Venil Noronha * @author Nathan Xu + * @author Wang ZhiYang */ public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware { @@ -367,14 +368,12 @@ protected final Object invokeHandler(Object data, @Nullable Acknowledgment ackno if (data instanceof List && !this.isConsumerRecordList) { return this.handlerMethod.invoke(message, ack, consumer); } + else if (this.hasMetadataParameter) { + return this.handlerMethod.invoke(message, data, ack, consumer, + AdapterUtils.buildConsumerRecordMetadata(data)); + } else { - if (this.hasMetadataParameter) { - return this.handlerMethod.invoke(message, data, ack, consumer, - AdapterUtils.buildConsumerRecordMetadata(data)); - } - else { - return this.handlerMethod.invoke(message, data, ack, consumer); - } + return this.handlerMethod.invoke(message, data, ack, consumer); } } catch (org.springframework.messaging.converter.MessageConversionException ex) { @@ -475,34 +474,27 @@ else if (result instanceof Message mResult) { Message reply = checkHeaders(mResult, topic, source); this.replyTemplate.send(reply); } - else { - if (result instanceof Iterable) { - Iterator iterator = ((Iterable) result).iterator(); - boolean iterableOfMessages = false; - if (iterator.hasNext()) { - iterableOfMessages = iterator.next() instanceof Message; - } - if (iterableOfMessages || this.splitIterables) { - ((Iterable) result).forEach(v -> { - if (v instanceof Message mv) { - Message aReply = checkHeaders(mv, topic, source); - this.replyTemplate.send(aReply); - } - else { - this.replyTemplate.send(topic, v); - } - }); + else if (result instanceof Iterable iterable && (iterableOfMessages(iterable) || this.splitIterables)) { + iterable.forEach(v -> { + if (v instanceof Message mv) { + Message aReply = checkHeaders(mv, topic, source); + this.replyTemplate.send(aReply); } else { - sendSingleResult(result, topic, source); + this.replyTemplate.send(topic, v); } - } - else { - sendSingleResult(result, topic, source); - } + }); + } + else { + sendSingleResult(result, topic, source); } } + private boolean iterableOfMessages(Iterable iterable) { + Iterator iterator = iterable.iterator(); + return iterator.hasNext() && iterator.next() instanceof Message; + } + private Message checkHeaders(Message reply, @Nullable String topic, @Nullable Object source) { // NOSONAR (complexity) MessageHeaders headers = reply.getHeaders(); boolean needsTopic = topic != null && headers.get(KafkaHeaders.TOPIC) == null; @@ -647,33 +639,15 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity break; } } - else if (isAck) { + else if (isAck || isConsumer || annotationHeaderIsGroupId(methodParameter)) { allowedBatchParameters++; } - else if (methodParameter.hasParameterAnnotation(Header.class)) { - Header header = methodParameter.getParameterAnnotation(Header.class); - if (header != null && KafkaHeaders.GROUP_ID.equals(header.value())) { - allowedBatchParameters++; - } - } - else { - if (isConsumer) { - allowedBatchParameters++; - } - else { - if (parameterType instanceof ParameterizedType paramType - && paramType.getRawType().equals(Consumer.class)) { - allowedBatchParameters++; - } - } - } } if (notConvertibleParameters == method.getParameterCount() && method.getReturnType().equals(void.class)) { this.conversionNeeded = false; } boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters; - if (!validParametersForBatch) { String stateMessage = "A parameter of type '%s' must be the only parameter " + "(except for an optional 'Acknowledgment' and/or 'Consumer' " @@ -693,19 +667,17 @@ private Type extractGenericParameterTypFromMethodParameter(MethodParameter metho Type genericParameterType = methodParameter.getGenericParameterType(); if (genericParameterType instanceof ParameterizedType parameterizedType) { if (parameterizedType.getRawType().equals(Message.class)) { - genericParameterType = ((ParameterizedType) genericParameterType).getActualTypeArguments()[0]; + genericParameterType = parameterizedType.getActualTypeArguments()[0]; } else if (parameterizedType.getRawType().equals(List.class) && parameterizedType.getActualTypeArguments().length == 1) { - Type paramType = parameterizedType.getActualTypeArguments()[0]; - this.isConsumerRecordList = paramType.equals(ConsumerRecord.class) - || (isSimpleListOfConsumerRecord(paramType) - || isListOfConsumerRecordUpperBounded(paramType)); - boolean messageWithGeneric = isMessageWithGeneric(paramType); - this.isMessageList = paramType.equals(Message.class) || messageWithGeneric; + Type paramType = getTypeFromWildCardWithUpperBound(parameterizedType.getActualTypeArguments()[0]); + this.isConsumerRecordList = parameterIsType(paramType, ConsumerRecord.class); + boolean messageWithGeneric = rawByParameterIsType(paramType, Message.class); + this.isMessageList = Message.class.equals(paramType) || messageWithGeneric; if (messageWithGeneric) { - genericParameterType = messagePayloadType(paramType); + genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0]; } } else { @@ -715,57 +687,33 @@ else if (parameterizedType.getRawType().equals(List.class) return genericParameterType; } - private Type messagePayloadType(Type paramType) { - if (paramType instanceof ParameterizedType pType) { - return pType.getActualTypeArguments()[0]; - } - else { - return ((ParameterizedType) ((WildcardType) paramType).getUpperBounds()[0]).getActualTypeArguments()[0]; - } - } - - private boolean isMessageWithGeneric(Type paramType) { - return (paramType instanceof ParameterizedType pType - && pType.getRawType().equals(Message.class)) - || (isWildCardWithUpperBound(paramType) - && ((WildcardType) paramType).getUpperBounds()[0] instanceof ParameterizedType wildCardZero - && wildCardZero.getRawType().equals(Message.class)); - } - - private boolean isSimpleListOfConsumerRecord(Type paramType) { - return paramType instanceof ParameterizedType pType && pType.getRawType().equals(ConsumerRecord.class); - } - - private boolean isListOfConsumerRecordUpperBounded(Type paramType) { - return isWildCardWithUpperBound(paramType) - && ((WildcardType) paramType).getUpperBounds()[0] instanceof ParameterizedType wildCardZero - && wildCardZero.getRawType().equals(ConsumerRecord.class); + private boolean annotationHeaderIsGroupId(MethodParameter methodParameter) { + Header header = methodParameter.getParameterAnnotation(Header.class); + return header != null && KafkaHeaders.GROUP_ID.equals(header.value()); } - private boolean isWildCardWithUpperBound(Type paramType) { - return paramType instanceof WildcardType wcType - && wcType.getUpperBounds() != null - && wcType.getUpperBounds().length > 0; + private Type getTypeFromWildCardWithUpperBound(Type paramType) { + if (paramType instanceof WildcardType wcType + && wcType.getUpperBounds() != null + && wcType.getUpperBounds().length > 0) { + paramType = wcType.getUpperBounds()[0]; + } + return paramType; } private boolean isMessageWithNoTypeInfo(Type parameterType) { - if (parameterType instanceof ParameterizedType parameterizedType) { - Type rawType = parameterizedType.getRawType(); - if (rawType.equals(Message.class)) { - return parameterizedType.getActualTypeArguments()[0] instanceof WildcardType; - } + if (parameterType instanceof ParameterizedType pType && pType.getRawType().equals(Message.class)) { + return pType.getActualTypeArguments()[0] instanceof WildcardType; } - return parameterType.equals(Message.class); // could be Message without a generic type + return Message.class.equals(parameterType); // could be Message without a generic type } private boolean parameterIsType(Type parameterType, Type type) { - if (parameterType instanceof ParameterizedType parameterizedType) { - Type rawType = parameterizedType.getRawType(); - if (rawType.equals(type)) { - return true; - } - } - return parameterType.equals(type); + return parameterType.equals(type) || rawByParameterIsType(parameterType, type); + } + + private boolean rawByParameterIsType(Type parameterType, Type type) { + return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java index 79dd2e83b2..616e6b224d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java @@ -37,6 +37,7 @@ * Utility methods. * * @author Gary Russell + * @author Wang ZhiYang * * @since 2.2 * @@ -64,26 +65,16 @@ public final class KafkaUtils { */ public static boolean returnTypeMessageOrCollectionOf(Method method) { Type returnType = method.getGenericReturnType(); - if (returnType.equals(Message.class)) { - return true; - } - if (returnType instanceof ParameterizedType) { - ParameterizedType prt = (ParameterizedType) returnType; - Type rawType = prt.getRawType(); - if (rawType.equals(Message.class)) { - return true; + if (returnType instanceof ParameterizedType prt) { + returnType = prt.getRawType(); + if (Collection.class.equals(returnType)) { + returnType = prt.getActualTypeArguments()[0]; } - if (rawType.equals(Collection.class)) { - Type collectionType = prt.getActualTypeArguments()[0]; - if (collectionType.equals(Message.class)) { - return true; - } - return collectionType instanceof ParameterizedType - && ((ParameterizedType) collectionType).getRawType().equals(Message.class); + if (returnType instanceof ParameterizedType pType) { + returnType = pType.getRawType(); } } - return false; - + return Message.class.equals(returnType); } /** @@ -130,12 +121,12 @@ public static void clearConsumerGroupId() { */ public static Duration determineSendTimeout(Map producerProps, long buffer, long min) { Object dt = producerProps.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); - if (dt instanceof Number) { - return Duration.ofMillis(Math.max(((Number) dt).longValue() + buffer, min)); + if (dt instanceof Number number) { + return Duration.ofMillis(Math.max(number.longValue() + buffer, min)); } - else if (dt instanceof String) { + else if (dt instanceof String str) { try { - return Duration.ofMillis(Math.max(Long.parseLong((String) dt) + buffer, min)); + return Duration.ofMillis(Math.max(Long.parseLong(str) + buffer, min)); } catch (@SuppressWarnings("unused") NumberFormatException ex) { } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 76713f8e24..29ae413970 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -1083,7 +1083,6 @@ public static class Config implements KafkaListenerConfigurer { @Autowired private EmbeddedKafkaBroker embeddedKafka; - @SuppressWarnings("unchecked") @Bean public MeterRegistry meterRegistry() { return new SimpleMeterRegistry();