Skip to content

Commit

Permalink
Store requeue options on Strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
anero committed Nov 22, 2024
1 parent aa7b3b0 commit 3c062cc
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 254 deletions.
2 changes: 1 addition & 1 deletion lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def requeue_throttled(work)
job_class = Object.const_get(message.fetch("wrapped") { message.fetch("class") { return false } })

Registry.get job_class do |strategy|
strategy.requeue_throttled(work, **job_class.sidekiq_throttled_requeue_options)
strategy.requeue_throttled(work)
end
end
end
Expand Down
9 changes: 0 additions & 9 deletions lib/sidekiq/throttled/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ module Throttled
#
# @see ClassMethods
module Job
VALID_VALUES_FOR_REQUEUE_WITH = %i[enqueue schedule].freeze

# Extends worker class with {ClassMethods}.
#
# @note Using `included` hook with extending worker with {ClassMethods}
Expand Down Expand Up @@ -91,13 +89,6 @@ module ClassMethods
# @see Registry.add
# @return [void]
def sidekiq_throttle(**kwargs)
requeue_options = Throttled.config.default_requeue_options.merge(kwargs.delete(:requeue) || {})
unless VALID_VALUES_FOR_REQUEUE_WITH.include?(requeue_options[:with])
raise ArgumentError, "requeue: #{requeue_options[:with]} is not a valid value for :with"
end

self.sidekiq_throttled_requeue_options = requeue_options

Registry.add(self, **kwargs)
end

Expand Down
74 changes: 49 additions & 25 deletions lib/sidekiq/throttled/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ module Throttled
# Meta-strategy that couples {Concurrency} and {Threshold} strategies.
#
# @private
class Strategy
class Strategy # rubocop:disable Metrics/ClassLength
# :enqueue means put the job back at the end of the queue immediately
# :schedule means schedule enqueueing the job for a later time when we expect to have capacity
VALID_VALUES_FOR_REQUEUE_WITH = %i[enqueue schedule].freeze

# @!attribute [r] concurrency
# @return [Strategy::Concurrency, nil]
attr_reader :concurrency
Expand All @@ -24,14 +28,19 @@ class Strategy
# @return [Proc, nil]
attr_reader :observer

# @!attribute [r] requeue_options
# @return [Hash, nil]
attr_reader :requeue_options

# @param [#to_s] name
# @param [Hash] concurrency Concurrency options.
# See keyword args of {Strategy::Concurrency#initialize} for details.
# @param [Hash] threshold Threshold options.
# See keyword args of {Strategy::Threshold#initialize} for details.
# @param [#call] key_suffix Dynamic key suffix generator.
# @param [#call] observer Process called after throttled.
def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil)
# @param [#call] requeue What to do with jobs that are throttled.
def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil, requeue: nil) # rubocop:disable Metrics/MethodLength, Metrics/ParameterLists
@observer = observer

@concurrency = StrategyCollection.new(concurrency,
Expand All @@ -44,7 +53,9 @@ def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer
name: name,
key_suffix: key_suffix)

raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any?
@requeue_options = Throttled.config.default_requeue_options.merge(requeue || {})

validate!
end

# @return [Boolean] whenever strategy has dynamic config
Expand Down Expand Up @@ -72,26 +83,30 @@ def throttled?(jid, *job_args)
false
end

# Return throttled job to be executed later. Implementation depends on the value of `with`:
# :enqueue means put the job back at the end of the queue immediately
# :schedule means schedule enqueueing the job for a later time when we expect to have capacity
#
# @param [#to_s, #call] with How to handle the throttled job
# @param [#to_s, #call] to Name of the queue to re-queue the job to.
# If not specified, will use the job's original queue.
# @return [Proc, Symbol] How to requeue the throttled job
def requeue_with
requeue_options[:with]
end

# @return [String, nil] Name of the queue to re-queue the job to.
def requeue_to
requeue_options[:to]
end

# Return throttled job to be executed later. Implementation depends on the strategy's `requeue` options.
# @return [void]
def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength
# Resolve :with and :to arguments, calling them if they are Procs
def requeue_throttled(work) # rubocop:disable Metrics/MethodLength
# Resolve :with and :to options, calling them if they are Procs
job_args = JSON.parse(work.job)["args"]
requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with
target_queue = calc_target_queue(work, to)
with = requeue_with.respond_to?(:call) ? requeue_with.call(*job_args) : requeue_with
target_queue = calc_target_queue(work)

case requeue_with
case with
when :enqueue
re_enqueue_throttled(work, target_queue)
when :schedule
# Find out when we will next be able to execute this job, and reschedule for then.
reschedule_throttled(work, requeue_to: target_queue)
reschedule_throttled(work, target_queue)
else
raise "unrecognized :with option #{with}"
end
Expand All @@ -112,14 +127,23 @@ def reset!

private

def calc_target_queue(work, to) # rubocop:disable Metrics/MethodLength
target = case to
def validate!
unless VALID_VALUES_FOR_REQUEUE_WITH.include?(@requeue_options[:with]) ||
@requeue_options[:with].respond_to?(:call)
raise ArgumentError, "requeue: #{@requeue_options[:with]} is not a valid value for :with"
end

raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any?
end

