Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvements in MessagingMessageListenerAdapter and KafkaUtils #2962

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
* @author Artem Bilan
* @author Venil Noronha
* @author Nathan Xu
* @author Wang ZhiYang
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware {

Expand Down Expand Up @@ -367,14 +368,12 @@ protected final Object invokeHandler(Object data, @Nullable Acknowledgment ackno
if (data instanceof List && !this.isConsumerRecordList) {
return this.handlerMethod.invoke(message, ack, consumer);
}
else if (this.hasMetadataParameter) {
return this.handlerMethod.invoke(message, data, ack, consumer,
AdapterUtils.buildConsumerRecordMetadata(data));
}
else {
if (this.hasMetadataParameter) {
return this.handlerMethod.invoke(message, data, ack, consumer,
AdapterUtils.buildConsumerRecordMetadata(data));
}
else {
return this.handlerMethod.invoke(message, data, ack, consumer);
}
return this.handlerMethod.invoke(message, data, ack, consumer);
}
}
catch (org.springframework.messaging.converter.MessageConversionException ex) {
Expand Down Expand Up @@ -475,34 +474,27 @@ else if (result instanceof Message<?> mResult) {
Message<?> reply = checkHeaders(mResult, topic, source);
this.replyTemplate.send(reply);
}
else {
if (result instanceof Iterable) {
Iterator<?> iterator = ((Iterable<?>) result).iterator();
boolean iterableOfMessages = false;
if (iterator.hasNext()) {
iterableOfMessages = iterator.next() instanceof Message;
}
if (iterableOfMessages || this.splitIterables) {
((Iterable<V>) result).forEach(v -> {
if (v instanceof Message<?> mv) {
Message<?> aReply = checkHeaders(mv, topic, source);
this.replyTemplate.send(aReply);
}
else {
this.replyTemplate.send(topic, v);
}
});
else if (result instanceof Iterable<?> iterable && (iterableOfMessages(iterable) || this.splitIterables)) {
iterable.forEach(v -> {
if (v instanceof Message<?> mv) {
Message<?> aReply = checkHeaders(mv, topic, source);
this.replyTemplate.send(aReply);
}
else {
sendSingleResult(result, topic, source);
this.replyTemplate.send(topic, v);
}
}
else {
sendSingleResult(result, topic, source);
}
});
}
else {
sendSingleResult(result, topic, source);
}
}

private boolean iterableOfMessages(Iterable<?> iterable) {
Iterator<?> iterator = iterable.iterator();
return iterator.hasNext() && iterator.next() instanceof Message;
}

private Message<?> checkHeaders(Message<?> reply, @Nullable String topic, @Nullable Object source) { // NOSONAR (complexity)
MessageHeaders headers = reply.getHeaders();
boolean needsTopic = topic != null && headers.get(KafkaHeaders.TOPIC) == null;
Expand Down Expand Up @@ -647,33 +639,15 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
break;
}
}
else if (isAck) {
else if (isAck || isConsumer || annotationHeaderIsGroupId(methodParameter)) {
allowedBatchParameters++;
}
else if (methodParameter.hasParameterAnnotation(Header.class)) {
Header header = methodParameter.getParameterAnnotation(Header.class);
if (header != null && KafkaHeaders.GROUP_ID.equals(header.value())) {
allowedBatchParameters++;
}
}
else {
if (isConsumer) {
allowedBatchParameters++;
}
else {
if (parameterType instanceof ParameterizedType paramType
&& paramType.getRawType().equals(Consumer.class)) {
allowedBatchParameters++;
}
}
}
}

if (notConvertibleParameters == method.getParameterCount() && method.getReturnType().equals(void.class)) {
this.conversionNeeded = false;
}
boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters;

if (!validParametersForBatch) {
String stateMessage = "A parameter of type '%s' must be the only parameter "
+ "(except for an optional 'Acknowledgment' and/or 'Consumer' "
Expand All @@ -693,19 +667,17 @@ private Type extractGenericParameterTypFromMethodParameter(MethodParameter metho
Type genericParameterType = methodParameter.getGenericParameterType();
if (genericParameterType instanceof ParameterizedType parameterizedType) {
if (parameterizedType.getRawType().equals(Message.class)) {
genericParameterType = ((ParameterizedType) genericParameterType).getActualTypeArguments()[0];
genericParameterType = parameterizedType.getActualTypeArguments()[0];
}
else if (parameterizedType.getRawType().equals(List.class)
&& parameterizedType.getActualTypeArguments().length == 1) {

Type paramType = parameterizedType.getActualTypeArguments()[0];
this.isConsumerRecordList = paramType.equals(ConsumerRecord.class)
|| (isSimpleListOfConsumerRecord(paramType)
|| isListOfConsumerRecordUpperBounded(paramType));
boolean messageWithGeneric = isMessageWithGeneric(paramType);
this.isMessageList = paramType.equals(Message.class) || messageWithGeneric;
Type paramType = getTypeFromWildCardWithUpperBound(parameterizedType.getActualTypeArguments()[0]);
this.isConsumerRecordList = parameterIsType(paramType, ConsumerRecord.class);
boolean messageWithGeneric = rawByParameterIsType(paramType, Message.class);
this.isMessageList = Message.class.equals(paramType) || messageWithGeneric;
if (messageWithGeneric) {
genericParameterType = messagePayloadType(paramType);
genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0];
}
}
else {
Expand All @@ -715,57 +687,33 @@ else if (parameterizedType.getRawType().equals(List.class)
return genericParameterType;
}

private Type messagePayloadType(Type paramType) {
if (paramType instanceof ParameterizedType pType) {
return pType.getActualTypeArguments()[0];
}
else {
return ((ParameterizedType) ((WildcardType) paramType).getUpperBounds()[0]).getActualTypeArguments()[0];
}
}

private boolean isMessageWithGeneric(Type paramType) {
return (paramType instanceof ParameterizedType pType
&& pType.getRawType().equals(Message.class))
|| (isWildCardWithUpperBound(paramType)
&& ((WildcardType) paramType).getUpperBounds()[0] instanceof ParameterizedType wildCardZero
&& wildCardZero.getRawType().equals(Message.class));
}

private boolean isSimpleListOfConsumerRecord(Type paramType) {
return paramType instanceof ParameterizedType pType && pType.getRawType().equals(ConsumerRecord.class);
}

private boolean isListOfConsumerRecordUpperBounded(Type paramType) {
return isWildCardWithUpperBound(paramType)
&& ((WildcardType) paramType).getUpperBounds()[0] instanceof ParameterizedType wildCardZero
&& wildCardZero.getRawType().equals(ConsumerRecord.class);
private boolean annotationHeaderIsGroupId(MethodParameter methodParameter) {
Header header = methodParameter.getParameterAnnotation(Header.class);
return header != null && KafkaHeaders.GROUP_ID.equals(header.value());
}

private boolean isWildCardWithUpperBound(Type paramType) {
return paramType instanceof WildcardType wcType
&& wcType.getUpperBounds() != null
&& wcType.getUpperBounds().length > 0;
private Type getTypeFromWildCardWithUpperBound(Type paramType) {
if (paramType instanceof WildcardType wcType
&& wcType.getUpperBounds() != null
&& wcType.getUpperBounds().length > 0) {
paramType = wcType.getUpperBounds()[0];
}
return paramType;
}

private boolean isMessageWithNoTypeInfo(Type parameterType) {
if (parameterType instanceof ParameterizedType parameterizedType) {
Type rawType = parameterizedType.getRawType();
if (rawType.equals(Message.class)) {
return parameterizedType.getActualTypeArguments()[0] instanceof WildcardType;
}
if (parameterType instanceof ParameterizedType pType && pType.getRawType().equals(Message.class)) {
return pType.getActualTypeArguments()[0] instanceof WildcardType;
}
return parameterType.equals(Message.class); // could be Message without a generic type
return Message.class.equals(parameterType); // could be Message without a generic type
}

private boolean parameterIsType(Type parameterType, Type type) {
if (parameterType instanceof ParameterizedType parameterizedType) {
Type rawType = parameterizedType.getRawType();
if (rawType.equals(type)) {
return true;
}
}
return parameterType.equals(type);
return parameterType.equals(type) || rawByParameterIsType(parameterType, type);
}

private boolean rawByParameterIsType(Type parameterType, Type type) {
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* Utility methods.
*
* @author Gary Russell
* @author Wang ZhiYang
*
* @since 2.2
*
Expand Down Expand Up @@ -64,26 +65,16 @@ public final class KafkaUtils {
*/
public static boolean returnTypeMessageOrCollectionOf(Method method) {
Type returnType = method.getGenericReturnType();
if (returnType.equals(Message.class)) {
return true;
}
if (returnType instanceof ParameterizedType) {
ParameterizedType prt = (ParameterizedType) returnType;
Type rawType = prt.getRawType();
if (rawType.equals(Message.class)) {
return true;
if (returnType instanceof ParameterizedType prt) {
returnType = prt.getRawType();
if (Collection.class.equals(returnType)) {
returnType = prt.getActualTypeArguments()[0];
}
if (rawType.equals(Collection.class)) {
Type collectionType = prt.getActualTypeArguments()[0];
if (collectionType.equals(Message.class)) {
return true;
}
return collectionType instanceof ParameterizedType
&& ((ParameterizedType) collectionType).getRawType().equals(Message.class);
if (returnType instanceof ParameterizedType pType) {
returnType = pType.getRawType();
}
}
return false;

return Message.class.equals(returnType);
}

/**
Expand Down Expand Up @@ -130,12 +121,12 @@ public static void clearConsumerGroupId() {
*/
public static Duration determineSendTimeout(Map<String, Object> producerProps, long buffer, long min) {
Object dt = producerProps.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
if (dt instanceof Number) {
return Duration.ofMillis(Math.max(((Number) dt).longValue() + buffer, min));
if (dt instanceof Number number) {
return Duration.ofMillis(Math.max(number.longValue() + buffer, min));
}
else if (dt instanceof String) {
else if (dt instanceof String str) {
try {
return Duration.ofMillis(Math.max(Long.parseLong((String) dt) + buffer, min));
return Duration.ofMillis(Math.max(Long.parseLong(str) + buffer, min));
}
catch (@SuppressWarnings("unused") NumberFormatException ex) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,6 @@ public static class Config implements KafkaListenerConfigurer {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@SuppressWarnings("unchecked")
@Bean
public MeterRegistry meterRegistry() {
return new SimpleMeterRegistry();
Expand Down