Skip to content

Commit

Permalink
Merge pull request #473 from Banno/assign-configs
Browse files Browse the repository at this point in the history
feat: configs seam for `RecordStream.assign`
  • Loading branch information
Kazark authored Aug 19, 2021
2 parents ee69768 + c5b44a9 commit b49a201
Showing 1 changed file with 49 additions and 1 deletion.
50 changes: 49 additions & 1 deletion core/src/main/scala/com/banno/kafka/RecordStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,16 @@ object RecordStream {
schemaRegistryUri: SchemaRegistryUrl,
clientId: String,
whetherCommits: WhetherCommits[P],
extraConfigs: Map[String, AnyRef],
): ChunkedAssigner[F, P] =
new ChunkedAssigner(
Batched.AssignerImpl(
BaseConfigs(
kafkaBootstrapServers,
schemaRegistryUri,
clientId,
whetherCommits
whetherCommits,
extraConfigs.toList: _*
)
)
)
Expand Down Expand Up @@ -400,6 +402,7 @@ object RecordStream {
schemaRegistryUri,
clientId,
WhetherCommits.No,
Map.empty,
)

def assign[F[_]: Concurrent: ContextShift](
Expand All @@ -412,6 +415,7 @@ object RecordStream {
schemaRegistryUri,
groupId.id,
WhetherCommits.May(groupId),
Map.empty,
)

def assign[F[_]: Concurrent: ContextShift](
Expand All @@ -425,6 +429,50 @@ object RecordStream {
schemaRegistryUri,
clientId,
WhetherCommits.May(groupId),
Map.empty,
)

def assign[F[_]: Concurrent: ContextShift](
kafkaBootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
clientId: String,
extraConfigs: Map[String, AnyRef],
): Assigner[F, Id, Stream] =
ChunkedAssigner(
kafkaBootstrapServers,
schemaRegistryUri,
clientId,
WhetherCommits.No,
extraConfigs,
)

def assign[F[_]: Concurrent: ContextShift](
kafkaBootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
groupId: GroupId,
extraConfigs: Map[String, AnyRef],
): Assigner[F, Id, RecordStream] =
ChunkedAssigner(
kafkaBootstrapServers,
schemaRegistryUri,
groupId.id,
WhetherCommits.May(groupId),
extraConfigs,
)

def assign[F[_]: Concurrent: ContextShift](
kafkaBootstrapServers: BootstrapServers,
schemaRegistryUri: SchemaRegistryUrl,
clientId: String,
groupId: GroupId,
extraConfigs: Map[String, AnyRef],
): Assigner[F, Id, RecordStream] =
ChunkedAssigner(
kafkaBootstrapServers,
schemaRegistryUri,
clientId,
WhetherCommits.May(groupId),
extraConfigs,
)

private def ephemeralTopicsSeekToEnd[F[_]: Monad](
Expand Down

0 comments on commit b49a201

Please sign in to comment.