Skip to content

Commit

Permalink
Merge pull request #483 from zcox/seek-ops-3
Browse files Browse the repository at this point in the history
Seek operations
  • Loading branch information
zcox authored Aug 31, 2021
2 parents 2b1b584 + 3b5b4c2 commit e3b98cf
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 95 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/com/banno/kafka/Publish.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ object Publish {
}

object Builder {
implicit def buildNPublishers[F[_]: Functor, K, V, X <: Coproduct, Y <: Coproduct](implicit
implicit def buildNPublishers[F[_]: Functor, K, V, X <: Coproduct, Y <: Coproduct](
implicit
buildTail: Builder[F, X, Y]
) =
new Builder[F, IncomingRecord[K, V] :+: X, (K, V) :+: Y] {
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/com/banno/kafka/RecordStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,16 @@ object RecordStream {
): Resource[F, ConsumerApi[F, GenericRecord, GenericRecord]] = {
val configs: List[(String, AnyRef)] =
whetherCommits.configs ++
extraConfigs ++
List(
kafkaBootstrapServers,
schemaRegistryUri,
EnableAutoCommit(false),
reset,
IsolationLevel.ReadCommitted,
ClientId(clientId),
MetricReporters[ConsumerPrometheusReporter],
)
extraConfigs ++
List(
kafkaBootstrapServers,
schemaRegistryUri,
EnableAutoCommit(false),
reset,
IsolationLevel.ReadCommitted,
ClientId(clientId),
MetricReporters[ConsumerPrometheusReporter],
)
ConsumerApi.Avro.Generic.resource[F](configs: _*)
}
}
Expand Down
20 changes: 11 additions & 9 deletions core/src/main/scala/com/banno/kafka/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ object Topic {
).configs(purpose.configs.toMap.asJava)

override def registerSchemas[F[_]: Sync](
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
): F[Unit] =
SchemaRegistryApi.register[F, K, V](
schemaRegistryUri.url,
topic,
configs,
).void
SchemaRegistryApi
.register[F, K, V](
schemaRegistryUri.url,
topic,
configs,
)
.void

override def setUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
Expand Down Expand Up @@ -161,8 +163,8 @@ object Topic {
) = cr.metadata.nextOffset

override def registerSchemas[F[_]: Sync](
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
): F[Unit] = fa.registerSchemas(schemaRegistryUri, configs)

override def setUp[F[_]: Sync](
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/com/banno/kafka/TopicPurpose.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ object TopicPurpose {
val smallState: TopicPurpose =
lowScale(
cleanupPolicy(compact) |+|
minCleanableDirtyRatio(0.01) |+|
segmentMegabytes(1) |+|
segmentDuration(10.minutes),
minCleanableDirtyRatio(0.01) |+|
segmentMegabytes(1) |+|
segmentDuration(10.minutes),
TopicContentType.State
)

Expand All @@ -89,9 +89,9 @@ object TopicPurpose {
val mediumState: TopicPurpose =
lowScale(
cleanupPolicy(compact) |+|
minCleanableDirtyRatio(0.10) |+|
segmentMegabytes(100) |+|
segmentDuration(1.day),
minCleanableDirtyRatio(0.10) |+|
segmentMegabytes(100) |+|
segmentDuration(1.day),
TopicContentType.State
)

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/com/banno/kafka/Topical.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait Topical[A, B] {
): F[Unit]

def registerSchemas[F[_]: Sync](
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
): F[Unit]
}
20 changes: 10 additions & 10 deletions core/src/main/scala/com/banno/kafka/Topics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ object Topics {
): F[Unit]

def tailRegisterSchemas[F[_]: Sync](
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object],
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object],
): F[Unit]

final override def nextOffset(x: IncomingRecord[K, V] :+: S) =
Expand All @@ -72,19 +72,19 @@ object Topics {
kv.eliminate(topic.coparse, tailCoparse)

override def registerSchemas[F[_]: Sync](
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
): F[Unit] =
topic.registerSchemas(schemaRegistryUri, configs) *>
tailRegisterSchemas(schemaRegistryUri, configs)
tailRegisterSchemas(schemaRegistryUri, configs)

final override def setUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object] = Map.empty,
): F[Unit] =
topic.setUp(bootstrapServers, schemaRegistryUri, configs) *>
tailSetUp(bootstrapServers, schemaRegistryUri, configs)
tailSetUp(bootstrapServers, schemaRegistryUri, configs)
}

