Skip to content

Commit

Permalink
Set maxBytes in the NsqSink
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 27, 2023
1 parent b108007 commit 1d466cf
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object Run {
): F[ExitCode] = {
val eitherT = for {
config <- ConfigParser.fromPath[F, SinkConfig](path)
_ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, config, telemetryInfo))
_ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, telemetryInfo, config))
} yield ExitCode.Success

eitherT.merge.handleErrorWith { e =>
Expand All @@ -56,8 +56,8 @@ object Run {
private def fromConfig[F[_]: Async: Tracking, SinkConfig](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
config: Config[SinkConfig],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo,
config: Config[SinkConfig]
): F[ExitCode] = {
val resources = for {
sinks <- mkSinks(config.streams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ object NsqCollector extends App[NsqSinkConfig](BuildInfo) {
override def mkSinks(config: Config.Streams[NsqSinkConfig]): Resource[IO, Sinks[IO]] =
for {
good <- NsqSink.create[IO](
config.sink.maxBytes,
config.sink,
config.good
)
bad <- NsqSink.create[IO](
config.sink.maxBytes,
config.sink,
config.bad
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,13 @@ class NsqSink[F[_]: Sync] private (
object NsqSink {

def create[F[_]: Sync](
maxBytes: Int,
nsqConfig: NsqSinkConfig,
topicName: String
): Resource[F, NsqSink[F]] =
Resource.make(
Sync[F].delay(
// MaxBytes is never used but is required by the sink interface definition,
// So just pass any int val in.
new NsqSink(0, nsqConfig, topicName)
new NsqSink(maxBytes, nsqConfig, topicName)
)
)(sink => Sync[F].delay(sink.shutdown()))
}

0 comments on commit 1d466cf

Please sign in to comment.