diff --git a/lib/kafka/protocol/message.rb b/lib/kafka/protocol/message.rb index 06e91d4b5..7f6181e50 100644 --- a/lib/kafka/protocol/message.rb +++ b/lib/kafka/protocol/message.rb @@ -88,7 +88,10 @@ def self.decode(decoder) # attributes. codec_id = attributes & 0b111 - new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: Time.at(timestamp / 1000.0)) + # The timestamp will be nil if the message was written in the Kafka 0.9 log format. + create_time = timestamp && Time.at(timestamp / 1000.0) + + new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time) end private diff --git a/spec/fixtures/message-0.9-format b/spec/fixtures/message-0.9-format new file mode 100644 index 000000000..a166ef946 Binary files /dev/null and b/spec/fixtures/message-0.9-format differ diff --git a/spec/protocol/message_spec.rb b/spec/protocol/message_spec.rb new file mode 100644 index 000000000..c1ad47a90 --- /dev/null +++ b/spec/protocol/message_spec.rb @@ -0,0 +1,29 @@ +describe Kafka::Protocol::Message do + it "encodes and decodes messages" do + message = Kafka::Protocol::Message.new( + value: "yolo", + key: "xx", + ) + + io = StringIO.new + encoder = Kafka::Protocol::Encoder.new(io) + message.encode(encoder) + data = StringIO.new(io.string) + decoder = Kafka::Protocol::Decoder.new(data) + + expect(Kafka::Protocol::Message.decode(decoder)).to eq message + end + + it "decodes messages written in the 0.9 format" do + data = File.open("spec/fixtures/message-0.9-format") + + decoder = Kafka::Protocol::Decoder.new(data) + message = Kafka::Protocol::Message.decode(decoder) + + expect(message.key).to eq "xx" + expect(message.value).to eq "yolo" + + # Messages didn't have timestamps back in the 0.9 days. + expect(message.create_time).to eq nil + end +end