From 0f51448b04c71ec06ab17310b3310c577dd8dad8 Mon Sep 17 00:00:00 2001 From: spenes Date: Thu, 9 Nov 2023 01:42:01 +0300 Subject: [PATCH] Update the Pubsub UserAgent format (close #362) --- pubsub/src/main/resources/application.conf | 3 ++ .../sinks/PubSubSink.scala | 6 ++-- .../sinks/PubSubSinkConfig.scala | 9 +++-- .../ConfigSpec.scala | 6 ++-- .../sinks/GcpUserAgentSpec.scala | 36 +++++++++++++++++++ 5 files changed, 54 insertions(+), 6 deletions(-) create mode 100644 pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GcpUserAgentSpec.scala diff --git a/pubsub/src/main/resources/application.conf b/pubsub/src/main/resources/application.conf index 6b33a1d32..530d04f48 100644 --- a/pubsub/src/main/resources/application.conf +++ b/pubsub/src/main/resources/application.conf @@ -22,6 +22,9 @@ startupCheckInterval = 1 second retryInterval = 10 seconds buffer = ${streams.buffer} + gcpUserAgent { + productName = "Snowplow OSS" + } } buffer { diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala index 387efaaa0..4a92a1f36 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala @@ -19,7 +19,6 @@ import com.permutive.pubsub.producer.encoder.MessageEncoder import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig} import com.permutive.pubsub.producer.{Model, PubsubProducer} import com.snowplowanalytics.snowplow.collector.core.{Config, Sink} -import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.BuilderOps._ import org.threeten.bp.Duration import org.typelevel.log4cats.Logger @@ -106,7 +105,7 @@ object PubSubSink { onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error"), customizePublisher = Some { _.setRetrySettings(retrySettings(sinkConfig.backoffPolicy)) - .setHeaderProvider(FixedHeaderProvider.create("User-Agent", BuildInfo.dockerAlias)) + .setHeaderProvider(FixedHeaderProvider.create("User-Agent", createUserAgent(sinkConfig.gcpUserAgent))) .setProvidersForEmulator() } ) @@ -114,6 +113,9 @@ object PubSubSink { GooglePubsubProducer.of[F, Array[Byte]](ProjectId(sinkConfig.googleProjectId), Topic(topicName), config) } + private[sinks] def createUserAgent(gcpUserAgent: PubSubSinkConfig.GcpUserAgent): String = + s"${gcpUserAgent.productName}/collector (GPN:Snowplow;)" + private def retrySettings(backoffPolicy: PubSubSinkConfig.BackoffPolicy): RetrySettings = RetrySettings .newBuilder() diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala index d467121bd..da491033b 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala @@ -1,6 +1,6 @@ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.PubSubSinkConfig.BackoffPolicy +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.PubSubSinkConfig._ import io.circe.Decoder import io.circe.config.syntax.durationDecoder import io.circe.generic.semiauto._ @@ -12,7 +12,8 @@ final case class PubSubSinkConfig( googleProjectId: String, backoffPolicy: BackoffPolicy, startupCheckInterval: FiniteDuration, - retryInterval: FiniteDuration + retryInterval: FiniteDuration, + gcpUserAgent: GcpUserAgent ) object PubSubSinkConfig { @@ -26,7 +27,11 @@ object PubSubSinkConfig { maxRpcTimeout: Long, rpcTimeoutMultiplier: Double ) + + final case class GcpUserAgent(productName: String) + implicit val configDecoder: Decoder[PubSubSinkConfig] = deriveDecoder[PubSubSinkConfig] implicit val backoffPolicyConfigDecoder: Decoder[BackoffPolicy] = deriveDecoder[BackoffPolicy] + implicit val gcpUserAgentDecoder: Decoder[GcpUserAgent] = deriveDecoder[GcpUserAgent] } diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala index 38014e157..bc5ce9d8f 100644 --- a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala +++ b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala @@ -122,7 +122,8 @@ object ConfigSpec { rpcTimeoutMultiplier = 2 ), startupCheckInterval = 1.second, - retryInterval = 10.seconds + retryInterval = 10.seconds, + gcpUserAgent = PubSubSinkConfig.GcpUserAgent(productName = "Snowplow OSS") ) ), bad = Config.Sink( @@ -145,7 +146,8 @@ object ConfigSpec { rpcTimeoutMultiplier = 2 ), startupCheckInterval = 1.second, - retryInterval = 10.seconds + retryInterval = 10.seconds, + gcpUserAgent = PubSubSinkConfig.GcpUserAgent(productName = "Snowplow OSS") ) ) ), diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GcpUserAgentSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GcpUserAgentSpec.scala new file mode 100644 index 000000000..902775713 --- /dev/null +++ b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GcpUserAgentSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import java.util.regex.Pattern + +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.PubSubSinkConfig._ + +import org.specs2.mutable.Specification + +class GcpUserAgentSpec extends Specification { + + "createUserAgent" should { + "create user agent string correctly" in { + val gcpUserAgent = GcpUserAgent(productName = "Snowplow OSS") + val resultUserAgent = PubSubSink.createUserAgent(gcpUserAgent) + val expectedUserAgent = s"Snowplow OSS/collector (GPN:Snowplow;)" + + val userAgentRegex = Pattern.compile( + """(?iU)(?:[^\(\)\/]+\/[^\/]+\s+)*(?:[^\s][^\(\)\/]+\/[^\/]+\s?\([^\(\)]*)gpn:(.*)[;\)]""" + ) + val matcher = userAgentRegex.matcher(resultUserAgent) + val matched = if (matcher.find()) Some(matcher.group(1)) else None + val expectedMatched = "Snowplow;" + + resultUserAgent must beEqualTo(expectedUserAgent) + matched must beSome(expectedMatched) + } + } +}