diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index df0404ca..094db301 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -188,6 +188,20 @@ public struct KafkaConsumerConfiguration { } } + /// The maximum amount of time the server will block before answering the fetch request + /// there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes. + /// Default: `.milliseconds(500)` + public var maximumFetchWaitTime: Duration = .milliseconds(500) { + didSet { + if maximumFetchWaitTime != .zero { + precondition( + maximumFetchWaitTime.canBeRepresentedAsMilliseconds, + "Lowest granularity is milliseconds" + ) + } + } + } + /// Topic metadata options. public var topicMetadata: KafkaConfiguration.TopicMetadataOptions = .init() @@ -260,6 +274,7 @@ extension KafkaConsumerConfiguration { resultDict["receive.message.max.bytes"] = String(maximumReceiveMessageBytes) resultDict["max.in.flight.requests.per.connection"] = String(maximumInFlightRequestsPerConnection) resultDict["metadata.max.age.ms"] = String(maximumMetadataAge.inMilliseconds) + resultDict["fetch.wait.max.ms"] = String(maximumFetchWaitTime.inMilliseconds) resultDict["topic.metadata.refresh.interval.ms"] = String(topicMetadata.refreshInterval.rawValue) resultDict["topic.metadata.refresh.fast.interval.ms"] = String(topicMetadata.refreshFastInterval.inMilliseconds) resultDict["topic.metadata.refresh.sparse"] = String(topicMetadata.isSparseRefreshingEnabled)