diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c5869fed..0d3447a0 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -4,16 +4,12 @@ on: push jobs: test: - name: Ruby ${{ matrix.ruby }} on ubuntu-latest - runs-on: ubuntu-latest + name: Ruby ${{ matrix.ruby }} on ubuntu-22.04 + runs-on: ubuntu-22.04 strategy: fail-fast: false matrix: - ruby: ['2.6', '2.7', '3.0', '3.1', '3.2', '3.3', 'ruby-head', 'jruby-9.4.8.0', 'truffleruby-22.3.1'] - # Windows on macOS builds started failing, so they are disabled for now - # platform: [windows-2019, macOS-10.14, ubuntu-18.04] - # exclude: - # ... + ruby: ['2.7', '3.0', '3.1', '3.2', '3.3', 'ruby-head', 'jruby-9.4.9.0', 'truffleruby-22.3.1'] steps: - uses: actions/checkout@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a7cd6cb..0c6c0c3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ section below. ## Unreleased changes +## Version 3.9.8 + +- [#390](https://github.com/Shopify/statsd-instrument/pull/391) - Fixing bug in Environment when using UDS. The max packet size option was not being passed to the +UDS connection, causing messages that were too large to be dropped (specially sensitive when used together with BatchedSink). + ## Version 3.9.7 - [#389](https://github.com/Shopify/statsd-instrument/pull/389) - Fixing bug with BatchedSink constructor when using UDS, the constructor was not properly passing the Sink to the BatchedSink. diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..9dc98f34 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +.PHONY: test lint update +test: + bundle exec rake test + +lint: + bundle exec rake lint_fix + +update: + bundle update + +check: update lint test \ No newline at end of file diff --git a/benchmark/local-udp-throughput b/benchmark/local-udp-throughput index 71fae2e5..812ebbc2 100755 --- a/benchmark/local-udp-throughput +++ b/benchmark/local-udp-throughput @@ -10,6 +10,24 @@ require "datadog/statsd" require "forwardable" require "vernier" +module Datadog + class Statsd + class Telemetry + def flush + ["bytes", "packets"].each do |kind| + ["sent", "dropped", "dropped_queue", "dropped_writer"].each do |measure| + var = "#{kind}_#{measure}" + puts "#{var}: #{instance_variable_get("@#{var}".to_sym)}" + end + end + puts "-" * 10 + + [] + end + end + end +end + class DatadogShim extend Forwardable @@ -222,3 +240,40 @@ benchmark_implementation( if ENABLE_PROFILING Vernier.stop_profile end + +if ENABLE_PROFILING + Vernier.start_profile(out: "tmp/benchmark_profile_datadog.json") +end +benchmark_implementation( + "Datadog client with UDP and delayed serialization", + { + buffer_max_payload_size: 4096, + delay_serialization: true, + telemetry_flush_interval: 1, + # The datadog implemtation will drop metrics if the queue is full + # to imitate the behavior of the implementation of this gem, we set + # the queue size to infinity to avoid dropping metrics. + sender_queue_size: Float::INFINITY, + }, + true, +) +if ENABLE_PROFILING + Vernier.stop_profile +end + +if ENABLE_PROFILING + Vernier.start_profile(out: "tmp/benchmark_profile_datadog.json") +end +benchmark_implementation( + "UDP - Datadog gem - using: delay_serialization, multi-threaded, allow dropping samples", + { + buffer_max_payload_size: 4096, + delay_serialization: true, + telemetry_flush_interval: 1, + sender_queue_size: 250_000, + }, + true, +) +if ENABLE_PROFILING + Vernier.stop_profile +end diff --git a/lib/statsd/instrument.rb b/lib/statsd/instrument.rb index 3a3229d7..0105081e 100644 --- a/lib/statsd/instrument.rb +++ b/lib/statsd/instrument.rb @@ -401,6 +401,7 @@ def extended(klass) require "statsd/instrument/helpers" require "statsd/instrument/assertions" require "statsd/instrument/expectation" +require "statsd/instrument/connection_behavior" require "statsd/instrument/uds_connection" require "statsd/instrument/udp_connection" require "statsd/instrument/sink" diff --git a/lib/statsd/instrument/client.rb b/lib/statsd/instrument/client.rb index 49623adf..4852dc36 100644 --- a/lib/statsd/instrument/client.rb +++ b/lib/statsd/instrument/client.rb @@ -459,7 +459,6 @@ def service_check(name, status, timestamp: nil, hostname: nil, tags: nil, messag # @note Supported by the Datadog implementation only. def event(title, text, timestamp: nil, hostname: nil, aggregation_key: nil, priority: nil, source_type_name: nil, alert_type: nil, tags: nil, no_prefix: false) - emit(datagram_builder(no_prefix: no_prefix)._e( title, text, diff --git a/lib/statsd/instrument/connection_behavior.rb b/lib/statsd/instrument/connection_behavior.rb new file mode 100644 index 00000000..4a651157 --- /dev/null +++ b/lib/statsd/instrument/connection_behavior.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +module StatsD + module Instrument + module ConnectionBehavior + def close + @socket&.close + rescue IOError, SystemCallError => e + StatsD.logger.debug do + "[#{self.class.name}] Error closing socket: #{e.class}: #{e.message}" + end + ensure + @socket = nil + end + + def send_buffer_size + if socket + send_buffer_size_from_socket(socket) + else + @max_packet_size + end + end + + def type + raise NotImplementedError, "#{self.class} must implement #type" + end + + private + + def send_buffer_size_from_socket(original_socket) + original_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int + end + + def setup_socket(original_socket) + original_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, @max_packet_size.to_i) + if send_buffer_size_from_socket(original_socket) < @max_packet_size + StatsD.logger.warn do + "[#{self.class.name}] Could not set socket send buffer size to #{@max_packet_size} " \ + "allowed size by environment/OS is (#{send_buffer_size_from_socket(original_socket)})." + end + end + original_socket + rescue IOError => e + StatsD.logger.debug do + "[#{self.class.name}] Failed to create socket: #{e.class}: #{e.message}" + end + nil + end + end + end +end diff --git a/lib/statsd/instrument/dogstatsd_datagram_builder.rb b/lib/statsd/instrument/dogstatsd_datagram_builder.rb index ec35113b..af564f54 100644 --- a/lib/statsd/instrument/dogstatsd_datagram_builder.rb +++ b/lib/statsd/instrument/dogstatsd_datagram_builder.rb @@ -33,7 +33,6 @@ def latency_metric_type # @see https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#events def _e(title, text, timestamp: nil, hostname: nil, aggregation_key: nil, priority: nil, source_type_name: nil, alert_type: nil, tags: nil) - escaped_title = "#{@prefix}#{title}".gsub("\n", '\n') escaped_text = text.gsub("\n", '\n') diff --git a/lib/statsd/instrument/environment.rb b/lib/statsd/instrument/environment.rb index 0ded1e1e..8855f755 100644 --- a/lib/statsd/instrument/environment.rb +++ b/lib/statsd/instrument/environment.rb @@ -104,10 +104,10 @@ def statsd_uds_send? def statsd_max_packet_size if statsd_uds_send? - return Float(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE)) + Integer(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE)) + else + Integer(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE)) end - - Float(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE)) end def statsd_batch_statistics_interval @@ -140,19 +140,32 @@ def default_sink_for_environment case environment when "production", "staging" connection = if statsd_uds_send? - StatsD::Instrument::UdsConnection.new(statsd_socket_path) + StatsD::Instrument::UdsConnection.new( + statsd_socket_path, + max_packet_size: statsd_max_packet_size, + ) else host, port = statsd_addr.split(":") - StatsD::Instrument::UdpConnection.new(host, port.to_i) + StatsD::Instrument::UdpConnection.new( + host, + port.to_i, + max_packet_size: statsd_max_packet_size, + ) end sink = StatsD::Instrument::Sink.new(connection) if statsd_batching? - # if we are batching, wrap the sink in a batched sink + current_send_buffer_size = connection.send_buffer_size + if current_send_buffer_size < statsd_max_packet_size + StatsD.logger.warn do + "[StatsD::Instrument::Environment] Send buffer size #{current_send_buffer_size} differs from " \ + "max packet size #{statsd_max_packet_size}. Using send buffer size as max packet size." + end + end return StatsD::Instrument::BatchedSink.new( sink, buffer_capacity: statsd_buffer_capacity, - max_packet_size: statsd_max_packet_size, + max_packet_size: [current_send_buffer_size, statsd_max_packet_size].min, statistics_interval: statsd_batch_statistics_interval, ) end diff --git a/lib/statsd/instrument/expectation.rb b/lib/statsd/instrument/expectation.rb index 226ae12b..9d5f57d5 100644 --- a/lib/statsd/instrument/expectation.rb +++ b/lib/statsd/instrument/expectation.rb @@ -36,7 +36,6 @@ def histogram(name, value = nil, **options) def initialize(client: nil, type:, name:, value: nil, sample_rate: nil, tags: nil, no_prefix: false, times: 1) - @type = type @name = no_prefix ? name : StatsD::Instrument::Helpers.prefix_metric(name, client: client) @value = normalized_value_for_type(type, value) if value diff --git a/lib/statsd/instrument/sink.rb b/lib/statsd/instrument/sink.rb index 5b1aa58b..75bab340 100644 --- a/lib/statsd/instrument/sink.rb +++ b/lib/statsd/instrument/sink.rb @@ -40,12 +40,15 @@ def <<(datagram) connection.send_datagram(datagram) rescue SocketError, IOError, SystemCallError => error StatsD.logger.debug do - "[#{self.class.name}] Resetting connection because of #{error.class}: #{error.message}" + "[#{self.class.name}] [#{connection.class.name}] " \ + "Resetting connection because of #{error.class}: #{error.message}" end invalidate_connection if retried StatsD.logger.warn do - "[#{self.class.name}] Events were dropped because of #{error.class}: #{error.message}" + "[#{self.class.name}] [#{connection.class.name}] " \ + "Events were dropped (after retrying) because of #{error.class}: #{error.message}. " \ + "Message size: #{datagram.bytesize} bytes." end else retried = true diff --git a/lib/statsd/instrument/strict.rb b/lib/statsd/instrument/strict.rb index fb2c9f2b..94176fed 100644 --- a/lib/statsd/instrument/strict.rb +++ b/lib/statsd/instrument/strict.rb @@ -65,7 +65,6 @@ def service_check(name, status, tags: nil, no_prefix: false, hostname: nil, time def event(title, text, tags: nil, no_prefix: false, hostname: nil, timestamp: nil, aggregation_key: nil, priority: nil, source_type_name: nil, alert_type: nil) - super end diff --git a/lib/statsd/instrument/udp_connection.rb b/lib/statsd/instrument/udp_connection.rb index 01942ca5..a291319c 100644 --- a/lib/statsd/instrument/udp_connection.rb +++ b/lib/statsd/instrument/udp_connection.rb @@ -3,24 +3,22 @@ module StatsD module Instrument class UdpConnection + include ConnectionBehavior + DEFAULT_MAX_PACKET_SIZE = 1_472 attr_reader :host, :port - def initialize(host, port) + def initialize(host, port, max_packet_size: DEFAULT_MAX_PACKET_SIZE) @host = host @port = port + @max_packet_size = max_packet_size end def send_datagram(message) socket.send(message, 0) end - def close - @socket&.close - @socket = nil - end - def type :udp end @@ -29,9 +27,10 @@ def type def socket @socket ||= begin - socket = UDPSocket.new - socket.connect(@host, @port) - socket + udp_socket = UDPSocket.new + setup_socket(udp_socket)&.tap do |s| + s.connect(@host, @port) + end end end end diff --git a/lib/statsd/instrument/uds_connection.rb b/lib/statsd/instrument/uds_connection.rb index 4f6bbd0d..8339124d 100644 --- a/lib/statsd/instrument/uds_connection.rb +++ b/lib/statsd/instrument/uds_connection.rb @@ -3,6 +3,8 @@ module StatsD module Instrument class UdsConnection + include ConnectionBehavior + DEFAULT_MAX_PACKET_SIZE = 8_192 def initialize(socket_path, max_packet_size: DEFAULT_MAX_PACKET_SIZE) @@ -17,12 +19,7 @@ def initialize(socket_path, max_packet_size: DEFAULT_MAX_PACKET_SIZE) end def send_datagram(message) - socket.sendmsg(message, 0) - end - - def close - @socket&.close - @socket = nil + socket&.sendmsg(message, 0) end def host @@ -41,10 +38,10 @@ def type def socket @socket ||= begin - socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM) - socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, @max_packet_size.to_i) - socket.connect(Socket.pack_sockaddr_un(@socket_path)) - socket + unix_socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM) + setup_socket(unix_socket)&.tap do |s| + s.connect(Socket.pack_sockaddr_un(@socket_path)) + end end end end diff --git a/lib/statsd/instrument/version.rb b/lib/statsd/instrument/version.rb index 2901907d..fb5ec9b5 100644 --- a/lib/statsd/instrument/version.rb +++ b/lib/statsd/instrument/version.rb @@ -2,6 +2,6 @@ module StatsD module Instrument - VERSION = "3.9.7" + VERSION = "3.9.8" end end diff --git a/test/environment_test.rb b/test/environment_test.rb index 36d95e33..30cac158 100644 --- a/test/environment_test.rb +++ b/test/environment_test.rb @@ -76,4 +76,93 @@ def test_client_from_env_uses_regular_udp_sink_when_buffer_capacity_is_0 ) assert_kind_of(StatsD::Instrument::Sink, env.client.sink) end + + def test_client_from_env_uses_uds_sink_with_correct_packet_size_in_production + skip_on_jruby("JRuby does not support UNIX domain sockets") + socket_path = "/tmp/statsd-test-#{Process.pid}.sock" + + # Create a UDS server socket + server = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM) + server.bind(Socket.pack_sockaddr_un(socket_path)) + + env = StatsD::Instrument::Environment.new( + "STATSD_ENV" => "production", + "STATSD_SOCKET_PATH" => socket_path, + "STATSD_MAX_PACKET_SIZE" => "65507", + "STATSD_USE_NEW_CLIENT" => "1", + ) + + begin + client = env.client + sink = client.sink + connection = sink.connection + + assert_kind_of(StatsD::Instrument::UdsConnection, connection) + assert_equal(65507, connection.instance_variable_get(:@max_packet_size)) + ensure + server.close + File.unlink(socket_path) if File.exist?(socket_path) + end + end + + def test_client_from_env_uses_default_packet_size_for_uds_when_not_specified + skip_on_jruby("JRuby does not support UNIX domain sockets") + socket_path = "/tmp/statsd-test-#{Process.pid}-default.sock" + + # Create a UDS server socket + server = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM) + server.bind(Socket.pack_sockaddr_un(socket_path)) + + env = StatsD::Instrument::Environment.new( + "STATSD_ENV" => "production", + "STATSD_SOCKET_PATH" => socket_path, + "STATSD_USE_NEW_CLIENT" => "1", + ) + + begin + client = env.client + sink = client.sink + connection = sink.connection + + assert_kind_of(StatsD::Instrument::UdsConnection, connection) + assert_equal( + StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE, + connection.instance_variable_get(:@max_packet_size), + ) + ensure + server.close + File.unlink(socket_path) if File.exist?(socket_path) + end + end + + def test_client_from_env_uses_batched_uds_sink_with_correct_packet_size + skip_on_jruby("JRuby does not support UNIX domain sockets") + socket_path = "/tmp/statsd-test-#{Process.pid}-batched.sock" + + # Create a UDS server socket + server = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM) + server.bind(Socket.pack_sockaddr_un(socket_path)) + + env = StatsD::Instrument::Environment.new( + "STATSD_ENV" => "production", + "STATSD_SOCKET_PATH" => socket_path, + "STATSD_MAX_PACKET_SIZE" => "65507", + "STATSD_BUFFER_CAPACITY" => "1000", + "STATSD_USE_NEW_CLIENT" => "1", + ) + + begin + client = env.client + sink = client.sink + assert_kind_of(StatsD::Instrument::BatchedSink, sink) + + underlying_sink = sink.instance_variable_get(:@sink) + connection = underlying_sink.connection + assert_kind_of(StatsD::Instrument::UdsConnection, connection) + assert_equal(65507, connection.instance_variable_get(:@max_packet_size)) + ensure + server.close + File.unlink(socket_path) if File.exist?(socket_path) + end + end end diff --git a/test/test_helper.rb b/test/test_helper.rb index aa5785c3..2ce5ec28 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -30,6 +30,15 @@ def strict_mode_enabled? end end +# Add helper methods available to all tests +module Minitest + class Test + def skip_on_jruby(message = "Test skipped on JRuby") + skip(message) if RUBY_ENGINE == "jruby" + end + end +end + StatsD.logger = Logger.new(File::NULL) Thread.abort_on_exception = true diff --git a/test/udp_sink_test.rb b/test/udp_sink_test.rb index 13e912e8..7fe9c2da 100644 --- a/test/udp_sink_test.rb +++ b/test/udp_sink_test.rb @@ -149,9 +149,27 @@ def test_socket_error_should_invalidate_socket UDPSocket.stubs(:new).returns(socket = mock("socket")) seq = sequence("connect_fail_connect_succeed") + + # First attempt + socket.expects(:setsockopt) + .with(Socket::SOL_SOCKET, Socket::SO_SNDBUF, StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE) + .in_sequence(seq) + socket.expects(:getsockopt) + .with(Socket::SOL_SOCKET, Socket::SO_SNDBUF) + .returns(mock(int: StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE)) + .in_sequence(seq) socket.expects(:connect).with("localhost", 8125).in_sequence(seq) socket.expects(:send).raises(Errno::EDESTADDRREQ).in_sequence(seq) socket.expects(:close).in_sequence(seq) + + # Second attempt after error + socket.expects(:setsockopt) + .with(Socket::SOL_SOCKET, Socket::SO_SNDBUF, StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE) + .in_sequence(seq) + socket.expects(:getsockopt) + .with(Socket::SOL_SOCKET, Socket::SO_SNDBUF) + .returns(mock(int: StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE)) + .in_sequence(seq) socket.expects(:connect).with("localhost", 8125).in_sequence(seq) socket.expects(:send).twice.returns(1).in_sequence(seq) socket.expects(:close).in_sequence(seq) @@ -161,7 +179,8 @@ def test_socket_error_should_invalidate_socket udp_sink << "bar:1|c" assert_equal( - "[#{@sink_class}] Resetting connection because of " \ + "[#{@sink_class}] [#{@sink_class.for_addr("localhost:8125").connection.class}] " \ + "Resetting connection because of " \ "Errno::EDESTADDRREQ: Destination address required\n", logs.string, ) diff --git a/test/uds_sink_test.rb b/test/uds_sink_test.rb index fef93206..a36be39d 100644 --- a/test/uds_sink_test.rb +++ b/test/uds_sink_test.rb @@ -35,10 +35,6 @@ def sink @sink ||= build_sink(@socket_path) end - def skip_on_jruby(message = "JRuby does not support UNIX domain sockets") - skip(message) if RUBY_PLATFORM == "java" - end - def read_datagrams(count, timeout: ENV["CI"] ? 5 : 1) datagrams = [] count.times do