diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index ae844edbf4..917cdec7ae 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -245,7 +245,7 @@ describe LavinMQ::AMQP::StreamQueue do with_amqp_server do |s| StreamQueueSpecHelpers.publish(s, queue_name, offset + 1) offset.times { StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) } - sleep 0.1 + sleep 0.1.seconds # consume again, should start from last offset automatically msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) @@ -268,7 +268,7 @@ describe LavinMQ::AMQP::StreamQueue do msg_store.store_consumer_offset(tag_prefix + i.to_s, offset) end msg_store.close - sleep 0.1 + wait_for { msg_store.@closed } msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) offsets.each_with_index do |offset, i| @@ -348,7 +348,7 @@ describe LavinMQ::AMQP::StreamQueue do StreamQueueSpecHelpers.publish(s, queue_name, 2) msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 - sleep 0.1 + sleep 0.1.seconds # should consume the same message again since tracking was not saved from last consume msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) @@ -367,7 +367,7 @@ describe LavinMQ::AMQP::StreamQueue do # get message without x-stream-offset, tracks offset msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 - sleep 0.1 + sleep 0.1.seconds # consume with x-stream-offset set, should consume the same message again msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) @@ -386,7 +386,7 @@ describe LavinMQ::AMQP::StreamQueue do # get message without x-stream-offset, tracks offset msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 - sleep 0.1 + sleep 0.1.seconds # consume with x-stream-offset set, should consume the same message again msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) @@ -407,10 +407,10 @@ describe LavinMQ::AMQP::StreamQueue do offsets.each_with_index do |offset, i| msg_store.store_consumer_offset(tag_prefix + i.to_s, offset) end - sleep 0.1 + sleep 0.1.seconds msg_store.cleanup_consumer_offsets msg_store.close - sleep 0.1 + sleep 0.1.seconds msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil @@ -475,7 +475,7 @@ describe LavinMQ::AMQP::StreamQueue do msgs.receive end - sleep 0.1 + sleep 0.1.seconds data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) msg_store.last_offset_by_consumer_tag(c_tag).should eq nil