From e37817fc7daea3ce8573da91054f02490cd0c8bb Mon Sep 17 00:00:00 2001 From: Daniel Schierbeck Date: Thu, 26 Oct 2017 10:37:06 +0200 Subject: [PATCH 1/2] Add a spec for encoding and decoding a message --- spec/protocol/message_spec.rb | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 spec/protocol/message_spec.rb diff --git a/spec/protocol/message_spec.rb b/spec/protocol/message_spec.rb new file mode 100644 index 000000000..4aa271c32 --- /dev/null +++ b/spec/protocol/message_spec.rb @@ -0,0 +1,16 @@ +describe Kafka::Protocol::Message do + it "encodes and decodes messages" do + message = Kafka::Protocol::Message.new( + value: "yolo", + key: "xx", + ) + + io = StringIO.new + encoder = Kafka::Protocol::Encoder.new(io) + message.encode(encoder) + data = StringIO.new(io.string) + decoder = Kafka::Protocol::Decoder.new(data) + + expect(Kafka::Protocol::Message.decode(decoder)).to eq message + end +end From ba8fa96ff87f0bf1aafe8acbbc01f2c722f73b83 Mon Sep 17 00:00:00 2001 From: Daniel Schierbeck Date: Thu, 26 Oct 2017 10:28:52 +0200 Subject: [PATCH 2/2] Don't assume there's a timestamp on messages Old messages may not have one. --- lib/kafka/protocol/message.rb | 5 ++++- spec/fixtures/message-0.9-format | Bin 0 -> 32 bytes spec/protocol/message_spec.rb | 13 +++++++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 spec/fixtures/message-0.9-format diff --git a/lib/kafka/protocol/message.rb b/lib/kafka/protocol/message.rb index 06e91d4b5..7f6181e50 100644 --- a/lib/kafka/protocol/message.rb +++ b/lib/kafka/protocol/message.rb @@ -88,7 +88,10 @@ def self.decode(decoder) # attributes. codec_id = attributes & 0b111 - new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: Time.at(timestamp / 1000.0)) + # The timestamp will be nil if the message was written in the Kafka 0.9 log format. + create_time = timestamp && Time.at(timestamp / 1000.0) + + new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time) end private diff --git a/spec/fixtures/message-0.9-format b/spec/fixtures/message-0.9-format new file mode 100644 index 0000000000000000000000000000000000000000..a166ef946c15e1bcb73ae9b20e4148f445b6891b GIT binary patch literal 32 gcmezW9|9N{7(~{2pPmDx85o!