Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support queue option #5

Merged
merged 5 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ appraise "sidekiq-7.2.x" do
gem "sidekiq", "~> 7.2.0"
end
end

appraise "sidekiq-7.3.x" do
group :test do
gem "sidekiq", "~> 7.3.0"
end
end
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ gem "rake", "~> 13.0"

gem "rspec", "~> 3.0"

gem "rubocop", "~> 1.21"
gem "rubocop"
gem "rubocop-performance"
gem "rubocop-rake"
gem "rubocop-rspec"
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ class MyJob

sidekiq_rescue CustomAppException # defaults to 60 seconds delay and 10 retries
sidekiq_rescue AnotherCustomAppException, delay: ->(counter) { counter * 2 }
sidekiq_rescue CustomInfrastructureException, delay: 5.minutes
sidekiq_rescue ActiveRecord::Deadlocked, delay: 5.seconds, limit: 3
sidekiq_rescue CustomInfrastructureException, delay: 5.minutes.to_i
sidekiq_rescue ActiveRecord::Deadlocked, delay: 5.seconds.to_i, limit: 3
sidekiq_rescue TooManyRequestsError, queue: "slow"
sidekiq_rescue Net::OpenTimeout, Timeout::Error, limit: 10 # retries at most 10 times for Net::OpenTimeout and Timeout::Error combined

