diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index 4f5f69103c..2a12e04247 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -33,7 +33,7 @@ module StreamQueueSpecHelpers end end -describe LavinMQ::StreamQueue do +describe LavinMQ::AMQP::StreamQueue do stream_queue_args = LavinMQ::AMQP::Table.new({"x-queue-type": "stream"}) describe "Consume" do @@ -263,14 +263,14 @@ describe LavinMQ::StreamQueue do StreamQueueSpecHelpers.publish(s, queue_name, 1) data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) offsets.each_with_index do |offset, i| msg_store.store_consumer_offset(tag_prefix + i.to_s, offset) end msg_store.close sleep 0.1 - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) offsets.each_with_index do |offset, i| msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset end @@ -286,7 +286,7 @@ describe LavinMQ::StreamQueue do StreamQueueSpecHelpers.publish(s, queue_name, 1) data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) offsets.each do |offset| msg_store.store_consumer_offset(consumer_tag, offset) end @@ -305,13 +305,13 @@ describe LavinMQ::StreamQueue do StreamQueueSpecHelpers.publish(s, queue_name, 1) data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) offsets.each do |offset| msg_store.store_consumer_offset(consumer_tag, offset) end msg_store.close - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last bytesize = consumer_tag.bytesize + 1 + 8 msg_store.@consumer_offsets.size.should eq bytesize @@ -326,7 +326,7 @@ describe LavinMQ::StreamQueue do with_amqp_server do |s| StreamQueueSpecHelpers.publish(s, queue_name, 1) data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) bytesize = consumer_tag.bytesize + 1 + 8 offsets = (LavinMQ::Config.instance.segment_size / bytesize).to_i32 + 1 @@ -403,7 +403,7 @@ describe LavinMQ::StreamQueue do StreamQueueSpecHelpers.publish(s, queue_name, 1) data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) offsets.each_with_index do |offset, i| msg_store.store_consumer_offset(tag_prefix + i.to_s, offset) end @@ -412,7 +412,7 @@ describe LavinMQ::StreamQueue do msg_store.close sleep 0.1 - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0] msg_store.close @@ -444,7 +444,7 @@ describe LavinMQ::StreamQueue do msgs.receive end - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2 with_channel(s) do |ch| @@ -452,7 +452,7 @@ describe LavinMQ::StreamQueue do 2.times { q.publish_confirm msg_body } end - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil end end @@ -477,7 +477,7 @@ describe LavinMQ::StreamQueue do sleep 0.1 data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) msg_store.last_offset_by_consumer_tag(c_tag).should eq nil end end @@ -488,7 +488,7 @@ describe LavinMQ::StreamQueue do with_amqp_server do |s| StreamQueueSpecHelpers.publish(s, queue_name, 1) data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) one_offset_bytesize = "#{consumer_tag_prefix}#{1000}".bytesize + 1 + 8 offsets = (LavinMQ::Config.instance.segment_size / one_offset_bytesize).to_i32 + 1 bytesize = 0 diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 4240cf9770..22d73d666e 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -1,4 +1,5 @@ require "./stream_queue" +require "../stream_consumer" module LavinMQ::AMQP class StreamQueue < DurableQueue @@ -170,7 +171,7 @@ module LavinMQ::AMQP @replicator.try &.replace_file @consumer_offsets.path end - def shift?(consumer : Client::Channel::StreamConsumer) : Envelope? + def shift?(consumer : AMQP::StreamConsumer) : Envelope? raise ClosedError.new if @closed if env = shift_requeued(consumer.requeued) diff --git a/src/lavinmq/amqp/stream_consumer.cr b/src/lavinmq/amqp/stream_consumer.cr index 152ce98f2d..eb270bbd2a 100644 --- a/src/lavinmq/amqp/stream_consumer.cr +++ b/src/lavinmq/amqp/stream_consumer.cr @@ -9,11 +9,13 @@ module LavinMQ property segment : UInt32 property pos : UInt32 getter requeued = Deque(SegmentPosition).new + @track_offset = false def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) + @tag = frame.consumer_tag validate_preconditions(frame) offset = frame.arguments["x-stream-offset"]? - @offset, @segment, @pos = stream_queue.find_offset(offset) + @offset, @segment, @pos = stream_queue.find_offset(offset, @tag, @track_offset) super end @@ -34,7 +36,10 @@ module LavinMQ raise LavinMQ::Error::PreconditionFailed.new("x-priority not supported on stream queues") end case frame.arguments["x-stream-offset"]? - when Nil, Int, Time, "first", "next", "last" + when Nil + @track_offset = true unless @tag.starts_with?("amq.ctag-") + when Int, Time, "first", "next", "last" + @track_offset = true if frame.arguments["x-stream-automatic-offset-tracking"]? else raise LavinMQ::Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'") end end @@ -82,6 +87,11 @@ module LavinMQ @queue.as(StreamQueue) end + def ack(sp) + stream_queue.store_consumer_offset(@tag, @offset) if @track_offset + super + end + def reject(sp, requeue : Bool) super if requeue diff --git a/src/lavinmq/client/channel/stream_consumer.cr b/src/lavinmq/client/channel/stream_consumer.cr deleted file mode 100644 index 2dfc4a5edd..0000000000 --- a/src/lavinmq/client/channel/stream_consumer.cr +++ /dev/null @@ -1,105 +0,0 @@ -require "./consumer" -require "../../segment_position" - -module LavinMQ - class Client - class Channel - class StreamConsumer < LavinMQ::Client::Channel::Consumer - property offset : Int64 - property segment : UInt32 - property pos : UInt32 - getter requeued = Deque(SegmentPosition).new - @track_offset = false - - def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) - @tag = frame.consumer_tag - validate_preconditions(frame) - offset = frame.arguments["x-stream-offset"]? - @offset, @segment, @pos = stream_queue.find_offset(offset, @tag, @track_offset) - super - end - - private def validate_preconditions(frame) - if frame.exclusive - raise Error::PreconditionFailed.new("Stream consumers must not be exclusive") - end - if frame.no_ack - raise Error::PreconditionFailed.new("Stream consumers must acknowledge messages") - end - if @channel.prefetch_count.zero? - raise Error::PreconditionFailed.new("Stream consumers must have a prefetch limit") - end - unless @channel.global_prefetch_count.zero? - raise Error::PreconditionFailed.new("Stream consumers does not support global prefetch limit") - end - if frame.arguments.has_key? "x-priority" - raise Error::PreconditionFailed.new("x-priority not supported on stream queues") - end - case frame.arguments["x-stream-offset"]? - when Nil - @track_offset = true unless @tag.starts_with?("amq.ctag-") - when Int, Time, "first", "next", "last" - @track_offset = true if frame.arguments["x-stream-automatic-offset-tracking"]? - else raise Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'") - end - end - - private def deliver_loop - i = 0 - loop do - wait_for_capacity - loop do - raise ClosedError.new if @closed - next if wait_for_queue_ready - next if wait_for_paused_queue - next if wait_for_flow - break - end - {% unless flag?(:release) %} - @log.debug { "Getting a new message" } - {% end %} - stream_queue.consume_get(self) do |env| - deliver(env.message, env.segment_position, env.redelivered) - end - Fiber.yield if (i &+= 1) % 32768 == 0 - end - rescue ex : ClosedError | Queue::ClosedError | Client::Channel::ClosedError | ::Channel::ClosedError - @log.debug { "deliver loop exiting: #{ex.inspect}" } - end - - private def wait_for_queue_ready - if @offset > stream_queue.last_offset && @requeued.empty? - @log.debug { "Waiting for queue not to be empty" } - select - when stream_queue.new_messages.receive - @log.debug { "Queue is not empty" } - when @has_requeued.receive - @log.debug { "Got a requeued message" } - when @notify_closed.receive - end - return true - end - end - - @has_requeued = ::Channel(Nil).new - - private def stream_queue : StreamQueue - @queue.as(StreamQueue) - end - - def ack(sp) - stream_queue.store_consumer_offset(@tag, @offset) if @track_offset - super - end - - def reject(sp, requeue : Bool) - super - if requeue - @requeued.push(sp) - @has_requeued.try_send? nil if @requeued.size == 1 - end - end - end - end - end -end