private final case class SingletonTopics[K, V](
Expand All @@ -95,8 +95,8 @@ object Topics {
override def tailCoparse(kv: CNil) = kv.impossible
override def tailNextOffset(x: CNil) = x.impossible
override def tailRegisterSchemas[F[_]: Sync](
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object],
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object],
): F[Unit] = Applicative[F].unit
override def tailSetUp[F[_]: Sync](
bootstrapServers: BootstrapServers,
Expand All @@ -121,8 +121,8 @@ object Topics {
override def tailCoparse(kv: T) = tail.coparse(kv)

override def tailRegisterSchemas[F[_]: Sync](
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object],
schemaRegistryUri: SchemaRegistryUrl,
configs: Map[String, Object],
): F[Unit] = tail.registerSchemas(schemaRegistryUri, configs)

override def tailSetUp[F[_]: Sync](
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/com/banno/kafka/admin/AdminApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,11 @@ object AdminApi {
topics: Iterable[NewTopic],
configs: Map[String, Object] = Map.empty,
): F[CreateTopicsResult] =
AdminApi.resource[F](
Map[String, Object](BootstrapServers(bootstrapServers)) ++ configs
).use(_.createTopicsIdempotent(topics))
AdminApi
.resource[F](
Map[String, Object](BootstrapServers(bootstrapServers)) ++ configs
)
.use(_.createTopicsIdempotent(topics))

def createTopicsIdempotent[F[_]: Sync](
bootstrapServers: String,
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/com/banno/kafka/configs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.confluent.kafka.serializers.subject.{
TopicNameStrategy => KTopicNameStrategy,
TopicRecordNameStrategy => KTopicRecordNameStrategy
}
import scala.concurrent.duration.FiniteDuration

//TODO other configs... maybe we could auto generate these somehow?

Expand Down Expand Up @@ -77,6 +78,11 @@ object EnableAutoCommit {
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> eac.b.toString
}

object AutoCommitInterval {
def apply(d: FiniteDuration): (String, AnyRef) =
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> d.toMillis.toString
}

case class KeySerializerClass(c: Class[?])
object KeySerializerClass {
implicit def toConfig(ksc: KeySerializerClass): (String, AnyRef) =
Expand Down
104 changes: 86 additions & 18 deletions core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,68 @@ import com.banno.kafka._
import fs2.concurrent.{Signal, SignallingRef}
import org.typelevel.log4cats.slf4j.Slf4jLogger

sealed trait SeekTo {
def apply[F[_]](consumer: ConsumerApi[F, ?, ?], partitions: Iterable[TopicPartition]): F[Unit]
}
case object SeekToBeginning extends SeekTo {
def apply[F[_]](consumer: ConsumerApi[F, ?, ?], partitions: Iterable[TopicPartition]): F[Unit] =
consumer.seekToBeginning(partitions)
}
case object SeekToEnd extends SeekTo {
def apply[F[_]](consumer: ConsumerApi[F, ?, ?], partitions: Iterable[TopicPartition]): F[Unit] =
consumer.seekToEnd(partitions)
sealed trait SeekTo

object SeekTo {

private case object Beginning extends SeekTo
private case object End extends SeekTo
private case class Timestamps(timestamps: Map[TopicPartition, Long], default: SeekTo)
extends SeekTo
private case class Timestamp(timestamp: Long, default: SeekTo) extends SeekTo
private case class Committed(default: SeekTo) extends SeekTo
private case class Offsets(offsets: Map[TopicPartition, Long], default: SeekTo) extends SeekTo

def beginning: SeekTo = Beginning
def end: SeekTo = End
def timestamps(timestamps: Map[TopicPartition, Long], default: SeekTo): SeekTo =
Timestamps(timestamps, default)
def timestamp(timestamp: Long, default: SeekTo): SeekTo = Timestamp(timestamp, default)
def committed(default: SeekTo): SeekTo = Committed(default)
def offsets(offsets: Map[TopicPartition, Long], default: SeekTo): SeekTo =
Offsets(offsets, default)

def seek[F[_]: Monad](
consumer: ConsumerApi[F, _, _],
partitions: Iterable[TopicPartition],
seekTo: SeekTo
): F[Unit] =
seekTo match {
case Beginning =>
consumer.seekToBeginning(partitions)
case End =>
consumer.seekToEnd(partitions)
case Offsets(offsets, default) =>
partitions.toList.traverse_(
tp =>
offsets
.get(tp)
//p could be mapped to an explicit null value
.flatMap(Option(_))
.fold(SeekTo.seek(consumer, List(tp), default))(o => consumer.seek(tp, o))
)
case Timestamps(ts, default) =>
for {
offsets <- consumer.offsetsForTimes(ts)
() <- seek(
consumer,
partitions,
Offsets(offsets.view.mapValues(_.offset).toMap, default)
)
} yield ()
case Timestamp(timestamp, default) =>
val timestamps = partitions.map(p => (p, timestamp)).toMap
seek(consumer, partitions, Timestamps(timestamps, default))
case Committed(default) =>
for {
committed <- consumer.committed(partitions.toSet)
() <- seek(
consumer,
partitions,
Offsets(committed.view.mapValues(_.offset).toMap, default)
)
} yield ()
}
}

case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
Expand All @@ -58,19 +110,34 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
def assign(
topics: List[String],
offsets: Map[TopicPartition, Long],
seekTo: SeekTo = SeekToBeginning
seekTo: SeekTo = SeekTo.beginning
)(implicit F: Monad[F]): F[Unit] =
assignAndSeek(topics, SeekTo.offsets(offsets, seekTo))

def assign(topic: String, offsets: Map[TopicPartition, Long])(implicit F: Monad[F]): F[Unit] =
assign(List(topic), offsets)

def assignAndSeek(
topics: List[String],
seekTo: SeekTo,
)(implicit F: Monad[F]): F[Unit] =
for {
infos <- consumer.partitionsFor(topics)
partitions = infos.map(_.toTopicPartition)
_ <- consumer.assign(partitions)
_ <- partitions.traverse_(
tp => offsets.get(tp).map(o => consumer.seek(tp, o)).getOrElse(seekTo(consumer, List(tp)))
)
() <- consumer.assign(partitions)
() <- SeekTo.seek(consumer, partitions, seekTo)
} yield ()

def assign(topic: String, offsets: Map[TopicPartition, Long])(implicit F: Monad[F]): F[Unit] =
assign(List(topic), offsets)
def subscribeAndSeek(
topics: List[String],
seekTo: SeekTo,
)(implicit F: Monad[F]): F[Unit] =
for {
infos <- consumer.partitionsFor(topics)
partitions = infos.map(_.toTopicPartition)
() <- consumer.subscribe(topics)
() <- SeekTo.seek(consumer, partitions, seekTo)
} yield ()

def positions[G[_]: Traverse](
partitions: G[TopicPartition]
Expand Down Expand Up @@ -239,7 +306,8 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
finalOffsets: Map[TopicPartition, Long],
pollTimeout: FiniteDuration,
maxZeroCount: Int
)(implicit
)(
implicit
F: MonadError[F, Throwable]
): Stream[F, ConsumerRecords[K, V]] =
//position is next offset consumer will read, assume already read up to offset before position
Expand Down
Loading

0 comments on commit e3b98cf

Please sign in to comment.