Skip to content

Commit

Permalink
Add timeout to RMQ
Browse files Browse the repository at this point in the history
ENG-227
  • Loading branch information
kasesalum committed Jun 30, 2023
1 parent 6f854e9 commit b17b7aa
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
3 changes: 2 additions & 1 deletion lib/freddy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def initialize(connection, logger, max_concurrency)
# handler.error(message: 'Can not do')
# end
# end
def respond_to(destination, &callback)
def respond_to(destination, &callback, timeout=30)
@logger.info "Listening for requests on #{destination}"

channel = @connection.create_channel(prefetch: @prefetch_buffer_size)
Expand All @@ -91,6 +91,7 @@ def respond_to(destination, &callback)
channel: channel,
handler_adapter_factory: handler_adapter_factory
},
timeout,
&callback
)
end
Expand Down
22 changes: 13 additions & 9 deletions lib/freddy/consumers/respond_to_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
class Freddy
module Consumers
class RespondToConsumer
def self.consume(**attrs, &block)
new(**attrs).consume(&block)
def self.consume(**attrs, timeout, &block)
new(**attrs).consume(timeout, &block)
end

def initialize(thread_pool:, destination:, channel:, handler_adapter_factory:)
Expand All @@ -14,8 +14,8 @@ def initialize(thread_pool:, destination:, channel:, handler_adapter_factory:)
@handler_adapter_factory = handler_adapter_factory
end

def consume
consumer = consume_from_destination do |delivery|
def consume(timeout)
consumer = consume_from_destination(timeout) do |delivery|
adapter = @handler_adapter_factory.for(delivery)

msg_handler = MessageHandler.new(adapter, delivery)
Expand All @@ -27,16 +27,20 @@ def consume

private

def consume_from_destination(&block)
def consume_from_destination(timeout, &block)
@channel.queue(@destination).subscribe(manual_ack: true) do |delivery|
process_message(delivery, &block)
process_message(delivery, timeout, &block)
end
end

def process_message(delivery)
def process_message(delivery, timeout)
@consume_thread_pool.post do
delivery.in_span do
yield(delivery)
Timeout.timeout(timeout) do
delivery.in_span do
yield(delivery)
end
rescue Timeout::Error
# log a warning here maybe?
end
ensure
@channel.acknowledge(delivery.tag, false)
Expand Down

0 comments on commit b17b7aa

Please sign in to comment.