Skip to content

Commit

Permalink
Fix UDS socket configuration and improve error handling (#391)
Browse files Browse the repository at this point in the history
* Fixing bug on UDS connection with packet size

we were not passing the correct value from the environment in the constructor

Signed-off-by: Pedro Tanaka <[email protected]>

* Adding datadog gem benchmarks for cross comparison

Signed-off-by: Pedro Tanaka <[email protected]>

* appeasing rubocop

Signed-off-by: Pedro Tanaka <[email protected]>

fixing ci

Signed-off-by: Pedro Tanaka <[email protected]>

* add changelog, bump version

Signed-off-by: Pedro Tanaka <[email protected]>

* Introduce method for send buffer size and extract module

Signed-off-by: Pedro Tanaka <[email protected]>

* Improve handling of setting the send buffer vs the batched sink buffer

Signed-off-by: Pedro Tanaka <[email protected]>

* skipping UDS tests in jruby

Signed-off-by: Pedro Tanaka <[email protected]>

---------

Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka authored Dec 19, 2024
1 parent fa7b330 commit 4045921
Show file tree
Hide file tree
Showing 19 changed files with 285 additions and 45 deletions.
10 changes: 3 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@ on: push

jobs:
test:
name: Ruby ${{ matrix.ruby }} on ubuntu-latest
runs-on: ubuntu-latest
name: Ruby ${{ matrix.ruby }} on ubuntu-22.04
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
ruby: ['2.6', '2.7', '3.0', '3.1', '3.2', '3.3', 'ruby-head', 'jruby-9.4.8.0', 'truffleruby-22.3.1']
# Windows on macOS builds started failing, so they are disabled for now
# platform: [windows-2019, macOS-10.14, ubuntu-18.04]
# exclude:
# ...
ruby: ['2.7', '3.0', '3.1', '3.2', '3.3', 'ruby-head', 'jruby-9.4.9.0', 'truffleruby-22.3.1']

steps:
- uses: actions/checkout@v4
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ section below.

## Unreleased changes

## Version 3.9.8

- [#390](https://github.com/Shopify/statsd-instrument/pull/391) - Fixing bug in Environment when using UDS. The max packet size option was not being passed to the
UDS connection, causing messages that were too large to be dropped (specially sensitive when used together with BatchedSink).

## Version 3.9.7

- [#389](https://github.com/Shopify/statsd-instrument/pull/389) - Fixing bug with BatchedSink constructor when using UDS, the constructor was not properly passing the Sink to the BatchedSink.
Expand Down
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.PHONY: test lint update
test:
bundle exec rake test

lint:
bundle exec rake lint_fix

update:
bundle update

check: update lint test
55 changes: 55 additions & 0 deletions benchmark/local-udp-throughput
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ require "datadog/statsd"
require "forwardable"
require "vernier"

module Datadog
class Statsd
class Telemetry
def flush
["bytes", "packets"].each do |kind|
["sent", "dropped", "dropped_queue", "dropped_writer"].each do |measure|
var = "#{kind}_#{measure}"
puts "#{var}: #{instance_variable_get("@#{var}".to_sym)}"
end
end
puts "-" * 10

[]
end
end
end
end

class DatadogShim
extend Forwardable

Expand Down Expand Up @@ -222,3 +240,40 @@ benchmark_implementation(
if ENABLE_PROFILING
Vernier.stop_profile
end

if ENABLE_PROFILING
Vernier.start_profile(out: "tmp/benchmark_profile_datadog.json")
end
benchmark_implementation(
"Datadog client with UDP and delayed serialization",
{
buffer_max_payload_size: 4096,
delay_serialization: true,
telemetry_flush_interval: 1,
# The datadog implemtation will drop metrics if the queue is full
# to imitate the behavior of the implementation of this gem, we set
# the queue size to infinity to avoid dropping metrics.
sender_queue_size: Float::INFINITY,
},
true,
)
if ENABLE_PROFILING
Vernier.stop_profile
end

if ENABLE_PROFILING
Vernier.start_profile(out: "tmp/benchmark_profile_datadog.json")
end
benchmark_implementation(
"UDP - Datadog gem - using: delay_serialization, multi-threaded, allow dropping samples",
{
buffer_max_payload_size: 4096,
delay_serialization: true,
telemetry_flush_interval: 1,
sender_queue_size: 250_000,
},
true,
)
if ENABLE_PROFILING
Vernier.stop_profile
end
1 change: 1 addition & 0 deletions lib/statsd/instrument.rb
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ def extended(klass)
require "statsd/instrument/helpers"
require "statsd/instrument/assertions"
require "statsd/instrument/expectation"
require "statsd/instrument/connection_behavior"
require "statsd/instrument/uds_connection"
require "statsd/instrument/udp_connection"
require "statsd/instrument/sink"
Expand Down
1 change: 0 additions & 1 deletion lib/statsd/instrument/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ def service_check(name, status, timestamp: nil, hostname: nil, tags: nil, messag
# @note Supported by the Datadog implementation only.
def event(title, text, timestamp: nil, hostname: nil, aggregation_key: nil, priority: nil,
source_type_name: nil, alert_type: nil, tags: nil, no_prefix: false)

emit(datagram_builder(no_prefix: no_prefix)._e(
title,
text,
Expand Down
51 changes: 51 additions & 0 deletions lib/statsd/instrument/connection_behavior.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# frozen_string_literal: true

module StatsD
module Instrument
module ConnectionBehavior
def close
@socket&.close
rescue IOError, SystemCallError => e
StatsD.logger.debug do
"[#{self.class.name}] Error closing socket: #{e.class}: #{e.message}"
end
ensure
@socket = nil
end

def send_buffer_size
if socket
send_buffer_size_from_socket(socket)
else
@max_packet_size
end
end

def type
raise NotImplementedError, "#{self.class} must implement #type"
end

private

def send_buffer_size_from_socket(original_socket)
original_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int
end

def setup_socket(original_socket)
original_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, @max_packet_size.to_i)
if send_buffer_size_from_socket(original_socket) < @max_packet_size
StatsD.logger.warn do
"[#{self.class.name}] Could not set socket send buffer size to #{@max_packet_size} " \
"allowed size by environment/OS is (#{send_buffer_size_from_socket(original_socket)})."
end
end
original_socket
rescue IOError => e
StatsD.logger.debug do
"[#{self.class.name}] Failed to create socket: #{e.class}: #{e.message}"
end
nil
end
end
end
end
1 change: 0 additions & 1 deletion lib/statsd/instrument/dogstatsd_datagram_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def latency_metric_type
# @see https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#events
def _e(title, text, timestamp: nil, hostname: nil, aggregation_key: nil, priority: nil,
source_type_name: nil, alert_type: nil, tags: nil)

escaped_title = "#{@prefix}#{title}".gsub("\n", '\n')
escaped_text = text.gsub("\n", '\n')

Expand Down
27 changes: 20 additions & 7 deletions lib/statsd/instrument/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ def statsd_uds_send?

def statsd_max_packet_size
if statsd_uds_send?
return Float(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE))
Integer(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE))
else
Integer(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE))
end

Float(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE))
end

def statsd_batch_statistics_interval
Expand Down Expand Up @@ -140,19 +140,32 @@ def default_sink_for_environment
case environment
when "production", "staging"
connection = if statsd_uds_send?
StatsD::Instrument::UdsConnection.new(statsd_socket_path)
StatsD::Instrument::UdsConnection.new(
statsd_socket_path,
max_packet_size: statsd_max_packet_size,
)
else
host, port = statsd_addr.split(":")
StatsD::Instrument::UdpConnection.new(host, port.to_i)
StatsD::Instrument::UdpConnection.new(
host,
port.to_i,
max_packet_size: statsd_max_packet_size,
)
end

sink = StatsD::Instrument::Sink.new(connection)
if statsd_batching?
# if we are batching, wrap the sink in a batched sink
current_send_buffer_size = connection.send_buffer_size
if current_send_buffer_size < statsd_max_packet_size
StatsD.logger.warn do
"[StatsD::Instrument::Environment] Send buffer size #{current_send_buffer_size} differs from " \
"max packet size #{statsd_max_packet_size}. Using send buffer size as max packet size."
end
end
return StatsD::Instrument::BatchedSink.new(
sink,
buffer_capacity: statsd_buffer_capacity,
max_packet_size: statsd_max_packet_size,
max_packet_size: [current_send_buffer_size, statsd_max_packet_size].min,
statistics_interval: statsd_batch_statistics_interval,
)
end
Expand Down
1 change: 0 additions & 1 deletion lib/statsd/instrument/expectation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def histogram(name, value = nil, **options)

def initialize(client: nil, type:, name:, value: nil,
sample_rate: nil, tags: nil, no_prefix: false, times: 1)

@type = type
@name = no_prefix ? name : StatsD::Instrument::Helpers.prefix_metric(name, client: client)
@value = normalized_value_for_type(type, value) if value
Expand Down
7 changes: 5 additions & 2 deletions lib/statsd/instrument/sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ def <<(datagram)
connection.send_datagram(datagram)
rescue SocketError, IOError, SystemCallError => error
StatsD.logger.debug do
"[#{self.class.name}] Resetting connection because of #{error.class}: #{error.message}"
"[#{self.class.name}] [#{connection.class.name}] " \
"Resetting connection because of #{error.class}: #{error.message}"
end
invalidate_connection
if retried
StatsD.logger.warn do
"[#{self.class.name}] Events were dropped because of #{error.class}: #{error.message}"
"[#{self.class.name}] [#{connection.class.name}] " \
"Events were dropped (after retrying) because of #{error.class}: #{error.message}. " \
"Message size: #{datagram.bytesize} bytes."
end
else
retried = true
Expand Down
1 change: 0 additions & 1 deletion lib/statsd/instrument/strict.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def service_check(name, status, tags: nil, no_prefix: false, hostname: nil, time

def event(title, text, tags: nil, no_prefix: false,
hostname: nil, timestamp: nil, aggregation_key: nil, priority: nil, source_type_name: nil, alert_type: nil)

super
end

Expand Down
17 changes: 8 additions & 9 deletions lib/statsd/instrument/udp_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,22 @@
module StatsD
module Instrument
class UdpConnection
include ConnectionBehavior

DEFAULT_MAX_PACKET_SIZE = 1_472

attr_reader :host, :port

def initialize(host, port)
def initialize(host, port, max_packet_size: DEFAULT_MAX_PACKET_SIZE)
@host = host
@port = port
@max_packet_size = max_packet_size
end

def send_datagram(message)
socket.send(message, 0)
end

def close
@socket&.close
@socket = nil
end

def type
:udp
end
Expand All @@ -29,9 +27,10 @@ def type

def socket
@socket ||= begin
socket = UDPSocket.new
socket.connect(@host, @port)
socket
udp_socket = UDPSocket.new
setup_socket(udp_socket)&.tap do |s|
s.connect(@host, @port)
end
end
end
end
Expand Down
17 changes: 7 additions & 10 deletions lib/statsd/instrument/uds_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
module StatsD
module Instrument
class UdsConnection
include ConnectionBehavior

DEFAULT_MAX_PACKET_SIZE = 8_192

def initialize(socket_path, max_packet_size: DEFAULT_MAX_PACKET_SIZE)
Expand All @@ -17,12 +19,7 @@ def initialize(socket_path, max_packet_size: DEFAULT_MAX_PACKET_SIZE)
end

def send_datagram(message)
socket.sendmsg(message, 0)
end

def close
@socket&.close
@socket = nil
socket&.sendmsg(message, 0)
end

def host
Expand All @@ -41,10 +38,10 @@ def type

def socket
@socket ||= begin
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, @max_packet_size.to_i)
socket.connect(Socket.pack_sockaddr_un(@socket_path))
socket
unix_socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
setup_socket(unix_socket)&.tap do |s|
s.connect(Socket.pack_sockaddr_un(@socket_path))
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/statsd/instrument/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module StatsD
module Instrument
VERSION = "3.9.7"
VERSION = "3.9.8"
end
end
Loading

0 comments on commit 4045921

Please sign in to comment.