Skip to content

Commit

Permalink
testing drop with uds
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka committed Feb 18, 2025
1 parent 6fd8c49 commit 94f98a2
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 23 deletions.
46 changes: 23 additions & 23 deletions benchmark/local-udp-throughput
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ def benchmark_implementation(name, env = {}, datadog_client = false)
receiver_uds.bind(Socket.pack_sockaddr_un(SOCKET_PATH))
# with UDS we have to take data out of the socket, otherwise it will fill up
# and we will block writing to it (which is what we are testing)
consume = Thread.new do
loop do
receiver_uds.recv(32768)
rescue
# Ignored
end
end
# consume = Thread.new do
# loop do
# receiver_uds.recv(32768)
# rescue
# # Ignored
# end
# end

log_file = File.open(log_filename, "w+", level: Logger::WARN)
StatsD.logger = Logger.new(log_file)
Expand Down Expand Up @@ -170,7 +170,7 @@ def benchmark_implementation(name, env = {}, datadog_client = false)

duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start

consume.kill
# consume.kill
receiver.close
receiver_uds.close

Expand All @@ -179,21 +179,21 @@ def benchmark_implementation(name, env = {}, datadog_client = false)
puts "events: #{(events_sent / duration).round(1).to_s.reverse.gsub(/(\d{3})(?=\d)/, '\\1,').reverse}/s"
end

if ENABLE_PROFILING
Vernier.start_profile(out: "tmp/benchmark_profile_udp_sync.json")
end
benchmark_implementation("UDP sync", "STATSD_BUFFER_CAPACITY" => "0")
if ENABLE_PROFILING
Vernier.stop_profile
end

if ENABLE_PROFILING
Vernier.start_profile(out: "tmp/benchmark_profile_udp_async.json")
end
benchmark_implementation("UDP batched")
if ENABLE_PROFILING
Vernier.stop_profile
end
# if ENABLE_PROFILING
# Vernier.start_profile(out: "tmp/benchmark_profile_udp_sync.json")
# end
# benchmark_implementation("UDP sync", "STATSD_BUFFER_CAPACITY" => "0")
# if ENABLE_PROFILING
# Vernier.stop_profile
# end

# if ENABLE_PROFILING
# Vernier.start_profile(out: "tmp/benchmark_profile_udp_async.json")
# end
# benchmark_implementation("UDP batched")
# if ENABLE_PROFILING
# Vernier.stop_profile
# end

if ENABLE_PROFILING
Vernier.start_profile(out: "tmp/benchmark_profile_uds_small_packet.json")
Expand Down
62 changes: 62 additions & 0 deletions test/integration_test.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "test_helper"
require "fileutils"

class IntegrationTest < Minitest::Test
def setup
Expand Down Expand Up @@ -103,4 +104,65 @@ def test_live_local_udp_socket_with_aggregation_sampled_scenario
assert_match(/counter:\d+|c/, packets.find { |packet| packet.start_with?("counter:") })
assert_match(/test_distribution:\d+:3|d/, packets.find { |packet| packet.start_with?("test_distribution:") })
end

def test_live_local_uds_socket
socket_path = "/tmp/statsd-test-#{Process.pid}.socket"
begin
# Set up server with specific configuration
server = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
server.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true)
server.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 8192) # Using DEFAULT_MAX_PACKET_SIZE
server.bind(Socket.pack_sockaddr_un(socket_path))

# Get and print the actual receive buffer size
actual_rcvbuf = server.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int
puts "\nServer receive buffer size: #{actual_rcvbuf} bytes"

# Verify socket file exists
assert(File.exist?(socket_path), "Socket file should exist")

puts "Using socket path: #{socket_path}"
client = StatsD::Instrument::Environment.new(
"STATSD_SOCKET_PATH" => socket_path,
"STATSD_IMPLEMENTATION" => "dogstatsd",
"STATSD_ENV" => "production",
).client

logger = Logger.new($stdout)
logger.level = Logger::INFO

StatsD.logger = logger

# Send messages until we block
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)

# Create a large message
large_tags = (1..100).map { |i| "tag#{i}:value#{i}" }
test_message = "overflow_counter:2|c|##{large_tags.join(",")}"
puts "\nTest message size: #{test_message.bytesize} bytes"

begin
Timeout.timeout(1.0) do
100_000.times do |i|
client.distribution("overflow_distribution", 299, tags: large_tags)
end
end
rescue Timeout::Error
puts "Hit timeout as expected"
end
finish = Process.clock_gettime(Process::CLOCK_MONOTONIC)

total_time = finish - start
puts "\nTotal time: #{total_time}s"
puts "Messages received: #{messages_received}"
puts "Average time per message: #{(total_time / 100_000.0) * 1000}ms"

# Should have blocked and hit the timeout
assert_operator(finish - start, :>, 0.5, "Should have blocked when socket buffer is full")
ensure
# slow_reader&.kill
server&.close
File.unlink(socket_path) if File.exist?(socket_path)
end
end
end

0 comments on commit 94f98a2

Please sign in to comment.