Skip to content

Commit

Permalink
Addressing PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
sobychacko committed Oct 15, 2024
1 parent 9c5ee3e commit d232029
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
Expand Down Expand Up @@ -2720,7 +2719,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (!(this.listener instanceof MessagingMessageListenerAdapter<?, ?>)) {
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
observation.error(e);
}
if (this.commonErrorHandler == null) {
Expand Down Expand Up @@ -2748,7 +2747,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
}
}
finally {
if (!(this.listener instanceof MessagingMessageListenerAdapter<?, ?>)) {
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
observation.stop();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,15 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) {
this.handlerMethod = handlerMethod;
}

/**
* Set the {@link ObservationRegistry} to handle observability.
* @param observationRegistry {@link ObservationRegistry} instance.
* @since 3.3.0
*/
public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

public boolean isAsyncReplies() {
return this.handlerMethod != null && this.handlerMethod.isAsyncReplies();
}
Expand Down Expand Up @@ -499,24 +508,27 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
invocationResult.messageReturnType() :
this.messageReturnType;

CompletableFuture<?> completableFutureResult;

if (monoPresent && result instanceof Mono<?> mono) {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
}
result = mono.toFuture();
completableFutureResult = mono.toFuture();
}
else if (!(result instanceof CompletableFuture<?>)) {
result = CompletableFuture.completedFuture(result);
completableFutureResult = CompletableFuture.completedFuture(result);
}
else {
completableFutureResult = (CompletableFuture<?>) result;
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
+ "otherwise the container will ack the message immediately");
}
}

((CompletableFuture<?>) result).whenComplete((r, t) -> {
completableFutureResult.whenComplete((r, t) -> {
try {
if (t == null) {
asyncSuccess(r, replyTopic, source, messageReturnType);
Expand Down Expand Up @@ -899,10 +911,6 @@ private boolean rawByParameterIsType(Type parameterType, Type type) {
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
}

public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

/**
* Root object for reply expression evaluation.
* @param request the request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ void observationErrorExceptionWhenCompletableFutureReturned(@Autowired Exception
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {

errorTemplate.send(OBSERVATION_ERROR_COMPLETABLE_FUTURE, "testError").get(10, TimeUnit.SECONDS);
Deque<SimpleSpan> spans = tracer.getSpans();
await().untilAsserted(() -> assertThat(spans).hasSize(2));
Expand Down

0 comments on commit d232029

Please sign in to comment.