From fd1490cfdf24a0c51e8d9fb193df1463edfd49fe Mon Sep 17 00:00:00 2001 From: Zach Cox Date: Fri, 3 Sep 2021 15:01:12 -0500 Subject: [PATCH] util for relative seek --- .../main/scala/com/banno/kafka/consumer/ConsumerOps.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala b/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala index eb4a4b624..f1275a80f 100644 --- a/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala +++ b/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala @@ -21,7 +21,7 @@ import cats.implicits._ import fs2._ import java.util.ConcurrentModificationException -import cats.effect.Sync +import cats.effect.{Sync, Clock} import cats.effect.concurrent.Ref import scala.jdk.CollectionConverters._ @@ -53,6 +53,10 @@ object SeekTo { def committed(default: SeekTo): SeekTo = Committed(default) def offsets(offsets: Map[TopicPartition, Long], default: SeekTo): SeekTo = Offsets(offsets, default) + def timestampBeforeNow[F[_]: Clock: Functor](duration: FiniteDuration, default: SeekTo): F[SeekTo] = + Clock[F].realTime(MILLISECONDS).map(now => + timestamp(now - duration.toMillis, default) + ) def seek[F[_]: Monad]( consumer: ConsumerApi[F, _, _],