Skip to content

Commit 9f2951e

Browse files
committed
KAFKA-19144 Move DelayedProduce to server module
1 parent e88c10d commit 9f2951e

File tree

13 files changed

+318
-200
lines changed

13 files changed

+318
-200
lines changed

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.kafka.common.requests._
3838
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
3939
import org.apache.kafka.common.utils.Time
4040
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
41+
import org.apache.kafka.server.DelayedProduce
4142
import org.apache.kafka.server.common.RequestLocal
4243
import org.apache.kafka.server.log.remote.TopicPartitionLog
4344
import org.apache.kafka.server.log.remote.storage.RemoteLogManager

core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,8 @@ class TransactionStateManager(brokerId: Int,
256256
expiredForPartition: Iterable[TransactionalIdCoordinatorEpochAndMetadata],
257257
tombstoneRecords: MemoryRecords
258258
): Unit = {
259-
def removeFromCacheCallback(responses: collection.Map[TopicIdPartition, PartitionResponse]): Unit = {
260-
responses.foreachEntry { (topicPartition, response) =>
259+
def removeFromCacheCallback(responses: java.util.Map[TopicIdPartition, PartitionResponse]): Unit = {
260+
responses.forEach { (topicPartition, response) =>
261261
inReadLock(stateLock) {
262262
transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
263263
expiredForPartition.foreach { idCoordinatorEpochAndMetadata =>
@@ -667,13 +667,13 @@ class TransactionStateManager(brokerId: Int,
667667
val recordsPerPartition = Map(transactionStateTopicIdPartition -> records)
668668

669669
// set the callback function to update transaction status in cache after log append completed
670-
def updateCacheCallback(responseStatus: collection.Map[TopicIdPartition, PartitionResponse]): Unit = {
670+
def updateCacheCallback(responseStatus: java.util.Map[TopicIdPartition, PartitionResponse]): Unit = {
671671
// the append response should only contain the topics partition
672-
if (responseStatus.size != 1 || !responseStatus.contains(transactionStateTopicIdPartition))
672+
if (responseStatus.size != 1 || !responseStatus.containsKey(transactionStateTopicIdPartition))
673673
throw new IllegalStateException("Append status %s should only have one partition %s"
674674
.format(responseStatus, transactionStateTopicPartition))
675675

676-
val status = responseStatus(transactionStateTopicIdPartition)
676+
val status = responseStatus.get(transactionStateTopicIdPartition)
677677

678678
var responseError = if (status.error == Errors.NONE) {
679679
Errors.NONE

core/src/main/scala/kafka/server/DelayedProduce.scala

Lines changed: 0 additions & 154 deletions
This file was deleted.

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1818,7 +1818,7 @@ class KafkaApis(val requestChannel: RequestChannel,
18181818
entriesPerPartition = controlRecords,
18191819
requestLocal = requestLocal,
18201820
responseCallback = errors => {
1821-
errors.foreachEntry { (topicIdPartition, partitionResponse) =>
1821+
errors.forEach { (topicIdPartition, partitionResponse) =>
18221822
addResultAndMaybeComplete(topicIdPartition.topicPartition(), partitionResponse.error)
18231823
}
18241824
}

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, Trans
5050
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
5151
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5252
import org.apache.kafka.metadata.MetadataCache
53+
import org.apache.kafka.server.DelayedProduce.{ProduceMetadata, ProducePartitionStatus}
5354
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition}
5455
import org.apache.kafka.server.log.remote.TopicPartitionLog
5556
import org.apache.kafka.server.config.ReplicationConfigs
@@ -61,7 +62,7 @@ import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe
6162
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
6263
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
6364
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
64-
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
65+
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, DelayedProduce, common}
6566
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
6667
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
6768
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -77,6 +78,7 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong}
7778
import java.util.function.Consumer
7879
import scala.collection.{Map, Seq, Set, immutable, mutable}
7980
import scala.jdk.CollectionConverters._
81+
import scala.jdk.FunctionConverters.enrichAsJavaConsumer
8082
import scala.jdk.OptionConverters.{RichOption, RichOptional}
8183

8284
/*
@@ -734,7 +736,7 @@ class ReplicaManager(val config: KafkaConfig,
734736
internalTopicsAllowed: Boolean,
735737
origin: AppendOrigin,
736738
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
737-
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
739+
responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit,
738740
delayedProduceLock: Option[Lock] = None,
739741
recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (),
740742
requestLocal: RequestLocal = RequestLocal.noCaching,
@@ -850,8 +852,8 @@ class ReplicaManager(val config: KafkaConfig,
850852

851853
val preAppendPartitionResponses = buildProducePartitionStatus(errorResults).map { case (k, status) => k -> status.responseStatus }
852854

853-
def newResponseCallback(responses: Map[TopicIdPartition, PartitionResponse]): Unit = {
854-
responseCallback(preAppendPartitionResponses ++ responses)
855+
def newResponseCallback(responses: util.Map[TopicIdPartition, PartitionResponse]): Unit = {
856+
responseCallback(preAppendPartitionResponses ++ responses.asScala)
855857
}
856858

857859
appendRecords(
@@ -926,7 +928,7 @@ class ReplicaManager(val config: KafkaConfig,
926928
results: Map[TopicIdPartition, LogAppendResult]
927929
): Map[TopicIdPartition, ProducePartitionStatus] = {
928930
results.map { case (topicIdPartition, result) =>
929-
topicIdPartition -> ProducePartitionStatus(
931+
topicIdPartition -> new ProducePartitionStatus(
930932
result.info.lastOffset + 1, // required offset
931933
new PartitionResponse(
932934
result.error,
@@ -972,12 +974,30 @@ class ReplicaManager(val config: KafkaConfig,
972974
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
973975
initialAppendResults: Map[TopicIdPartition, LogAppendResult],
974976
initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus],
975-
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
977+
responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit,
976978
): Unit = {
977979
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) {
978980
// create delayed produce operation
979-
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
980-
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)
981+
val produceMetadata = new ProduceMetadata(requiredAcks, initialProduceStatus.asJava)
982+
983+
def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit = {
984+
val (hasEnough, error) = getPartitionOrError(tp) match {
985+
case Left(err) =>
986+
// Case A
987+
(false, err)
988+
989+
case Right(partition) =>
990+
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
991+
}
992+
993+
// Case B || C.1 || C.2
994+
if (error != Errors.NONE || hasEnough) {
995+
status.setAcksPending(false)
996+
status.responseStatus.error = error
997+
}
998+
}
999+
1000+
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, delegate, responseCallback.asJava, delayedProduceLock.toJava)
9811001

9821002
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
9831003
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList
@@ -989,13 +1009,13 @@ class ReplicaManager(val config: KafkaConfig,
9891009
} else {
9901010
// we can respond immediately
9911011
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus }
992-
responseCallback(produceResponseStatus)
1012+
responseCallback(produceResponseStatus.asJava)
9931013
}
9941014
}
9951015

9961016
private def sendInvalidRequiredAcksResponse(
9971017
entries: Map[TopicIdPartition, MemoryRecords],
998-
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit): Unit = {
1018+
responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit): Unit = {
9991019
// If required.acks is outside accepted range, something is wrong with the client
10001020
// Just return an error and don't handle the request at all
10011021
val responseStatus = entries.map { case (topicIdPartition, _) =>
@@ -1006,7 +1026,7 @@ class ReplicaManager(val config: KafkaConfig,
10061026
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
10071027
)
10081028
}
1009-
responseCallback(responseStatus)
1029+
responseCallback(responseStatus.asJava)
10101030
}
10111031

10121032
/**

core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.junit.jupiter.api.Assertions._
4040
import org.mockito.Mockito.mock
4141

4242
import java.io.File
43-
import java.util.{Map => JMap}
43+
import java.util.{Optional, Map => JMap}
4444
import scala.collection.Map
4545
import scala.jdk.CollectionConverters._
4646

@@ -271,9 +271,10 @@ class LocalLeaderEndPointTest extends Logging {
271271
origin: AppendOrigin = AppendOrigin.CLIENT,
272272
requiredAcks: Short = -1): CallbackResult[PartitionResponse] = {
273273
val result = new CallbackResult[PartitionResponse]()
274-
def appendCallback(responses: scala.collection.Map[TopicIdPartition, PartitionResponse]): Unit = {
275-
val response = responses.get(partition)
276-
assertTrue(response.isDefined)
274+
def appendCallback(responses: java.util.Map[TopicIdPartition, PartitionResponse]): Unit = {
275+
val response = Optional.ofNullable(responses.get(partition))
276+
277+
assertTrue(response.isPresent)
277278
result.fire(response.get)
278279
}
279280

core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import org.apache.kafka.common.replica.ClientMetadata
5454
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
5555
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
5656
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
57+
import org.apache.kafka.server.DelayedProduce
5758
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager, RequestLocal}
5859
import org.apache.kafka.server.metrics.KafkaYammerMetrics
5960
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}

0 commit comments

Comments
 (0)