diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 1385ad33ad78a..516d518525eca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -575,7 +575,6 @@ private void handleShareAcknowledgeFailure(Node fetchTarget, metadata.topicNames().get(topic.topicId())); metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error)); - acknowledgeRequestState.inFlightAcknowledgements.remove(tip); })); } finally { log.debug("Removing pending request for node {} - failed", fetchTarget); @@ -599,7 +598,6 @@ private void handleShareAcknowledgeCloseSuccess(Node fetchTarget, metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); } acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode())); - acknowledgeRequestState.inFlightAcknowledgements.remove(tip); })); metricsManager.recordLatency(resp.requestLatencyMs()); @@ -625,7 +623,6 @@ private void handleShareAcknowledgeCloseFailure(Node fetchTarget, metadata.topicNames().get(topic.topicId())); metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error)); - acknowledgeRequestState.inFlightAcknowledgements.remove(tip); })); } finally { log.debug("Removing pending request for node {} - failed", fetchTarget); @@ -839,10 +836,6 @@ void retryRequest() { inFlightAcknowledgements.clear(); } - boolean isIncompleteMapEmpty() { - return incompleteAcknowledgements.isEmpty(); - } - boolean maybeExpire() { return numAttempts > 0 && isExpired(); } diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index ad74234f05615..b2d7ccdbb2c65 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -73,6 +73,7 @@ public static void main(String[] args) { Map metrics = null; if (options.printMetrics()) metrics = shareConsumer.metrics(); + shareConsumer.commitAsync(); shareConsumer.close(); // print final stats