def calc_target_queue(work) # rubocop:disable Metrics/MethodLength
target = case requeue_to
when Proc, Method
to.call(*JSON.parse(work.job)["args"])
requeue_to.call(*JSON.parse(work.job)["args"])
when NilClass
work.queue
when String, Symbol
to.to_s
requeue_to.to_s
else
raise ArgumentError, "Invalid argument for `to`"
end
Expand All @@ -130,21 +154,21 @@ def calc_target_queue(work, to) # rubocop:disable Metrics/MethodLength
end

# Push the job back to the head of the queue.
def re_enqueue_throttled(work, requeue_to)
def re_enqueue_throttled(work, target_queue)
case work.class.name
when "Sidekiq::Pro::SuperFetch::UnitOfWork"
# Calls SuperFetch UnitOfWork's requeue to remove the job from the
# temporary queue and push job back to the head of the target queue, so that
# the job won't be tried immediately after it was requeued (in most cases).
work.queue = requeue_to if requeue_to
work.queue = target_queue if target_queue
work.requeue
else
# This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
Sidekiq.redis { |conn| conn.lpush(requeue_to, work.job) }
Sidekiq.redis { |conn| conn.lpush(target_queue, work.job) }
end
end

def reschedule_throttled(work, requeue_to:)
def reschedule_throttled(work, target_queue)
message = JSON.parse(work.job)
job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
job_args = message["args"]
Expand All @@ -154,7 +178,7 @@ def reschedule_throttled(work, requeue_to:)
# Technically, the job could processed twice if the process dies between the two lines,
# but your job should be idempotent anyway, right?
# The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk.
Sidekiq::Client.enqueue_to_in(requeue_to, retry_in(work), Object.const_get(job_class), *job_args)
Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args)
work.acknowledge
end

Expand Down
65 changes: 25 additions & 40 deletions spec/lib/sidekiq/throttled/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,36 @@
describe ".sidekiq_throttle" do
it "delegates call to Registry.register" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)
.to receive(:add).with(working_class, concurrency: { limit: 10 })

working_class.sidekiq_throttle(foo: :bar)

expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :enqueue })
working_class.sidekiq_throttle(concurrency: { limit: 10 })
end

it "accepts and stores a requeue parameter including :with" do
it "accepts and registers a strategy with a requeue parameter including :with" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue: { with: :schedule })
.to receive(:add).with(working_class, concurrency: { limit: 10 }, requeue: { with: :schedule })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :schedule })
working_class.sidekiq_throttle(concurrency: { limit: 10 }, requeue: { with: :schedule })
end

it "accepts and stores a requeue parameter including :to" do
it "accepts and registers a strategy with a requeue parameter including :to" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)
.to receive(:add).with(working_class, concurrency: { limit: 10 }, requeue: { to: :other_queue })

working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :enqueue })
working_class.sidekiq_throttle(concurrency: { limit: 10 }, requeue: { to: :other_queue })
end

it "accepts and stores a requeue parameter including both :to and :with" do
it "accepts and registers a strategy with a requeue parameter including both :to and :with" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue, with: :schedule })
.to receive(:add).with(working_class, concurrency: { limit: 10 },
requeue: { to: :other_queue, with: :schedule })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :schedule })
working_class.sidekiq_throttle(concurrency: { limit: 10 }, requeue: { to: :other_queue, with: :schedule })
end

it "raises an error when :with is not a valid value" do
expect do
working_class.sidekiq_throttle(foo: :bar, requeue: { with: :invalid_with_value })
working_class.sidekiq_throttle(requeue: { with: :invalid_with_value })
end.to raise_error(ArgumentError, "requeue: invalid_with_value is not a valid value for :with")
end

Expand All @@ -67,39 +60,31 @@
end

it "uses the default when not overridden" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)
working_class.sidekiq_throttle(concurrency: { limit: 10 })

working_class.sidekiq_throttle(foo: :bar)

expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :schedule })
strategy = Sidekiq::Throttled::Registry.get(working_class)
expect(strategy.requeue_options).to eq({ with: :schedule })
end

it "uses the default alongside a requeue parameter including :to" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue })
working_class.sidekiq_throttle(concurrency: { limit: 10 }, requeue: { to: :other_queue })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :schedule })
strategy = Sidekiq::Throttled::Registry.get(working_class)
expect(strategy.requeue_options).to eq({ to: :other_queue, with: :schedule })
end

it "allows overriding the default" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)
working_class.sidekiq_throttle(concurrency: { limit: 10 }, requeue: { with: :enqueue })

working_class.sidekiq_throttle(foo: :bar, requeue: { with: :enqueue })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :enqueue })
strategy = Sidekiq::Throttled::Registry.get(working_class)
expect(strategy.requeue_options).to eq({ with: :enqueue })
end

it "allows overriding the default and including a :to parameter" do
expect(Sidekiq::Throttled::Registry)
.to receive(:add).with(working_class, foo: :bar)

working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue, with: :enqueue })
working_class.sidekiq_throttle(concurrency: { limit: 10 }, requeue: { to: :other_queue, with: :enqueue })

expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :enqueue })
strategy = Sidekiq::Throttled::Registry.get(working_class)
expect(strategy.requeue_options).to eq({ to: :other_queue, with: :enqueue })
end
end
end
Expand Down
Loading

0 comments on commit 3c062cc

Please sign in to comment.