diff --git a/Sources/Kafka/Configuration/KafkaTransactionalProducerConfiguration.swift b/Sources/Kafka/Configuration/KafkaTransactionalProducerConfiguration.swift new file mode 100644 index 00000000..312462fd --- /dev/null +++ b/Sources/Kafka/Configuration/KafkaTransactionalProducerConfiguration.swift @@ -0,0 +1,249 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +public struct KafkaTransactionalProducerConfiguration { + // MARK: - Kafka-specific Config properties + + /// If the ``isAutoCreateTopicsEnabled`` option is set to `true`, + /// the broker will automatically generate topics when producing data to non-existent topics. + /// The configuration specified in this ``KafkaTopicConfiguration`` will be applied to the newly created topic. + /// Default: See default values of ``KafkaTopicConfiguration`` + public var topicConfiguration: KafkaTopicConfiguration = .init() + + /// The time between two consecutive polls. + /// Effectively controls the rate at which incoming events are consumed. + /// Default: `.milliseconds(100)` + public var pollInterval: Duration = .milliseconds(100) + + /// Maximum timeout for flushing outstanding produce requests when the ``KafkaProducer`` is shutting down. + /// Default: `10000` + public var flushTimeoutMilliseconds: Int = 10000 { + didSet { + precondition( + 0...Int(Int32.max) ~= self.flushTimeoutMilliseconds, + "Flush timeout outside of valid range \(0...Int32.max)" + ) + } + } + + // MARK: - Producer-specific Config Properties + + /// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. + /// The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: + /// ``KafkaProducerConfiguration/maximumInFlightRequestsPerConnection`` = `5` (must be less than or equal to 5), + /// ``KafkaProducerConfiguration/maximumMessageSendRetries`` = `UInt32.max` (must be greater than 0), + /// ``KafkaTopicConfiguration/requiredAcknowledgements`` = ``KafkaTopicConfiguration/RequiredAcknowledgments/all``, + /// queuing strategy = FIFO. + /// Producer instantiation will fail if the user-supplied configuration is incompatible. + /// Default: `false` + public var isIdempotenceEnabled: Bool = false + + /// Producer queue options. + public struct QueueConfiguration: Sendable, Hashable { + /// Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. + public struct MessageLimit: Sendable, Hashable { + internal let rawValue: Int + + private init(rawValue: Int) { + self.rawValue = rawValue + } + + public static func maximumLimit(_ value: Int) -> MessageLimit { + return .init(rawValue: value) + } + + /// No limit for the maximum number of messages allowed on the producer queue. + public static let unlimited: MessageLimit = .init(rawValue: 0) + } + + /// Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. + /// Default: `.maximumLimit(100_000)` + public var messageLimit: MessageLimit = .maximumLimit(100_000) + + /// Maximum total message size sum allowed on the producer queue. This queue is shared by all topics and partitions. + /// This property has higher priority than ``KafkaConfiguration/QueueOptions/messageLimit``. + /// Default: `1_048_576 * 1024` + public var maximumMessageBytes: Int = 1_048_576 * 1024 + + /// How long wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. + /// A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. + /// (Lowest granularity is milliseconds) + /// Default: `.milliseconds(5)` + public var maximumMessageQueueTime: Duration = .milliseconds(5) { + didSet { + precondition( + self.maximumMessageQueueTime.canBeRepresentedAsMilliseconds, + "Lowest granularity is milliseconds" + ) + } + } + + public init() {} + } + + /// Producer queue options. + public var queue: QueueConfiguration = .init() + + /// How many times to retry sending a failing Message. + /// + /// - Note: retrying may cause reordering unless ``KafkaProducerConfiguration/isIdempotenceEnabled`` is set to `true`. + /// Default: `2_147_483_647` + public var maximumMessageSendRetries: Int = 2_147_483_647 + + /// Allow automatic topic creation on the broker when producing to non-existent topics. + /// The broker must also be configured with ``isAutoCreateTopicsEnabled`` = `true` for this configuration to take effect. + /// Default: `true` + public var isAutoCreateTopicsEnabled: Bool = true + + // MARK: - Common Client Config Properties + + /// Client identifier. + /// Default: `"rdkafka"` + public var identifier: String = "rdkafka" + + /// Initial list of brokers. + /// Default: `[]` + public var bootstrapBrokerAddresses: [KafkaConfiguration.BrokerAddress] = [] + + /// Message options. + public var message: KafkaConfiguration.MessageOptions = .init() + + /// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hiccups. + /// Default: `100_000_000` + public var maximumReceiveMessageBytes: Int = 100_000_000 + + /// Maximum number of in-flight requests per broker connection. + /// This is a generic property applied to all broker communication, however, it is primarily relevant to produce requests. + /// In particular, note that other mechanisms limit the number of outstanding consumer fetch requests per broker to one. + /// Default: `5` + public var maximumInFlightRequestsPerConnection: Int = 5 { + didSet { + precondition( + self.maximumInFlightRequestsPerConnection <= 5, + "Max in flight requests is 5 for TransactionalProducer" + ) + } + } + + /// Metadata cache max age. + /// (Lowest granularity is milliseconds) + /// Default: `.milliseconds(900_000)` + public var maximumMetadataAge: Duration = .milliseconds(900_000) { + didSet { + precondition( + self.maximumMetadataAge.canBeRepresentedAsMilliseconds, + "Lowest granularity is milliseconds" + ) + } + } + + /// Topic metadata options. + public var topicMetadata: KafkaConfiguration.TopicMetadataOptions = .init() + + /// Topic denylist. + /// Default: `[]` + public var topicDenylist: [String] = [] + + /// Debug options. + /// Default: `[]` + public var debugOptions: [KafkaConfiguration.DebugOption] = [] + + /// Socket options. + public var socket: KafkaConfiguration.SocketOptions = .init() + + /// Broker options. + public var broker: KafkaConfiguration.BrokerOptions = .init() + + /// Reconnect options. + public var reconnect: KafkaConfiguration.ReconnectOptions = .init() + + /// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl). + /// Default: `.plaintext` + public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext + + // TODO: add Docc + var transactionalId: String + var transactionsTimeout: Duration = .seconds(60) { + didSet { + precondition( + self.maximumMetadataAge.canBeRepresentedAsMilliseconds, + "Lowest granularity is milliseconds" + ) + } + } + + public init( + transactionalId: String, + bootstrapBrokerAddresses: [KafkaConfiguration.BrokerAddress] + ) { + self.transactionalId = transactionalId + self.bootstrapBrokerAddresses = bootstrapBrokerAddresses + } +} + +// MARK: - KafkaProducerConfiguration + Dictionary + +extension KafkaTransactionalProducerConfiguration { + internal var dictionary: [String: String] { + var resultDict: [String: String] = [:] + + resultDict["transactional.id"] = self.transactionalId + resultDict["transaction.timeout.ms"] = String(self.transactionsTimeout.totalMilliseconds) + resultDict["enable.idempotence"] = "true" + + resultDict["queue.buffering.max.messages"] = String(self.queue.messageLimit.rawValue) + resultDict["queue.buffering.max.kbytes"] = String(self.queue.maximumMessageBytes / 1024) + resultDict["queue.buffering.max.ms"] = String(self.queue.maximumMessageQueueTime.inMilliseconds) + resultDict["message.send.max.retries"] = String(self.maximumMessageSendRetries) + resultDict["allow.auto.create.topics"] = String(self.isAutoCreateTopicsEnabled) + + resultDict["client.id"] = self.identifier + resultDict["bootstrap.servers"] = self.bootstrapBrokerAddresses.map(\.description).joined(separator: ",") + resultDict["message.max.bytes"] = String(self.message.maximumBytes) + resultDict["message.copy.max.bytes"] = String(self.message.maximumBytesToCopy) + resultDict["receive.message.max.bytes"] = String(self.maximumReceiveMessageBytes) + resultDict["max.in.flight.requests.per.connection"] = String(self.maximumInFlightRequestsPerConnection) + resultDict["metadata.max.age.ms"] = String(self.maximumMetadataAge.inMilliseconds) + resultDict["topic.metadata.refresh.interval.ms"] = String(self.topicMetadata.refreshInterval.rawValue) + resultDict["topic.metadata.refresh.fast.interval.ms"] = String(self.topicMetadata.refreshFastInterval.inMilliseconds) + resultDict["topic.metadata.refresh.sparse"] = String(self.topicMetadata.isSparseRefreshingEnabled) + resultDict["topic.metadata.propagation.max.ms"] = String(self.topicMetadata.maximumPropagation.inMilliseconds) + resultDict["topic.blacklist"] = self.topicDenylist.joined(separator: ",") + if !self.debugOptions.isEmpty { + resultDict["debug"] = self.debugOptions.map(\.description).joined(separator: ",") + } + resultDict["socket.timeout.ms"] = String(self.socket.timeout.inMilliseconds) + resultDict["socket.send.buffer.bytes"] = String(self.socket.sendBufferBytes.rawValue) + resultDict["socket.receive.buffer.bytes"] = String(self.socket.receiveBufferBytes.rawValue) + resultDict["socket.keepalive.enable"] = String(self.socket.isKeepaliveEnabled) + resultDict["socket.nagle.disable"] = String(self.socket.isNagleDisabled) + resultDict["socket.max.fails"] = String(self.socket.maximumFailures.rawValue) + resultDict["socket.connection.setup.timeout.ms"] = String(self.socket.connectionSetupTimeout.inMilliseconds) + resultDict["broker.address.ttl"] = String(self.broker.addressTimeToLive.inMilliseconds) + resultDict["broker.address.family"] = self.broker.addressFamily.description + resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoff.rawValue) + resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.maximumBackoff.inMilliseconds) + + // Merge with SecurityProtocol configuration dictionary + resultDict.merge(self.securityProtocol.dictionary) { _, _ in + fatalError("securityProtocol and \(#file) should not have duplicate keys") + } + + return resultDict + } +} + +// MARK: - KafkaProducerConfiguration + Sendable + +extension KafkaTransactionalProducerConfiguration: Sendable {} diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 12fe0dca..738cac84 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -314,7 +314,7 @@ public final class KafkaConsumer: Sendable, Service { switch action { case .setUpConnection(let client): let assignment = RDKafkaTopicPartitionList() - assignment.setOffset(topic: topic, partition: partition, offset: Int64(offset.rawValue)) + assignment.setOffset(topic: topic, partition: partition, offset: offset) try client.assign(topicPartitionList: assignment) } } @@ -418,6 +418,10 @@ public final class KafkaConsumer: Sendable, Service { } } } + + func client() throws -> RDKafkaClient { + return try self.stateMachine.withLockedValue { try $0.client() } + } } // MARK: - KafkaConsumer + StateMachine @@ -668,5 +672,22 @@ extension KafkaConsumer { return nil } } + + func client() throws -> RDKafkaClient { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing(let client, _): + return client + case .consuming(let client, _): + return client + case .consumptionStopped(let client): + return client + case .finishing(let client): + return client + case .finished: + throw KafkaError.client(reason: "Client is stopped") + } + } } } diff --git a/Sources/Kafka/KafkaError.swift b/Sources/Kafka/KafkaError.swift index ddaf119a..bbb7c2b8 100644 --- a/Sources/Kafka/KafkaError.swift +++ b/Sources/Kafka/KafkaError.swift @@ -56,7 +56,7 @@ public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable { } static func rdKafkaError( - wrapping error: rd_kafka_resp_err_t, file: String = #fileID, line: UInt = #line + wrapping error: rd_kafka_resp_err_t, isFatal: Bool = false, file: String = #fileID, line: UInt = #line ) -> KafkaError { let errorMessage = String(cString: rd_kafka_err2str(error)) return KafkaError( @@ -135,6 +135,36 @@ public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable { ) ) } + + static func transactionAborted( + reason: String, file: String = #fileID, line: UInt = #line + ) -> KafkaError { + return KafkaError( + backing: .init( + code: .transactionAborted, reason: reason, file: file, line: line + ) + ) + } + + static func transactionIncomplete( + reason: String, file: String = #fileID, line: UInt = #line + ) -> KafkaError { + return KafkaError( + backing: .init( + code: .transactionIncomplete, reason: reason, file: file, line: line + ) + ) + } + + static func transactionOutOfAttempts( + numOfAttempts: UInt64, file: String = #fileID, line: UInt = #line + ) -> KafkaError { + return KafkaError( + backing: .init( + code: .transactionOutOfAttempts, reason: "Out of \(numOfAttempts) attempts", file: file, line: line + ) + ) + } } extension KafkaError { @@ -153,6 +183,10 @@ extension KafkaError { case messageConsumption case topicCreation case topicDeletion + case transactionAborted + case transactionIncomplete + case notInTransaction // FIXME: maybe add subcode ? + case transactionOutOfAttempts } fileprivate var backingCode: BackingCode @@ -177,6 +211,12 @@ extension KafkaError { public static let topicCreationFailed = ErrorCode(.topicCreation) /// Deleting a topic failed. public static let topicDeletionFailed = ErrorCode(.topicDeletion) + /// Transaction was aborted (can be re-tried from scratch). + public static let transactionAborted = ErrorCode(.transactionAborted) + /// Transaction could not be completed + public static let transactionIncomplete = ErrorCode(.transactionIncomplete) + /// Out of provided number of attempts + public static let transactionOutOfAttempts = ErrorCode(.transactionOutOfAttempts) public var description: String { return String(describing: self.backingCode) @@ -196,16 +236,20 @@ extension KafkaError { let line: UInt + let isFatal: Bool + fileprivate init( code: KafkaError.ErrorCode, reason: String, file: String, - line: UInt + line: UInt, + isFatal: Bool = false ) { self.code = code self.reason = reason self.file = file self.line = line + self.isFatal = isFatal } // Only the error code matters for equality. diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index b3cfcbdc..58f58290 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -41,6 +41,30 @@ extension KafkaProducerCloseOnTerminate: NIOAsyncSequenceProducerDelegate { } } +// MARK: - KafkaProducerSharedSettings + +internal protocol KafkaProducerSharedProperties: Sendable { + /// If the ``isAutoCreateTopicsEnabled`` option is set to `true`, + /// the broker will automatically generate topics when producing data to non-existent topics. + /// The configuration specified in this ``KafkaTopicConfiguration`` will be applied to the newly created topic. + /// Default: See default values of ``KafkaTopicConfiguration`` + var topicConfiguration: KafkaTopicConfiguration { get } + + /// The time between two consecutive polls. + /// Effectively controls the rate at which incoming events are consumed. + /// Default: `.milliseconds(100)` + var pollInterval: Duration { get } + + /// Maximum timeout for flushing outstanding produce requests when the ``KafkaProducer`` is shutting down. + /// Default: `10000` + var flushTimeoutMilliseconds: Int { get } + + var dictionary: [String: String] { get } +} + +extension KafkaProducerConfiguration: KafkaProducerSharedProperties {} +extension KafkaTransactionalProducerConfiguration: KafkaProducerSharedProperties {} + // MARK: - KafkaProducerEvents /// `AsyncSequence` implementation for handling ``KafkaProducerEvent``s emitted by Kafka. @@ -80,7 +104,7 @@ public final class KafkaProducer: Service, Sendable { private let stateMachine: NIOLockedValueBox /// The configuration object of the producer client. - private let configuration: KafkaProducerConfiguration + private let configuration: KafkaProducerSharedProperties /// Topic configuration that is used when a new topic has to be created by the producer. private let topicConfiguration: KafkaTopicConfiguration @@ -93,7 +117,7 @@ public final class KafkaProducer: Service, Sendable { /// - Throws: A ``KafkaError`` if initializing the producer failed. private init( stateMachine: NIOLockedValueBox, - configuration: KafkaProducerConfiguration, + configuration: KafkaProducerSharedProperties, topicConfiguration: KafkaTopicConfiguration ) { self.stateMachine = stateMachine @@ -113,6 +137,13 @@ public final class KafkaProducer: Service, Sendable { public convenience init( configuration: KafkaProducerConfiguration, logger: Logger + ) throws { + try self.init(configuration: configuration as KafkaProducerSharedProperties, logger: logger) + } + + internal convenience init( + configuration: KafkaProducerSharedProperties, + logger: Logger ) throws { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) @@ -153,6 +184,14 @@ public final class KafkaProducer: Service, Sendable { public static func makeProducerWithEvents( configuration: KafkaProducerConfiguration, logger: Logger + ) throws -> (KafkaProducer, KafkaProducerEvents) { + return try self.makeProducerWithEvents(configuration: configuration as (any KafkaProducerSharedProperties), logger: logger) + } + + internal static func makeProducerWithEvents( + configuration: KafkaProducerSharedProperties, + topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(), + logger: Logger ) throws -> (KafkaProducer, KafkaProducerEvents) { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) @@ -263,6 +302,10 @@ public final class KafkaProducer: Service, Sendable { return KafkaProducerMessageID(rawValue: newMessageID) } } + + func client() throws -> RDKafkaClient { + try self.stateMachine.withLockedValue { try $0.client() } + } } // MARK: - KafkaProducer + StateMachine @@ -453,5 +496,20 @@ extension KafkaProducer { break } } + + func client() throws -> RDKafkaClient { + switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .started(let client, _, _, _): + return client + case .consumptionStopped(let client): + return client + case .finishing(let client, _): + return client + case .finished: + throw KafkaError.connectionClosed(reason: "Client stopped") + } + } } } diff --git a/Sources/Kafka/KafkaTransaction.swift b/Sources/Kafka/KafkaTransaction.swift new file mode 100644 index 00000000..890cc8e2 --- /dev/null +++ b/Sources/Kafka/KafkaTransaction.swift @@ -0,0 +1,40 @@ + + +public final class KafkaTransaction { + let client: RDKafkaClient + let producer: KafkaProducer + + init(client: RDKafkaClient, producer: KafkaProducer) throws { + self.client = client + self.producer = producer + + try client.beginTransaction() + } + + deinit {} + + public func send( + offsets: RDKafkaTopicPartitionList, + forConsumer consumer: KafkaConsumer, + timeout: Duration = .kafkaUntilEndOfTransactionTimeout, + attempts: UInt64 = .max + ) async throws { + let consumerClient = try consumer.client() + try await consumerClient.withKafkaHandlePointer { + try await self.client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout) + } + } + + @discardableResult + public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID { + try self.producer.send(message) + } + + func commit() async throws { + try await self.client.commitTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout) + } + + func abort() async throws { + try await self.client.abortTransaction(attempts: .max, timeout: .kafkaUntilEndOfTransactionTimeout) + } +} diff --git a/Sources/Kafka/KafkaTransactionalProducer.swift b/Sources/Kafka/KafkaTransactionalProducer.swift new file mode 100644 index 00000000..4b792962 --- /dev/null +++ b/Sources/Kafka/KafkaTransactionalProducer.swift @@ -0,0 +1,82 @@ +import Logging +import ServiceLifecycle + +public final class KafkaTransactionalProducer: Service, Sendable { + let producer: KafkaProducer + + private init(producer: KafkaProducer, config: KafkaTransactionalProducerConfiguration) async throws { + self.producer = producer + let client = try producer.client() + try await client.initTransactions(timeout: config.transactionsTimeout) + } + + /// Initialize a new ``KafkaTransactionalProducer``. + /// + /// This creates a producer without listening for events. + /// + /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. + /// - Parameter logger: A logger. + /// - Returns: The newly created ``KafkaProducer``. + /// - Throws: A ``KafkaError`` if initializing the producer failed. + public convenience init( + config: KafkaTransactionalProducerConfiguration, + logger: Logger + ) async throws { + let producer = try KafkaProducer(configuration: config, logger: logger) + try await self.init(producer: producer, config: config) + } + + /// Initialize a new ``KafkaTransactionalProducer`` and a ``KafkaProducerEvents`` asynchronous sequence. + /// + /// Use the asynchronous sequence to consume events. + /// + /// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages. + /// Additionally, make sure to consume the asynchronous sequence otherwise the events will be buffered in memory indefinitely. + /// + /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. + /// - Parameter logger: A logger. + /// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaProducerEvents`` + /// `AsyncSequence` used for receiving message events. + /// - Throws: A ``KafkaError`` if initializing the producer failed. + public static func makeTransactionalProducerWithEvents( + config: KafkaTransactionalProducerConfiguration, + logger: Logger + ) async throws -> (KafkaTransactionalProducer, KafkaProducerEvents) { + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: config, + logger: logger + ) + + let transactionalProducer = try await KafkaTransactionalProducer(producer: producer, config: config) + + return (transactionalProducer, events) + } + + // + public func withTransaction(_ body: @Sendable (KafkaTransaction) async throws -> Void) async throws { + let transaction = try KafkaTransaction( + client: try producer.client(), + producer: self.producer + ) + + do { // need to think here a little bit how to abort transaction + try await body(transaction) + try await transaction.commit() + } catch { // FIXME: maybe catch AbortTransaction? + do { + try await transaction.abort() + } catch { + // FIXME: that some inconsistent state + // should we emit fatalError(..) + // or propagate error as exception with isFatal flag? + } + throw error + } + } + + public func run() async throws { + try await self.producer.run() + } +} diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 9d69e08d..df0c3aac 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -36,6 +36,12 @@ final class RDKafkaClient: Sendable { /// `librdkafka`'s `rd_kafka_queue_t` that events are received on. private let queue: OpaquePointer + /// Queue for blocking calls outside of cooperative thread pool + private var gcdQueue: DispatchQueue { + // global concurrent queue + .global(qos: .default) // FIXME: maybe DispatchQueue(label: "com.swift.kafka.queue") + } + // Use factory method to initialize private init( type: ClientType, @@ -509,7 +515,7 @@ final class RDKafkaClient: Sendable { changesList.setOffset( topic: message.topic, partition: message.partition, - offset: Int64(message.offset.rawValue + 1) + offset: .init(rawValue: message.offset.rawValue + 1) ) let error = changesList.withListPointer { listPointer in @@ -546,7 +552,7 @@ final class RDKafkaClient: Sendable { changesList.setOffset( topic: message.topic, partition: message.partition, - offset: Int64(message.offset.rawValue + 1) + offset: .init(rawValue: message.offset.rawValue + 1) ) // Unretained pass because the reference that librdkafka holds to capturedClosure @@ -610,4 +616,150 @@ final class RDKafkaClient: Sendable { func withKafkaHandlePointer(_ body: (OpaquePointer) throws -> T) rethrows -> T { return try body(self.kafkaHandle) } + + /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle with async closure. + /// - Warning: Do not escape the pointer from the closure for later use. + /// - Parameter body: The closure will use the Kafka handle pointer. + @discardableResult + func withKafkaHandlePointer(_ body: (OpaquePointer) async throws -> T) async rethrows -> T { + return try await body(self.kafkaHandle) + } + + func initTransactions(timeout: Duration) async throws { + rd_kafka_conf_set_dr_msg_cb(self.kafkaHandle) { _, _, _ in + print("test") + } + + let result = await performBlockingCall(queue: gcdQueue) { + rd_kafka_init_transactions(self.kafkaHandle, timeout.totalMilliseconds) + } + + if result != nil { + let code = rd_kafka_error_code(result) + rd_kafka_error_destroy(result) + throw KafkaError.rdKafkaError(wrapping: code) + } + } + + func beginTransaction() throws { + let result = rd_kafka_begin_transaction(kafkaHandle) + if result != nil { + let code = rd_kafka_error_code(result) + rd_kafka_error_destroy(result) + throw KafkaError.rdKafkaError(wrapping: code) + } + } + + func send( + attempts: UInt64, + offsets: RDKafkaTopicPartitionList, + forConsumerKafkaHandle consumer: OpaquePointer, + timeout: Duration + ) async throws { + try await offsets.withListPointer { topicPartitionList in + + let consumerMetadata = rd_kafka_consumer_group_metadata(consumer) + defer { rd_kafka_consumer_group_metadata_destroy(consumerMetadata) } + + // TODO: actually it should be withing some timeout (like transaction timeout or session timeout) + for idx in 0...allocate(capacity: size) + defer { configValue.deallocate() } + + if RD_KAFKA_CONF_OK == rd_kafka_topic_conf_get(configPointer, key, configValue, &size) { + let sizeNoNullTerm = size - 1 + let wasVal = String(unsafeUninitializedCapacity: sizeNoNullTerm) { + let buf = UnsafeRawBufferPointer( + UnsafeMutableRawBufferPointer( + start: configValue, + count: sizeNoNullTerm + )) + _ = $0.initialize(from: buf) + return sizeNoNullTerm + } + if wasVal == value { + return // Values are equal, avoid changing (not mark config as modified) + } + } + let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) defer { errorChars.deallocate() } diff --git a/Sources/Kafka/RDKafka/RDKafkaTopicPartitionList.swift b/Sources/Kafka/RDKafka/RDKafkaTopicPartitionList.swift index fc663599..94a240c6 100644 --- a/Sources/Kafka/RDKafka/RDKafkaTopicPartitionList.swift +++ b/Sources/Kafka/RDKafka/RDKafkaTopicPartitionList.swift @@ -15,7 +15,7 @@ import Crdkafka /// Swift wrapper type for `rd_kafka_topic_partition_list_t`. -final class RDKafkaTopicPartitionList { +public final class RDKafkaTopicPartitionList { private let _internal: UnsafeMutablePointer /// Create a new topic+partition list. @@ -44,7 +44,7 @@ final class RDKafkaTopicPartitionList { } /// Manually set read offset for a given topic+partition pair. - func setOffset(topic: String, partition: KafkaPartition, offset: Int64) { + func setOffset(topic: String, partition: KafkaPartition, offset: KafkaOffset) { precondition( 0...Int(Int32.max) ~= partition.rawValue || partition == .unassigned, "Partition ID outside of valid range \(0...Int32.max)" @@ -57,7 +57,7 @@ final class RDKafkaTopicPartitionList { ) else { fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") } - partitionPointer.pointee.offset = offset + partitionPointer.pointee.offset = Int64(offset.rawValue) } /// Scoped accessor that enables safe access to the pointer of the underlying `rd_kafka_topic_partition_t`. @@ -67,4 +67,12 @@ final class RDKafkaTopicPartitionList { func withListPointer(_ body: (UnsafeMutablePointer) throws -> T) rethrows -> T { return try body(self._internal) } + + /// Scoped accessor that enables safe access to the pointer of the underlying `rd_kafka_topic_partition_t`. + /// - Warning: Do not escape the pointer from the closure for later use. + /// - Parameter body: The closure will use the pointer. + @discardableResult + func withListPointer(_ body: (UnsafeMutablePointer) async throws -> T) async rethrows -> T { + return try await body(self._internal) + } } diff --git a/Sources/Kafka/Utilities/BlockingCall.swift b/Sources/Kafka/Utilities/BlockingCall.swift new file mode 100644 index 00000000..da0cd69b --- /dev/null +++ b/Sources/Kafka/Utilities/BlockingCall.swift @@ -0,0 +1,10 @@ +import Dispatch + +// performs blocking calls outside of cooperative thread pool +internal func performBlockingCall(queue: DispatchQueue, body: @escaping () -> T) async -> T { + await withCheckedContinuation { continuation in + queue.async { + continuation.resume(returning: body()) + } + } +} diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index 5bc233bd..0198716c 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -39,6 +39,7 @@ final class KafkaTests: XCTestCase { var bootstrapBrokerAddress: KafkaConfiguration.BrokerAddress! var producerConfig: KafkaProducerConfiguration! var uniqueTestTopic: String! + var uniqueTestTopic2: String! override func setUpWithError() throws { self.bootstrapBrokerAddress = KafkaConfiguration.BrokerAddress( @@ -63,6 +64,7 @@ final class KafkaTests: XCTestCase { logger: .kafkaTest ) self.uniqueTestTopic = try client._createUniqueTopic(timeout: 10 * 1000) + self.uniqueTestTopic2 = try client._createUniqueTopic(timeout: 10 * 1000) } override func tearDownWithError() throws { @@ -80,6 +82,7 @@ final class KafkaTests: XCTestCase { logger: .kafkaTest ) try client._deleteTopic(self.uniqueTestTopic, timeout: 10 * 1000) + try client._deleteTopic(self.uniqueTestTopic2, timeout: 10 * 1000) self.bootstrapBrokerAddress = nil self.producerConfig = nil @@ -578,7 +581,8 @@ final class KafkaTests: XCTestCase { receivedDeliveryReports.insert(deliveryReport) } default: - break // Ignore any other events + continue +// break // Ignore any other events } if receivedDeliveryReports.count >= messages.count { @@ -602,4 +606,111 @@ final class KafkaTests: XCTestCase { XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) })) } } + + func testProduceAndConsumeWithTransaction() async throws { + let testMessages = Self.createTestMessages(topic: uniqueTestTopic, count: 10) + + let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + + let transactionConfigProducer = KafkaTransactionalProducerConfiguration( + transactionalId: "1234", + bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]) + + let transactionalProducer = try await KafkaTransactionalProducer(config: transactionConfigProducer, logger: .kafkaTest) + + let makeConsumerConfig = { (topic: String) -> KafkaConsumerConfiguration in + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: "subscription-test-group-id", topics: [topic]), + bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] + ) + consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning + consumerConfig.broker.addressFamily = .v4 + consumerConfig.isAutoCommitEnabled = false + return consumerConfig + } + + let consumer = try KafkaConsumer( + configuration: makeConsumerConfig(uniqueTestTopic), + logger: .kafkaTest + ) + + let consumerAfterTransaction = try KafkaConsumer( + configuration: makeConsumerConfig(uniqueTestTopic2), + logger: .kafkaTest + ) + + let serviceGroup = ServiceGroup( + services: [ + producer, + consumer, + transactionalProducer, + consumerAfterTransaction, + ], + configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []), + logger: .kafkaTest + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // Producer Task + group.addTask { + try await Self.sendAndAcknowledgeMessages( + producer: producer, + events: events, + messages: testMessages + ) + } + + // Consumer Task + group.addTask { + var count = 0 + for try await messageResult in consumer.messages { + guard case let message = messageResult else { + continue + } + count += 1 + try await transactionalProducer.withTransaction { transaction in + let newMessage = KafkaProducerMessage( + topic: self.uniqueTestTopic2, + value: message.value.description + "_updated" + ) + try transaction.send(newMessage) + let partitionlist = RDKafkaTopicPartitionList() + partitionlist.setOffset(topic: self.uniqueTestTopic, partition: message.partition, offset: message.offset) + try await transaction.send(offsets: partitionlist, forConsumer: consumer) + } + + if count >= testMessages.count { + break + } + } + print("Changed all messages \(count)") + } + + group.addTask { + var count = 0 + for try await messageAfterTransaction in consumerAfterTransaction.messages { + let value = messageAfterTransaction.value.getString(at: 0, length: messageAfterTransaction.value.readableBytes) + XCTAssert(value?.contains("_updated") ?? false) + count += 1 + if count >= testMessages.count || Task.isCancelled { + break + } + } + XCTAssertEqual(count, testMessages.count) + } + + // Wait for Producer Task and Consumer Task to complete + try await group.next() + try await group.next() + try await group.next() + + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } } diff --git a/Tests/IntegrationTests/Utilities.swift b/Tests/IntegrationTests/Utilities.swift index db86c0a0..8b914cae 100644 --- a/Tests/IntegrationTests/Utilities.swift +++ b/Tests/IntegrationTests/Utilities.swift @@ -20,7 +20,7 @@ import Logging extension Logger { static var kafkaTest: Logger { var logger = Logger(label: "kafka.test") - logger.logLevel = .info + logger.logLevel = .debug return logger } }