From c5b44a9c9ba13563ebead0f0b83b709e3c958bef Mon Sep 17 00:00:00 2001 From: Keith Pinson Date: Thu, 19 Aug 2021 11:25:39 -0400 Subject: [PATCH] feat: configs seam for `RecordStream.assign` --- .../scala/com/banno/kafka/RecordStream.scala | 50 ++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/banno/kafka/RecordStream.scala b/core/src/main/scala/com/banno/kafka/RecordStream.scala index 7a01da5af..a42043bfd 100644 --- a/core/src/main/scala/com/banno/kafka/RecordStream.scala +++ b/core/src/main/scala/com/banno/kafka/RecordStream.scala @@ -344,6 +344,7 @@ object RecordStream { schemaRegistryUri: SchemaRegistryUrl, clientId: String, whetherCommits: WhetherCommits[P], + extraConfigs: Map[String, AnyRef], ): ChunkedAssigner[F, P] = new ChunkedAssigner( Batched.AssignerImpl( @@ -351,7 +352,8 @@ object RecordStream { kafkaBootstrapServers, schemaRegistryUri, clientId, - whetherCommits + whetherCommits, + extraConfigs.toList: _* ) ) ) @@ -400,6 +402,7 @@ object RecordStream { schemaRegistryUri, clientId, WhetherCommits.No, + Map.empty, ) def assign[F[_]: Concurrent: ContextShift]( @@ -412,6 +415,7 @@ object RecordStream { schemaRegistryUri, groupId.id, WhetherCommits.May(groupId), + Map.empty, ) def assign[F[_]: Concurrent: ContextShift]( @@ -425,6 +429,50 @@ object RecordStream { schemaRegistryUri, clientId, WhetherCommits.May(groupId), + Map.empty, + ) + + def assign[F[_]: Concurrent: ContextShift]( + kafkaBootstrapServers: BootstrapServers, + schemaRegistryUri: SchemaRegistryUrl, + clientId: String, + extraConfigs: Map[String, AnyRef], + ): Assigner[F, Id, Stream] = + ChunkedAssigner( + kafkaBootstrapServers, + schemaRegistryUri, + clientId, + WhetherCommits.No, + extraConfigs, + ) + + def assign[F[_]: Concurrent: ContextShift]( + kafkaBootstrapServers: BootstrapServers, + schemaRegistryUri: SchemaRegistryUrl, + groupId: GroupId, + extraConfigs: Map[String, AnyRef], + ): Assigner[F, Id, RecordStream] = + ChunkedAssigner( + kafkaBootstrapServers, + schemaRegistryUri, + groupId.id, + WhetherCommits.May(groupId), + extraConfigs, + ) + + def assign[F[_]: Concurrent: ContextShift]( + kafkaBootstrapServers: BootstrapServers, + schemaRegistryUri: SchemaRegistryUrl, + clientId: String, + groupId: GroupId, + extraConfigs: Map[String, AnyRef], + ): Assigner[F, Id, RecordStream] = + ChunkedAssigner( + kafkaBootstrapServers, + schemaRegistryUri, + clientId, + WhetherCommits.May(groupId), + extraConfigs, ) private def ephemeralTopicsSeekToEnd[F[_]: Monad](