diff --git a/build.sbt b/build.sbt index 4ff7526d7..75916f935 100644 --- a/build.sbt +++ b/build.sbt @@ -65,7 +65,17 @@ lazy val core = project mimaBinaryIssueFilters ++= { import com.typesafe.tools.mima.core._ import com.typesafe.tools.mima.core.ProblemFilters._ - Seq() + Seq( + ProblemFilters.exclude[DirectMissingMethodProblem]( + "com.banno.kafka.producer.ProducerApi.mapK" + ), + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "com.banno.kafka.producer.ProducerApi.send" + ), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "com.banno.kafka.producer.ProducerImpl.mapK" + ), + ) }, ) .settings( diff --git a/core/src/main/scala/com/banno/kafka/producer/ProducerApi.scala b/core/src/main/scala/com/banno/kafka/producer/ProducerApi.scala index ee1450f95..59394c333 100644 --- a/core/src/main/scala/com/banno/kafka/producer/ProducerApi.scala +++ b/core/src/main/scala/com/banno/kafka/producer/ProducerApi.scala @@ -50,6 +50,8 @@ trait ProducerApi[F[_], K, V] { def sendSync(record: ProducerRecord[K, V]): F[RecordMetadata] def sendAsync(record: ProducerRecord[K, V]): F[RecordMetadata] + def send(record: ProducerRecord[K, V]): F[F[RecordMetadata]] + // Cats doesn't have `Bicontravariant` final def contrabimap[A, B](f: A => K, g: B => V): ProducerApi[F, A, B] = { val self = this @@ -84,6 +86,8 @@ trait ProducerApi[F[_], K, V] { self.sendSync(record.bimap(f, g)) override def sendAsync(record: ProducerRecord[A, B]): F[RecordMetadata] = self.sendAsync(record.bimap(f, g)) + override def send(record: ProducerRecord[A, B]): F[F[RecordMetadata]] = + self.send(record.bimap(f, g)) } } @@ -123,10 +127,12 @@ trait ProducerApi[F[_], K, V] { record.bitraverse(f, g) >>= self.sendSync override def sendAsync(record: ProducerRecord[A, B]): F[RecordMetadata] = record.bitraverse(f, g) >>= self.sendAsync + override def send(record: ProducerRecord[A, B]): F[F[RecordMetadata]] = + record.bitraverse(f, g) >>= self.send } } - final def mapK[G[_]](f: F ~> G): ProducerApi[G, K, V] = { + final def mapK[G[_]: Functor](f: F ~> G): ProducerApi[G, K, V] = { val self = this new ProducerApi[G, K, V] { override def abortTransaction: G[Unit] = f(self.abortTransaction) @@ -153,6 +159,8 @@ trait ProducerApi[F[_], K, V] { f(self.sendSync(record)) override def sendAsync(record: ProducerRecord[K, V]): G[RecordMetadata] = f(self.sendAsync(record)) + override def send(record: ProducerRecord[K, V]): G[G[RecordMetadata]] = + f(self.send(record)).map(f(_)) } } } diff --git a/core/src/main/scala/com/banno/kafka/producer/ProducerImpl.scala b/core/src/main/scala/com/banno/kafka/producer/ProducerImpl.scala index 56a61de1b..880f228da 100644 --- a/core/src/main/scala/com/banno/kafka/producer/ProducerImpl.scala +++ b/core/src/main/scala/com/banno/kafka/producer/ProducerImpl.scala @@ -24,6 +24,8 @@ import scala.concurrent.duration._ import org.apache.kafka.common._ import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer._ +import scala.concurrent.Promise +import scala.util.Try case class ProducerImpl[F[_], K, V](p: Producer[K, V])(implicit F: Async[F]) extends ProducerApi[F, K, V] { @@ -70,6 +72,21 @@ case class ProducerImpl[F[_], K, V](p: Producer[K, V])(implicit F: Async[F]) Some(F.delay(jFuture.cancel(false)).void) } + /** The outer effect sends the record on a blocking context and is cancelable. + * The inner effect cancels the underlying send. + */ + private def sendRaw2( + record: ProducerRecord[K, V], + callback: Try[RecordMetadata] => Unit, + ): F[F[Unit]] = + // KafkaProducer.send should be interruptible via InterruptedException, so use F.interruptible instead of F.blocking or F.delay + F.interruptible( + sendRaw( + record, + { (rm, e) => callback(Option(e).toLeft(rm).toTry) }, + ) + ).map(jf => F.delay(jf.cancel(true)).void) + /** The returned F[_] completes as soon as the underlying * Producer.send(record) call returns. This is immediately after the producer * enqueues the record, not after Kafka accepts the write. If the producer's @@ -95,17 +112,44 @@ case class ProducerImpl[F[_], K, V](p: Producer[K, V])(implicit F: Async[F]) * future.get() call throws an exception. You should use this method if your * program should not proceed until Kafka accepts the write, or you need to * use the RecordMetadata, or you need to explicitly handle any possible - * error. + * error. Note that traversing many records with this operation prevents the + * underlying producer from batching multiple records. */ def sendSync(record: ProducerRecord[K, V]): F[RecordMetadata] = F.delay(sendRaw(record)).map(_.get()) /** Similar to sendSync, except the returned F[_] is completed asynchronously, - * usually on the producer's I/O thread. TODO does this have different - * blocking semantics than sendSync? + * usually on the producer's I/O thread. Note that traversing many records + * with this operation prevents the underlying producer from batching + * multiple records. */ def sendAsync(record: ProducerRecord[K, V]): F[RecordMetadata] = F.async(sendRaw(record, _)) + + /** The outer effect completes synchronously when the underlying Producer.send + * call returns. This is immediately after the producer enqueues the record, + * not after Kafka accepts the write. If the producer's internal queue is + * full, it will block until the record can be enqueued (i.e. backpressure). + * The outer effect is executed on a blocking context and is cancelable. The + * outer effect will only contain an error if the Producer.send call throws + * an exception. The inner effect completes asynchronously after Kafka + * acknowledges the write, and the RecordMetadata is available. The inner + * effect will only contain an error if the write failed. The inner effect is + * also cancelable. With this operation, user code can react to both the + * producer's initial buffering of the record to be sent, and the final + * result of the write (either success or failure). + */ + def send(record: ProducerRecord[K, V]): F[F[RecordMetadata]] = + // inspired by https://github.com/fd4s/fs2-kafka/blob/series/3.x/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala + F.delay(Promise[RecordMetadata]()) + .flatMap { promise => + sendRaw2(record, promise.complete) + .map(cancel => + F.fromFutureCancelable( + F.delay((promise.future, cancel)) + ) + ) + } } object ProducerImpl { diff --git a/core/src/test/scala/com/banno/kafka/ProducerSendSpec.scala b/core/src/test/scala/com/banno/kafka/ProducerSendSpec.scala new file mode 100644 index 000000000..3f50bb0ef --- /dev/null +++ b/core/src/test/scala/com/banno/kafka/ProducerSendSpec.scala @@ -0,0 +1,227 @@ +/* + * Copyright 2019 Jack Henry & Associates, Inc.® + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.banno.kafka + +import cats.syntax.all.* +import cats.effect.{Sync, IO} +import munit.CatsEffectSuite +import org.scalacheck.Gen +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.producer.* +import com.banno.kafka.admin.AdminApi +import com.banno.kafka.producer.* +import com.banno.kafka.consumer.* +import java.util.concurrent.{ + Future => JFuture, + TimeUnit, + Executors, + CompletableFuture, +} +import scala.concurrent.duration.* + +class ProducerSendSpec extends CatsEffectSuite { + + val bootstrapServer = "localhost:9092" + val schemaRegistryUrl = "http://localhost:8091" + + def randomId: String = + Gen.listOfN(10, Gen.alphaChar).map(_.mkString).sample.get + def genGroupId: String = randomId + def genTopic: String = randomId + + def createTestTopic[F[_]: Sync](partitionCount: Int = 1): F[String] = { + val topicName = genTopic + AdminApi + .createTopicsIdempotent[F]( + bootstrapServer, + List(new NewTopic(topicName, partitionCount, 1.toShort)), + ) + .as(topicName) + } + + test("send one record") { + ProducerApi + .resource[IO, String, String]( + BootstrapServers(bootstrapServer) + ) + .use { producer => + for { + topic <- createTestTopic[IO]() + ack <- producer.send(new ProducerRecord(topic, "a", "a")) + rm <- ack + } yield { + assertEquals(rm.topic, topic) + assertEquals(rm.offset, 0L) + } + } + } + + test("send many records") { + ProducerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer) + ) + .use { producer => + ConsumerApi + .resource[IO, Int, Int]( + BootstrapServers(bootstrapServer), + GroupId(genGroupId), + AutoOffsetReset.earliest, + ) + .use { consumer => + for { + topic <- createTestTopic[IO]() + values = (0 to 9).toList + sends = values + .map(v => producer.send(new ProducerRecord(topic, v, v))) + acks <- sends.sequence + rms <- acks.sequence + () <- consumer.subscribe(topic) + records <- consumer + .recordStream(100.millis) + .take(values.size.toLong) + .compile + .toList + } yield { + assertEquals(rms.size, values.size) + for ((rm, i) <- rms.zipWithIndex) { + assertEquals(rm.topic, topic) + assertEquals(rm.offset, i.toLong) + } + assertEquals(values, records.map(_.value)) + } + } + } + } + + test("outer effect fails on send throw") { + val producer = + ProducerImpl[IO, String, String](ThrowOnSendProducer[String, String]()) + for { + topic <- createTestTopic[IO]() + result <- producer.send(new ProducerRecord(topic, "a", "a")).attempt + } yield { + assertEquals(result, Left(SendThrowTestException())) + } + } + + test("inner effect fails on callback with exception") { + val producer = + ProducerImpl[IO, String, String](FailedCallbackProducer[String, String]()) + for { + topic <- createTestTopic[IO]() + ack <- producer.send(new ProducerRecord(topic, "a", "a")) + result <- ack.attempt + } yield { + assertEquals(result, Left(CallbackFailureTestException())) + } + } + +} + +case class SendThrowTestException() extends RuntimeException("Send throw test") + +case class ThrowOnSendProducer[K, V]() extends Producer[K, V] { + def send(r: ProducerRecord[K, V], cb: Callback): JFuture[RecordMetadata] = + throw SendThrowTestException() + + def abortTransaction(): Unit = ??? + def beginTransaction(): Unit = ??? + def close(x$1: java.time.Duration): Unit = ??? + def close(): Unit = ??? + def commitTransaction(): Unit = ??? + def flush(): Unit = ??? + def initTransactions(): Unit = ??? + def metrics(): java.util.Map[ + org.apache.kafka.common.MetricName, + _ <: org.apache.kafka.common.Metric, + ] = ??? + def partitionsFor( + x$1: String + ): java.util.List[org.apache.kafka.common.PartitionInfo] = ??? + def send( + x$1: org.apache.kafka.clients.producer.ProducerRecord[K, V] + ): java.util.concurrent.Future[ + org.apache.kafka.clients.producer.RecordMetadata + ] = ??? + def sendOffsetsToTransaction( + x$1: java.util.Map[ + org.apache.kafka.common.TopicPartition, + org.apache.kafka.clients.consumer.OffsetAndMetadata, + ], + x$2: org.apache.kafka.clients.consumer.ConsumerGroupMetadata, + ): Unit = ??? + def sendOffsetsToTransaction( + x$1: java.util.Map[ + org.apache.kafka.common.TopicPartition, + org.apache.kafka.clients.consumer.OffsetAndMetadata, + ], + x$2: String, + ): Unit = ??? +} + +case class CallbackFailureTestException() + extends RuntimeException("Callback throw test") + +case class FailedCallbackProducer[K, V]() extends Producer[K, V] { + val scheduler = Executors.newSingleThreadScheduledExecutor() + def send(r: ProducerRecord[K, V], cb: Callback): JFuture[RecordMetadata] = { + scheduler.schedule( + new Runnable() { + override def run(): Unit = + cb.onCompletion(null, CallbackFailureTestException()) + }, + 100L, + TimeUnit.MILLISECONDS, + ) + new CompletableFuture() + } + + def abortTransaction(): Unit = ??? + def beginTransaction(): Unit = ??? + def close(x$1: java.time.Duration): Unit = ??? + def close(): Unit = ??? + def commitTransaction(): Unit = ??? + def flush(): Unit = ??? + def initTransactions(): Unit = ??? + def metrics(): java.util.Map[ + org.apache.kafka.common.MetricName, + _ <: org.apache.kafka.common.Metric, + ] = ??? + def partitionsFor( + x$1: String + ): java.util.List[org.apache.kafka.common.PartitionInfo] = ??? + def send( + x$1: org.apache.kafka.clients.producer.ProducerRecord[K, V] + ): java.util.concurrent.Future[ + org.apache.kafka.clients.producer.RecordMetadata + ] = ??? + def sendOffsetsToTransaction( + x$1: java.util.Map[ + org.apache.kafka.common.TopicPartition, + org.apache.kafka.clients.consumer.OffsetAndMetadata, + ], + x$2: org.apache.kafka.clients.consumer.ConsumerGroupMetadata, + ): Unit = ??? + def sendOffsetsToTransaction( + x$1: java.util.Map[ + org.apache.kafka.common.TopicPartition, + org.apache.kafka.clients.consumer.OffsetAndMetadata, + ], + x$2: String, + ): Unit = ??? +}