diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 4ce411e..1ba6fbc 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -221,7 +221,7 @@ def register public def run(logstash_queue) - @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } + @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}-#{pipeline_id}") } @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } @runner_threads.each { |t| t.join } end # def run @@ -356,4 +356,10 @@ def set_sasl_config(props) props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? end + + # ID of the pipeline whose events you want to read from. + def pipeline_id + respond_to?(:execution_context) ? execution_context.pipeline_id : "main" + end + end #class LogStash::Inputs::Kafka diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 1fa3015..91f5572 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -4,166 +4,103 @@ require "digest" require "rspec/wait" +def thread_it(kafka_input, queue) + Thread.new do + begin + kafka_input.run(queue) + end + end +end + +def run_with_kafka(&block) + queue = Queue.new + t = thread_it(kafka_input, queue) + begin + wait(timeout_seconds).for {queue.length}.to eq(expected_num_events) + yield(queue) + ensure + t.kill + t.join(30_000) + end +end + +shared_examples 'consumes all expected messages' do + it 'should consume all expected messages' do + run_with_kafka do |queue| + expect(queue.length).to eq(expected_num_events) + end + end +end + # Please run kafka_test_setup.sh prior to executing this integration test. describe "inputs/kafka", :integration => true do + subject(:kafka_input) { LogStash::Inputs::Kafka.new(config) } + let(:execution_context) { double("execution_context")} + + before :each do + allow(kafka_input).to receive(:execution_context).and_return(execution_context) + allow(execution_context).to receive(:pipeline_id).and_return(pipeline_id) + end + # Group ids to make sure that the consumers get all the logs. let(:group_id_1) {rand(36**8).to_s(36)} let(:group_id_2) {rand(36**8).to_s(36)} let(:group_id_3) {rand(36**8).to_s(36)} let(:group_id_4) {rand(36**8).to_s(36)} - let(:group_id_5) {rand(36**8).to_s(36)} - let(:plain_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) } - let(:snappy_config) { { 'topics' => ['logstash_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:lz4_config) { { 'topics' => ['logstash_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:pattern_config) { { 'topics_pattern' => 'logstash_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} } - let(:decorate_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, 'auto_offset_reset' => 'earliest', 'decorate_events' => true} } - let(:manual_commit_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_5, 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false'} } + let(:pipeline_id) {rand(36**8).to_s(36)} + let(:config) { { 'codec' => 'plain', 'auto_offset_reset' => 'earliest'}} let(:timeout_seconds) { 30 } let(:num_events) { 103 } + let(:expected_num_events) { num_events } - describe "#kafka-topics" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end - - it "should consume all messages from plain 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(plain_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end - end - - it "should consume all messages from snappy 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(snappy_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end - end + context 'from a plain 3 partition topic' do + let(:config) { super.merge({ 'topics' => ['logstash_topic_plain'], 'group_id' => group_id_1}) } + it_behaves_like 'consumes all expected messages' + end - it "should consume all messages from lz4 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(lz4_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end - end + context 'from snappy 3 partition topic' do + let(:config) { { 'topics' => ['logstash_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } + it_behaves_like 'consumes all expected messages' + end - it "should consumer all messages with multiple consumers" do - kafka_input = LogStash::Inputs::Kafka.new(multi_consumer_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - kafka_input.kafka_consumers.each_with_index do |consumer, i| - expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}") - end - ensure - t.kill - t.join(30_000) - end - end + context 'from lz4 3 partition topic' do + let(:config) { { 'topics' => ['logstash_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } + it_behaves_like 'consumes all expected messages' end - describe "#kafka-topics-pattern" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end + context 'manually committing' do + let(:config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_2, 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false'} } + it_behaves_like 'consumes all expected messages' + end - it "should consume all messages from all 3 topics" do - kafka_input = LogStash::Inputs::Kafka.new(pattern_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(3*num_events) - expect(queue.length).to eq(3*num_events) - ensure - t.kill - t.join(30_000) - end - end + context 'using a pattern to consume from all 3 topics' do + let(:config) { { 'topics_pattern' => 'logstash_topic_.*', 'group_id' => group_id_3, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} } + let(:expected_num_events) { 3*num_events } + it_behaves_like 'consumes all expected messages' end - describe "#kafka-decorate" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) + context "with multiple consumers" do + let(:config) { super.merge({'topics' => ['logstash_topic_plain'], "group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) } + it 'should should consume all messages' do + run_with_kafka do |queue| + expect(queue.length).to eq(num_events) + kafka_input.kafka_consumers.each_with_index do |consumer, i| + expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}-#{pipeline_id}") end end end + end + context 'with decorate events set to true' do + let(:config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, 'auto_offset_reset' => 'earliest', 'decorate_events' => true} } it "should show the right topic and group name in decorated kafka section" do start = LogStash::Timestamp.now.time.to_i - kafka_input = LogStash::Inputs::Kafka.new(decorate_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) + run_with_kafka do |queue| expect(queue.length).to eq(num_events) event = queue.shift expect(event.get("[@metadata][kafka][topic]")).to eq("logstash_topic_plain") expect(event.get("[@metadata][kafka][consumer_group]")).to eq(group_id_3) expect(event.get("[@metadata][kafka][timestamp]")).to be >= start - ensure - t.kill - t.join(30_000) - end - end - end - - describe "#kafka-offset-commit" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end - - it "should manually commit offsets" do - kafka_input = LogStash::Inputs::Kafka.new(manual_commit_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) end end end