Skip to content

Commit

Permalink
Set maxBytes in the NsqSink
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 27, 2023
1 parent c6982c8 commit e8526a4
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ object NsqCollector extends App[NsqSinkConfig](BuildInfo) {
override def mkSinks(config: Config.Streams[NsqSinkConfig]): Resource[IO, Sinks[IO]] =
for {
good <- NsqSink.create[IO](
config.sink.maxBytes,
config.sink,
config.good
)
bad <- NsqSink.create[IO](
config.sink.maxBytes,
config.sink,
config.bad
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,13 @@ class NsqSink[F[_]: Sync] private (
object NsqSink {

def create[F[_]: Sync](
maxBytes: Int,
nsqConfig: NsqSinkConfig,
topicName: String
): Resource[F, NsqSink[F]] =
Resource.make(
Sync[F].delay(
// MaxBytes is never used but is required by the sink interface definition,
// So just pass any int val in.
new NsqSink(0, nsqConfig, topicName)
new NsqSink(maxBytes, nsqConfig, topicName)
)
)(sink => Sync[F].delay(sink.shutdown()))
}

0 comments on commit e8526a4

Please sign in to comment.