-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19144 Move DelayedProduce to server module #19793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
9f2951e
30fb277
458807b
c86ffc4
b19a431
f376a7f
c1409d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -50,13 +50,14 @@ import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, Trans | |||||||||||
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} | ||||||||||||
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER | ||||||||||||
import org.apache.kafka.metadata.MetadataCache | ||||||||||||
import org.apache.kafka.server.purgatory.DelayedProduce.{ProduceMetadata, ProducePartitionStatus} | ||||||||||||
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition} | ||||||||||||
import org.apache.kafka.server.log.remote.TopicPartitionLog | ||||||||||||
import org.apache.kafka.server.config.ReplicationConfigs | ||||||||||||
import org.apache.kafka.server.log.remote.storage.RemoteLogManager | ||||||||||||
import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||||||||||||
import org.apache.kafka.server.network.BrokerEndPoint | ||||||||||||
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey} | ||||||||||||
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedProduce, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey} | ||||||||||||
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey} | ||||||||||||
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} | ||||||||||||
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask} | ||||||||||||
|
@@ -76,6 +77,7 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong} | |||||||||||
import java.util.function.Consumer | ||||||||||||
import scala.collection.{Map, Seq, Set, immutable, mutable} | ||||||||||||
import scala.jdk.CollectionConverters._ | ||||||||||||
import scala.jdk.FunctionConverters.enrichAsJavaConsumer | ||||||||||||
import scala.jdk.OptionConverters.{RichOption, RichOptional} | ||||||||||||
|
||||||||||||
/* | ||||||||||||
|
@@ -732,7 +734,7 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
internalTopicsAllowed: Boolean, | ||||||||||||
origin: AppendOrigin, | ||||||||||||
entriesPerPartition: Map[TopicIdPartition, MemoryRecords], | ||||||||||||
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, | ||||||||||||
responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit, | ||||||||||||
recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), | ||||||||||||
requestLocal: RequestLocal = RequestLocal.noCaching, | ||||||||||||
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { | ||||||||||||
|
@@ -846,8 +848,8 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
|
||||||||||||
val preAppendPartitionResponses = buildProducePartitionStatus(errorResults).map { case (k, status) => k -> status.responseStatus } | ||||||||||||
|
||||||||||||
def newResponseCallback(responses: Map[TopicIdPartition, PartitionResponse]): Unit = { | ||||||||||||
responseCallback(preAppendPartitionResponses ++ responses) | ||||||||||||
def newResponseCallback(responses: util.Map[TopicIdPartition, PartitionResponse]): Unit = { | ||||||||||||
responseCallback(preAppendPartitionResponses ++ responses.asScala) | ||||||||||||
} | ||||||||||||
|
||||||||||||
appendRecords( | ||||||||||||
|
@@ -922,7 +924,7 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
results: Map[TopicIdPartition, LogAppendResult] | ||||||||||||
): Map[TopicIdPartition, ProducePartitionStatus] = { | ||||||||||||
results.map { case (topicIdPartition, result) => | ||||||||||||
topicIdPartition -> ProducePartitionStatus( | ||||||||||||
topicIdPartition -> new ProducePartitionStatus( | ||||||||||||
result.info.lastOffset + 1, // required offset | ||||||||||||
new PartitionResponse( | ||||||||||||
result.error, | ||||||||||||
|
@@ -967,12 +969,33 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
entriesPerPartition: Map[TopicIdPartition, MemoryRecords], | ||||||||||||
initialAppendResults: Map[TopicIdPartition, LogAppendResult], | ||||||||||||
initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus], | ||||||||||||
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, | ||||||||||||
responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit, | ||||||||||||
): Unit = { | ||||||||||||
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) { | ||||||||||||
// create delayed produce operation | ||||||||||||
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus) | ||||||||||||
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback) | ||||||||||||
val produceMetadata = new ProduceMetadata(requiredAcks, initialProduceStatus.asJava) | ||||||||||||
|
||||||||||||
// Updates the status of a produce partition based on the current state. | ||||||||||||
// Please refer to the documentation in `DelayedProduce#tryComplete` for | ||||||||||||
// a comprehensive description of Case A, Case B and Case C. | ||||||||||||
def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit = { | ||||||||||||
val (hasEnough, error) = getPartitionOrError(tp) match { | ||||||||||||
case Left(err) => | ||||||||||||
// Case A | ||||||||||||
(false, err) | ||||||||||||
|
||||||||||||
case Right(partition) => | ||||||||||||
partition.checkEnoughReplicasReachOffset(status.requiredOffset) | ||||||||||||
} | ||||||||||||
|
||||||||||||
// Case B || C.1 || C.2 | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||||||||||||
if (error != Errors.NONE || hasEnough) { | ||||||||||||
status.setAcksPending(false) | ||||||||||||
status.responseStatus.error = error | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, delegate, responseCallback.asJava) | ||||||||||||
|
||||||||||||
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation | ||||||||||||
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList | ||||||||||||
|
@@ -983,23 +1006,25 @@ class ReplicaManager(val config: KafkaConfig, | |||||||||||
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys.asJava) | ||||||||||||
} else { | ||||||||||||
// we can respond immediately | ||||||||||||
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus } | ||||||||||||
val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse] | ||||||||||||
initialProduceStatus.foreach { case (k, status) => k -> produceResponseStatus.put(k, status.responseStatus) } | ||||||||||||
Comment on lines
+1009
to
+1010
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's ok to use |
||||||||||||
responseCallback(produceResponseStatus) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
private def sendInvalidRequiredAcksResponse( | ||||||||||||
entries: Map[TopicIdPartition, MemoryRecords], | ||||||||||||
responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit): Unit = { | ||||||||||||
responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit): Unit = { | ||||||||||||
// If required.acks is outside accepted range, something is wrong with the client | ||||||||||||
// Just return an error and don't handle the request at all | ||||||||||||
val responseStatus = entries.map { case (topicIdPartition, _) => | ||||||||||||
topicIdPartition -> new PartitionResponse( | ||||||||||||
Errors.INVALID_REQUIRED_ACKS, | ||||||||||||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset, | ||||||||||||
RecordBatch.NO_TIMESTAMP, | ||||||||||||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset | ||||||||||||
) | ||||||||||||
val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse] | ||||||||||||
entries.foreach { case(topicIdPartition, _) => | ||||||||||||
responseStatus.put(topicIdPartition, new PartitionResponse( | ||||||||||||
Errors.INVALID_REQUIRED_ACKS, | ||||||||||||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset, | ||||||||||||
RecordBatch.NO_TIMESTAMP, | ||||||||||||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset) | ||||||||||||
) | ||||||||||||
} | ||||||||||||
responseCallback(responseStatus) | ||||||||||||
} | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this comment mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is relevant to the comment of
DelayedProduce,
but it's confusing that these comments are standalone here.Could you write the whole meaning of these cases, or link these comments to
tryComplete
?