From f9391d9ada5a778d5336e29bc00bf7e5af74cce1 Mon Sep 17 00:00:00 2001 From: Chetan Tutika Date: Thu, 18 Apr 2024 19:31:29 +0000 Subject: [PATCH] finagle-memcached: Remove Compression Toggle Remove the toggle for enabling and disabling Lz4 compression as the compression has been tested on few services Differential Revision: https://phabricator.twitter.biz/D1137993 --- .../com.twitter.finagle.memcached.json | 8 +- .../CompressingMemcachedFilter.scala | 8 +- .../MemcachedCompressingClientTest.scala | 232 +++++++----------- .../unit/CompressingMemcachedFilterTest.scala | 3 - 4 files changed, 89 insertions(+), 162 deletions(-) diff --git a/finagle-memcached/src/main/resources/com/twitter/toggles/configs/com.twitter.finagle.memcached.json b/finagle-memcached/src/main/resources/com/twitter/toggles/configs/com.twitter.finagle.memcached.json index 27f3bf56a60..04b65254f2e 100644 --- a/finagle-memcached/src/main/resources/com/twitter/toggles/configs/com.twitter.finagle.memcached.json +++ b/finagle-memcached/src/main/resources/com/twitter/toggles/configs/com.twitter.finagle.memcached.json @@ -1,9 +1,3 @@ { - "toggles": [ - { - "id": "com.twitter.finagle.filter.CompressingMemcached", - "description": "Enable compressing filter for memcached values", - "fraction": 1.0 - } - ] + "toggles": [] } diff --git a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/CompressingMemcachedFilter.scala b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/CompressingMemcachedFilter.scala index 696229dde53..06956cda687 100644 --- a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/CompressingMemcachedFilter.scala +++ b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/CompressingMemcachedFilter.scala @@ -18,9 +18,7 @@ import com.twitter.finagle.memcached.protocol.StorageCommand import com.twitter.finagle.memcached.protocol.Value import com.twitter.finagle.memcached.protocol.Values import com.twitter.finagle.memcached.protocol.ValuesAndErrors -import com.twitter.finagle.server.ServerInfo import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finagle.toggle.Toggle import com.twitter.io.Buf import com.twitter.util.Future import com.twitter.util.Return @@ -59,14 +57,10 @@ private[finagle] final class CompressingMemcachedFilter( private final val compressionFactory = CompressionProvider(compressionScheme, statsReceiver) - private val toggle: Toggle = Toggles("com.twitter.finagle.filter.CompressingMemcached") - - private val serverInfo = ServerInfo() - override def apply(command: Command, service: Service[Command, Response]): Future[Response] = { command match { case storageCommand: StorageCommand => - if (compressionScheme == Uncompressed || !toggle.isEnabled(serverInfo.id.hashCode)) { + if (compressionScheme == Uncompressed) { service(storageCommand) } else { service(compress(storageCommand)) } case nonStorageCommand: NonStorageCommand => diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedCompressingClientTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedCompressingClientTest.scala index dca8c143af2..c0087aa8280 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedCompressingClientTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/MemcachedCompressingClientTest.scala @@ -26,7 +26,6 @@ class MemcachedCompressingClientTest extends AnyFunSuite with BeforeAndAfter { val clientName = "test_client" val Timeout: Duration = 15.seconds val stats = new InMemoryStatsReceiver - private val useCompressionFilerToggleKey = "com.twitter.finagle.filter.CompressingMemcached" def awaitResult[T](awaitable: Awaitable[T]): T = Await.result(awaitable, Timeout) @@ -49,108 +48,53 @@ class MemcachedCompressingClientTest extends AnyFunSuite with BeforeAndAfter { test("withCompressionScheme Lz4 and toggled on") { - com.twitter.finagle.toggle.flag.overrides.let(useCompressionFilerToggleKey, 1) { - val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)) - val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress]) - - val client = Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .connectionsPerEndpoint(1) - .withStatsReceiver(stats) - .withCompressionScheme(Lz4) - .newRichClient(Name.bound(address), clientName) - - awaitResult(client.set("foobar", alwaysCompressedData)) // will be compressed - awaitResult(client.set("baz", neverCompressedData)) // won't be compressed - - val alwaysCompressedServiceResponse: Response = - awaitResult[Response](server.service(Gets(Seq(Buf.Utf8("foobar"))))) - val neverCompressedServiceResponse = awaitResult(server.service(Gets(Seq(Buf.Utf8("baz"))))) - - val alwaysCompressedServiceData = getResponseBuf(alwaysCompressedServiceResponse).head - val neverCompressedServiceData = getResponseBuf(neverCompressedServiceResponse).head - - val results = awaitResult( - client.gets(Seq("foobar", "baz")) - ).flatMap { - case (key, (value1, _)) => - Map((key, value1)) - } - - val deletedResult = awaitResult { - for { - _ <- client.delete("foobar") - _ <- client.delete("baz") - r <- client.gets(Seq("foobar", "baz")) - } yield r - } - - assert(results("foobar") === alwaysCompressedData) - assert(results("baz") === neverCompressedData) - assert(deletedResult.isEmpty) - assert(alwaysCompressedData.length > alwaysCompressedServiceData.length) - assert(neverCompressedData.length == neverCompressedServiceData.length) - - assert(stats.counters(Seq(clientName, "lz4", "decompression", "attempted")) == 1) - assert(stats.counters(Seq(clientName, "lz4", "compression", "attempted")) == 1) - assert(stats.counters(Seq(clientName, "lz4", "compression", "skipped")) == 1) - - client.close() - server.stop() + val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)) + val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress]) + + val client = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .connectionsPerEndpoint(1) + .withStatsReceiver(stats) + .withCompressionScheme(Lz4) + .newRichClient(Name.bound(address), clientName) + + awaitResult(client.set("foobar", alwaysCompressedData)) // will be compressed + awaitResult(client.set("baz", neverCompressedData)) // won't be compressed + + val alwaysCompressedServiceResponse: Response = + awaitResult[Response](server.service(Gets(Seq(Buf.Utf8("foobar"))))) + val neverCompressedServiceResponse = awaitResult(server.service(Gets(Seq(Buf.Utf8("baz"))))) + + val alwaysCompressedServiceData = getResponseBuf(alwaysCompressedServiceResponse).head + val neverCompressedServiceData = getResponseBuf(neverCompressedServiceResponse).head + + val results = awaitResult( + client.gets(Seq("foobar", "baz")) + ).flatMap { + case (key, (value1, _)) => + Map((key, value1)) } - } - test("withCompressionScheme Lz4 and toggled off") { - - com.twitter.finagle.toggle.flag.overrides.let(useCompressionFilerToggleKey, 0) { - val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)) - val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress]) - - val client = Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .connectionsPerEndpoint(1) - .withStatsReceiver(stats) - .withCompressionScheme(Lz4) - .newRichClient(Name.bound(address), clientName) - - awaitResult(client.set("foobar", alwaysCompressedData)) // will be compressed - awaitResult(client.set("baz", neverCompressedData)) // won't be compressed - - val alwaysCompressedServiceResponse: Response = - awaitResult[Response](server.service(Gets(Seq(Buf.Utf8("foobar"))))) - val neverCompressedServiceResponse = awaitResult(server.service(Gets(Seq(Buf.Utf8("baz"))))) - - val alwaysCompressedServiceData = getResponseBuf(alwaysCompressedServiceResponse).head - val neverCompressedServiceData = getResponseBuf(neverCompressedServiceResponse).head - - val results = awaitResult( - client.gets(Seq("foobar", "baz")) - ).flatMap { - case (key, (value1, _)) => - Map((key, value1)) - } - - val deletedResult = awaitResult { - for { - _ <- client.delete("foobar") - _ <- client.delete("baz") - r <- client.gets(Seq("foobar", "baz")) - } yield r - } - - assert(results("foobar") === alwaysCompressedData) - assert(results("baz") === neverCompressedData) - assert(deletedResult.isEmpty) - assert(alwaysCompressedData.length == alwaysCompressedServiceData.length) - assert(neverCompressedData.length == neverCompressedServiceData.length) - - assert(stats.counters(Seq(clientName, "lz4", "decompression", "attempted")) == 0) - assert(stats.counters(Seq(clientName, "lz4", "compression", "attempted")) == 0) - assert(stats.counters(Seq(clientName, "lz4", "compression", "skipped")) == 0) - - client.close() - server.stop() + val deletedResult = awaitResult { + for { + _ <- client.delete("foobar") + _ <- client.delete("baz") + r <- client.gets(Seq("foobar", "baz")) + } yield r } + + assert(results("foobar") === alwaysCompressedData) + assert(results("baz") === neverCompressedData) + assert(deletedResult.isEmpty) + assert(alwaysCompressedData.length > alwaysCompressedServiceData.length) + assert(neverCompressedData.length == neverCompressedServiceData.length) + + assert(stats.counters(Seq(clientName, "lz4", "decompression", "attempted")) == 1) + assert(stats.counters(Seq(clientName, "lz4", "compression", "attempted")) == 1) + assert(stats.counters(Seq(clientName, "lz4", "compression", "skipped")) == 1) + + client.close() + server.stop() } test("withCompressionScheme Uncompressed") { @@ -207,51 +151,49 @@ class MemcachedCompressingClientTest extends AnyFunSuite with BeforeAndAfter { test("Clients with different compression schemes can decompress each other's values") { - com.twitter.finagle.toggle.flag.overrides.let(useCompressionFilerToggleKey, 1) { - val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)) - val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress]) - - val compressionClient = Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .connectionsPerEndpoint(1) - .withStatsReceiver(stats) - .withCompressionScheme(Lz4) - .newRichClient(Name.bound(address), clientName) - - val uncompressedClient = Memcached.client - .configured(param.KeyHasher(KeyHasher.FNV1_32)) - .connectionsPerEndpoint(1) - .withStatsReceiver(stats) - .newRichClient(Name.bound(address), clientName) - - awaitResult(uncompressedClient.set("foobar", alwaysCompressedData)) - awaitResult(uncompressedClient.set("baz", neverCompressedData)) - - awaitResult(compressionClient.set("foo", alwaysCompressedData)) - awaitResult(compressionClient.set("bazbar", neverCompressedData)) - - val compressionClientResults = awaitResult( - compressionClient.gets(Seq("foobar", "baz")) - ).flatMap { - case (key, (value1, _)) => - Map((key, value1)) - } - - val clientResults = awaitResult( - uncompressedClient.gets(Seq("foo", "bazbar")) - ).flatMap { - case (key, (value1, _)) => - Map((key, value1)) - } - - assert(compressionClientResults("foobar") === alwaysCompressedData) - assert(compressionClientResults("baz") === neverCompressedData) - assert(clientResults("foo") === alwaysCompressedData) - assert(clientResults("bazbar") === neverCompressedData) - - uncompressedClient.close() - compressionClient.close() - server.stop() + val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)) + val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress]) + + val compressionClient = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .connectionsPerEndpoint(1) + .withStatsReceiver(stats) + .withCompressionScheme(Lz4) + .newRichClient(Name.bound(address), clientName) + + val uncompressedClient = Memcached.client + .configured(param.KeyHasher(KeyHasher.FNV1_32)) + .connectionsPerEndpoint(1) + .withStatsReceiver(stats) + .newRichClient(Name.bound(address), clientName) + + awaitResult(uncompressedClient.set("foobar", alwaysCompressedData)) + awaitResult(uncompressedClient.set("baz", neverCompressedData)) + + awaitResult(compressionClient.set("foo", alwaysCompressedData)) + awaitResult(compressionClient.set("bazbar", neverCompressedData)) + + val compressionClientResults = awaitResult( + compressionClient.gets(Seq("foobar", "baz")) + ).flatMap { + case (key, (value1, _)) => + Map((key, value1)) } + + val clientResults = awaitResult( + uncompressedClient.gets(Seq("foo", "bazbar")) + ).flatMap { + case (key, (value1, _)) => + Map((key, value1)) + } + + assert(compressionClientResults("foobar") === alwaysCompressedData) + assert(compressionClientResults("baz") === neverCompressedData) + assert(clientResults("foo") === alwaysCompressedData) + assert(clientResults("bazbar") === neverCompressedData) + + uncompressedClient.close() + compressionClient.close() + server.stop() } } diff --git a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/CompressingMemcachedFilterTest.scala b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/CompressingMemcachedFilterTest.scala index 201b99313f8..f7159f26635 100644 --- a/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/CompressingMemcachedFilterTest.scala +++ b/finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/CompressingMemcachedFilterTest.scala @@ -29,7 +29,6 @@ class CompressingMemcachedFilterTest with ScalaCheckPropertyChecks { val TimeOut = 15.seconds - private val useCompressionFilerToggleKey = "com.twitter.finagle.filter.CompressingMemcached" private def awaitResult[T](awaitable: Awaitable[T]): T = Await.result(awaitable, TimeOut) @@ -43,7 +42,6 @@ class CompressingMemcachedFilterTest stats.clear() } - com.twitter.finagle.toggle.flag.overrides.let(useCompressionFilerToggleKey, 1) { test("Add is correctly processed and compressed") { @@ -323,5 +321,4 @@ class CompressingMemcachedFilterTest } } } - } }