Skip to content

Commit

Permalink
Merge pull request #846 from zcox/record-stream-pac
Browse files Browse the repository at this point in the history
KAYAK-3391 RecordStream.processingAndCommitting
  • Loading branch information
zcox authored Jan 18, 2024
2 parents 0b8e7eb + 6758e43 commit 97c4f1f
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 32 deletions.
53 changes: 50 additions & 3 deletions core/src/main/scala/com/banno/kafka/RecordStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ sealed trait RecordStream[F[_], A] {
* reprocessing after a failure.
*/
def readProcessCommit[B](process: A => F[B]): Stream[F, B]

/** Returns a stream that processes records using the specified function,
* committing offsets for successfully processed records, either after
* processing the specified number of records, or after the specified time
* has elapsed since the last offset commit. On any stream finalization,
* whether success, error, or cancelation, offsets will be committed. This is
* at-least-once processing: after a restart, the record that failed will be
* reprocessed. In some use cases this pattern is more appropriate than just
* using auto-offset-commits, since it will not commit offsets for failed
* records when the consumer is closed.
*/
def processingAndCommitting[B](
maxRecordCount: Long = 1000L,
maxElapsedTime: FiniteDuration = 60.seconds,
)(
process: A => F[B]
): Stream[F, B]
}

sealed trait HistoryAndUnbounded[F[_], P[_[_], _], A] {
Expand Down Expand Up @@ -149,7 +166,7 @@ object RecordStream {
private sealed trait WhetherCommits[P[_[_], _]] {
def extrude[F[_], A](x: RecordStream[F, A]): P[F, A]

def chunk[F[_]: Applicative, A](
def chunk[F[_]: Clock: Concurrent, A](
topical: Topical[A, ?],
stream: P[F, IncomingRecords[A]],
): P[F, A]
Expand All @@ -176,7 +193,7 @@ object RecordStream {
override def extrude[F[_], A](x: RecordStream[F, A]): Stream[F, A] =
x.records

override def chunk[F[_]: Applicative, A](
override def chunk[F[_]: Clock: Concurrent, A](
topical: Topical[A, ?],
stream: Stream[F, IncomingRecords[A]],
): Stream[F, A] =
Expand All @@ -196,7 +213,7 @@ object RecordStream {
override def extrude[F[_], A](x: RecordStream[F, A]): RecordStream[F, A] =
x

override def chunk[F[_]: Applicative, A](
override def chunk[F[_]: Clock: Concurrent, A](
topical: Topical[A, ?],
stream: RecordStream[F, IncomingRecords[A]],
): RecordStream[F, A] =
Expand All @@ -205,6 +222,21 @@ object RecordStream {
) {
override protected def nextOffsets(x: A) = topical.nextOffset(x)
override def records: Stream[F, A] = stream.records.flatMap(chunked)
override def processingAndCommitting[B](
maxRecordCount: Long,
maxElapsedTime: FiniteDuration,
)(
process: A => F[B]
): Stream[F, B] =
consumer.processingAndCommitting[A, B](
maxRecordCount,
maxElapsedTime,
)(
records,
process,
a => nextOffsets(a).view.mapValues(_.offset - 1).toMap,
_ => 1,
)
}

override def historyAndUnbounded[F[_]: Async, A](
Expand Down Expand Up @@ -521,6 +553,21 @@ object RecordStream {
.recordsStream(pollTimeout)
.prefetch
.evalMap(parseBatch(topical))
override def processingAndCommitting[C](
maxRecordCount: Long,
maxElapsedTime: FiniteDuration,
)(
process: IncomingRecords[A] => F[C]
): Stream[F, C] =
consumer.processingAndCommitting[IncomingRecords[A], C](
maxRecordCount,
maxElapsedTime,
)(
records,
process,
_.nextOffsets.view.mapValues(_.offset - 1).toMap,
_.toList.size.toLong,
)
}

private[RecordStream] case class SubscriberImpl[F[_]: Async](
Expand Down
75 changes: 59 additions & 16 deletions core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,10 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
.recordStream(pollTimeout)
.evalMap(r => process(r) <* consumer.commitSync(r.nextOffset))

/** Returns a stream that processes records using the specified function,
* committing offsets for successfully processed records, either after
* processing the specified number of records, or after the specified time
* has elapsed since the last offset commit. On any stream finalization,
/** Returns a stream that processes records one-at-a-time using the specified
* function, committing offsets for successfully processed records, either
* after processing the specified number of records, or after the specified
* time has elapsed since the last offset commit. On any stream finalization,
* whether success, error, or cancelation, offsets will be committed. This is
* at-least-once processing: after a restart, the record that failed will be
* reprocessed. In some use cases this pattern is more appropriate than just
Expand All @@ -371,6 +371,53 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
)(
process: ConsumerRecord[K, V] => F[A]
)(implicit C: Clock[F], S: Concurrent[F]): Stream[F, A] =
processingAndCommitting[ConsumerRecord[K, V], A](
maxRecordCount,
maxElapsedTime,
)(
consumer.recordStream(pollTimeout),
process,
r => Map(new TopicPartition(r.topic, r.partition) -> r.offset),
_ => 1,
)

/** Returns a stream that processes batches of records using the specified
* function, committing offsets for successfully processed batches, either
* after processing the specified number of records, or after the specified
* time has elapsed since the last offset commit. On any stream finalization,
* whether success, error, or cancelation, offsets will be committed. This is
* at-least-once processing: after a restart, the batch that failed will be
* reprocessed. In some use cases this pattern is more appropriate than just
* using auto-offset-commits, since it will not commit offsets for failed
* records when the consumer is closed. The consumer must be configured to
* disable offset auto-commits, i.e. `enable.auto.commit=false`.
*/
def processingAndCommittingBatched[A](
pollTimeout: FiniteDuration,
maxRecordCount: Long = 1000L,
maxElapsedTime: FiniteDuration = 60.seconds,
)(
process: ConsumerRecords[K, V] => F[A]
)(implicit C: Clock[F], S: Concurrent[F]): Stream[F, A] =
processingAndCommitting[ConsumerRecords[K, V], A](
maxRecordCount,
maxElapsedTime,
)(
consumer.recordsStream(pollTimeout),
process,
_.lastOffsets,
_.count().toLong,
)

def processingAndCommitting[A, B](
maxRecordCount: Long,
maxElapsedTime: FiniteDuration,
)(
stream: Stream[F, A],
process: A => F[B],
offsets: A => Map[TopicPartition, Long],
count: A => Long,
)(implicit C: Clock[F], S: Concurrent[F]): Stream[F, B] =
Stream
.eval(
C.monotonic
Expand All @@ -383,20 +430,19 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
consumer.commitSync(offsets)
val commitNextOffsets: F[Unit] =
state.get.map(_.nextOffsets).flatMap(commit)
consumer
.recordStream(pollTimeout)
.evalMap { record =>
stream
.evalMap { a =>
for {
a <- process(record)
s <- state.updateAndGet(_.update(record))
b <- process(a)
s <- state.updateAndGet(_.update(offsets(a), count(a)))
now <- C.monotonic
() <- s
.needToCommit(maxRecordCount, now, maxElapsedTime)
.traverse_(os =>
commit(os) *>
state.update(_.reset(now))
)
} yield a
} yield b
}
.onFinalize(commitNextOffsets)
}
Expand All @@ -406,13 +452,10 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
recordCount: Long,
lastCommitTime: FiniteDuration,
) {
def update(record: ConsumerRecord[_, _]): OffsetCommitState =
def update(os: Map[TopicPartition, Long], count: Long): OffsetCommitState =
copy(
offsets = offsets + (new TopicPartition(
record.topic,
record.partition,
) -> record.offset),
recordCount = recordCount + 1,
offsets = offsets ++ os,
recordCount = recordCount + count,
)

def needToCommit(
Expand Down
111 changes: 98 additions & 13 deletions core/src/test/scala/com/banno/kafka/ProcessingAndCommittingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,19 @@ class ProcessingAndCommittingSpec extends CatsEffectSuite with KafkaSpec {
BootstrapServers(bootstrapServer)
)

def consumerResource =
ConsumerApi
.resource[IO, Int, Int](
BootstrapServers(bootstrapServer),
GroupId(genGroupId),
AutoOffsetReset.earliest,
EnableAutoCommit(false),
)
def consumerResource(configs: (String, AnyRef)*) = {
val configs2 = List[(String, AnyRef)](
BootstrapServers(bootstrapServer),
GroupId(genGroupId),
AutoOffsetReset.earliest,
EnableAutoCommit(false),
) ++ configs.toList
ConsumerApi.resource[IO, Int, Int](configs2: _*)
}

test("processingAndCommitting commits after number of records") {
producerResource.use { producer =>
consumerResource.use { consumer =>
consumerResource().use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
Expand Down Expand Up @@ -108,10 +109,55 @@ class ProcessingAndCommittingSpec extends CatsEffectSuite with KafkaSpec {
}
}

test("processingAndCommittingBatched commits after number of records") {
producerResource.use { producer =>
consumerResource("max.poll.records" -> "2").use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
ps = Set(p)
values = (0 to 9).toList
_ <- producer.sendAsyncBatch(
values.map(v => new ProducerRecord(topic, v, v))
)
() <- consumer.subscribe(topic)
c0 <- consumer.partitionQueries.committed(ps)
pac = consumer.processingAndCommittingBatched(
pollTimeout = 100.millis,
maxRecordCount = 2,
maxElapsedTime = Long.MaxValue.nanos,
)(_.recordList(topic).map(_.value).pure[IO])
committed = Stream.repeatEval(
consumer.partitionQueries.committed(ps)
)
results <- pac
.take(values.size.toLong / 2)
.interleave(committed)
.compile
.toList
} yield {
assertEquals(c0, empty)
assertEquals(results.size, values.size)
// TODO rewrite this to use values, not so hard-coded
assertEquals(results(0), List(0, 1))
assertEquals(results(1), offsets(p, 2))
assertEquals(results(2), List(2, 3))
assertEquals(results(3), offsets(p, 4))
assertEquals(results(4), List(4, 5))
assertEquals(results(5), offsets(p, 6))
assertEquals(results(6), List(6, 7))
assertEquals(results(7), offsets(p, 8))
assertEquals(results(8), List(8, 9))
assertEquals(results(9), offsets(p, 10))
}
}
}
}

// this has a real danger of becoming a "flaky test" due to its timing assumptions
test("processingAndCommitting commits after elapsed time") {
producerResource.use { producer =>
consumerResource.use { consumer =>
consumerResource().use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
Expand Down Expand Up @@ -169,7 +215,7 @@ class ProcessingAndCommittingSpec extends CatsEffectSuite with KafkaSpec {

test("on failure, commits successful offsets, but not the failed offset") {
producerResource.use { producer =>
consumerResource.use { consumer =>
consumerResource().use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
Expand Down Expand Up @@ -204,9 +250,48 @@ class ProcessingAndCommittingSpec extends CatsEffectSuite with KafkaSpec {
}
}

test(
"on failure, batched commits successful offsets, but not the failed batch offsets"
) {
producerResource.use { producer =>
consumerResource("max.poll.records" -> "2").use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
ps = Set(p)
values = (0 to 9).toList
throwOn = 7
_ <- producer.sendAsyncBatch(
values.map(v => new ProducerRecord(topic, v, v))
)
() <- consumer.subscribe(topic)
c0 <- consumer.partitionQueries.committed(ps)
pac = consumer.processingAndCommittingBatched(
pollTimeout = 100.millis,
maxRecordCount = Long.MaxValue,
maxElapsedTime = Long.MaxValue.nanos,
) { rs =>
val vs = rs.recordList(topic).map(_.value)
if (vs contains throwOn)
IO.raiseError(CommitOnFailureException())
else
vs.pure[IO]
}
results <- pac.compile.toList.attempt
c1 <- consumer.partitionQueries.committed(ps)
} yield {
assertEquals(c0, empty)
assertEquals(results, Left(CommitOnFailureException()))
// on failure, the committed offset should be the beginning of the batch that failed, so processing will resume there next time and try again
assertEquals(c1, offsets(p, 6L))
}
}
}
}

test("commits offsets on successful stream finalization") {
producerResource.use { producer =>
consumerResource.use { consumer =>
consumerResource().use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
Expand Down Expand Up @@ -235,7 +320,7 @@ class ProcessingAndCommittingSpec extends CatsEffectSuite with KafkaSpec {

test("commits offsets on stream cancel") {
producerResource.use { producer =>
consumerResource.use { consumer =>
consumerResource().use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
Expand Down

0 comments on commit 97c4f1f

Please sign in to comment.