Skip to content

Commit

Permalink
Clean up integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
robbavey committed Jun 24, 2019
1 parent 7d79adc commit a008e3c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 136 deletions.
10 changes: 6 additions & 4 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,6 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# The class name of the partition assignment strategy that the client will use to distribute
# partition ownership amongst consumer instances
config :partition_assignment_strategy, :validate => :string
# ID of the pipeline whose events you want to read from.
def pipeline_id
respond_to?(:execution_context) ? execution_context.pipeline_id : "main"
end
# The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
config :receive_buffer_bytes, :validate => :string
# The amount of time to wait before attempting to reconnect to a given host.
Expand Down Expand Up @@ -360,4 +356,10 @@ def set_sasl_config(props)

props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
end

# ID of the pipeline whose events you want to read from.
def pipeline_id
respond_to?(:execution_context) ? execution_context.pipeline_id : "main"
end

end #class LogStash::Inputs::Kafka
201 changes: 69 additions & 132 deletions spec/integration/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,166 +4,103 @@
require "digest"
require "rspec/wait"

def thread_it(kafka_input, queue)
Thread.new do
begin
kafka_input.run(queue)
end
end
end

def run_with_kafka(&block)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
wait(timeout_seconds).for {queue.length}.to eq(expected_num_events)
yield(queue)
ensure
t.kill
t.join(30_000)
end
end

shared_examples 'consumes all expected messages' do
it 'should consume all expected messages' do
run_with_kafka do |queue|
expect(queue.length).to eq(expected_num_events)
end
end
end

# Please run kafka_test_setup.sh prior to executing this integration test.
describe "inputs/kafka", :integration => true do
subject(:kafka_input) { LogStash::Inputs::Kafka.new(config) }
let(:execution_context) { double("execution_context")}

before :each do
allow(kafka_input).to receive(:execution_context).and_return(execution_context)
allow(execution_context).to receive(:pipeline_id).and_return(pipeline_id)
end

# Group ids to make sure that the consumers get all the logs.
let(:group_id_1) {rand(36**8).to_s(36)}
let(:group_id_2) {rand(36**8).to_s(36)}
let(:group_id_3) {rand(36**8).to_s(36)}
let(:group_id_4) {rand(36**8).to_s(36)}
let(:group_id_5) {rand(36**8).to_s(36)}
let(:plain_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) }
let(:snappy_config) { { 'topics' => ['logstash_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
let(:lz4_config) { { 'topics' => ['logstash_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
let(:pattern_config) { { 'topics_pattern' => 'logstash_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} }
let(:decorate_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, 'auto_offset_reset' => 'earliest', 'decorate_events' => true} }
let(:manual_commit_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_5, 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false'} }
let(:pipeline_id) {rand(36**8).to_s(36)}
let(:config) { { 'codec' => 'plain', 'auto_offset_reset' => 'earliest'}}
let(:timeout_seconds) { 30 }
let(:num_events) { 103 }
let(:expected_num_events) { num_events }

describe "#kafka-topics" do
def thread_it(kafka_input, queue)
Thread.new do
begin
kafka_input.run(queue)
end
end
end

it "should consume all messages from plain 3-partition topic" do
kafka_input = LogStash::Inputs::Kafka.new(plain_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
expect(queue.length).to eq(num_events)
ensure
t.kill
t.join(30_000)
end
end

it "should consume all messages from snappy 3-partition topic" do
kafka_input = LogStash::Inputs::Kafka.new(snappy_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
expect(queue.length).to eq(num_events)
ensure
t.kill
t.join(30_000)
end
end
context 'from a plain 3 partition topic' do
let(:config) { super.merge({ 'topics' => ['logstash_topic_plain'], 'group_id' => group_id_1}) }
it_behaves_like 'consumes all expected messages'
end

it "should consume all messages from lz4 3-partition topic" do
kafka_input = LogStash::Inputs::Kafka.new(lz4_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
expect(queue.length).to eq(num_events)
ensure
t.kill
t.join(30_000)
end
end
context 'from snappy 3 partition topic' do
let(:config) { { 'topics' => ['logstash_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
it_behaves_like 'consumes all expected messages'
end

it "should consumer all messages with multiple consumers" do
kafka_input = LogStash::Inputs::Kafka.new(multi_consumer_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
expect(queue.length).to eq(num_events)
kafka_input.kafka_consumers.each_with_index do |consumer, i|
expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}-main")
end
ensure
t.kill
t.join(30_000)
end
end
context 'from lz4 3 partition topic' do
let(:config) { { 'topics' => ['logstash_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} }
it_behaves_like 'consumes all expected messages'
end

describe "#kafka-topics-pattern" do
def thread_it(kafka_input, queue)
Thread.new do
begin
kafka_input.run(queue)
end
end
end
context 'manually committing' do
let(:config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_2, 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false'} }
it_behaves_like 'consumes all expected messages'
end

it "should consume all messages from all 3 topics" do
kafka_input = LogStash::Inputs::Kafka.new(pattern_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(3*num_events)
expect(queue.length).to eq(3*num_events)
ensure
t.kill
t.join(30_000)
end
end
context 'using a pattern to consume from all 3 topics' do
let(:config) { { 'topics_pattern' => 'logstash_topic_.*', 'group_id' => group_id_3, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} }
let(:expected_num_events) { 3*num_events }
it_behaves_like 'consumes all expected messages'
end

describe "#kafka-decorate" do
def thread_it(kafka_input, queue)
Thread.new do
begin
kafka_input.run(queue)
context "with multiple consumers" do
let(:config) { super.merge({'topics' => ['logstash_topic_plain'], "group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) }
it 'should should consume all messages' do
run_with_kafka do |queue|
expect(queue.length).to eq(num_events)
kafka_input.kafka_consumers.each_with_index do |consumer, i|
expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}-#{pipeline_id}")
end
end
end
end

context 'with decorate events set to true' do
let(:config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, 'auto_offset_reset' => 'earliest', 'decorate_events' => true} }
it "should show the right topic and group name in decorated kafka section" do
start = LogStash::Timestamp.now.time.to_i
kafka_input = LogStash::Inputs::Kafka.new(decorate_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
run_with_kafka do |queue|
expect(queue.length).to eq(num_events)
event = queue.shift
expect(event.get("[@metadata][kafka][topic]")).to eq("logstash_topic_plain")
expect(event.get("[@metadata][kafka][consumer_group]")).to eq(group_id_3)
expect(event.get("[@metadata][kafka][timestamp]")).to be >= start
ensure
t.kill
t.join(30_000)
end
end
end

describe "#kafka-offset-commit" do
def thread_it(kafka_input, queue)
Thread.new do
begin
kafka_input.run(queue)
end
end
end

it "should manually commit offsets" do
kafka_input = LogStash::Inputs::Kafka.new(manual_commit_config)
queue = Queue.new
t = thread_it(kafka_input, queue)
begin
t.run
wait(timeout_seconds).for {queue.length}.to eq(num_events)
expect(queue.length).to eq(num_events)
ensure
t.kill
t.join(30_000)
end
end
end
Expand Down

0 comments on commit a008e3c

Please sign in to comment.