Skip to content

Commit

Permalink
Update the Pubsub UserAgent format (close #362)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Nov 9, 2023
1 parent c34826d commit 0f51448
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 6 deletions.
3 changes: 3 additions & 0 deletions pubsub/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
startupCheckInterval = 1 second
retryInterval = 10 seconds
buffer = ${streams.buffer}
gcpUserAgent {
productName = "Snowplow OSS"
}
}

buffer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig}
import com.permutive.pubsub.producer.{Model, PubsubProducer}
import com.snowplowanalytics.snowplow.collector.core.{Config, Sink}
import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.BuilderOps._
import org.threeten.bp.Duration
import org.typelevel.log4cats.Logger
Expand Down Expand Up @@ -106,14 +105,17 @@ object PubSubSink {
onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error"),
customizePublisher = Some {
_.setRetrySettings(retrySettings(sinkConfig.backoffPolicy))
.setHeaderProvider(FixedHeaderProvider.create("User-Agent", BuildInfo.dockerAlias))
.setHeaderProvider(FixedHeaderProvider.create("User-Agent", createUserAgent(sinkConfig.gcpUserAgent)))
.setProvidersForEmulator()
}
)

GooglePubsubProducer.of[F, Array[Byte]](ProjectId(sinkConfig.googleProjectId), Topic(topicName), config)
}

private[sinks] def createUserAgent(gcpUserAgent: PubSubSinkConfig.GcpUserAgent): String =
s"${gcpUserAgent.productName}/collector (GPN:Snowplow;)"

private def retrySettings(backoffPolicy: PubSubSinkConfig.BackoffPolicy): RetrySettings =
RetrySettings
.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.PubSubSinkConfig.BackoffPolicy
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.PubSubSinkConfig._
import io.circe.Decoder
import io.circe.config.syntax.durationDecoder
import io.circe.generic.semiauto._
Expand All @@ -12,7 +12,8 @@ final case class PubSubSinkConfig(
googleProjectId: String,
backoffPolicy: BackoffPolicy,
startupCheckInterval: FiniteDuration,
retryInterval: FiniteDuration
retryInterval: FiniteDuration,
gcpUserAgent: GcpUserAgent
)

object PubSubSinkConfig {
Expand All @@ -26,7 +27,11 @@ object PubSubSinkConfig {
maxRpcTimeout: Long,
rpcTimeoutMultiplier: Double
)

final case class GcpUserAgent(productName: String)

implicit val configDecoder: Decoder[PubSubSinkConfig] = deriveDecoder[PubSubSinkConfig]
implicit val backoffPolicyConfigDecoder: Decoder[BackoffPolicy] =
deriveDecoder[BackoffPolicy]
implicit val gcpUserAgentDecoder: Decoder[GcpUserAgent] = deriveDecoder[GcpUserAgent]
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ object ConfigSpec {
rpcTimeoutMultiplier = 2
),
startupCheckInterval = 1.second,
retryInterval = 10.seconds
retryInterval = 10.seconds,
gcpUserAgent = PubSubSinkConfig.GcpUserAgent(productName = "Snowplow OSS")
)
),
bad = Config.Sink(
Expand All @@ -145,7 +146,8 @@ object ConfigSpec {
rpcTimeoutMultiplier = 2
),
startupCheckInterval = 1.second,
retryInterval = 10.seconds
retryInterval = 10.seconds,
gcpUserAgent = PubSubSinkConfig.GcpUserAgent(productName = "Snowplow OSS")
)
)
),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2013-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import java.util.regex.Pattern

import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.PubSubSinkConfig._

import org.specs2.mutable.Specification

class GcpUserAgentSpec extends Specification {

"createUserAgent" should {
"create user agent string correctly" in {
val gcpUserAgent = GcpUserAgent(productName = "Snowplow OSS")
val resultUserAgent = PubSubSink.createUserAgent(gcpUserAgent)
val expectedUserAgent = s"Snowplow OSS/collector (GPN:Snowplow;)"

val userAgentRegex = Pattern.compile(
"""(?iU)(?:[^\(\)\/]+\/[^\/]+\s+)*(?:[^\s][^\(\)\/]+\/[^\/]+\s?\([^\(\)]*)gpn:(.*)[;\)]"""
)
val matcher = userAgentRegex.matcher(resultUserAgent)
val matched = if (matcher.find()) Some(matcher.group(1)) else None
val expectedMatched = "Snowplow;"

resultUserAgent must beEqualTo(expectedUserAgent)
matched must beSome(expectedMatched)
}
}
}

0 comments on commit 0f51448

Please sign in to comment.