Skip to content

Commit

Permalink
Fmt fix
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 27, 2023
1 parent d3298c5 commit d18e499
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,30 @@ import com.snowplowanalytics.snowplow.collector.core.model.Sinks

object Run {

type MkSinks[F[_], SinkConfig] = Config.Streams[SinkConfig] => Resource[F, Sinks[F]]

type TelemetryInfo[SinkConfig] = Config[SinkConfig] => Telemetry.TelemetryInfo

implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[SinkConfig]
): Opts[F[ExitCode]] = {
val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone
configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _))
}

private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo,
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[SinkConfig],
path: Option[Path]
): F[ExitCode] = {
val eitherT = for {
config <- ConfigParser.fromPath[F, SinkConfig](path)
_ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, config, telemetryInfo))
_ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, telemetryInfo, config))
} yield ExitCode.Success

eitherT.merge.handleErrorWith { e =>
Expand All @@ -55,9 +59,9 @@ object Run {

private def fromConfig[F[_]: Async: Tracking, SinkConfig](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
config: Config[SinkConfig],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[SinkConfig],
config: Config[SinkConfig]
): F[ExitCode] = {
val resources = for {
sinks <- mkSinks(config.streams)
Expand Down

0 comments on commit d18e499

Please sign in to comment.