Skip to content

Commit

Permalink
Merge pull request #492 from zcox/timestamp-before-now
Browse files Browse the repository at this point in the history
util for relative seek
  • Loading branch information
zcox authored Sep 3, 2021
2 parents b6b1613 + fd1490c commit b6e5e93
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import cats.implicits._
import fs2._
import java.util.ConcurrentModificationException

import cats.effect.Sync
import cats.effect.{Sync, Clock}
import cats.effect.concurrent.Ref

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -53,6 +53,10 @@ object SeekTo {
def committed(default: SeekTo): SeekTo = Committed(default)
def offsets(offsets: Map[TopicPartition, Long], default: SeekTo): SeekTo =
Offsets(offsets, default)
def timestampBeforeNow[F[_]: Clock: Functor](duration: FiniteDuration, default: SeekTo): F[SeekTo] =
Clock[F].realTime(MILLISECONDS).map(now =>
timestamp(now - duration.toMillis, default)
)

def seek[F[_]: Monad](
consumer: ConsumerApi[F, _, _],
Expand Down

0 comments on commit b6e5e93

Please sign in to comment.