def perform(*args)
# Might raise CustomAppException, AnotherCustomAppException, or YetAnotherCustomAppException for something domain specific
# Might raise ActiveRecord::Deadlocked when a local db deadlock is detected
# Might raise Net::OpenTimeout or Timeout::Error when the remote service is down
# Might raise TooManyRequestsError when the rate limit is exceeded
end
end
```
Expand Down Expand Up @@ -74,7 +76,7 @@ end

## Configuration

You can configure the number of retries and the delay (in seconds) between retries:
You can configure the number of retries and the delay *in seconds* between retries:

```ruby
class MyJob
Expand All @@ -89,7 +91,12 @@ class MyJob
end
```

The `delay` is not the exact time between retries, but a minimum delay. The actual delay calculates based on jitter and `delay` value. The formula is `delay + delay * jitter * rand` seconds. Randomization is used to avoid retry storms. The `jitter` represents the upper bound of possible wait time (expressed as a percentage) and defaults to 0.15 (15%).
* `delay` - the delay between retries in seconds
* `limit` - the number of retries. The number of attempts includes the original job execution.
* `jitter` - represents the upper bound of possible wait time (expressed as a percentage) and defaults to 0.15 (15%)
* `queue` - the queue to which the job will be enqueued if it fails

The `delay` is not the exact time between retries, but a minimum delay. The actual delay calculates based on jitter and `delay` value. The formula is `delay + delay * jitter * rand` seconds. Randomization is used to avoid retry storms.

The default values are:
- `delay`: 60 seconds
Expand Down
22 changes: 17 additions & 5 deletions lib/sidekiq/rescue/dsl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ module ClassMethods
# @raise [ArgumentError] if delay is not an Integer or Float
# @raise [ArgumentError] if limit is not an Integer
# @raise [ArgumentError] if jitter is not an Integer or Float
# @raise [ArgumentError] if queue is not a String
# @example
# sidekiq_rescue NetworkError, delay: 60, limit: 10
def sidekiq_rescue(*errors, delay: Sidekiq::Rescue.config.delay, limit: Sidekiq::Rescue.config.limit,
jitter: Sidekiq::Rescue.config.jitter)
jitter: Sidekiq::Rescue.config.jitter, queue: nil)
unpacked_errors = validate_and_unpack_error_argument(errors)
validate_delay_argument(delay)
validate_limit_argument(limit)
validate_jitter_argument(jitter)
assign_sidekiq_rescue_options(errors: unpacked_errors, delay: delay, limit: limit, jitter: jitter)
validate_queue_argument(queue)
assign_sidekiq_rescue_options(
errors: unpacked_errors, delay: delay, limit: limit, jitter: jitter, queue: queue
)
end

# Find the error group and options for the given exception.
Expand Down Expand Up @@ -77,10 +81,18 @@ def validate_jitter_argument(jitter)
"jitter must be integer or float"
end

def assign_sidekiq_rescue_options(errors:, delay:, limit:, jitter:)
def validate_queue_argument(queue)
return if queue.nil? || queue.is_a?(String)

raise ArgumentError,
"queue must be a string"
end

def assign_sidekiq_rescue_options(errors:, delay:, limit:, jitter:, queue:)
self.sidekiq_rescue_options ||= {}
self.sidekiq_rescue_options = self.sidekiq_rescue_options.merge(errors => { delay: delay, limit: limit,
jitter: jitter })
self.sidekiq_rescue_options = self.sidekiq_rescue_options.merge(errors => {
delay: delay, limit: limit, jitter: jitter, queue: queue
}.compact)
end
end
end
Expand Down
16 changes: 15 additions & 1 deletion lib/sidekiq/rescue/rspec/matchers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ module Matchers
str = "expected #{actual} to be rescueable with #{expected}"
str += " and delay #{@delay}" if @delay
str += " and limit #{@limit}" if @limit
str += " and jitter #{@jitter}" if @jitter
str += " and queue #{@queue}" if @queue
str
end
failure_message_when_negated { |actual| "expected #{actual} not to be rescueable with #{expected}" }
Expand All @@ -27,6 +29,14 @@ module Matchers
@limit = limit
end

chain :with_jitter do |jitter|
@jitter = jitter
end

chain :with_queue do |queue|
@queue = queue
end

match do |actual|
matched = actual.is_a?(Class) &&
actual.include?(Sidekiq::Rescue::Dsl) &&
Expand All @@ -38,12 +48,16 @@ module Matchers
_error_group, options = actual.sidekiq_rescue_error_group_with_options_by(expected.new)

(@delay.nil? || options.fetch(:delay) == @delay) &&
(@limit.nil? || options.fetch(:limit) == @limit)
(@limit.nil? || options.fetch(:limit) == @limit) &&
(@jitter.nil? || options.fetch(:jitter) == @jitter) &&
(@queue.nil? || options.fetch(:queue) == @queue)
end

match_when_negated do |actual|
raise NotImplementedError, "it's confusing to use `not_to be_rescueable` with `with_delay`" if @delay
raise NotImplementedError, "it's confusing to use `not_to be_rescueable` with `with_limit`" if @limit
raise NotImplementedError, "it's confusing to use `not_to be_rescueable` with `with_jitter`" if @jitter
raise NotImplementedError, "it's confusing to use `not_to be_rescueable` with `with_queue`" if @queue

actual.is_a?(Class) &&
actual.include?(Sidekiq::Rescue::Dsl) &&
Expand Down
7 changes: 5 additions & 2 deletions lib/sidekiq/rescue/server_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ def sidekiq_rescue(job_payload, job_class)

def rescue_error(error, error_group, options, job_payload)
delay, limit, jitter = options.fetch_values(:delay, :limit, :jitter)
queue = options.fetch(:queue, job_payload["queue"])

rescue_counter = increment_rescue_counter_for(error_group, job_payload)
raise error if rescue_counter > limit

calculated_delay = calculate_delay(delay, rescue_counter, jitter)
log_reschedule_info(rescue_counter, error, calculated_delay)
reschedule_job(job_payload: job_payload, delay: calculated_delay, rescue_counter: rescue_counter,
error_group: error_group)
error_group: error_group, queue: queue)
end

def increment_rescue_counter_for(error_group, job_payload)
Expand All @@ -63,10 +65,11 @@ def log_reschedule_info(rescue_counter, error, delay)
"#{error.message}; rescheduling in #{delay} seconds")
end

def reschedule_job(job_payload:, delay:, rescue_counter:, error_group:)
def reschedule_job(job_payload:, delay:, rescue_counter:, error_group:, queue:)
payload = job_payload.dup
payload["at"] = Time.now.to_f + delay if delay.positive?
payload["sidekiq_rescue_exceptions_counter"] = { error_group.to_s => rescue_counter }
payload["queue"] = queue
Sidekiq::Client.push(payload)
end
end
Expand Down
6 changes: 6 additions & 0 deletions spec/sidekiq/rescue/dsl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ def define_dsl(...)
expect(job_class.sidekiq_rescue_options.dig([TestError], :delay)).to be_a(Proc)
end

it "sets the queue" do
define_dsl { sidekiq_rescue TestError, queue: "slow" }

expect(job_class.sidekiq_rescue_options.dig([TestError], :queue)).to eq("slow")
end

it "raises an ArgumentError if delay proc has no arguments" do
expect { define_dsl { sidekiq_rescue TestError, delay: -> { 10 } } }.to raise_error(
ArgumentError,
Expand Down
9 changes: 9 additions & 0 deletions spec/sidekiq/rescue/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,13 @@
end
end
end

context "with custom queue" do
let(:job_class) { WithCustomQueueJob }

it "reschedules the job with correct queue" do
expect { perform_async }.not_to raise_error
expect(last_job["queue"]).to eq("custom_queue")
end
end
end
10 changes: 10 additions & 0 deletions spec/sidekiq/rescue/rspec/matchers_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,14 @@
job_class = Class.new(BaseJob).tap { |klass| klass.sidekiq_rescue TestError, delay: 10, limit: 20 }
expect(job_class).to have_sidekiq_rescue(TestError).with_delay(10).with_limit(20)
end

it "works with jitter" do
job_class = Class.new(BaseJob).tap { |klass| klass.sidekiq_rescue TestError, jitter: 0.1 }
expect(job_class).to have_sidekiq_rescue(TestError).with_jitter(0.1)
end

it "works with queue" do
job_class = Class.new(BaseJob).tap { |klass| klass.sidekiq_rescue TestError, queue: "custom_queue" }
expect(job_class).to have_sidekiq_rescue(TestError).with_queue("custom_queue")
end
end
19 changes: 19 additions & 0 deletions spec/sidekiq/rescue/server_middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,23 @@
)
end
end

context "with custom queue" do
subject(:call_with_custom_queue) do
middleware.call(job_instance, job_payload, "default") { raise TestError }
end

let(:job_instance) { WithCustomQueueJob.new }

it "reschedules the job on expected error and increments counter" do
allow(Sidekiq::Client).to receive(:push)

call_with_custom_queue
expect(Sidekiq::Client).to have_received(:push).with(
job_payload.merge("at" => anything,
"sidekiq_rescue_exceptions_counter" => { "[TestError]" => 1 },
"queue" => "custom_queue")
)
end
end
end
8 changes: 8 additions & 0 deletions spec/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,11 @@ class WithCustomJitterJob < BaseJob
class WithZeroJitterAndDelayJob < BaseJob
sidekiq_rescue TestError, delay: 0, jitter: 0
end

class WithCustomQueueJob < BaseJob
sidekiq_rescue TestError, queue: "custom_queue"

def perform(*)
raise TestError
end
end