From 22c77af366b8cd092fe8ab843a1c0781b5fbe726 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Mon, 8 Jul 2024 17:29:55 +0530 Subject: [PATCH 1/2] MINOR: Commit the acknowledgements for fetched records in ShareConsumerPerformance (#1411) --- .../java/org/apache/kafka/tools/ShareConsumerPerformance.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index ad74234f05615..b2d7ccdbb2c65 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -73,6 +73,7 @@ public static void main(String[] args) { Map metrics = null; if (options.printMetrics()) metrics = shareConsumer.metrics(); + shareConsumer.commitAsync(); shareConsumer.close(); // print final stats From dfcf22c3369c627e942edf1844a7b6c29f649382 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Tue, 9 Jul 2024 09:33:08 +0100 Subject: [PATCH 2/2] AKCORE-243: Tidy up sending of acknowledgements when consumer is closed (#1412) * AKCORE-243: Tidy up sending of acknowledgements when consumer is closed --- .../internals/ConsumerNetworkThread.java | 4 +- .../internals/NetworkClientDelegate.java | 7 + .../internals/ShareConsumeRequestManager.java | 369 +++++++++++------- .../consumer/internals/ShareConsumerImpl.java | 51 ++- .../internals/events/ApplicationEvent.java | 3 +- .../events/ApplicationEventProcessor.java | 38 +- ...java => ShareAcknowledgeOnCloseEvent.java} | 6 +- ...java => ShareSubscriptionChangeEvent.java} | 4 +- ...nEvent.java => ShareUnsubscribeEvent.java} | 6 +- .../ShareConsumeRequestManagerTest.java | 182 +++++---- .../internals/ShareConsumerImplTest.java | 42 +- .../kafka/server/SharePartitionManager.java | 4 +- .../kafka/test/api/ShareConsumerTest.java | 167 +++----- .../group/share/PersisterStateManager.java | 4 +- 14 files changed, 491 insertions(+), 396 deletions(-) rename clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{ShareLeaveOnCloseEvent.java => ShareAcknowledgeOnCloseEvent.java} (83%) rename clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{ShareSubscriptionChangeApplicationEvent.java => ShareSubscriptionChangeEvent.java} (89%) rename clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{ShareUnsubscribeApplicationEvent.java => ShareUnsubscribeEvent.java} (87%) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index adee6594603bb..65906e43d0eec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -293,12 +293,12 @@ private void closeInternal(final Duration timeout) { * Check the unsent queue one last time and poll until all requests are sent or the timer runs out. */ private void sendUnsentRequests(final Timer timer) { - if (networkClientDelegate.unsentRequests().isEmpty()) + if (!networkClientDelegate.hasAnyPendingRequests()) return; do { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); - } while (timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()); + } while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests()); } void cleanup() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 6c2f0fac921d4..e43f79995adc4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -131,6 +131,13 @@ public void poll(final long timeoutMs, final long currentTimeMs) { checkDisconnects(currentTimeMs); } + /** + * Return true if there is at least one in-flight request or unsent request. + */ + public boolean hasAnyPendingRequests() { + return client.hasInFlightRequests() || !unsentRequests.isEmpty(); + } + /** * Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will * find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 4f64a4e75c384..880e38a90037a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -85,6 +85,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private final Queue acknowledgeRequestStates; private final long retryBackoffMs; private final long retryBackoffMaxMs; + private boolean closing = false; ShareConsumeRequestManager(final Time time, final LogContext logContext, @@ -122,7 +123,7 @@ public PollResult poll(long currentTimeMs) { } // Send any pending acknowledgements before fetching more records. - PollResult pollResult = processAcknowledgements(currentTimeMs, false); + PollResult pollResult = processAcknowledgements(currentTimeMs); if (pollResult != null) { return pollResult; } @@ -191,63 +192,6 @@ public PollResult poll(long currentTimeMs) { return new PollResult(requests); } - @Override - public PollResult pollOnClose() { - if (memberId == null) { - return PollResult.EMPTY; - } - long currentTimeMs = Time.SYSTEM.milliseconds(); - - PollResult pollResult = processAcknowledgements(currentTimeMs, true); - if (pollResult != null) { - return pollResult; - } - - final Cluster cluster = metadata.fetch(); - List requests = new LinkedList<>(); - - sessionHandlers.forEach((nodeId, sessionHandler) -> { - sessionHandler.notifyClose(); - Node node = cluster.nodeById(nodeId); - if (node != null) { - Map acknowledgementsMapForNode = new HashMap<>(); - for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - Acknowledgements acknowledgements = fetchAcknowledgementsMap.get(tip); - if (acknowledgements != null) { - acknowledgementsMapForNode.put(tip, acknowledgements); - sessionHandler.addPartitionToFetch(tip, acknowledgements); - - metricsManager.recordAcknowledgementSent(acknowledgements.size()); - log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node); - } - } - - ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig); - if (requestBuilder != null) { - AcknowledgeRequestState requestState = new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName(), - 0L, - retryBackoffMs, - retryBackoffMaxMs, - node.id(), - acknowledgementsMapForNode, - Optional.empty()); - - BiConsumer responseHandler = (clientResponse, error) -> { - if (error != null) { - handleShareAcknowledgeCloseFailure(node, requestBuilder.data(), requestState, error, currentTimeMs); - } else { - handleShareAcknowledgeCloseSuccess(node, requestBuilder.data(), requestState, clientResponse, currentTimeMs); - } - }; - requests.add(new UnsentRequest(requestBuilder, Optional.of(node)).whenComplete(responseHandler)); - } - } - }); - - return new PollResult(requests); - } - public void fetch(Map acknowledgementsMap) { if (!fetchMoreRecords) { log.debug("Fetch more data"); @@ -256,20 +200,14 @@ public void fetch(Map acknowledgementsMap) { acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge)); } - public void acknowledgeOnClose(Map acknowledgementsMap) { - acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge)); - } - /** - * Process acknowledgeRequestStates and prepares a - * list of acknowledgements to be sent in the poll(). + * Process acknowledgeRequestStates and prepares a list of acknowledgements to be sent in the poll(). * * @param currentTimeMs the current time in ms. - * @param onClose True if called during close * * @return the PollResult containing zero or more acknowledgements. */ - private PollResult processAcknowledgements(long currentTimeMs, boolean onClose) { + private PollResult processAcknowledgements(long currentTimeMs) { List unsentRequests = new ArrayList<>(); Iterator iterator = acknowledgeRequestStates.iterator(); while (iterator.hasNext()) { @@ -282,20 +220,7 @@ private PollResult processAcknowledgements(long currentTimeMs, boolean onClose) } else { if (acknowledgeRequestState.canSendRequest(currentTimeMs)) { acknowledgeRequestState.onSendAttempt(currentTimeMs); - UnsentRequest request; - if (onClose) { - request = acknowledgeRequestState.buildRequest( - this::handleShareAcknowledgeCloseSuccess, - this::handleShareAcknowledgeCloseFailure, - currentTimeMs, - true); - } else { - request = acknowledgeRequestState.buildRequest( - this::handleShareAcknowledgeSuccess, - this::handleShareAcknowledgeFailure, - currentTimeMs, - false); - } + UnsentRequest request = acknowledgeRequestState.buildRequest(currentTimeMs); if (request != null) { unsentRequests.add(request); } @@ -350,21 +275,25 @@ public CompletableFuture> commitSync( metricsManager.recordAcknowledgementSent(acknowledgements.size()); log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node); + resultCount.incrementAndGet(); } - resultCount.incrementAndGet(); } acknowledgeRequestStates.add(new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName(), + ShareConsumeRequestManager.class.getSimpleName() + ":1", deadlineMs, retryBackoffMs, retryBackoffMaxMs, + sessionHandler, nodeId, acknowledgementsMapForNode, - Optional.of(resultHandler) + this::handleShareAcknowledgeSuccess, + this::handleShareAcknowledgeFailure, + resultHandler )); } }); + resultHandler.completeIfEmpty(); return future; } @@ -389,20 +318,78 @@ public void commitAsync(final Map acknowledg metricsManager.recordAcknowledgementSent(acknowledgements.size()); log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node); + resultCount.incrementAndGet(); } - resultCount.incrementAndGet(); } acknowledgeRequestStates.add(new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName(), + ShareConsumeRequestManager.class.getSimpleName() + ":2", Long.MAX_VALUE, retryBackoffMs, retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + this::handleShareAcknowledgeSuccess, + this::handleShareAcknowledgeFailure, + resultHandler + )); + } + }); + + resultHandler.completeIfEmpty(); + } + + /** + * Enqueue the final AcknowledgeRequestState used to commit the final acknowledgements and + * close the share sessions. + * + * @param acknowledgementsMap The acknowledgements to commit + * @param deadlineMs Time until which the request will be retried if it fails with + * an expected retriable error. + * + * @return The future which completes when the acknowledgements finished + */ + public CompletableFuture acknowledgeOnClose(final Map acknowledgementsMap, + final long deadlineMs) { + final Cluster cluster = metadata.fetch(); + final AtomicInteger resultCount = new AtomicInteger(); + final CompletableFuture future = new CompletableFuture<>(); + final CloseResultHandler resultHandler = new CloseResultHandler(resultCount, future); + + closing = true; + + sessionHandlers.forEach((nodeId, sessionHandler) -> { + Node node = cluster.nodeById(nodeId); + if (node != null) { + Map acknowledgementsMapForNode = new HashMap<>(); + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { + Acknowledgements acknowledgements = acknowledgementsMap.get(tip); + if (acknowledgements != null) { + acknowledgementsMapForNode.put(tip, acknowledgements); + + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node); + resultCount.incrementAndGet(); + } + } + acknowledgeRequestStates.add(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":3", + deadlineMs, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, nodeId, acknowledgementsMapForNode, - Optional.of(resultHandler) + this::handleShareAcknowledgeCloseSuccess, + this::handleShareAcknowledgeCloseFailure, + resultHandler, + true )); } }); + + resultHandler.completeIfEmpty(); + return future; } private void handleShareFetchSuccess(Node fetchTarget, @@ -513,40 +500,40 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget, long currentTimeMs) { try { final ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody(); - final ShareSessionHandler handler = sessionHandler(fetchTarget.id()); - - if (handler == null) { - log.error("Unable to find ShareSessionHandler for node {}. Ignoring ShareAcknowledge response.", - fetchTarget.id()); - acknowledgeRequestState.onFailedAttempt(currentTimeMs); - return; - } + final ShareSessionHandler handler = acknowledgeRequestState.sessionHandler(); final short requestVersion = resp.requestHeader().apiVersion(); if (!handler.handleResponse(response, requestVersion)) { acknowledgeRequestState.onFailedAttempt(currentTimeMs); - if (response.error().exception() instanceof RetriableException) { - // For commitSync, we retry the request until the timer expires. - // For commitAsync, we do not retry irrespective of the error. - if (acknowledgeRequestState.isExpired()) { - return; - } + if (response.error().exception() instanceof RetriableException && !closing) { + // We retry the request until the timer expires, unless we are closing. + } else { + requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> { + TopicIdPartition tip = new TopicIdPartition(topic.topicId(), + partition.partitionIndex(), + metadata.topicNames().get(topic.topicId())); + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error()); + })); + + acknowledgeRequestState.processingComplete(); } - } + } else { + response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> { + TopicIdPartition tip = new TopicIdPartition(topic.topicId(), + partition.partitionIndex(), + metadata.topicNames().get(topic.topicId())); + if (partition.errorCode() != Errors.NONE.code()) { + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); + } + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode())); + })); - response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> { - TopicIdPartition tip = new TopicIdPartition(topic.topicId(), - partition.partitionIndex(), - metadata.topicNames().get(topic.topicId())); - if (partition.errorCode() != Errors.NONE.code()) { - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); - } - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode())); - })); + acknowledgeRequestState.processingComplete(); + } metricsManager.recordLatency(resp.requestLatencyMs()); - acknowledgeRequestState.isProcessed = true; } finally { log.debug("Removing pending request for node {} - success", fetchTarget); nodesWithPendingRequests.remove(fetchTarget.id()); @@ -559,10 +546,7 @@ private void handleShareAcknowledgeFailure(Node fetchTarget, Throwable error, long currentTimeMs) { try { - final ShareSessionHandler handler = sessionHandler(fetchTarget.id()); - if (handler != null) { - handler.handleError(error); - } + acknowledgeRequestState.sessionHandler().handleError(error); requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> { TopicIdPartition tip = new TopicIdPartition(topic.topicId(), @@ -596,6 +580,7 @@ private void handleShareAcknowledgeCloseSuccess(Node fetchTarget, })); metricsManager.recordLatency(resp.requestLatencyMs()); + acknowledgeRequestState.processingComplete(); } finally { log.debug("Removing pending request for node {} - success", fetchTarget); nodesWithPendingRequests.remove(fetchTarget.id()); @@ -609,10 +594,7 @@ private void handleShareAcknowledgeCloseFailure(Node fetchTarget, Throwable error, long currentTimeMs) { try { - final ShareSessionHandler handler = sessionHandler(fetchTarget.id()); - if (handler != null) { - handler.handleError(error); - } + acknowledgeRequestState.sessionHandler().handleError(error); requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> { TopicIdPartition tip = new TopicIdPartition(topic.topicId(), @@ -658,6 +640,11 @@ public void onMemberEpochUpdated(Optional memberEpochOpt, Optional acknowledgementsMap; + /** + * The handler to call on a successful response from ShareAcknowledge. + */ + private ResponseHandler successHandler; + + /** + * The handler to call on a failed response from ShareAcknowledge. + */ + private ResponseHandler errorHandler; + /** * Whether the request has been processed and will not be retried. */ private boolean isProcessed = false; /** - * For a synchronous request, this handles completing a future when all results are known. + * This handles completing a future when all results are known. */ - private final Optional resultHandler; + private final ResultHandler resultHandler; + + /** + * Whether this is the final acknowledge request state before the consumer closes. + */ + private final boolean onClose; + + AcknowledgeRequestState(LogContext logContext, + String owner, + long deadlineMs, + long retryBackoffMs, + long retryBackoffMaxMs, + ShareSessionHandler sessionHandler, + int nodeId, + Map acknowledgementsMap, + ResponseHandler successHandler, + ResponseHandler errorHandler, + ResultHandler resultHandler) { + this(logContext, owner, deadlineMs, retryBackoffMs, retryBackoffMaxMs, sessionHandler, nodeId, + acknowledgementsMap, successHandler, errorHandler, resultHandler, false); + } AcknowledgeRequestState(LogContext logContext, String owner, long deadlineMs, long retryBackoffMs, long retryBackoffMaxMs, + ShareSessionHandler sessionHandler, int nodeId, Map acknowledgementsMap, - Optional resultHandler) { + ResponseHandler successHandler, + ResponseHandler errorHandler, + ResultHandler resultHandler, + boolean onClose) { super(logContext, owner, retryBackoffMs, retryBackoffMaxMs, deadlineTimer(time, deadlineMs)); + this.sessionHandler = sessionHandler; this.nodeId = nodeId; this.acknowledgementsMap = acknowledgementsMap; + this.successHandler = successHandler; + this.errorHandler = errorHandler; this.resultHandler = resultHandler; + this.onClose = onClose; } - UnsentRequest buildRequest(ResponseHandler successHandler, - ResponseHandler errorHandler, - long currentTimeMs, - boolean onClose) { - ShareSessionHandler handler = sessionHandlers.get(nodeId); - if (handler == null) { - return null; - } - + UnsentRequest buildRequest(long currentTimeMs) { + // If this is the closing request, close the share session by setting the final epoch if (onClose) { - handler.notifyClose(); + sessionHandler.notifyClose(); } for (Map.Entry entry : acknowledgementsMap.entrySet()) { - handler.addPartitionToFetch(entry.getKey(), entry.getValue()); + sessionHandler.addPartitionToFetch(entry.getKey(), entry.getValue()); } - ShareAcknowledgeRequest.Builder requestBuilder = handler.newShareAcknowledgeBuilder(groupId, fetchConfig); + + ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig); Node nodeToSend = metadata.fetch().nodeById(nodeId); nodesWithPendingRequests.add(nodeId); @@ -717,7 +736,7 @@ UnsentRequest buildRequest(ResponseHandler successHandler, if (error != null) { onFailedAttempt(currentTimeMs); errorHandler.handle(nodeToSend, requestBuilder.data(), this, error, currentTimeMs); - isProcessed = true; + processingComplete(); } else { successHandler.handle(nodeToSend, requestBuilder.data(), this, clientResponse, currentTimeMs); } @@ -747,14 +766,17 @@ void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors acknowledgeErrorCod Acknowledgements acks = acknowledgementsMap.get(tip); if (acks != null) { acks.setAcknowledgeErrorCode(acknowledgeErrorCode); - if (resultHandler.isPresent()) { - resultHandler.get().complete(tip, acks); - } else { - Map acksMap = Collections.singletonMap(tip, acks); - ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(acksMap); - backgroundEventHandler.add(event); - } } + resultHandler.complete(tip, acks); + } + + ShareSessionHandler sessionHandler() { + return sessionHandler; + } + + void processingComplete() { + isProcessed = true; + resultHandler.completeIfEmpty(); } boolean isProcessed() { @@ -778,13 +800,26 @@ private interface ResponseHandler { void handle(Node target, ShareAcknowledgeRequestData request, AcknowledgeRequestState requestState, T response, long currentTimeMs); } + private interface ResultHandler { + /** + * Handle the result of a ShareAcknowledge request sent to one or more nodes and + * signal the completion when all results are known. + */ + void complete(TopicIdPartition partition, Acknowledgements acknowledgements); + + /** + * Handles the case where there are no results pending after initialization. + */ + void completeIfEmpty(); + } + /** * Sends a ShareAcknowledgeCommitCallback event to the application when it is done * processing all the remaining acknowledgement request states. * Also manages completing the future for synchronous acknowledgement commit by counting - * down the result as they are known and completing the future at the end. + * down the results as they are known and completing the future at the end. */ - class CommitResultHandler { + class CommitResultHandler implements ResultHandler { private final Map result; private final AtomicInteger remainingResults; private final Optional>> future; @@ -797,12 +832,56 @@ class CommitResultHandler { } public void complete(TopicIdPartition partition, Acknowledgements acknowledgements) { - result.put(partition, acknowledgements); + if (acknowledgements != null) { + result.put(partition, acknowledgements); + } if (remainingResults.decrementAndGet() == 0) { ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(result); backgroundEventHandler.add(event); future.ifPresent(future -> future.complete(result)); } } + + public void completeIfEmpty() { + if (remainingResults.get() == 0) { + future.ifPresent(future -> future.complete(result)); + } + } + } + + /** + * Sends a ShareAcknowledgeCommitCallback event to the application when it is done + * processing all the remaining acknowledgement request states. + * Also manages completing the future for acknowledgement commit by counting + * down the results as they are known and completing the future at the end. + */ + class CloseResultHandler implements ResultHandler { + private final Map result; + private final AtomicInteger remainingResults; + private final CompletableFuture future; + + CloseResultHandler(final AtomicInteger remainingResults, + final CompletableFuture future) { + result = new HashMap<>(); + this.remainingResults = remainingResults; + this.future = future; + } + + public void complete(TopicIdPartition partition, Acknowledgements acknowledgements) { + if (acknowledgements != null) { + result.put(partition, acknowledgements); + } + if (remainingResults.decrementAndGet() == 0) { + ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(result); + backgroundEventHandler.add(event); + future.complete(null); + } + } + + public void completeIfEmpty() { + if (remainingResults.get() == 0) { + future.complete(null); + } + } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 8864506f75017..9cd7d837fda27 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -40,12 +40,12 @@ import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent; -import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; +import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; -import org.apache.kafka.clients.consumer.internals.events.ShareLeaveOnCloseEvent; -import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; +import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -527,7 +527,7 @@ public void subscribe(final Collection topics) { // Trigger subscribe event to effectively join the group if not already part of it, // or just send the new subscription to the broker. - applicationEventHandler.add(new ShareSubscriptionChangeApplicationEvent()); + applicationEventHandler.add(new ShareSubscriptionChangeEvent()); } } finally { release(); @@ -541,7 +541,8 @@ public void subscribe(final Collection topics) { public void unsubscribe() { acquireAndEnsureOpen(); try { - ShareUnsubscribeApplicationEvent unsubscribeApplicationEvent = new ShareUnsubscribeApplicationEvent(); + Timer timer = time.timer(Long.MAX_VALUE); + ShareUnsubscribeEvent unsubscribeApplicationEvent = new ShareUnsubscribeEvent(calculateDeadlineMs(timer)); applicationEventHandler.add(unsubscribeApplicationEvent); log.info("Unsubscribing all topics"); @@ -836,14 +837,19 @@ private void close(final Duration timeout, final boolean swallowException) { closeTimer.update(); // Prepare shutting down the network thread - prepareShutdown(closeTimer, firstException); - closeTimer.update(); + swallow(log, Level.ERROR, "Failed to release assignment before closing consumer", + () -> sendAcknowledgementsAndLeaveGroup(closeTimer, firstException), firstException); + swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback", + this::handleCompletedAcknowledgements, firstException); if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); - swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback.", this::handleCompletedAcknowledgements, - firstException); closeTimer.update(); + // close() can be called from inside one of the constructors. In that case, it's possible that neither + // the reaper nor the background event queue were constructed, so check them first to avoid NPE. + if (backgroundEventReaper != null && backgroundEventQueue != null) + backgroundEventReaper.reap(backgroundEventQueue); + closeQuietly(kafkaShareConsumerMetrics, "kafka share consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); closeQuietly(deserializers, "consumer deserializers", firstException); @@ -863,14 +869,27 @@ private void close(final Duration timeout, final boolean swallowException) { /** * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: * 1. commit pending acknowledgements and close any share sessions - * 2. send leave group + * 2. leave the group */ - void prepareShutdown(final Timer timer, final AtomicReference firstException) { + private void sendAcknowledgementsAndLeaveGroup(final Timer timer, final AtomicReference firstException) { completeQuietly( - () -> { - applicationEventHandler.addAndGet(new ShareLeaveOnCloseEvent(acknowledgementsToSend(), calculateDeadlineMs(timer))); - }, - "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); + () -> { + applicationEventHandler.addAndGet(new ShareAcknowledgeOnCloseEvent(acknowledgementsToSend(), calculateDeadlineMs(timer))); + }, + "Failed to send pending acknowledgements with a timeout(ms)=" + timer.timeoutMs(), firstException); + timer.update(); + + ShareUnsubscribeEvent unsubscribeEvent = new ShareUnsubscribeEvent(calculateDeadlineMs(timer)); + applicationEventHandler.add(unsubscribeEvent); + try { + processBackgroundEvents(unsubscribeEvent.future(), timer); + log.info("Completed releasing assignment and leaving group to close consumer."); + } catch (TimeoutException e) { + log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " + + "complete it within {} ms. It will proceed to close.", timer.timeoutMs()); + } finally { + timer.update(); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index cd14f2823a52b..9f1695269b68e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -33,7 +33,8 @@ public enum Type { UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, COMMIT_ON_CLOSE, LEAVE_ON_CLOSE, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, - SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, SHARE_LEAVE_ON_CLOSE + SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, + SHARE_ACKNOWLEDGE_ON_CLOSE } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 6558535711785..41b795184a884 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -136,15 +135,15 @@ public void process(ApplicationEvent event) { return; case SHARE_SUBSCRIPTION_CHANGE: - process((ShareSubscriptionChangeApplicationEvent) event); + process((ShareSubscriptionChangeEvent) event); return; case SHARE_UNSUBSCRIBE: - process((ShareUnsubscribeApplicationEvent) event); + process((ShareUnsubscribeEvent) event); return; - case SHARE_LEAVE_ON_CLOSE: - process((ShareLeaveOnCloseEvent) event); + case SHARE_ACKNOWLEDGE_ON_CLOSE: + process((ShareAcknowledgeOnCloseEvent) event); return; default: @@ -328,7 +327,7 @@ private void process(final ShareAcknowledgeAsyncEvent event) { * consumer join the share group if it is not part of it yet, or send the updated subscription if * it is already a member. */ - private void process(final ShareSubscriptionChangeApplicationEvent ignored) { + private void process(final ShareSubscriptionChangeEvent ignored) { if (!requestManagers.shareHeartbeatRequestManager.isPresent()) { log.warn("Group membership manager not present when processing a subscribe event"); return; @@ -345,7 +344,7 @@ private void process(final ShareSubscriptionChangeApplicationEvent ignored) { * execution for releasing the assignment completes, and the request to leave * the group is sent out. */ - private void process(final ShareUnsubscribeApplicationEvent event) { + private void process(final ShareUnsubscribeEvent event) { if (!requestManagers.shareHeartbeatRequestManager.isPresent()) { KafkaException error = new KafkaException("Group membership manager not present when processing an unsubscribe event"); event.future().completeExceptionally(error); @@ -357,21 +356,22 @@ private void process(final ShareUnsubscribeApplicationEvent event) { future.whenComplete(complete(event.future())); } - private void process(final ShareLeaveOnCloseEvent event) { - if (!requestManagers.shareHeartbeatRequestManager.isPresent()) { - event.future().complete(null); + /** + * Process event indicating that the consumer is closing. This will make the consumer + * complete pending acknowledgements. + * + * @param event Acknowledge-on-close event containing a future that will complete when + * the acknowledgements have responses. + */ + private void process(final ShareAcknowledgeOnCloseEvent event) { + if (!requestManagers.shareConsumeRequestManager.isPresent()) { + KafkaException error = new KafkaException("Group membership manager not present when processing an acknowledge-on-close event"); + event.future().completeExceptionally(error); return; } - requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.acknowledgeOnClose(event.acknowledgementsMap())); - - ShareMembershipManager membershipManager = - Objects.requireNonNull(requestManagers.shareHeartbeatRequestManager.get().membershipManager(), - "Expecting membership manager to be non-null"); - - log.debug("Leaving group before closing"); - CompletableFuture future = membershipManager.leaveGroup(); - // The future will be completed on heartbeat sent + ShareConsumeRequestManager manager = requestManagers.shareConsumeRequestManager.get(); + CompletableFuture future = manager.acknowledgeOnClose(event.acknowledgementsMap(), event.deadlineMs()); future.whenComplete(complete(event.future())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareLeaveOnCloseEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java similarity index 83% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareLeaveOnCloseEvent.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java index c8f6c6da6dc90..23d4410d07bc2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareLeaveOnCloseEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java @@ -21,12 +21,12 @@ import java.util.Map; -public class ShareLeaveOnCloseEvent extends CompletableApplicationEvent { +public class ShareAcknowledgeOnCloseEvent extends CompletableApplicationEvent { private Map acknowledgementsMap; - public ShareLeaveOnCloseEvent(final Map acknowledgementsMap, final long deadlineMs) { - super(Type.SHARE_LEAVE_ON_CLOSE, deadlineMs); + public ShareAcknowledgeOnCloseEvent(final Map acknowledgementsMap, final long deadlineMs) { + super(Type.SHARE_ACKNOWLEDGE_ON_CLOSE, deadlineMs); this.acknowledgementsMap = acknowledgementsMap; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareSubscriptionChangeApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareSubscriptionChangeEvent.java similarity index 89% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareSubscriptionChangeApplicationEvent.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareSubscriptionChangeEvent.java index acc5b0bc97219..a6e00bf1059b9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareSubscriptionChangeApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareSubscriptionChangeEvent.java @@ -22,9 +22,9 @@ * calls the subscribe API. This will make the consumer join a share group if not part of it * yet, or just send the updated subscription to the broker if it's already a member of the group. */ -public class ShareSubscriptionChangeApplicationEvent extends ApplicationEvent { +public class ShareSubscriptionChangeEvent extends ApplicationEvent { - public ShareSubscriptionChangeApplicationEvent() { + public ShareSubscriptionChangeEvent() { super(Type.SHARE_SUBSCRIPTION_CHANGE); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareUnsubscribeApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareUnsubscribeEvent.java similarity index 87% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareUnsubscribeApplicationEvent.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareUnsubscribeEvent.java index 7378f5ea0bde3..c06d8f3d56e45 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareUnsubscribeApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareUnsubscribeEvent.java @@ -24,8 +24,8 @@ * complete and the heartbeat to leave the group is sent out (minimal effort to send the * leave group heartbeat, without waiting for any response or considering timeouts). */ -public class ShareUnsubscribeApplicationEvent extends CompletableApplicationEvent { - public ShareUnsubscribeApplicationEvent() { - super(Type.SHARE_UNSUBSCRIBE, Integer.MAX_VALUE); +public class ShareUnsubscribeEvent extends CompletableApplicationEvent { + public ShareUnsubscribeEvent(final long deadlineMs) { + super(Type.SHARE_UNSUBSCRIBE, deadlineMs); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index d313e9dc24642..4e75652e376fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -98,6 +98,7 @@ import static java.util.Collections.singletonMap; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -136,7 +137,7 @@ public class ShareConsumeRequestManagerTest { private ShareFetchMetricsManager metricsManager; private MockClient client; private Metrics metrics; - private TestableShareConsumeRequestManager fetcher; + private TestableShareConsumeRequestManager shareConsumeRequestManager; private TestableNetworkClientDelegate networkClientDelegate; private MemoryRecords records; private List acquiredRecords; @@ -170,27 +171,27 @@ private void assignFromSubscribed(Set partitions) { public void teardown() throws Exception { if (metrics != null) metrics.close(); - if (fetcher != null) - fetcher.close(); + if (shareConsumeRequestManager != null) + shareConsumeRequestManager.close(); } private int sendFetches() { - return fetcher.sendFetches(); + return shareConsumeRequestManager.sendFetches(); } @Test public void testFetchNormal() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(Collections.singleton(tp0)); // normal fetch assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Map>> partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); @@ -201,18 +202,18 @@ public void testFetchNormal() { @Test public void testFetchWithAcquiredRecords() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(Collections.singleton(tp0)); // normal fetch assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); client.prepareResponse(fullFetchResponse(tip0, records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Map>> partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); @@ -224,17 +225,17 @@ public void testFetchWithAcquiredRecords() { @Test public void testMultipleFetches() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(Collections.singleton(tp0)); assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); client.prepareResponse(fullFetchResponse(tip0, records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Map>> partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); @@ -245,15 +246,15 @@ public void testMultipleFetches() { Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); - fetcher.fetch(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); client.prepareResponse(fullFetchResponse(tip0, records, ShareCompletedFetchTest.acquiredRecords(2L, 1), Errors.NONE, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); assertEquals(1.0, metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue()); @@ -264,16 +265,16 @@ public void testMultipleFetches() { Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements2.add(2L, AcknowledgeType.REJECT); - fetcher.fetch(Collections.singletonMap(tip0, acknowledgements2)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements2)); assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); // Preparing a response with an acknowledgement error. client.prepareResponse(fullFetchResponse(tip0, records, Collections.emptyList(), Errors.NONE, Errors.INVALID_RECORD_STATE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); assertEquals(2.0, metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue()); assertEquals(1.0, @@ -286,63 +287,92 @@ public void testMultipleFetches() { @Test public void testCommitSync() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(Collections.singleton(tp0)); // normal fetch assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); acknowledgements.add(2L, AcknowledgeType.ACCEPT); acknowledgements.add(3L, AcknowledgeType.REJECT); - fetcher.commitSync(Collections.singletonMap(tip0, acknowledgements), time.milliseconds() + 2000); + shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), time.milliseconds() + 2000); - assertEquals(1, fetcher.sendAcknowledgements()); + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); } @Test public void testCommitAsync() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(Collections.singleton(tp0)); // normal fetch assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); acknowledgements.add(2L, AcknowledgeType.ACCEPT); acknowledgements.add(3L, AcknowledgeType.REJECT); - fetcher.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); - assertEquals(1, fetcher.sendAcknowledgements()); + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + } + + @Test + public void testAcknowledgeOnClose() { + buildRequestManager(); + + assignFromSubscribed(Collections.singleton(tp0)); + + // normal fetch + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + acknowledgements.add(2L, AcknowledgeType.ACCEPT); + acknowledgements.add(3L, AcknowledgeType.REJECT); + + shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0, acknowledgements), + calculateDeadlineMs(time.timer(100))); + + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); } @Test public void testMultipleTopicsFetch() { - buildFetcher(); + buildRequestManager(); Set partitions = new HashSet<>(); partitions.add(tp0); partitions.add(tp1); @@ -350,7 +380,7 @@ public void testMultipleTopicsFetch() { assignFromSubscribed(partitions); assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); LinkedHashMap partitionDataMap = new LinkedHashMap<>(); partitionDataMap.put(tip0, partitionDataForFetch(tip0, records, acquiredRecords, Errors.NONE, Errors.NONE)); @@ -358,7 +388,7 @@ public void testMultipleTopicsFetch() { client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, partitionDataMap, Collections.emptyList())); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); ShareFetch shareFetch = collectFetch(); assertEquals(1, shareFetch.records().size()); @@ -371,7 +401,7 @@ public void testMultipleTopicsFetch() { @Test public void testMultipleTopicsFetchError() { - buildFetcher(); + buildRequestManager(); Set partitions = new HashSet<>(); partitions.add(tp0); partitions.add(tp1); @@ -379,7 +409,7 @@ public void testMultipleTopicsFetchError() { assignFromSubscribed(partitions); assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); LinkedHashMap partitionDataMap = new LinkedHashMap<>(); partitionDataMap.put(tip1, partitionDataForFetch(tip1, records, emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE)); @@ -387,7 +417,7 @@ public void testMultipleTopicsFetchError() { client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, partitionDataMap, Collections.emptyList())); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); // The first call throws TopicAuthorizationException because there are no records ready to return when the // exception is noticed. @@ -403,27 +433,27 @@ public void testMultipleTopicsFetchError() { @Test public void testCloseShouldBeIdempotent() { - buildFetcher(); + buildRequestManager(); - fetcher.close(); - fetcher.close(); - fetcher.close(); + shareConsumeRequestManager.close(); + shareConsumeRequestManager.close(); + shareConsumeRequestManager.close(); - verify(fetcher, times(1)).closeInternal(); + verify(shareConsumeRequestManager, times(1)).closeInternal(); } @Test public void testFetchError() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(singleton(tp0)); assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); client.prepareResponse(fullFetchResponse(tip0, records, emptyAcquiredRecords, Errors.NOT_LEADER_OR_FOLLOWER)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Map>> partitionRecords = fetchRecords(); assertFalse(partitionRecords.containsKey(tp0)); @@ -431,7 +461,7 @@ public void testFetchError() { @Test public void testInvalidDefaultRecordBatch() { - buildFetcher(); + buildRequestManager(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteBufferOutputStream out = new ByteBufferOutputStream(buffer); @@ -470,7 +500,7 @@ public void testInvalidDefaultRecordBatch() { @Test public void testParseInvalidRecordBatch() { - buildFetcher(); + buildRequestManager(); MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), @@ -496,7 +526,7 @@ public void testParseInvalidRecordBatch() { @Test public void testHeaders() { - buildFetcher(); + buildRequestManager(); MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L); builder.append(0L, "key".getBytes(), "value-1".getBytes()); @@ -543,7 +573,7 @@ record = recordIterator.next(); @Test public void testUnauthorizedTopic() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(singleton(tp0)); @@ -560,7 +590,7 @@ public void testUnauthorizedTopic() { @Test public void testUnknownTopicIdError() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(singleton(tp0)); assertEquals(1, sendFetches()); @@ -575,7 +605,7 @@ public void testUnknownTopicIdError() { public void testHandleFetchResponseError(Errors error, boolean hasTopLevelError, boolean shouldRequestMetadataUpdate) { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(singleton(tp0)); assertEquals(1, sendFetches()); @@ -616,7 +646,7 @@ private static Stream handleFetchResponseErrorSupplier() { @Test public void testFetchDisconnected() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(singleton(tp0)); @@ -628,7 +658,7 @@ public void testFetchDisconnected() { @Test public void testFetchWithLastRecordMissingFromBatch() { - buildFetcher(); + buildRequestManager(); MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("0".getBytes(), "v".getBytes()), @@ -658,7 +688,7 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { ShareCompletedFetchTest.acquiredRecords(0L, 3), Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Map>> allFetchedRecords = fetchRecords(); assertTrue(allFetchedRecords.containsKey(tp0)); @@ -679,11 +709,11 @@ private MemoryRecords buildRecords(long baseOffset, int count, long firstMessage @Test public void testCorruptMessageError() { - buildFetcher(); + buildRequestManager(); assignFromSubscribed(singleton(tp0)); assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); // Prepare a response with the CORRUPT_MESSAGE error. client.prepareResponse(fullFetchResponse( @@ -692,7 +722,7 @@ public void testCorruptMessageError() { ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.CORRUPT_MESSAGE)); networkClientDelegate.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); // Trigger the exception. assertThrows(KafkaException.class, this::fetchRecords); @@ -749,9 +779,9 @@ private ShareAcknowledgeResponseData.PartitionData partitionDataForAcknowledge(T } /** - * Assert that the {@link Fetcher#collectFetch() latest fetch} does not contain any - * {@link Fetch#records() user-visible records}, - * and is {@link Fetch#isEmpty() empty}. + * Assert that the {@link ShareFetchCollector#collect(ShareFetchBuffer)} latest fetch} does not contain any + * {@link ShareFetch#records() user-visible records}, and is {@link ShareFetch#isEmpty() empty}. + * * @param reason the reason to include for assertion methods such as {@link org.junit.jupiter.api.Assertions#assertTrue(boolean, String)} */ private void assertEmptyFetch(String reason) { @@ -770,32 +800,32 @@ private Map>> fetchRecords() { @SuppressWarnings("unchecked") private ShareFetch collectFetch() { - return (ShareFetch) fetcher.collectFetch(); + return (ShareFetch) shareConsumeRequestManager.collectFetch(); } - private void buildFetcher() { - buildFetcher(new ByteArrayDeserializer(), new ByteArrayDeserializer()); + private void buildRequestManager() { + buildRequestManager(new ByteArrayDeserializer(), new ByteArrayDeserializer()); } - private void buildFetcher(Deserializer keyDeserializer, - Deserializer valueDeserializer) { - buildFetcher(new MetricConfig(), keyDeserializer, valueDeserializer); + private void buildRequestManager(Deserializer keyDeserializer, + Deserializer valueDeserializer) { + buildRequestManager(new MetricConfig(), keyDeserializer, valueDeserializer); } - private void buildFetcher(MetricConfig metricConfig, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { + private void buildRequestManager(MetricConfig metricConfig, + Deserializer keyDeserializer, + Deserializer valueDeserializer) { LogContext logContext = new LogContext(); SubscriptionState subscriptionState = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST); - buildFetcher(metricConfig, keyDeserializer, valueDeserializer, + buildRequestManager(metricConfig, keyDeserializer, valueDeserializer, subscriptionState, logContext); } - private void buildFetcher(MetricConfig metricConfig, - Deserializer keyDeserializer, - Deserializer valueDeserializer, - SubscriptionState subscriptionState, - LogContext logContext) { + private void buildRequestManager(MetricConfig metricConfig, + Deserializer keyDeserializer, + Deserializer valueDeserializer, + SubscriptionState subscriptionState, + LogContext logContext) { buildDependencies(metricConfig, subscriptionState, logContext); Deserializers deserializers = new Deserializers<>(keyDeserializer, valueDeserializer); int maxWaitMs = 0; @@ -817,7 +847,7 @@ private void buildFetcher(MetricConfig metricConfig, fetchConfig, deserializers); BackgroundEventHandler backgroundEventHandler = new TestableBackgroundEventHandler(logContext, completedAcknowledgements); - fetcher = spy(new TestableShareConsumeRequestManager<>( + shareConsumeRequestManager = spy(new TestableShareConsumeRequestManager<>( logContext, groupId, metadata, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index f0e718b693f0b..16bc09d960a37 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -24,9 +24,9 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; -import org.apache.kafka.clients.consumer.internals.events.ShareLeaveOnCloseEvent; -import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent; +import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; +import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; @@ -153,7 +153,8 @@ private ShareConsumerImpl newConsumer( @Test public void testSuccessfulStartupShutdown() { consumer = newConsumer(); - completeShareLeaveOnCloseApplicationEventSuccessfully(); + completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); + completeShareUnsubscribeApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.close()); } @@ -179,7 +180,8 @@ public void testWakeupBeforeCallingPoll() { @Test public void testFailOnClosedConsumer() { consumer = newConsumer(); - completeShareLeaveOnCloseApplicationEventSuccessfully(); + completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); + completeShareUnsubscribeApplicationEventSuccessfully(); consumer.close(); final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::subscription); assertEquals("This consumer has already been closed.", res.getMessage()); @@ -188,10 +190,11 @@ public void testFailOnClosedConsumer() { @Test public void testVerifyApplicationEventOnShutdown() { consumer = newConsumer(); - completeShareLeaveOnCloseApplicationEventSuccessfully(); - doReturn(null).when(applicationEventHandler).addAndGet(any()); + completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); + completeShareUnsubscribeApplicationEventSuccessfully(); consumer.close(); - verify(applicationEventHandler).addAndGet(any(ShareLeaveOnCloseEvent.class)); + verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class)); + verify(applicationEventHandler).add(any(ShareUnsubscribeEvent.class)); } @Test @@ -215,7 +218,7 @@ public void testSubscribeGeneratesEvent() { String topic = "topic1"; consumer.subscribe(singletonList(topic)); assertEquals(singleton(topic), consumer.subscription()); - verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareSubscriptionChangeApplicationEvent.class)); + verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareSubscriptionChangeEvent.class)); } @Test @@ -226,7 +229,7 @@ public void testUnsubscribeGeneratesUnsubscribeEvent() { consumer.unsubscribe(); assertTrue(consumer.subscription().isEmpty()); - verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeApplicationEvent.class)); + verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeEvent.class)); } @Test @@ -236,7 +239,7 @@ public void testSubscribeToEmptyListActsAsUnsubscribe() { consumer.subscribe(Collections.emptyList()); assertTrue(consumer.subscription().isEmpty()); - verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeApplicationEvent.class)); + verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeEvent.class)); } @Test @@ -348,11 +351,12 @@ public void testEnsurePollEventSentOnConsumerPoll() { consumer.subscribe(singletonList("topic1")); consumer.poll(Duration.ofMillis(100)); verify(applicationEventHandler).add(any(PollEvent.class)); - verify(applicationEventHandler).add(any(ShareSubscriptionChangeApplicationEvent.class)); + verify(applicationEventHandler).add(any(ShareSubscriptionChangeEvent.class)); - completeShareLeaveOnCloseApplicationEventSuccessfully(); + completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); + completeShareUnsubscribeApplicationEventSuccessfully(); consumer.close(); - verify(applicationEventHandler).addAndGet(any(ShareLeaveOnCloseEvent.class)); + verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class)); } private Properties requiredConsumerPropertiesAndGroupId(final String groupId) { @@ -448,17 +452,17 @@ public void testProcessBackgroundEventsTimesOut() throws Exception { private void completeShareUnsubscribeApplicationEventSuccessfully() { doAnswer(invocation -> { - ShareUnsubscribeApplicationEvent event = invocation.getArgument(0); + ShareUnsubscribeEvent event = invocation.getArgument(0); event.future().complete(null); return null; - }).when(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeApplicationEvent.class)); + }).when(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeEvent.class)); } - private void completeShareLeaveOnCloseApplicationEventSuccessfully() { + private void completeShareAcknowledgeOnCloseApplicationEventSuccessfully() { doAnswer(invocation -> { - ShareLeaveOnCloseEvent event = invocation.getArgument(0); + ShareAcknowledgeOnCloseEvent event = invocation.getArgument(0); event.future().complete(null); return null; - }).when(applicationEventHandler).add(ArgumentMatchers.isA(ShareLeaveOnCloseEvent.class)); + }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ShareAcknowledgeOnCloseEvent.class)); } } diff --git a/core/src/main/java/kafka/server/SharePartitionManager.java b/core/src/main/java/kafka/server/SharePartitionManager.java index c3edd725024b5..94ef46f301adc 100644 --- a/core/src/main/java/kafka/server/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/SharePartitionManager.java @@ -510,7 +510,7 @@ public void acknowledgeSessionUpdate(String groupId, ShareFetchMetadata reqMetad if (shareSession.epoch != reqMetadata.epoch()) { log.debug("Share session error for {}: expected epoch {}, but got {} instead", key, shareSession.epoch, reqMetadata.epoch()); - throw Errors.INVALID_SHARE_SESSION_EPOCH.exception(); + throw Errors.INVALID_SHARE_SESSION_EPOCH.exception(); } else { cache.touch(shareSession, time.milliseconds()); shareSession.epoch = ShareFetchMetadata.nextEpoch(shareSession.epoch); @@ -742,8 +742,6 @@ void shareAcknowledgement() { void recordAcknowledgement(byte ackType) { if (recordAcksSensorMap.containsKey(ackType)) { recordAcksSensorMap.get(ackType).record(1.0); - } else { - log.error("Unknown ack type {}", ackType); } } diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index ce5f08a5ee968..1e860dcdee32b 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -52,7 +52,6 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -109,6 +108,7 @@ public void createCluster() throws Exception { .setConfigProp("group.coordinator.rebalance.protocols", "classic,consumer,share") .setConfigProp("group.share.enable", "true") .setConfigProp("group.share.partition.max.record.locks", "10000") + .setConfigProp("group.share.persister.class.name", "org.apache.kafka.server.group.share.DefaultStatePersister") .setConfigProp("group.share.record.lock.duration.ms", "10000") .setConfigProp("offsets.topic.replication.factor", "1") .setConfigProp("share.coordinator.state.topic.min.isr", "1") @@ -116,7 +116,6 @@ public void createCluster() throws Exception { .setConfigProp("transaction.state.log.min.isr", "1") .setConfigProp("transaction.state.log.replication.factor", "1") .setConfigProp("unstable.api.versions.enable", "true") - .setConfigProp("group.share.persister.class.name", "org.apache.kafka.server.group.share.DefaultStatePersister") .build(); cluster.format(); cluster.startup(); @@ -136,7 +135,7 @@ public void testPollNoSubscribeFails() { KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); assertEquals(Collections.emptySet(), shareConsumer.subscription()); // "Consumer is not subscribed to any topics." - assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(5000))); + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); shareConsumer.close(); } @@ -146,7 +145,7 @@ public void testSubscribeAndPollNoRecords() { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.close(); assertEquals(0, records.count()); } @@ -157,7 +156,7 @@ public void testSubscribePollUnsubscribe() { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.unsubscribe(); assertEquals(Collections.emptySet(), shareConsumer.subscription()); shareConsumer.close(); @@ -170,11 +169,11 @@ public void testSubscribePollSubscribe() { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); - records = shareConsumer.poll(Duration.ofMillis(5000)); + records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.close(); assertEquals(0, records.count()); } @@ -185,11 +184,11 @@ public void testSubscribeUnsubscribePollFails() { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.unsubscribe(); assertEquals(Collections.emptySet(), shareConsumer.subscription()); // "Consumer is not subscribed to any topics." - assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(5000))); + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); shareConsumer.close(); assertEquals(0, records.count()); } @@ -200,11 +199,11 @@ public void testSubscribeSubscribeEmptyPollFails() { Set subscription = Collections.singleton(tp.topic()); shareConsumer.subscribe(subscription); assertEquals(subscription, shareConsumer.subscription()); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.subscribe(Collections.emptySet()); assertEquals(Collections.emptySet(), shareConsumer.subscription()); // "Consumer is not subscribed to any topics." - assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(5000))); + assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); shareConsumer.close(); assertEquals(0, records.count()); } @@ -282,7 +281,7 @@ public void testAcknowledgementCommitCallbackOnClose() { assertEquals(1, records.count()); // Now in the second poll, we implicitly acknowledge the record received in the first poll. - // We get back the acknowledgement error code after the second poll. + // We get back the acknowledgement error code asynchronously after the second poll. // The acknowledgement commit callback is invoked in close. shareConsumer.poll(Duration.ofMillis(5000)); shareConsumer.close(); @@ -547,15 +546,15 @@ public void testExplicitAcknowledgementCommitAsync() throws InterruptedException assertEquals(2L, records2.iterator().next().offset()); assertFalse(partitionExceptionMap1.containsKey(tp)); - // The callback will receive the acknowledgement responses after the next poll. - shareConsumer1.poll(Duration.ofMillis(5000)); - - assertTrue(partitionExceptionMap1.containsKey(tp)); - assertNull(partitionExceptionMap1.get(tp)); + // The callback will receive the acknowledgement responses asynchronously after the next poll. + shareConsumer1.poll(Duration.ofMillis(500)); shareConsumer1.close(); shareConsumer2.close(); producer.close(); + + assertTrue(partitionExceptionMap1.containsKey(tp)); + assertNull(partitionExceptionMap1.get(tp)); } @Test @@ -606,16 +605,15 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch() throws Interrup shareConsumer1.acknowledge(firstRecord); // The callback will receive the acknowledgement responses after polling. The callback is - // called on entry to the poll method, and the commit is being performed asynchronously so - // call a couple of times in case the response takes a little time to arrive - shareConsumer1.poll(Duration.ofMillis(5000)); + // called on entry to the poll method or during close. The commit is being performed asynchronously, so + // we can only rely on the completion once the consumer has closed because that waits for the response. shareConsumer1.poll(Duration.ofMillis(5000)); - assertTrue(partitionExceptionMap.containsKey(tp)); - assertNull(partitionExceptionMap.get(tp)); - shareConsumer1.close(); producer.close(); + + assertTrue(partitionExceptionMap.containsKey(tp)); + assertNull(partitionExceptionMap.get(tp)); } @Test @@ -648,7 +646,7 @@ public void testExplicitAcknowledgeReleaseAccept() { assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); - records = shareConsumer.poll(Duration.ofMillis(5000)); + records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); shareConsumer.close(); producer.close(); @@ -704,11 +702,6 @@ public void testImplicitAcknowledgeFailsExplicit() { producer.close(); } - /** - * Currently calling commitSync immediately after poll for IMPLICIT acknowledgements will not work - * as the acknowledgement mode will be PENDING. - * Enable test after fixing the above issue. - */ @Test public void testImplicitAcknowledgeCommitSync() { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); @@ -728,11 +721,6 @@ public void testImplicitAcknowledgeCommitSync() { producer.close(); } - /** - * Currently calling commitAsync immediately after poll for implicit acknowledgements will not work - * as the acknowledgement mode will be PENDING. - * Enable test after fixing the above issue. - */ @Test public void testImplicitAcknowledgementCommitAsync() throws InterruptedException { ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); @@ -807,8 +795,8 @@ public void testMultipleConsumersWithDifferentGroupIds() throws InterruptedExcep AtomicInteger shareConsumer1Records = new AtomicInteger(); AtomicInteger shareConsumer2Records = new AtomicInteger(); TestUtils.waitForCondition(() -> { - int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(5000)).count()); - int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(5000)).count()); + int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000)).count()); + int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); return records1 == 3 && records2 == 3; }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers"); @@ -817,7 +805,7 @@ public void testMultipleConsumersWithDifferentGroupIds() throws InterruptedExcep shareConsumer1Records.set(0); TestUtils.waitForCondition(() -> { - int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(5000)).count()); + int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000)).count()); return records1 == 2; }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer 1"); @@ -828,8 +816,8 @@ public void testMultipleConsumersWithDifferentGroupIds() throws InterruptedExcep shareConsumer1Records.set(0); shareConsumer2Records.set(0); TestUtils.waitForCondition(() -> { - int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(5000)).count()); - int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(5000)).count()); + int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000)).count()); + int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); return records1 == 3 && records2 == 5; }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers for the last batch"); @@ -916,6 +904,7 @@ public void testMultipleConsumersInGroupConcurrentConsumption() { } } + // THIS ONE @Test public void testMultipleConsumersInMultipleGroupsConcurrentConsumption() { AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0); @@ -923,7 +912,7 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption() { AtomicInteger totalMessagesConsumedGroup3 = new AtomicInteger(0); int producerCount = 4; - int consumerCount = 4; + int consumerCount = 2; int messagesPerProducer = 2000; final int totalMessagesSent = producerCount * messagesPerProducer; @@ -932,35 +921,30 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption() { ExecutorService shareGroupExecutorService2 = Executors.newFixedThreadPool(consumerCount); ExecutorService shareGroupExecutorService3 = Executors.newFixedThreadPool(consumerCount); + CountDownLatch startSignal = new CountDownLatch(producerCount); + ConcurrentLinkedQueue> producerFutures = new ConcurrentLinkedQueue<>(); - // While we could run the producers and consumers concurrently, it seems that contention for resources on the - // test infrastructure causes trouble. Run the producers first, check that the set of messages was produced - // successfully, and then run the consumers next. for (int i = 0; i < producerCount; i++) { - Runnable task = () -> { + producerExecutorService.submit(() -> { CompletableFuture future = produceMessages(messagesPerProducer); producerFutures.add(future); - }; - producerExecutorService.submit(task); - } - producerExecutorService.shutdown(); - int actualMessagesSent = 0; - try { - producerExecutorService.awaitTermination(60, TimeUnit.SECONDS); // Wait for all producer threads to complete - - for (CompletableFuture future : producerFutures) { - actualMessagesSent += future.get(); - } - } catch (Exception e) { - fail("Exception occurred : " + e.getMessage()); + startSignal.countDown(); + }); } - assertEquals(totalMessagesSent, actualMessagesSent); ConcurrentLinkedQueue> futures1 = new ConcurrentLinkedQueue<>(); ConcurrentLinkedQueue> futures2 = new ConcurrentLinkedQueue<>(); ConcurrentLinkedQueue> futures3 = new ConcurrentLinkedQueue<>(); + // Wait for the producers to run + try { + boolean signalled = startSignal.await(15, TimeUnit.SECONDS); + assertTrue(signalled); + } catch (InterruptedException e) { + fail("Exception awaiting start signal"); + } + for (int i = 0; i < consumerCount; i++) { final int consumerNumber = i + 1; shareGroupExecutorService1.submit(() -> { @@ -979,6 +963,7 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption() { consumeMessages(totalMessagesConsumedGroup3, totalMessagesSent, "group3", consumerNumber, 100, true, future); }); } + producerExecutorService.shutdown(); shareGroupExecutorService1.shutdown(); shareGroupExecutorService2.shutdown(); shareGroupExecutorService3.shutdown(); @@ -1008,6 +993,18 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption() { assertEquals(totalMessagesSent, totalResult1); assertEquals(totalMessagesSent, totalResult2); assertEquals(totalMessagesSent, totalResult3); + + int actualMessagesSent = 0; + try { + producerExecutorService.awaitTermination(60, TimeUnit.SECONDS); // Wait for all producer threads to complete + + for (CompletableFuture future : producerFutures) { + actualMessagesSent += future.get(); + } + } catch (Exception e) { + fail("Exception occurred : " + e.getMessage()); + } + assertEquals(totalMessagesSent, actualMessagesSent); } catch (Exception e) { fail("Exception occurred : " + e.getMessage()); } @@ -1180,7 +1177,7 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerDisallowed() { // The second poll sends the acknowledgments implicitly. // The acknowledgement commit callback will be called and the exception is thrown. // This is verified inside the onComplete() method implementation. - shareConsumer.poll(Duration.ofMillis(5000)); + shareConsumer.poll(Duration.ofMillis(500)); shareConsumer.close(); producer.close(); } @@ -1221,7 +1218,7 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup() { shareConsumer.poll(Duration.ofMillis(5000)); // Till now acknowledgement commit callback has not been called, so no exception thrown yet. // On 3rd poll, the acknowledgement commit callback will be called and the exception is thrown. - assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ofMillis(5000))); + assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); shareConsumer.close(); producer.close(); } @@ -1259,7 +1256,7 @@ public void testAcknowledgeCommitCallbackThrowsException() { shareConsumer.poll(Duration.ofMillis(5000)); // On the third poll, the acknowledgement commit callback will be called and the exception is thrown. - assertThrows(org.apache.kafka.common.errors.OutOfOrderSequenceException.class, () -> shareConsumer.poll(Duration.ofMillis(5000))); + assertThrows(org.apache.kafka.common.errors.OutOfOrderSequenceException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); shareConsumer.close(); producer.close(); @@ -1345,7 +1342,7 @@ public void testSubscriptionFollowedByTopicCreation() throws InterruptedExceptio producer.send(record); TestUtils.waitForCondition(() -> { - int records = shareConsumer.poll(Duration.ofMillis(5000)).count(); + int records = shareConsumer.poll(Duration.ofMillis(2000)).count(); return records == 1; }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer, metadata sync failed"); @@ -1467,45 +1464,6 @@ public void testLsoMovementByRecordsDeletion() { producer.close(); } - @Test - @Disabled("redundant due to warmup") - public void testShareGroupStateTopicCreation() throws InterruptedException { - Set topics = null; - try { - topics = listTopics().names().get(); - } catch (Exception e) { - fail("Failed to list topics: " + e); - } - // The __share_group_state topic is created on the first FIND_COORDINATOR RPC call for any share group topic partition. - // Since that does not happen automatically on cluster creation, we expect that the cluster will not have - // __share_group_state topic until yet. - assertFalse(topics.contains(SHARE_GROUP_STATE_TOPIC_NAME)); - - KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); - shareConsumer.subscribe(Collections.singleton(tp.topic())); - - shareConsumer.poll(Duration.ofMillis(5000)); - TestUtils.waitForCondition( - () -> { - boolean ans = false; - try { - ans = listTopics().names().get().contains(SHARE_GROUP_STATE_TOPIC_NAME); - } catch (Exception e) { - fail("Failed to list topics: " + e); - } - return ans; - }, - 45000L, - 3000L, - () -> "Failed to create share group topic"); - - // The above condition only checks for the existence of the __share_group_state topic, but that happens when - // FIND_COORDINATOR is called internally. We need to wait for the ReadShareGroupState to finish completely - Thread.sleep(5000L); - - shareConsumer.close(); - } - @Test public void testWriteShareGroupState() throws Exception { KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); @@ -1731,14 +1689,14 @@ private void consumeMessages(AtomicInteger totalMessagesConsumed, try { if (totalMessages > 0) { while (totalMessagesConsumed.get() < totalMessages && retries < maxPolls) { - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(2000)); messagesConsumed += records.count(); totalMessagesConsumed.addAndGet(records.count()); retries++; } } else { while (retries < maxPolls) { - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(2000)); messagesConsumed += records.count(); totalMessagesConsumed.addAndGet(records.count()); retries++; @@ -1868,6 +1826,5 @@ private void warmup() throws InterruptedException { () -> listTopics().names().get().contains(SHARE_GROUP_STATE_TOPIC_NAME), DEFAULT_MAX_WAIT_MS, 200L, () -> "SGS not up yet"); producer.close(); shareConsumer.close(); - System.err.println("warmup complete"); } -} \ No newline at end of file +} diff --git a/server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateManager.java b/server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateManager.java index 6df986374bf73..2c0567e8b78c3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateManager.java @@ -276,9 +276,9 @@ protected void handleFindCoordinatorResponse(ClientResponse response) { enqueue(this); break; - case COORDINATOR_NOT_AVAILABLE: // retryable error codes + case COORDINATOR_NOT_AVAILABLE: // retriable error codes case COORDINATOR_LOAD_IN_PROGRESS: - log.warn("Received retryable error in find coordinator {}", error.message()); + log.warn("Received retriable error in find coordinator: {}", error.message()); if (findCoordattempts >= this.maxFindCoordAttempts) { log.error("Exhausted max retries to find coordinator without success."); findCoordinatorErrorResponse(error, new Exception("Exhausted max retries to find coordinator without success."));