diff --git a/.github/codecov.yml b/.github/codecov.yml new file mode 100644 index 00000000..82b0def4 --- /dev/null +++ b/.github/codecov.yml @@ -0,0 +1,4 @@ +ignore: + # Requires Sidekiq-Pro + - lib/sidekiq/throttled/patches/super_fetch.rb + - spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 77d54a7d..92e7db03 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,18 +8,13 @@ on: jobs: test: - name: "rspec (ruby:${{ matrix.ruby }}" - - strategy: - fail-fast: false - matrix: - ruby: [ "2.7", "3.0", "3.1", "3.2" ] + name: "test / ${{ matrix.ruby }} / ${{ matrix.redis }}" runs-on: ubuntu-latest services: redis: - image: redis + image: ${{ matrix.redis }} ports: - "6379:6379" options: >- @@ -28,26 +23,52 @@ jobs: --health-timeout 5s --health-retries 5 + env: + BUNDLE_GEMS__CONTRIBSYS__COM: ${{ secrets.BUNDLE_GEMS__CONTRIBSYS__COM }} + + strategy: + fail-fast: false + matrix: + ruby: [ ruby-2.7, ruby-3.0, ruby-3.1, ruby-3.2, ruby-3.3 ] + redis: [ "redis:6.2", "redis:7.0", "redis:7.2" ] + steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 with: ruby-version: ${{ matrix.ruby }} bundler-cache: true - - name: RSpec - run: scripts/ci-rspec + - name: Run test suites + run: bundle exec rake test + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v4 + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + + # See: https://github.com/orgs/community/discussions/26822#discussioncomment-3305794 + test-finale: + name: "test" + + runs-on: ubuntu-latest + if: ${{ always() }} + + needs: [test] + + steps: + - run: test "success" = "${{ needs.test.result }}" - rubocop: + lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 with: - ruby-version: "3.2" + ruby-version: ruby-3.2 bundler-cache: true - - run: bundle exec rubocop + - run: bundle exec rake lint diff --git a/.rspec b/.rspec index d3ec40e0..283edea7 100644 --- a/.rspec +++ b/.rspec @@ -1,5 +1,2 @@ ---backtrace ---color ---format=documentation ---order random +--require simplecov --require spec_helper diff --git a/.rubocop.yml b/.rubocop.yml index 98f0a84e..a2e7d7c8 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -19,3 +19,7 @@ AllCops: - vendor/**/* NewCops: enable TargetRubyVersion: 2.7 + +# Broken: https://github.com/rubocop/rubocop/issues/12113 +Bundler/DuplicatedGroup: + Enabled: false diff --git a/.simplecov b/.simplecov new file mode 100644 index 00000000..193cb89e --- /dev/null +++ b/.simplecov @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +SimpleCov.start do + gemfile = File.basename(ENV.fetch("BUNDLE_GEMFILE", "Gemfile"), ".gemfile").strip + gemfile = nil if gemfile.empty? || gemfile.casecmp?("gems.rb") || gemfile.casecmp?("Gemfile") + + command_name ["#{RUBY_ENGINE}-#{RUBY_ENGINE_VERSION}", gemfile].compact.join("/") + + enable_coverage :branch + + if ENV["CI"] + require "simplecov-cobertura" + formatter SimpleCov::Formatter::CoberturaFormatter + else + formatter SimpleCov::Formatter::MultiFormatter.new([ + SimpleCov::Formatter::SimpleFormatter, + SimpleCov::Formatter::HTMLFormatter + ]) + end + + add_filter "/demo/" + add_filter "/gemfiles/" + add_filter "/spec/" + add_filter "/vendor/" +end diff --git a/Appraisals b/Appraisals index 235564ab..4e63feb7 100644 --- a/Appraisals +++ b/Appraisals @@ -1,19 +1,43 @@ # frozen_string_literal: true -appraise "sidekiq-6.5.x" do - group :test do - gem "sidekiq", "~> 6.5.0" - end -end - appraise "sidekiq-7.0.x" do group :test do gem "sidekiq", "~> 7.0.0" + + # Sidekiq Pro license must be set in global bundler config + # or in BUNDLE_GEMS__CONTRIBSYS__COM env variable + install_if "-> { Bundler.settings['gems.contribsys.com']&.include?(':') }" do + source "https://gems.contribsys.com/" do + gem "sidekiq-pro", "~> 7.0.0" + end + end end end appraise "sidekiq-7.1.x" do group :test do gem "sidekiq", "~> 7.1.0" + + # Sidekiq Pro license must be set in global bundler config + # or in BUNDLE_GEMS__CONTRIBSYS__COM env variable + install_if "-> { Bundler.settings['gems.contribsys.com']&.include?(':') }" do + source "https://gems.contribsys.com/" do + gem "sidekiq-pro", "~> 7.1.0" + end + end + end +end + +appraise "sidekiq-7.2.x" do + group :test do + gem "sidekiq", "~> 7.2.0" + + # Sidekiq Pro license must be set in global bundler config + # or in BUNDLE_GEMS__CONTRIBSYS__COM env variable + install_if "-> { Bundler.settings['gems.contribsys.com']&.include?(':') }" do + source "https://gems.contribsys.com/" do + gem "sidekiq-pro", "~> 7.2.0" + end + end end end diff --git a/CHANGES.md b/CHANGES.md index 4b012655..276aa169 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,10 +8,88 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Change default cooldown period to `1.0` (was `2.0`), + and cooldown threshold to `100` (was `1`) + [#195](https://github.com/ixti/sidekiq-throttled/pull/195). + +### Removed + +- Drop Sidekiq < 7 support +- Remove deprecated `Sidekiq::Throttled.setup!` + +## [1.4.0] - 2024-04-07 + +### Fixed + +- Correctly unwrap `ActiveJob` arguments: + [#184](https://github.com/ixti/sidekiq-throttled/pull/184), + [#185](https://github.com/ixti/sidekiq-throttled/pull/185). + + +## [1.3.0] - 2024-01-18 + +### Added + +- Add Sidekiq Pro 7.0, 7.1, and 7.2 support +- Add Ruby 3.3 support + + +## [1.2.0] - 2023-12-18 + +### Added + +- Bring back Ruby-2.7.x support + + +## [1.1.0] - 2023-11-21 + +### Changed + +- Renamed `Sidekiq::Throttled::Middleware` to `Sidekiq::Throttled::Middlewares::Server` + +### Deprecated + +- `Sidekiq::Throttled.setup!` is now deprecated. If you need to change order of + the middleware, please manipulate middlewares chains directly. + + +## [1.0.1] - 2023-11-20 + +### Added + +- Bring back Sidekiq-6.5 support + + +## [1.0.0] - 2023-11-20 + +### Added + +- Add Sidekiq-7.2 support +- Revive queues cooldown logic + [#163](https://github.com/ixti/sidekiq-throttled/pull/163) + +### Changed + +- (BREAKING) Jobs inherit throttling strategy from their parent class, unless + explicitly overriden + +### Fixed + +- Correctly finalize throttled jobs when used with ActiveJob + [#151](https://github.com/ixti/sidekiq-throttled/pull/151) + +### Removed + +- (BREAKING) Drop Ruby-2.7.x support +- (BREAKING) Drop Sidekiq-6.x.x support +- (BREAKING) Removed `Sidekiq::Throttled.configuration` + ## [1.0.0.alpha.1] - 2023-06-08 -### Changes +### Changed - Upstream `Sidekiq::BasicFetch` is now infused with throttling directly, thus default fetch configuration should work on both Sidekiq and Sidekiq-Pro @@ -30,7 +108,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add Ruby 3.2 support -### Changes +### Changed - Switch README to Asciidoc format - Switch CHANGES to keepachangelog.com format @@ -46,6 +124,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Remove queue exclusion from fetcher pon throttled job -[unreleased]: https://github.com/ixti/sidekiq-throttled/compare/v1.0.0.alpha.1...main +[unreleased]: https://github.com/ixti/sidekiq-throttled/compare/v1.4.0...main +[1.4.0]: https://github.com/ixti/sidekiq-throttled/compare/v1.3.0...v1.4.0 +[1.3.0]: https://github.com/ixti/sidekiq-throttled/compare/v1.2.0...v1.3.0 +[1.2.0]: https://github.com/ixti/sidekiq-throttled/compare/v1.1.0...v1.2.0 +[1.1.0]: https://github.com/ixti/sidekiq-throttled/compare/v1.0.1...v1.1.0 +[1.0.1]: https://github.com/ixti/sidekiq-throttled/compare/v1.0.0...v1.0.1 +[1.0.0]: https://github.com/ixti/sidekiq-throttled/compare/v1.0.0.alpha.1...v1.0.0 [1.0.0.alpha.1]: https://github.com/ixti/sidekiq-throttled/compare/v1.0.0.alpha...v1.0.0.alpha.1 [1.0.0.alpha]: https://github.com/ixti/sidekiq-throttled/compare/v0.16.1...v1.0.0.alpha diff --git a/Gemfile b/Gemfile index 0589cf19..36d44f05 100644 --- a/Gemfile +++ b/Gemfile @@ -15,7 +15,10 @@ group :test do gem "rack-test" gem "rspec" + gem "simplecov" + gem "simplecov-cobertura" + gem "timecop" gem "rubocop", require: false diff --git a/LICENSE b/LICENSE.txt similarity index 100% rename from LICENSE rename to LICENSE.txt diff --git a/README.adoc b/README.adoc index 21508cea..b2a1653c 100644 --- a/README.adoc +++ b/README.adoc @@ -12,11 +12,7 @@ {doc-link}[image:{doc-badge}[API Documentation]] **** -NOTE: This is the 1.x *development* branch. For the 0.x *stable* branch, please - see: https://github.com/ixti/sidekiq-throttled/tree/0-x-stable[0-x-stable] - -Concurrency and threshold throttling for https://github.com/mperham/sidekiq[Sidekiq]. - +Concurrency and threshold throttling for https://github.com/sidekiq/sidekiq[Sidekiq]. == Installation @@ -35,7 +31,6 @@ Or install it yourself as: $ gem install sidekiq-throttled - == Usage Add somewhere in your app's bootstrap (e.g. `config/initializers/sidekiq.rb` if @@ -44,12 +39,8 @@ you are using Rails): [source,ruby] ---- require "sidekiq/throttled" -Sidekiq::Throttled.setup! ---- -Load order can be an issue if you are using other Sidekiq plugins and/or middleware. -To prevent any problems, add the `.setup!` call to the bottom of your init file. - Once you've done that you can include `Sidekiq::Throttled::Job` to your job classes and configure throttling: @@ -89,6 +80,70 @@ end ---- +=== Web UI + +To add a Throttled tab to your sidekiq web dashboard, require it durring your +application initialization. + +[source,ruby] +---- +require "sidekiq/throttled/web" +---- + + +=== Configuration + +[source,ruby] +---- +Sidekiq::Throttled.configure do |config| + # Period in seconds to exclude queue from polling in case it returned + # {config.cooldown_threshold} amount of throttled jobs in a row. Set + # this value to `nil` to disable cooldown manager completely. + # Default: 1.0 + config.cooldown_period = 1.0 + + # Exclude queue from polling after it returned given amount of throttled + # jobs in a row. + # Default: 100 (cooldown after hundredth throttled job in a row) + config.cooldown_threshold = 100 +end +---- + +[WARNING] +.Cooldown Settings +==== +If a queue contains a thousand jobs in a row that will be throttled, +the cooldown will kick-in 10 times in a row, meaning it will take 10 seconds +before all those jobs are put back at the end of the queue and you actually +start processing other jobs. + +You may want to adjust the cooldown_threshold and cooldown_period, +keeping in mind that this will also impact the load on your Redis server. +==== + +==== Middleware(s) + +`Sidekiq::Throttled` relies on following bundled middlewares: + +* `Sidekiq::Throttled::Middlewares::Server` + +The middleware is automatically injected when you require `sidekiq/throttled`. +In rare cases, when this causes an issue, you can change middleware order manually: + +[source,ruby] +---- +Sidekiq.configure_server do |config| + # ... + + config.server_middleware do |chain| + chain.prepend(Sidekiq::Throttled::Middlewares::Server) + end +end +---- + +See: https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/middleware/chain.rb + + === Observer You can specify an observer that will be called on throttling. To do so pass an @@ -236,6 +291,7 @@ This library aims to support and is tested against the following Ruby versions: * Ruby 3.0.x * Ruby 3.1.x * Ruby 3.2.x +* Ruby 3.3.x If something doesn't work on one of these versions, it's a bug. @@ -255,10 +311,15 @@ dropped. This library aims to support and work with following Sidekiq versions: -* Sidekiq 6.5.x * Sidekiq 7.0.x * Sidekiq 7.1.x +* Sidekiq 7.2.x +And the following Sidekiq Pro versions: + +* Sidekiq Pro 7.0.x +* Sidekiq Pro 7.1.x +* Sidekiq Pro 7.2.x == Development @@ -267,6 +328,11 @@ This library aims to support and work with following Sidekiq versions: bundle exec appraisal install bundle exec rake +=== Sidekiq-Pro + +If you're working on Sidekiq-Pro support make sure that you have Sidekiq-Pro +license set either in the global config, or in `BUNDLE_GEMS\__CONTRIBSYS__COM` +environment variable. == Contributing @@ -276,3 +342,11 @@ This library aims to support and work with following Sidekiq versions: * Send a pull request * If we like them we'll merge them * If we've accepted a patch, feel free to ask for commit access! + + +== Endorsement + +https://github.com/sensortower[image:sensortower.svg[SensorTower]] + +The initial work on the project was initiated to address the needs of +https://github.com/sensortower[SensorTower]. diff --git a/Rakefile b/Rakefile index ba007163..2dfa7996 100644 --- a/Rakefile +++ b/Rakefile @@ -1,9 +1,30 @@ # frozen_string_literal: true -require "appraisal" require "bundler/gem_tasks" -require "rspec/core/rake_task" -RSpec::Core::RakeTask.new +desc "Run tests" +task :test do + rm_rf "coverage" + rm_rf "gemfiles" -task default: ENV["APPRAISAL_INITIALIZED"] ? %i[spec] : %i[appraisal rubocop] + Bundler.with_original_env do + sh "bundle exec appraisal generate" + + # XXX: `bundle exec appraisal install` fails on ruby-3.2 + Dir["gemfiles/*.gemfile"].each do |gemfile| + sh({ "BUNDLE_GEMFILE" => gemfile }, "bundle lock") + sh({ "BUNDLE_GEMFILE" => gemfile }, "bundle check") do |ok| + sh({ "BUNDLE_GEMFILE" => gemfile }, "bundle install") unless ok + end + end + + sh "bundle exec appraisal rspec --force-colour" + end +end + +desc "Lint codebase" +task :lint do + sh "bundle exec rubocop --color" +end + +task default: %i[test lint] diff --git a/lib/sidekiq/throttled.rb b/lib/sidekiq/throttled.rb index 8a549958..67c9016e 100644 --- a/lib/sidekiq/throttled.rb +++ b/lib/sidekiq/throttled.rb @@ -2,12 +2,15 @@ require "sidekiq" -require_relative "./throttled/version" -require_relative "./throttled/configuration" +require_relative "./throttled/config" +require_relative "./throttled/cooldown" +require_relative "./throttled/job" +require_relative "./throttled/message" +require_relative "./throttled/middlewares/server" require_relative "./throttled/patches/basic_fetch" +require_relative "./throttled/patches/super_fetch" require_relative "./throttled/registry" -require_relative "./throttled/job" -require_relative "./throttled/middleware" +require_relative "./throttled/version" require_relative "./throttled/worker" # @see https://github.com/mperham/sidekiq/ @@ -17,7 +20,6 @@ module Sidekiq # Just add somewhere in your bootstrap: # # require "sidekiq/throttled" - # Sidekiq::Throttled.setup! # # Once you've done that you can include {Sidekiq::Throttled::Job} to your # job classes and configure throttling: @@ -40,17 +42,38 @@ module Sidekiq # end # end module Throttled + MUTEX = Mutex.new + private_constant :MUTEX + + @config = Config.new.freeze + @cooldown = Cooldown[@config] + class << self - # @return [Configuration] - def configuration - @configuration ||= Configuration.new - end + # @api internal + # + # @return [Cooldown, nil] + attr_reader :cooldown - # Hooks throttler into sidekiq. + # @api internal # - # @return [void] - def setup! - Sidekiq::Throttled::Patches::BasicFetch.apply! + # @return [Config, nil] + attr_reader :config + + # @example + # Sidekiq::Throttled.configure do |config| + # config.cooldown_period = nil # Disable queues cooldown manager + # end + # + # @yieldparam config [Config] + def configure + MUTEX.synchronize do + config = @config.dup + + yield config + + @config = config.freeze + @cooldown = Cooldown[@config] + end end # Tells whenever job is throttled or not. @@ -58,24 +81,36 @@ def setup! # @param [String] message Job's JSON payload # @return [Boolean] def throttled?(message) - message = JSON.parse message - job = message.fetch("wrapped") { message.fetch("class") { return false } } - jid = message.fetch("jid") { return false } + message = Message.new(message) + return false unless message.job_class && message.job_id - Registry.get job do |strategy| - return strategy.throttled?(jid, *message["args"]) + Registry.get(message.job_class) do |strategy| + return strategy.throttled?(message.job_id, *message.job_args) end false - rescue + rescue StandardError false end + + # Return throttled job to be executed later, delegating the details of how to do that + # to the Strategy for that job. + # + # @return [void] + def requeue_throttled(work) + message = JSON.parse(work.job) + job_class = Object.const_get(message.fetch("wrapped") { message.fetch("class") { return false } }) + + Registry.get job_class do |strategy| + strategy.requeue_throttled(work, **job_class.sidekiq_throttled_requeue_options) + end + end end end configure_server do |config| config.server_middleware do |chain| - chain.add Sidekiq::Throttled::Middleware + chain.add(Sidekiq::Throttled::Middlewares::Server) end end end diff --git a/lib/sidekiq/throttled/config.rb b/lib/sidekiq/throttled/config.rb new file mode 100644 index 00000000..f21b0a90 --- /dev/null +++ b/lib/sidekiq/throttled/config.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true + +module Sidekiq + module Throttled + # Configuration object. + class Config + # Period in seconds to exclude queue from polling in case it returned + # {#cooldown_threshold} amount of throttled jobs in a row. + # + # Set this to `nil` to disable cooldown completely. + # + # @return [Float, nil] + attr_reader :cooldown_period + + # Amount of throttled jobs returned from the queue subsequently after + # which queue will be excluded from polling for the durations of + # {#cooldown_period}. + # + # @return [Integer] + attr_reader :cooldown_threshold + + # Specifies how we should return throttled jobs to the queue so they can be executed later. + # Expects a hash with keys that may include :with and :to + # For :with, options are `:enqueue` (put them on the end of the queue) and `:schedule` (schedule for later). + # For :to, the name of a sidekiq queue should be specified. If none is specified, jobs will by default be + # requeued to the same queue they were originally enqueued in. + # Default: {with: `:enqueue`} + # + # @return [Hash] + attr_reader :default_requeue_options + + def initialize + reset! + end + + # @!attribute [w] cooldown_period + def cooldown_period=(value) + raise TypeError, "unexpected type #{value.class}" unless value.nil? || value.is_a?(Float) + raise ArgumentError, "period must be positive" unless value.nil? || value.positive? + + @cooldown_period = value + end + + # @!attribute [w] cooldown_threshold + def cooldown_threshold=(value) + raise TypeError, "unexpected type #{value.class}" unless value.is_a?(Integer) + raise ArgumentError, "threshold must be positive" unless value.positive? + + @cooldown_threshold = value + end + + # @!attribute [w] default_requeue_options + def default_requeue_options=(options) + requeue_with = options.delete(:with).intern || :enqueue + + @default_requeue_options = options.merge({ with: requeue_with }) + end + + def reset! + @cooldown_period = 1.0 + @cooldown_threshold = 100 + @default_requeue_options = { with: :enqueue } + end + end + end +end diff --git a/lib/sidekiq/throttled/configuration.rb b/lib/sidekiq/throttled/configuration.rb deleted file mode 100644 index 70c69679..00000000 --- a/lib/sidekiq/throttled/configuration.rb +++ /dev/null @@ -1,50 +0,0 @@ -# frozen_string_literal: true - -module Sidekiq - module Throttled - # Configuration holder. - class Configuration - # Class constructor. - def initialize - reset! - end - - # Reset configuration to defaults. - # - # @return [self] - def reset! - @inherit_strategies = false - - self - end - - # Instructs throttler to lookup strategies in parent classes, if there's - # no own strategy: - # - # class FooJob - # include Sidekiq::Job - # include Sidekiq::Throttled::Job - # - # sidekiq_throttle :concurrency => { :limit => 42 } - # end - # - # class BarJob < FooJob - # end - # - # By default in the example above, `Bar` won't have throttling options. - # Set this flag to `true` to enable this lookup in initializer, after - # that `Bar` will use `Foo` throttling bucket. - def inherit_strategies=(value) - @inherit_strategies = value ? true : false - end - - # Whenever throttled workers should inherit parent's strategies or not. - # Default: `false`. - # - # @return [Boolean] - def inherit_strategies? - @inherit_strategies - end - end - end -end diff --git a/lib/sidekiq/throttled/cooldown.rb b/lib/sidekiq/throttled/cooldown.rb new file mode 100644 index 00000000..11f38bda --- /dev/null +++ b/lib/sidekiq/throttled/cooldown.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +require "concurrent" + +require_relative "./expirable_set" + +module Sidekiq + module Throttled + # @api internal + # + # Queues cooldown manager. Tracks list of queues that should be temporarily + # (for the duration of {Config#cooldown_period}) excluded from polling. + class Cooldown + class << self + # Returns new {Cooldown} instance if {Config#cooldown_period} is not `nil`. + # + # @param config [Config] + # @return [Cooldown, nil] + def [](config) + new(config) if config.cooldown_period + end + end + + # @param config [Config] + def initialize(config) + @queues = ExpirableSet.new(config.cooldown_period) + @threshold = config.cooldown_threshold + @tracker = Concurrent::Map.new + end + + # Notify that given queue returned job that was throttled. + # + # @param queue [String] + # @return [void] + def notify_throttled(queue) + @queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ) + end + + # Notify that given queue returned job that was not throttled. + # + # @param queue [String] + # @return [void] + def notify_admitted(queue) + @tracker.delete(queue) + end + + # List of queues that should not be polled + # + # @return [Array] + def queues + @queues.to_a + end + end + end +end diff --git a/lib/sidekiq/throttled/expirable_set.rb b/lib/sidekiq/throttled/expirable_set.rb new file mode 100644 index 00000000..4242bf0d --- /dev/null +++ b/lib/sidekiq/throttled/expirable_set.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +require "concurrent" + +module Sidekiq + module Throttled + # @api internal + # + # Set of elements with expirations. + # + # @example + # set = ExpirableSet.new(10.0) + # set.add("a") + # sleep(5) + # set.add("b") + # set.to_a # => ["a", "b"] + # sleep(5) + # set.to_a # => ["b"] + class ExpirableSet + include Enumerable + + # @param ttl [Float] expiration is seconds + # @raise [ArgumentError] if `ttl` is not positive Float + def initialize(ttl) + raise ArgumentError, "ttl must be positive Float" unless ttl.is_a?(Float) && ttl.positive? + + @elements = Concurrent::Map.new + @ttl = ttl + end + + # @param element [Object] + # @return [ExpirableSet] self + def add(element) + # cleanup expired elements to avoid mem-leak + horizon = now + expired = @elements.each_pair.select { |(_, sunset)| expired?(sunset, horizon) } + expired.each { |pair| @elements.delete_pair(*pair) } + + # add new element + @elements[element] = now + @ttl + + self + end + + # @yield [Object] Gives each live (not expired) element to the block + def each + return to_enum __method__ unless block_given? + + horizon = now + + @elements.each_pair do |element, sunset| + yield element unless expired?(sunset, horizon) + end + + self + end + + private + + # @return [Float] + def now + ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + end + + def expired?(sunset, horizon) + sunset <= horizon + end + end + end +end diff --git a/lib/sidekiq/throttled/job.rb b/lib/sidekiq/throttled/job.rb index a1a173be..57fcf9e1 100644 --- a/lib/sidekiq/throttled/job.rb +++ b/lib/sidekiq/throttled/job.rb @@ -14,7 +14,8 @@ module Throttled # include Sidekiq::Throttled::Job # # sidkiq_options :queue => :my_queue - # sidekiq_throttle :threshold => { :limit => 123, :period => 1.hour } + # sidekiq_throttle :threshold => { :limit => 123, :period => 1.hour }, + # :requeue => { :to => :other_queue, :with => :schedule } # # def perform # # ... @@ -23,14 +24,17 @@ module Throttled # # @see ClassMethods module Job + VALID_VALUES_FOR_REQUEUE_WITH = %i[enqueue schedule].freeze + # Extends worker class with {ClassMethods}. # # @note Using `included` hook with extending worker with {ClassMethods} # in order to make API inline with `include Sidekiq::Job`. # # @private - def self.included(worker) - worker.send(:extend, ClassMethods) + def self.included(base) + base.sidekiq_class_attribute :sidekiq_throttled_requeue_options + base.extend(ClassMethods) end # Helper methods added to the singleton class of destination @@ -71,9 +75,29 @@ module ClassMethods # }) # end # + # @example Allow max 123 MyJob jobs per hour; when jobs are throttled, schedule them for later in :other_queue + # + # class MyJob + # include Sidekiq::Job + # include Sidekiq::Throttled::Job + # + # sidekiq_throttle({ + # :threshold => { :limit => 123, :period => 1.hour }, + # :requeue => { :to => :other_queue, :with => :schedule } + # }) + # end + # + # @param [Hash] requeue What to do with jobs that are throttled # @see Registry.add # @return [void] def sidekiq_throttle(**kwargs) + requeue_options = Throttled.config.default_requeue_options.merge(kwargs.delete(:requeue) || {}) + unless VALID_VALUES_FOR_REQUEUE_WITH.include?(requeue_options[:with]) + raise ArgumentError, "requeue: #{requeue_options[:with]} is not a valid value for :with" + end + + self.sidekiq_throttled_requeue_options = requeue_options + Registry.add(self, **kwargs) end diff --git a/lib/sidekiq/throttled/message.rb b/lib/sidekiq/throttled/message.rb new file mode 100644 index 00000000..c5e77344 --- /dev/null +++ b/lib/sidekiq/throttled/message.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module Sidekiq + module Throttled + class Message + def initialize(item) + @item = item.is_a?(Hash) ? item : parse(item) + end + + def job_class + @item.fetch("wrapped") { @item["class"] } + end + + def job_args + @item.key?("wrapped") ? @item.dig("args", 0, "arguments") : @item["args"] + end + + def job_id + @item["jid"] + end + + private + + def parse(item) + item = Sidekiq.load_json(item) + item.is_a?(Hash) ? item : {} + rescue JSON::ParserError + {} + end + end + end +end diff --git a/lib/sidekiq/throttled/middleware.rb b/lib/sidekiq/throttled/middleware.rb deleted file mode 100644 index a5ed7444..00000000 --- a/lib/sidekiq/throttled/middleware.rb +++ /dev/null @@ -1,24 +0,0 @@ -# frozen_string_literal: true - -# internal -require_relative "./registry" - -module Sidekiq - module Throttled - # Server middleware that notifies strategy that job was finished. - # - # @private - class Middleware - include Sidekiq::ServerMiddleware if Sidekiq::VERSION >= "6.5.0" - - # Called within Sidekiq job processing - def call(_worker, msg, _queue) - yield - ensure - Registry.get msg["class"] do |strategy| - strategy.finalize!(msg["jid"], *msg["args"]) - end - end - end - end -end diff --git a/lib/sidekiq/throttled/middlewares/server.rb b/lib/sidekiq/throttled/middlewares/server.rb new file mode 100644 index 00000000..e9bba5a0 --- /dev/null +++ b/lib/sidekiq/throttled/middlewares/server.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +# internal +require_relative "../message" +require_relative "../registry" + +module Sidekiq + module Throttled + module Middlewares + # Server middleware required for Sidekiq::Throttled functioning. + class Server + include Sidekiq::ServerMiddleware + + def call(_worker, msg, _queue) + yield + ensure + message = Message.new(msg) + + if message.job_class && message.job_id + Registry.get(message.job_class) do |strategy| + strategy.finalize!(message.job_id, *message.job_args) + end + end + end + end + end + end +end diff --git a/lib/sidekiq/throttled/patches/basic_fetch.rb b/lib/sidekiq/throttled/patches/basic_fetch.rb index c421520d..0cc904b3 100644 --- a/lib/sidekiq/throttled/patches/basic_fetch.rb +++ b/lib/sidekiq/throttled/patches/basic_fetch.rb @@ -3,58 +3,32 @@ require "sidekiq" require "sidekiq/fetch" +require_relative "./throttled_retriever" + module Sidekiq module Throttled module Patches module BasicFetch - class << self - def apply! - Sidekiq::BasicFetch.prepend(self) unless Sidekiq::BasicFetch.include?(self) - end - end - - # Retrieves job from redis. - # - # @return [Sidekiq::Throttled::UnitOfWork, nil] - def retrieve_work - work = super - - if work && Throttled.throttled?(work.job) - requeue_throttled(work) - return nil - end - - work + def self.prepended(base) + base.prepend(ThrottledRetriever) end private - # Pushes job back to the head of the queue, so that job won't be tried - # immediately after it was requeued (in most cases). - # - # @note This is triggered when job is throttled. So it is same operation - # Sidekiq performs upon `Sidekiq::Worker.perform_async` call. - # - # @return [void] - def requeue_throttled(work) - redis { |conn| conn.lpush(work.queue, work.job) } - end - # Returns list of queues to try to fetch jobs from. # # @note It may return an empty array. # @param [Array] queues # @return [Array] def queues_cmd - queues = super - - # TODO: Refactor to be prepended as an integration mixin during configuration stage - # Or via configurable queues reducer - queues -= Sidekiq::Pauzer.paused_queues.map { |name| "queue:#{name}" } if defined?(Sidekiq::Pauzer) + throttled_queues = Throttled.cooldown&.queues + return super if throttled_queues.nil? || throttled_queues.empty? - queues + super - throttled_queues end end end end end + +Sidekiq::BasicFetch.prepend(Sidekiq::Throttled::Patches::BasicFetch) diff --git a/lib/sidekiq/throttled/patches/super_fetch.rb b/lib/sidekiq/throttled/patches/super_fetch.rb new file mode 100644 index 00000000..d1d33a5f --- /dev/null +++ b/lib/sidekiq/throttled/patches/super_fetch.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +require "sidekiq" + +require_relative "./throttled_retriever" + +module Sidekiq + module Throttled + module Patches + module SuperFetch + def self.prepended(base) + base.prepend(ThrottledRetriever) + end + + private + + # Returns list of non-paused queues to try to fetch jobs from. + # + # @note It may return an empty array. + # @return [Array] + def active_queues + # Create a hash of throttled queues for fast lookup + throttled_queues = Throttled.cooldown&.queues&.to_h { |queue| [queue, true] } + return super if throttled_queues.nil? || throttled_queues.empty? + + # Reject throttled queues from the list of active queues + super.reject { |queue, _private_queue| throttled_queues[queue] } + end + end + end + end +end + +begin + require "sidekiq/pro/super_fetch" + Sidekiq::Pro::SuperFetch.prepend(Sidekiq::Throttled::Patches::SuperFetch) +rescue LoadError + # Sidekiq Pro is not available +end diff --git a/lib/sidekiq/throttled/patches/throttled_retriever.rb b/lib/sidekiq/throttled/patches/throttled_retriever.rb new file mode 100644 index 00000000..87dd3240 --- /dev/null +++ b/lib/sidekiq/throttled/patches/throttled_retriever.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module Sidekiq + module Throttled + module Patches + module ThrottledRetriever + # Retrieves job from redis. + # + # @return [Sidekiq::BasicFetch::UnitOfWork, nil] + def retrieve_work + work = super + + if work && Throttled.throttled?(work.job) + Throttled.cooldown&.notify_throttled(work.queue) + Throttled.requeue_throttled(work) + return nil + end + + Throttled.cooldown&.notify_admitted(work.queue) if work + + work + end + end + end + end +end diff --git a/lib/sidekiq/throttled/registry.rb b/lib/sidekiq/throttled/registry.rb index 1d46f351..b8e413a5 100644 --- a/lib/sidekiq/throttled/registry.rb +++ b/lib/sidekiq/throttled/registry.rb @@ -102,8 +102,6 @@ def find(name) # @param name [Class, #to_s] # @return [Strategy, nil] def find_by_class(name) - return unless Throttled.configuration.inherit_strategies? - const = name.is_a?(Class) ? name : Object.const_get(name) return unless const.is_a?(Class) @@ -112,6 +110,8 @@ def find_by_class(name) return strategy if strategy end + nil + rescue NameError nil end end diff --git a/lib/sidekiq/throttled/strategy.rb b/lib/sidekiq/throttled/strategy.rb index 6a1b1dea..a3681a39 100644 --- a/lib/sidekiq/throttled/strategy.rb +++ b/lib/sidekiq/throttled/strategy.rb @@ -31,7 +31,7 @@ class Strategy # See keyword args of {Strategy::Threshold#initialize} for details. # @param [#call] key_suffix Dynamic key suffix generator. # @param [#call] observer Process called after throttled. - def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) # rubocop:disable Metrics/MethodLength + def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) @observer = observer @concurrency = StrategyCollection.new(concurrency, @@ -44,9 +44,7 @@ def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer name: name, key_suffix: key_suffix) - return if @concurrency.any? || @threshold.any? - - raise ArgumentError, "Neither :concurrency nor :threshold given" + raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any? end # @return [Boolean] whenever strategy has dynamic config @@ -74,18 +72,111 @@ def throttled?(jid, *job_args) false end + # Return throttled job to be executed later. Implementation depends on the value of `with`: + # :enqueue means put the job back at the end of the queue immediately + # :schedule means schedule enqueueing the job for a later time when we expect to have capacity + # + # @param [#to_s, #call] with How to handle the throttled job + # @param [#to_s, #call] to Name of the queue to re-queue the job to. + # If not specified, will use the job's original queue. + # @return [void] + def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength + # Resolve :with and :to arguments, calling them if they are Procs + job_args = JSON.parse(work.job)["args"] + requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with + target_queue = calc_target_queue(work, to) + + case requeue_with + when :enqueue + re_enqueue_throttled(work, target_queue) + when :schedule + # Find out when we will next be able to execute this job, and reschedule for then. + reschedule_throttled(work, requeue_to: target_queue) + else + raise "unrecognized :with option #{with}" + end + end + # Marks job as being processed. # @return [void] def finalize!(jid, *job_args) @concurrency&.finalize!(jid, *job_args) end - # Resets count of jobs of all avaliable strategies + # Resets count of jobs of all available strategies # @return [void] def reset! @concurrency&.reset! @threshold&.reset! end + + private + + def calc_target_queue(work, to) # rubocop:disable Metrics/MethodLength + target = case to + when Proc, Method + to.call(*JSON.parse(work.job)["args"]) + when NilClass + work.queue + when String, Symbol + to.to_s + else + raise ArgumentError, "Invalid argument for `to`" + end + + target = work.queue if target.nil? || target.empty? + + target.start_with?("queue:") ? target : "queue:#{target}" + end + + # Push the job back to the head of the queue. + def re_enqueue_throttled(work, requeue_to) + case work.class.name + when "Sidekiq::Pro::SuperFetch::UnitOfWork" + # Calls SuperFetch UnitOfWork's requeue to remove the job from the + # temporary queue and push job back to the head of the target queue, so that + # the job won't be tried immediately after it was requeued (in most cases). + work.queue = requeue_to if requeue_to + work.requeue + else + # This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call. + Sidekiq.redis { |conn| conn.lpush(requeue_to, work.job) } + end + end + + def reschedule_throttled(work, requeue_to:) + message = JSON.parse(work.job) + job_class = message.fetch("wrapped") { message.fetch("class") { return false } } + job_args = message["args"] + + # Re-enqueue the job to the target queue at another time as a NEW unit of work + # AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned + # Technically, the job could processed twice if the process dies between the two lines, + # but your job should be idempotent anyway, right? + # The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk. + Sidekiq::Client.enqueue_to_in(requeue_to, retry_in(work), Object.const_get(job_class), *job_args) + work.acknowledge + end + + def retry_in(work) + message = JSON.parse(work.job) + jid = message.fetch("jid") { return false } + job_args = message["args"] + + # Ask both concurrency and threshold, if relevant, how long minimum until we can retry. + # If we get two answers, take the longer one. + intervals = [@concurrency&.retry_in(jid, *job_args), @threshold&.retry_in(*job_args)].compact + + raise "Cannot compute a valid retry interval" if intervals.empty? + + interval = intervals.max + + # Add a random amount of jitter, proportional to the length of the minimum retry time. + # This helps spread out jobs more evenly and avoid clumps of jobs on the queue. + interval += rand(interval / 5) if interval > 10 + + interval + end end end end diff --git a/lib/sidekiq/throttled/strategy/base.rb b/lib/sidekiq/throttled/strategy/base.rb index 5ea016e4..b0ee0730 100644 --- a/lib/sidekiq/throttled/strategy/base.rb +++ b/lib/sidekiq/throttled/strategy/base.rb @@ -15,7 +15,7 @@ def key(job_args) return key unless @key_suffix key << ":#{@key_suffix.call(*job_args)}" - rescue => e + rescue StandardError => e Sidekiq.logger.error "Failed to get key suffix: #{e}" raise e end diff --git a/lib/sidekiq/throttled/strategy/concurrency.rb b/lib/sidekiq/throttled/strategy/concurrency.rb index 9d9663c7..2f4e9376 100644 --- a/lib/sidekiq/throttled/strategy/concurrency.rb +++ b/lib/sidekiq/throttled/strategy/concurrency.rb @@ -52,6 +52,16 @@ def throttled?(jid, *job_args) Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) } end + # @return [Float] How long, in seconds, before we'll next be able to take on jobs + def retry_in(_jid, *job_args) + job_limit = limit(job_args) + return 0.0 if !job_limit || count(*job_args) < job_limit + + oldest_jid_with_score = Sidekiq.redis { |redis| redis.zrange(key(job_args), 0, 0, withscores: true) }.first + expiry_time = oldest_jid_with_score.last.to_f + expiry_time - Time.now.to_f + end + # @return [Integer] Current count of jobs def count(*job_args) Sidekiq.redis { |conn| conn.zcard(key(job_args)) }.to_i diff --git a/lib/sidekiq/throttled/strategy/threshold.rb b/lib/sidekiq/throttled/strategy/threshold.rb index 11383d36..095f37e0 100644 --- a/lib/sidekiq/throttled/strategy/threshold.rb +++ b/lib/sidekiq/throttled/strategy/threshold.rb @@ -69,6 +69,25 @@ def throttled?(*job_args) Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) } end + # @return [Float] How long, in seconds, before we'll next be able to take on jobs + def retry_in(*job_args) + job_limit = limit(job_args) + return 0.0 if !job_limit || count(*job_args) < job_limit + + job_period = period(job_args) + job_key = key(job_args) + time_since_oldest = Time.now.to_f - Sidekiq.redis { |redis| redis.lindex(job_key, -1) }.to_f + if time_since_oldest > job_period + # The oldest job on our list is from more than the throttling period ago, + # which means we have not hit the limit this period. + 0.0 + else + # If we can only have X jobs every Y minutes, then wait until Y minutes have elapsed + # since the oldest job on our list. + job_period - time_since_oldest + end + end + # @return [Integer] Current count of jobs def count(*job_args) Sidekiq.redis { |conn| conn.llen(key(job_args)) }.to_i diff --git a/lib/sidekiq/throttled/strategy_collection.rb b/lib/sidekiq/throttled/strategy_collection.rb index ce9d7e34..d6fd0bc4 100644 --- a/lib/sidekiq/throttled/strategy_collection.rb +++ b/lib/sidekiq/throttled/strategy_collection.rb @@ -26,8 +26,8 @@ def initialize(strategies, strategy:, name:, key_suffix:) # @param [#call] block # Iterates each strategy in collection - def each(&block) - @strategies.each(&block) + def each(...) + @strategies.each(...) end # @return [Boolean] whenever any strategy in collection has dynamic config @@ -37,14 +37,19 @@ def dynamic? # @return [Boolean] whenever job is throttled or not # by any strategy in collection. - def throttled?(*args) - any? { |s| s.throttled?(*args) } + def throttled?(...) + any? { |s| s.throttled?(...) } + end + + # @return [Float] How long, in seconds, before we'll next be able to take on jobs + def retry_in(*args) + max { |s| s.retry_in(*args) } end # Marks job as being processed. # @return [void] - def finalize!(*args) - each { |c| c.finalize!(*args) } + def finalize!(...) + each { |c| c.finalize!(...) } end # Resets count of jobs of all avaliable strategies diff --git a/lib/sidekiq/throttled/version.rb b/lib/sidekiq/throttled/version.rb index 0069537a..8500452c 100644 --- a/lib/sidekiq/throttled/version.rb +++ b/lib/sidekiq/throttled/version.rb @@ -3,6 +3,6 @@ module Sidekiq module Throttled # Gem version - VERSION = "1.0.0.alpha.1" + VERSION = "1.4.0" end end diff --git a/rubocop/layout.yml b/rubocop/layout.yml index 9d97320d..6e5c7f1d 100644 --- a/rubocop/layout.yml +++ b/rubocop/layout.yml @@ -5,6 +5,10 @@ Layout/ArgumentAlignment: Layout/EmptyLinesAroundAttributeAccessor: Enabled: true +Layout/FirstArrayElementIndentation: + Enabled: true + EnforcedStyle: consistent + Layout/FirstHashElementIndentation: Enabled: true EnforcedStyle: consistent diff --git a/rubocop/rspec.yml b/rubocop/rspec.yml index c129a663..e9df608b 100644 --- a/rubocop/rspec.yml +++ b/rubocop/rspec.yml @@ -1,6 +1,11 @@ +RSpec/BeNil: + Enabled: true + EnforcedStyle: be + RSpec/ExampleLength: Enabled: true Max: 10 + CountAsOne: [array, hash, heredoc, method_call] RSpec/MultipleExpectations: Enabled: false diff --git a/rubocop/style.yml b/rubocop/style.yml index 47b505f0..3a7e90f7 100644 --- a/rubocop/style.yml +++ b/rubocop/style.yml @@ -46,6 +46,9 @@ Style/OptionalBooleanParameter: Style/RedundantAssignment: Enabled: true +Style/RedundantCurrentDirectoryInPath: + Enabled: false + Style/RedundantFetchBlock: Enabled: true @@ -64,7 +67,7 @@ Style/RegexpLiteral: Style/RescueStandardError: Enabled: true - EnforcedStyle: implicit + EnforcedStyle: explicit Style/SingleArgumentDig: Enabled: true diff --git a/scripts/ci-rspec b/scripts/ci-rspec deleted file mode 100755 index 1aedbca0..00000000 --- a/scripts/ci-rspec +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash - -set -Eeuxo pipefail - -rm -f ./Gemfile.lock -bundle install - -rm -f ./gemfiles/*.gemfile ./gemfiles/*.gemfile.lock -bundle exec appraisal generate - -# XXX: `bundle exec appraisal install` fails on CI with ruby-3.2 -for BUNDLE_GEMFILE in gemfiles/*.gemfile; do - export BUNDLE_GEMFILE - bundle check || bundle install -done - -bundle exec appraisal rspec diff --git a/sensortower.svg b/sensortower.svg new file mode 100644 index 00000000..0ae9f6af --- /dev/null +++ b/sensortower.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/sidekiq-throttled.gemspec b/sidekiq-throttled.gemspec index 46e01671..7980c359 100644 --- a/sidekiq-throttled.gemspec +++ b/sidekiq-throttled.gemspec @@ -19,19 +19,18 @@ Gem::Specification.new do |spec| spec.metadata["rubygems_mfa_required"] = "true" spec.files = Dir.chdir(__dir__) do - docs = %w[LICENSE README.adoc].freeze + docs = %w[LICENSE.txt README.adoc].freeze `git ls-files -z`.split("\x0").select do |f| f.start_with?("lib/") || docs.include?(f) end end - spec.bindir = "exe" - spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } spec.require_paths = ["lib"] spec.required_ruby_version = ">= 2.7" - spec.add_runtime_dependency "redis-prescription", "~> 2.2" - spec.add_runtime_dependency "sidekiq", ">= 6.5" + spec.add_dependency "concurrent-ruby", ">= 1.2.0" + spec.add_dependency "redis-prescription", "~> 2.2" + spec.add_dependency "sidekiq", ">= 6.5" end diff --git a/spec/lib/sidekiq/throttled/config_spec.rb b/spec/lib/sidekiq/throttled/config_spec.rb new file mode 100644 index 00000000..f10ab700 --- /dev/null +++ b/spec/lib/sidekiq/throttled/config_spec.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/config" + +RSpec.describe Sidekiq::Throttled::Config do + subject(:config) { described_class.new } + + describe "#cooldown_period" do + subject { config.cooldown_period } + + it { is_expected.to eq 1.0 } + end + + describe "#cooldown_period=" do + it "updates #cooldown_period" do + expect { config.cooldown_period = 42.0 } + .to change(config, :cooldown_period).to(42.0) + end + + it "allows setting value to `nil`" do + expect { config.cooldown_period = nil } + .to change(config, :cooldown_period).to(nil) + end + + it "fails if given value is neither `NilClass` nor `Float`" do + expect { config.cooldown_period = 42 } + .to raise_error(TypeError, %r{unexpected type}) + end + + it "fails if given value is not positive" do + expect { config.cooldown_period = 0.0 } + .to raise_error(ArgumentError, %r{must be positive}) + end + end + + describe "#cooldown_threshold" do + subject { config.cooldown_threshold } + + it { is_expected.to eq 100 } + end + + describe "#cooldown_threshold=" do + it "updates #cooldown_threshold" do + expect { config.cooldown_threshold = 42 } + .to change(config, :cooldown_threshold).to(42) + end + + it "fails if given value is not `Integer`" do + expect { config.cooldown_threshold = 42.0 } + .to raise_error(TypeError, %r{unexpected type}) + end + + it "fails if given value is not positive" do + expect { config.cooldown_threshold = 0 } + .to raise_error(ArgumentError, %r{must be positive}) + end + end +end diff --git a/spec/lib/sidekiq/throttled/cooldown_spec.rb b/spec/lib/sidekiq/throttled/cooldown_spec.rb new file mode 100644 index 00000000..c2c1e9a7 --- /dev/null +++ b/spec/lib/sidekiq/throttled/cooldown_spec.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/cooldown" + +RSpec.describe Sidekiq::Throttled::Cooldown do + subject(:cooldown) { described_class.new(config) } + + let(:config) { Sidekiq::Throttled::Config.new } + + describe ".[]" do + subject { described_class[config] } + + it { is_expected.to be_an_instance_of described_class } + + context "when `cooldown_period` is nil" do + before { config.cooldown_period = nil } + + it { is_expected.to be(nil) } + end + end + + describe "#notify_throttled" do + before do + config.cooldown_threshold = 5 + + (config.cooldown_threshold - 1).times do + cooldown.notify_throttled("queue:the_longest_line") + end + end + + it "marks queue for exclusion once threshold is met" do + cooldown.notify_throttled("queue:the_longest_line") + + expect(cooldown.queues).to contain_exactly("queue:the_longest_line") + end + end + + describe "#notify_admitted" do + before do + config.cooldown_threshold = 5 + + (config.cooldown_threshold - 1).times do + cooldown.notify_throttled("queue:at_the_end_of") + cooldown.notify_throttled("queue:the_longest_line") + end + end + + it "resets threshold counter" do + cooldown.notify_admitted("queue:at_the_end_of") + + cooldown.notify_throttled("queue:at_the_end_of") + cooldown.notify_throttled("queue:the_longest_line") + + expect(cooldown.queues).to contain_exactly("queue:the_longest_line") + end + end + + describe "#queues" do + before do + config.cooldown_period = 1.0 + config.cooldown_threshold = 1 + end + + it "keeps queue in the exclusion list for the duration of cooldown_period" do + monotonic_time = 0.0 + + allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time } + + cooldown.notify_throttled("queue:at_the_end_of") + + monotonic_time += 0.9 + cooldown.notify_throttled("queue:the_longest_line") + + expect(cooldown.queues).to contain_exactly("queue:at_the_end_of", "queue:the_longest_line") + + monotonic_time += 0.1 + expect(cooldown.queues).to contain_exactly("queue:the_longest_line") + + monotonic_time += 1.0 + expect(cooldown.queues).to be_empty + end + end +end diff --git a/spec/lib/sidekiq/throttled/expirable_set_spec.rb b/spec/lib/sidekiq/throttled/expirable_set_spec.rb new file mode 100644 index 00000000..465c7ba9 --- /dev/null +++ b/spec/lib/sidekiq/throttled/expirable_set_spec.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/expirable_set" + +RSpec.describe Sidekiq::Throttled::ExpirableSet do + subject(:expirable_set) { described_class.new(2.0) } + + it { is_expected.to be_an Enumerable } + + describe ".new" do + it "raises ArgumentError if given TTL is not Float" do + expect { described_class.new(42) }.to raise_error(ArgumentError) + end + + it "raises ArgumentError if given TTL is not positive" do + expect { described_class.new(0.0) }.to raise_error(ArgumentError) + end + end + + describe "#add" do + it "returns self" do + expect(expirable_set.add("a")).to be expirable_set + end + + it "adds uniq elements to the set" do + expirable_set.add("a").add("b").add("b").add("a") + + expect(expirable_set).to contain_exactly("a", "b") + end + end + + describe "#each" do + subject { expirable_set.each } + + before do + monotonic_time = 0.0 + + allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time } + + expirable_set.add("lorem") + expirable_set.add("ipsum") + + monotonic_time += 1 + + expirable_set.add("ipsum") + + monotonic_time += 1 + + expirable_set.add("dolor") + end + + it { is_expected.to be_an(Enumerator) } + it { is_expected.to contain_exactly("ipsum", "dolor") } + + context "with block given" do + it "yields each paused queue and returns self" do + yielded_elements = [] + + expect(expirable_set.each { |element| yielded_elements << element }).to be expirable_set + expect(yielded_elements).to contain_exactly("ipsum", "dolor") + end + end + end +end diff --git a/spec/lib/sidekiq/throttled/job_spec.rb b/spec/lib/sidekiq/throttled/job_spec.rb index fdb4d318..9b6772a0 100644 --- a/spec/lib/sidekiq/throttled/job_spec.rb +++ b/spec/lib/sidekiq/throttled/job_spec.rb @@ -1,7 +1,12 @@ # frozen_string_literal: true RSpec.describe Sidekiq::Throttled::Job do - let(:working_class) { Class.new { include Sidekiq::Throttled::Job } } + let(:working_class) do + Class.new do + include Sidekiq::Job + include Sidekiq::Throttled::Job + end + end it "aliased as Sidekiq::Throttled::Worker" do expect(Sidekiq::Throttled::Worker).to be described_class @@ -13,6 +18,89 @@ .to receive(:add).with(working_class, foo: :bar) working_class.sidekiq_throttle(foo: :bar) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :enqueue }) + end + + it "accepts and stores a requeue parameter including :with" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { with: :schedule }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :schedule }) + end + + it "accepts and stores a requeue parameter including :to" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :enqueue }) + end + + it "accepts and stores a requeue parameter including both :to and :with" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue, with: :schedule }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :schedule }) + end + + it "raises an error when :with is not a valid value" do + expect do + working_class.sidekiq_throttle(foo: :bar, requeue: { with: :invalid_with_value }) + end.to raise_error(ArgumentError, "requeue: invalid_with_value is not a valid value for :with") + end + + context "when default_requeue_options are set" do + before do + Sidekiq::Throttled.configure do |config| + config.default_requeue_options = { with: :schedule } + end + end + + after do + Sidekiq::Throttled.configure(&:reset!) + end + + it "uses the default when not overridden" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :schedule }) + end + + it "uses the default alongside a requeue parameter including :to" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :schedule }) + end + + it "allows overriding the default" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { with: :enqueue }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :enqueue }) + end + + it "allows overriding the default and including a :to parameter" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue, with: :enqueue }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :enqueue }) + end end end diff --git a/spec/lib/sidekiq/throttled/message_spec.rb b/spec/lib/sidekiq/throttled/message_spec.rb new file mode 100644 index 00000000..2f7c849f --- /dev/null +++ b/spec/lib/sidekiq/throttled/message_spec.rb @@ -0,0 +1,183 @@ +# frozen_string_literal: true + +RSpec.describe Sidekiq::Throttled::Message do + subject(:message) do + described_class.new(item) + end + + let(:item) do + { + "class" => "ExcitingJob", + "args" => [42], + "jid" => "deadbeef" + } + end + + describe "#job_class" do + subject { message.job_class } + + it { is_expected.to eq("ExcitingJob") } + + context "with serialized payload" do + let(:item) do + JSON.dump({ + "class" => "ExcitingJob", + "args" => [42], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq("ExcitingJob") } + end + + context "with ActiveJob payload" do + let(:item) do + { + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + } + end + + it { is_expected.to eq("ExcitingJob") } + end + + context "with serialized ActiveJob payload" do + let(:item) do + JSON.dump({ + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq("ExcitingJob") } + end + + context "with invalid payload" do + let(:item) { "invalid" } + + it { is_expected.to be nil } + end + + context "with invalid serialized payload" do + let(:item) { JSON.dump("invalid") } + + it { is_expected.to be nil } + end + end + + describe "#job_args" do + subject { message.job_args } + + it { is_expected.to eq([42]) } + + context "with serialized payload" do + let(:item) do + JSON.dump({ + "class" => "ExcitingJob", + "args" => [42], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq([42]) } + end + + context "with ActiveJob payload" do + let(:item) do + { + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + } + end + + it { is_expected.to eq([42]) } + end + + context "with serialized ActiveJob payload" do + let(:item) do + JSON.dump({ + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq([42]) } + end + + context "with invalid payload" do + let(:item) { "invalid" } + + it { is_expected.to be nil } + end + + context "with invalid serialized payload" do + let(:item) { JSON.dump("invalid") } + + it { is_expected.to be nil } + end + end + + describe "#job_id" do + subject { message.job_id } + + it { is_expected.to eq("deadbeef") } + + context "with serialized payload" do + let(:item) do + JSON.dump({ + "class" => "ExcitingJob", + "args" => [42], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq("deadbeef") } + end + + context "with ActiveJob payload" do + let(:item) do + { + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + } + end + + it { is_expected.to eq("deadbeef") } + end + + context "with serialized ActiveJob payload" do + let(:item) do + JSON.dump({ + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq("deadbeef") } + end + + context "with invalid payload" do + let(:item) { "invalid" } + + it { is_expected.to be nil } + end + + context "with invalid serialized payload" do + let(:item) { JSON.dump("invalid") } + + it { is_expected.to be nil } + end + end +end diff --git a/spec/lib/sidekiq/throttled/middleware_spec.rb b/spec/lib/sidekiq/throttled/middleware_spec.rb deleted file mode 100644 index 15ab2c4e..00000000 --- a/spec/lib/sidekiq/throttled/middleware_spec.rb +++ /dev/null @@ -1,67 +0,0 @@ -# frozen_string_literal: true - -require "sidekiq/throttled/middleware" - -RSpec.describe Sidekiq::Throttled::Middleware do - subject(:middleware) { described_class.new } - - describe "#call" do - let(:payload) { { "class" => "foo", "jid" => "bar" } } - - context "when job class has strategy with concurrency constraint" do - let! :strategy do - Sidekiq::Throttled::Registry.add payload["class"], - concurrency: { limit: 1 } - end - - it "calls #finalize! of queue with jid of job being processed" do - expect(strategy).to receive(:finalize!).with "bar" - middleware.call(double, payload, double) { |*| } - end - - it "returns yields control to the given block" do - expect { |b| middleware.call(double, payload, double, &b) } - .to yield_control - end - - it "returns result of given block" do - expect(middleware.call(double, payload, double) { |*| :foobar }) - .to be :foobar - end - end - - context "when job class has strategy without concurrency constraint" do - let! :strategy do - Sidekiq::Throttled::Registry.add payload["class"], - threshold: { limit: 1, period: 1 } - end - - it "calls #finalize! of queue with jid of job being processed" do - expect(strategy).to receive(:finalize!).with "bar" - middleware.call(double, payload, double) { |*| } - end - - it "returns yields control to the given block" do - expect { |b| middleware.call(double, payload, double, &b) } - .to yield_control - end - - it "returns result of given block" do - expect(middleware.call(double, payload, double) { |*| :foobar }) - .to be :foobar - end - end - - context "when job class has no strategy" do - it "returns yields control to the given block" do - expect { |b| middleware.call(double, payload, double, &b) } - .to yield_control - end - - it "returns result of given block" do - expect(middleware.call(double, payload, double) { |*| :foobar }) - .to be :foobar - end - end - end -end diff --git a/spec/lib/sidekiq/throttled/middlewares/server_spec.rb b/spec/lib/sidekiq/throttled/middlewares/server_spec.rb new file mode 100644 index 00000000..23ff91b4 --- /dev/null +++ b/spec/lib/sidekiq/throttled/middlewares/server_spec.rb @@ -0,0 +1,143 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/middlewares/server" + +RSpec.describe Sidekiq::Throttled::Middlewares::Server do + subject(:middleware) { described_class.new } + + describe "#call" do + let(:args) { ["bar", 1] } + let(:payload) { { "class" => "foo", "jid" => "bar" } } + let(:payload_args) { { "class" => "foo", "jid" => "bar", "args" => args } } + + context "when job class has strategy with concurrency constraint" do + let! :strategy do + Sidekiq::Throttled::Registry.add payload["class"], + concurrency: { limit: 1 } + end + + it "calls #finalize! of queue with jid of job being processed" do + expect(strategy).to receive(:finalize!).with "bar" + middleware.call(double, payload, double) { |*| :foobar } + end + + it "calls #finalize! of queue with jid and args of job being processed" do + expect(strategy).to receive(:finalize!).with "bar", *args + middleware.call(double, payload_args, double) { |*| :foobar } + end + + it "returns yields control to the given block" do + expect { |b| middleware.call(double, payload, double, &b) } + .to yield_control + end + + it "returns result of given block" do + expect(middleware.call(double, payload, double) { |*| :foobar }) + .to be :foobar + end + end + + context "when job class has strategy with concurrency constraint and uses ActiveJob" do + let(:payload) do + { + "class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", + "wrapped" => "wrapped-foo", + "jid" => "bar" + } + end + let(:payload_args) do + { + "class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", + "wrapped" => "wrapped-foo", + "args" => [{ "job_class" => "foo", "arguments" => args }], + "jid" => "bar" + } + end + + let! :strategy do + Sidekiq::Throttled::Registry.add payload["wrapped"], + concurrency: { limit: 1 } + end + + it "calls #finalize! of queue with jid of job being processed" do + expect(strategy).to receive(:finalize!).with "bar" + middleware.call(double, payload, double) { |*| :foobar } + end + + it "calls #finalize! of queue with jid and args of job being processed" do + expect(strategy).to receive(:finalize!).with "bar", *args + middleware.call(double, payload_args, double) { |*| :foobar } + end + + it "returns yields control to the given block" do + expect { |b| middleware.call(double, payload, double, &b) } + .to yield_control + end + + it "returns result of given block" do + expect(middleware.call(double, payload, double) { |*| :foobar }) + .to be :foobar + end + end + + context "when job class has strategy without concurrency constraint" do + let! :strategy do + Sidekiq::Throttled::Registry.add payload["class"], + threshold: { limit: 1, period: 1 } + end + + it "calls #finalize! of queue with jid of job being processed" do + expect(strategy).to receive(:finalize!).with "bar" + middleware.call(double, payload, double) { |*| :foobar } + end + + it "returns yields control to the given block" do + expect { |b| middleware.call(double, payload, double, &b) } + .to yield_control + end + + it "returns result of given block" do + expect(middleware.call(double, payload, double) { |*| :foobar }) + .to be :foobar + end + end + + context "when job class has no strategy" do + it "returns yields control to the given block" do + expect { |b| middleware.call(double, payload, double, &b) } + .to yield_control + end + + it "returns result of given block" do + expect(middleware.call(double, payload, double) { |*| :foobar }) + .to be :foobar + end + end + + context "when message contains no job class" do + before do + allow(Sidekiq::Throttled::Registry).to receive(:get).and_call_original + payload.delete("class") + end + + it "does not attempt to retrieve any strategy" do + expect { |b| middleware.call(double, payload, double, &b) }.to yield_control + + expect(Sidekiq::Throttled::Registry).not_to receive(:get) + end + end + + context "when message contains no jid" do + before do + allow(Sidekiq::Throttled::Registry).to receive(:get).and_call_original + payload.delete("jid") + end + + it "does not attempt to retrieve any strategy" do + expect { |b| middleware.call(double, payload, double, &b) }.to yield_control + + expect(Sidekiq::Throttled::Registry).not_to receive(:get) + end + end + end +end diff --git a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb index 203daba2..de70974b 100644 --- a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb +++ b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb @@ -2,83 +2,93 @@ require "sidekiq/throttled/patches/basic_fetch" -class ThrottledTestJob - include Sidekiq::Job - include Sidekiq::Throttled::Job - - def perform(*); end -end - RSpec.describe Sidekiq::Throttled::Patches::BasicFetch do - subject(:fetch) do + let(:fetch) do if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0") Sidekiq.instance_variable_set(:@config, Sidekiq::DEFAULTS.dup) - Sidekiq.queues = %w[default] + Sidekiq.queues = %w[default critical] Sidekiq::BasicFetch.new(Sidekiq) else config = Sidekiq::Config.new - config.queues = %w[default] + config.queues = %w[default critical] Sidekiq::BasicFetch.new(config.default_capsule) end end - before { described_class.apply! } + before do + Sidekiq::Throttled.configure { |config| config.cooldown_period = nil } - describe "#retrieve_work" do - def enqueued_jobs(queue) - Sidekiq.redis do |conn| - conn.lrange("queue:#{queue}", 0, -1).map do |job| - JSON.parse(job).then do |payload| - [payload["class"], payload["args"]] - end - end - end - end + stub_job_class("TestJob") + stub_job_class("AnotherTestJob") { sidekiq_options(queue: :critical) } - before do - # Sidekiq is FIFO queue, with head on right side of the list, - # meaning jobs below will be stored in 3, 2, 1 order. - ThrottledTestJob.perform_bulk([[1], [2], [3]]) - end + allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC).and_return(0.0) + + # Sidekiq is FIFO queue, with head on right side of the list, + # meaning jobs below will be stored in 3, 2, 1 order. + TestJob.perform_async(1) + TestJob.perform_async(2) + TestJob.perform_async(3) + AnotherTestJob.perform_async(4) + end + describe "#retrieve_work" do context "when job is not throttled" do it "returns unit of work" do - expect(fetch.retrieve_work).to be_an_instance_of(Sidekiq::BasicFetch::UnitOfWork) + expect(Array.new(4) { fetch.retrieve_work }).to all be_an_instance_of(Sidekiq::BasicFetch::UnitOfWork) end end - context "when job was throttled due to concurrency" do - before do - ThrottledTestJob.sidekiq_throttle(concurrency: { limit: 1 }) + shared_examples "requeues throttled job" do + it "returns nothing" do fetch.retrieve_work - end - it "returns nothing" do - expect(fetch.retrieve_work).to be_nil + expect(fetch.retrieve_work).to be(nil) end - it "pushes job back to the end queue" do + it "pushes job back to the head of the queue" do + fetch.retrieve_work + expect { fetch.retrieve_work } - .to change { enqueued_jobs("default") } - .to eq([["ThrottledTestJob", [2]], ["ThrottledTestJob", [3]]]) + .to change { enqueued_jobs("default") }.to([["TestJob", [2]], ["TestJob", [3]]]) + .and(keep_unchanged { enqueued_jobs("critical") }) end - end - context "when job was throttled due to threshold" do - before do - ThrottledTestJob.sidekiq_throttle(threshold: { limit: 1, period: 60 }) - fetch.retrieve_work - end + context "when queue cooldown kicks in" do + before do + Sidekiq::Throttled.configure do |config| + config.cooldown_period = 2.0 + config.cooldown_threshold = 1 + end - it "returns nothing" do - expect(fetch.retrieve_work).to be_nil - end + fetch.retrieve_work + end - it "pushes job back to the end queue" do - expect { fetch.retrieve_work } - .to change { enqueued_jobs("default") } - .to eq([["ThrottledTestJob", [2]], ["ThrottledTestJob", [3]]]) + it "updates cooldown queues" do + expect { fetch.retrieve_work } + .to change { enqueued_jobs("default") }.to([["TestJob", [2]], ["TestJob", [3]]]) + .and(change { Sidekiq::Throttled.cooldown.queues }.to(["queue:default"])) + end + + it "excludes the queue from polling" do + fetch.retrieve_work + + expect { fetch.retrieve_work } + .to change { enqueued_jobs("critical") }.to([]) + .and(keep_unchanged { enqueued_jobs("default") }) + end end end + + context "when job was throttled due to concurrency" do + before { TestJob.sidekiq_throttle(concurrency: { limit: 1 }) } + + include_examples "requeues throttled job" + end + + context "when job was throttled due to threshold" do + before { TestJob.sidekiq_throttle(threshold: { limit: 1, period: 60 }) } + + include_examples "requeues throttled job" + end end end diff --git a/spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb b/spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb new file mode 100644 index 00000000..4bc7219e --- /dev/null +++ b/spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +require "sidekiq/throttled/patches/super_fetch" + +RSpec.describe Sidekiq::Throttled::Patches::SuperFetch, :sidekiq_pro do + let(:base_queue) { "default" } + let(:critical_queue) { "critical" } + let(:config) do + config = Sidekiq.instance_variable_get(:@config) + config.super_fetch! + config.queues = [base_queue, critical_queue] + config + end + let(:fetch) do + config.default_capsule.fetcher + end + + before do + Sidekiq::Throttled.configure { |config| config.cooldown_period = nil } + + bq = base_queue + cq = critical_queue + stub_job_class("TestJob") { sidekiq_options(queue: bq) } + stub_job_class("AnotherTestJob") { sidekiq_options(queue: cq) } + + allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC).and_return(0.0) + + # Give super_fetch a chance to finish its initialization, but also check that there are no pre-existing jobs + pre_existing_job = fetch.retrieve_work + raise "Found pre-existing job: #{pre_existing_job.inspect}" if pre_existing_job + + # Sidekiq is FIFO queue, with head on right side of the list, + # meaning jobs below will be stored in 3, 2, 1 order. + TestJob.perform_bulk([[1], [2], [3]]) + AnotherTestJob.perform_async(4) + end + + describe "#retrieve_work" do + context "when job is not throttled" do + it "returns unit of work" do + expect(Array.new(4) { fetch.retrieve_work }).to all be_an_instance_of(Sidekiq::Pro::SuperFetch::UnitOfWork) + end + end + + shared_examples "requeues throttled job" do + it "returns nothing" do + fetch.retrieve_work + + expect(fetch.retrieve_work).to be(nil) + end + + it "pushes job back to the head of the queue" do + fetch.retrieve_work + + expect { fetch.retrieve_work } + .to change { enqueued_jobs(base_queue) }.to([["TestJob", [2]], ["TestJob", [3]]]) + .and(keep_unchanged { enqueued_jobs(critical_queue) }) + end + + context "when queue cooldown kicks in" do + before do + Sidekiq::Throttled.configure do |config| + config.cooldown_period = 2.0 + config.cooldown_threshold = 1 + end + + fetch.retrieve_work + end + + it "updates cooldown queues" do + expect { fetch.retrieve_work } + .to change { enqueued_jobs(base_queue) }.to([["TestJob", [2]], ["TestJob", [3]]]) + .and(change { Sidekiq::Throttled.cooldown.queues }.to(["queue:#{base_queue}"])) + end + + it "excludes the queue from polling" do + fetch.retrieve_work + + expect { fetch.retrieve_work } + .to change { enqueued_jobs(critical_queue) }.to([]) + .and(keep_unchanged { enqueued_jobs(base_queue) }) + end + end + end + + context "when job was throttled due to concurrency" do + before { TestJob.sidekiq_throttle(concurrency: { limit: 1 }) } + + include_examples "requeues throttled job" + end + + context "when job was throttled due to threshold" do + before { TestJob.sidekiq_throttle(threshold: { limit: 1, period: 60 }) } + + include_examples "requeues throttled job" + end + end +end diff --git a/spec/lib/sidekiq/throttled/registry_spec.rb b/spec/lib/sidekiq/throttled/registry_spec.rb index 7155742f..e76b1c0d 100644 --- a/spec/lib/sidekiq/throttled/registry_spec.rb +++ b/spec/lib/sidekiq/throttled/registry_spec.rb @@ -59,7 +59,7 @@ def stub_class(name, *parent, &block) let(:name) { "foo" } context "when strategy is not registered" do - it { is_expected.to be_nil } + it { is_expected.to be(nil) } end context "when strategy was registered" do @@ -76,15 +76,7 @@ def stub_class(name, *parent, &block) before { described_class.add(parent_class.name, **threshold) } - it { is_expected.to be_nil } - - context "when configuration has inherit strategy turned on" do - before { Sidekiq::Throttled.configuration.inherit_strategies = true } - - after { Sidekiq::Throttled.configuration.reset! } - - it { is_expected.to be described_class.get("Parent") } - end + it { is_expected.to be described_class.get("Parent") } end end @@ -119,7 +111,7 @@ def stub_class(name, *parent, &block) describe ".each_with_static_keys" do before do described_class.add("foo", **threshold) - described_class.add("bar", **threshold.merge(key_suffix: ->(i) { i })) + described_class.add("bar", **threshold, key_suffix: ->(i) { i }) end it "yields once for each strategy without dynamic key suffixes" do diff --git a/spec/lib/sidekiq/throttled/strategy/concurrency_spec.rb b/spec/lib/sidekiq/throttled/strategy/concurrency_spec.rb index 5995e1c0..c0968bc0 100644 --- a/spec/lib/sidekiq/throttled/strategy/concurrency_spec.rb +++ b/spec/lib/sidekiq/throttled/strategy/concurrency_spec.rb @@ -40,6 +40,62 @@ end end + describe "#retry_in" do + context "when limit is exceeded with all jobs starting just now" do + before { 5.times { strategy.throttled? jid } } + + it "tells us to wait roughly one ttl" do + expect(subject.retry_in(jid)).to be_within(0.1).of(900) + end + end + + context "when limit exceeded, with first job starting 800 seconds ago" do + before do + Timecop.travel(Time.now - 800) do + strategy.throttled? jid + end + 4.times { strategy.throttled? jid } + end + + it "tells us to wait 100 seconds" do + expect(subject.retry_in(jid)).to be_within(0.1).of(100) + end + end + + context "when limit not exceeded, because the oldest job was more than the ttl ago" do + before do + Timecop.travel(Time.now - 1000) do + strategy.throttled? jid + end + 4.times { strategy.throttled? jid } + end + + it "tells us we do not need to wait" do + expect(subject.retry_in(jid)).to eq 0 + end + end + + context "when limit not exceeded, because there are fewer jobs than the limit" do + before do + 4.times { strategy.throttled? jid } + end + + it "tells us we do not need to wait" do + expect(subject.retry_in(jid)).to eq 0 + end + end + + context "when dynamic limit returns nil" do + let(:strategy) { described_class.new :test, limit: proc { |*| } } + + before { 5.times { strategy.throttled? jid } } + + it "tells us we do not need to wait" do + expect(subject.retry_in(jid)).to eq 0 + end + end + end + describe "#count" do subject { strategy.count } diff --git a/spec/lib/sidekiq/throttled/strategy/threshold_spec.rb b/spec/lib/sidekiq/throttled/strategy/threshold_spec.rb index 3ab6d126..ab692be5 100644 --- a/spec/lib/sidekiq/throttled/strategy/threshold_spec.rb +++ b/spec/lib/sidekiq/throttled/strategy/threshold_spec.rb @@ -40,6 +40,62 @@ end end + describe "#retry_in" do + context "when limit exceeded with all jobs happening just now" do + before { 5.times { strategy.throttled? } } + + it "tells us to wait roughly one period" do + expect(subject.retry_in).to be_within(0.1).of(10) + end + end + + context "when limit exceeded, with first job happening 8 seconds ago" do + before do + Timecop.travel(Time.now - 8) do + strategy.throttled? + end + 4.times { strategy.throttled? } + end + + it "tells us to wait 2 seconds" do + expect(subject.retry_in).to be_within(0.1).of(2) + end + end + + context "when limit not exceeded, because the oldest job was more than a period ago" do + before do + Timecop.travel(Time.now - 12) do + strategy.throttled? + end + 4.times { strategy.throttled? } + end + + it "tells us we do not need to wait" do + expect(subject.retry_in).to eq 0 + end + end + + context "when limit not exceeded, because there are fewer jobs than the limit" do + before do + 4.times { strategy.throttled? } + end + + it "tells us we do not need to wait" do + expect(subject.retry_in).to eq 0 + end + end + + context "when there is no limit" do + subject(:strategy) { described_class.new :test, limit: -> {}, period: 10 } + + before { 5.times { strategy.throttled? } } + + it "tells us we do not need to wait" do + expect(subject.retry_in).to eq 0 + end + end + end + describe "#count" do subject { strategy.count } diff --git a/spec/lib/sidekiq/throttled/strategy_spec.rb b/spec/lib/sidekiq/throttled/strategy_spec.rb index 230150e0..4c17d497 100644 --- a/spec/lib/sidekiq/throttled/strategy_spec.rb +++ b/spec/lib/sidekiq/throttled/strategy_spec.rb @@ -7,6 +7,10 @@ let(:concurrency) { { concurrency: { limit: 7 } } } let(:ten_seconds_ago) { Time.now - 10 } + before do + stub_job_class("ThrottledTestJob") + end + describe ".new" do it "fails if neither :threshold nor :concurrency given" do expect { described_class.new(:foo) }.to raise_error ArgumentError @@ -199,6 +203,738 @@ end end + describe "#requeue_throttled" do + def scheduled_redis_item_and_score + Sidekiq.redis do |conn| + # Depending on whether we have redis-client (for Sidekiq 7) or redis-rb (for older Sidekiq), + # zscan takes different arguments + if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0") + conn.zscan("schedule", 0).last.first + else + conn.zscan("schedule").first + end + end + end + + context "when using Sidekiq Pro's SuperFetch", :sidekiq_pro do + let(:sidekiq_config) do + config = Sidekiq::Config.new(queues: %w[default other_queue]) + config.super_fetch! + config + end + let(:fetcher) { sidekiq_config.default_capsule.fetcher } + + let(:work) { fetcher.retrieve_work } + + before do + pre_existing_job = fetcher.retrieve_work + raise "Found pre-existing job: #{pre_existing_job.inspect}" if pre_existing_job + + # Sidekiq is FIFO queue, with head on right side of the list, + # meaning jobs below will be stored in 3, 2, 1 order. + ThrottledTestJob.perform_bulk([[1], [2], [3]]) + work + end + + describe "with parameter with: :enqueue" do + let(:options) { threshold } + + it "puts the job back on the queue" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "puts the job back on a different queue when specified" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: :other_queue) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "accepts a Proc for :with argument" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: ->(_arg) { :enqueue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "accepts a Proc for :to argument" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: ->(_arg) { :other_queue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + describe "with an invalid :to parameter" do + let(:options) { threshold } + + it "raises an ArgumentError when :to is an invalid type" do + invalid_to_value = 12_345 # Integer is an invalid type for `to` + expect do + subject.requeue_throttled(work, with: :enqueue, to: invalid_to_value) + end.to raise_error(ArgumentError, "Invalid argument for `to`") + end + end + + context "when :to Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: :enqueue, to: faulty_proc) + end.to raise_error("Proc error") + end + end + end + + describe "with parameter with: :schedule" do + context "when threshold constraints given" do + let(:options) { threshold } + + before do + allow(subject.threshold).to receive(:retry_in).and_return(300.0) + end + + it "reschedules for when the threshold strategy says to, plus some jitter" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "reschedules for a different queue if specified" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule, to: :other_queue) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:other_queue") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "accepts a Proc for :with argument" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: ->(_arg) { :schedule }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "accepts a Proc for :to argument" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule, to: ->(_arg) { :other_queue }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:other_queue") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + end + + context "when concurrency constraints given" do + let(:options) { concurrency } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(300.0) + end + + it "reschedules for when the concurrency strategy says to, plus some jitter" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + end + + context "when threshold and concurrency constraints given" do + let(:options) { threshold.merge concurrency } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(300.0) + allow(subject.threshold).to receive(:retry_in).and_return(500.0) + end + + it "reschedules for the later of what the two say, plus some jitter" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(51.0).of(Time.now.to_f + 550.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + end + + describe "with an invalid :to parameter" do + let(:options) { threshold } + + it "raises an ArgumentError when :to is an invalid type" do + invalid_to_value = 12_345 # Integer is an invalid type for `to` + expect do + subject.requeue_throttled(work, with: :schedule, to: invalid_to_value) + end.to raise_error(ArgumentError, "Invalid argument for `to`") + end + end + + context "when :to Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: :schedule, to: faulty_proc) + end.to raise_error("Proc error") + end + end + end + + describe "with an invalid :with parameter" do + let(:options) { threshold } + + it "raises an error when :with is not a valid value" do + expect { subject.requeue_throttled(work, with: :invalid_with_value) } + .to raise_error(RuntimeError, "unrecognized :with option invalid_with_value") + end + end + + context "when :with is a Proc returning an invalid value" do + let(:options) { threshold } + + it "raises an error when Proc returns an unrecognized value" do + with_proc = ->(*_) { :invalid_value } + expect do + subject.requeue_throttled(work, with: with_proc) + end.to raise_error(RuntimeError, "unrecognized :with option #{with_proc}") + end + end + + context "when :with Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: faulty_proc) + end.to raise_error("Proc error") + end + end + + context "when :to resolves to nil or empty string" do + let(:options) { threshold } + + it "defaults to work.queue when :to returns nil" do + to_proc = ->(*_) {} + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: to_proc) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "defaults to work.queue when :to returns an empty string" do + to_proc = ->(*_) { "" } + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: to_proc) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + end + + describe "#reschedule_throttled" do + let(:options) { threshold } + + context "when job_class is missing from work.job" do + before do + invalid_job_data = JSON.parse(work.job).tap do |msg| + msg.delete("class") + msg.delete("wrapped") + end + allow(work).to receive(:job).and_return(invalid_job_data.to_json) + end + + it "returns false and does not reschedule the job" do + expect(Sidekiq::Client).not_to receive(:enqueue_to_in) + expect(work).not_to receive(:acknowledge) + expect(subject.send(:reschedule_throttled, work, requeue_to: "queue:default")).to be_falsey + end + end + end + + describe "#retry_in" do + context "when both strategies return nil" do + let(:options) { concurrency.merge(threshold) } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(nil) + allow(subject.threshold).to receive(:retry_in).and_return(nil) + end + + it "raises an error indicating it cannot compute a valid retry interval" do + expect do + subject.send(:retry_in, work) + end.to raise_error("Cannot compute a valid retry interval") + end + end + end + end + + context "when using Sidekiq BasicFetch" do + let(:sidekiq_config) do + Sidekiq::Config.new(queues: %w[default]) + end + let(:fetcher) { sidekiq_config.default_capsule.fetcher } + + let(:work) { fetcher.retrieve_work } + + before do + pre_existing_job = fetcher.retrieve_work + raise "Found pre-existing job: #{pre_existing_job.inspect}" if pre_existing_job + + # Sidekiq is FIFO queue, with head on right side of the list, + # meaning jobs below will be stored in 3, 2, 1 order. + ThrottledTestJob.perform_bulk([[1], [2], [3]]) + work + end + + describe "with parameter with: :enqueue" do + let(:options) { threshold } + + it "puts the job back on the queue" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + end + + it "puts the job back on a different queue when specified" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: :other_queue) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) + end + + it "accepts a Proc for :with argument" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: ->(_arg) { :enqueue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + end + + it "accepts a Proc for :to argument" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: ->(_arg) { :other_queue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) + end + + describe "with an invalid :to parameter" do + let(:options) { threshold } + + it "raises an ArgumentError when :to is an invalid type" do + invalid_to_value = 12_345 # Integer is an invalid type for `to` + expect do + subject.requeue_throttled(work, with: :enqueue, to: invalid_to_value) + end.to raise_error(ArgumentError, "Invalid argument for `to`") + end + end + + context "when :to Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: :enqueue, to: faulty_proc) + end.to raise_error("Proc error") + end + end + end + + describe "with parameter with: :schedule" do + context "when threshold constraints given" do + let(:options) { threshold } + + before do + allow(subject.threshold).to receive(:retry_in).and_return(300.0) + end + + it "reschedules for when the threshold strategy says to, plus some jitter" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + + it "reschedules for a different queue if specified" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule, to: :other_queue) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:other_queue") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + + it "accepts a Proc for :with argument" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: ->(_arg) { :schedule }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + + it "accepts a Proc for :to argument" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule, to: ->(_arg) { :other_queue }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:other_queue") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + end + + context "when concurrency constraints given" do + let(:options) { concurrency } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(300.0) + end + + it "reschedules for when the concurrency strategy says to, plus some jitter" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + end + + context "when threshold and concurrency constraints given" do + let(:options) { threshold.merge concurrency } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(300.0) + allow(subject.threshold).to receive(:retry_in).and_return(500.0) + end + + it "reschedules for the later of what the two say, plus some jitter" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(51.0).of(Time.now.to_f + 550.0) + end + end + + describe "with an invalid :to parameter" do + let(:options) { threshold } + + it "raises an ArgumentError when :to is an invalid type" do + invalid_to_value = 12_345 # Integer is an invalid type for `to` + expect do + subject.requeue_throttled(work, with: :schedule, to: invalid_to_value) + end.to raise_error(ArgumentError, "Invalid argument for `to`") + end + end + + context "when :to Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: :schedule, to: faulty_proc) + end.to raise_error("Proc error") + end + end + end + + describe "with an invalid :with parameter" do + let(:options) { threshold } + + it "raises an error when :with is not a valid value" do + expect { subject.requeue_throttled(work, with: :invalid_with_value) } + .to raise_error(RuntimeError, "unrecognized :with option invalid_with_value") + end + end + + context "when :with is a Proc returning an invalid value" do + let(:options) { threshold } + + it "raises an error when Proc returns an unrecognized value" do + with_proc = ->(*_) { :invalid_value } + expect do + subject.requeue_throttled(work, with: with_proc) + end.to raise_error(RuntimeError, "unrecognized :with option #{with_proc}") + end + end + + context "when :with Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: faulty_proc) + end.to raise_error("Proc error") + end + end + + context "when :to resolves to nil or empty string" do + let(:options) { threshold } + + it "defaults to work.queue when :to returns nil" do + to_proc = ->(*_) {} + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: to_proc) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + end + + it "defaults to work.queue when :to returns an empty string" do + to_proc = ->(*_) { "" } + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: to_proc) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + end + end + + describe "#reschedule_throttled" do + let(:options) { threshold } + + context "when job_class is missing from work.job" do + before do + invalid_job_data = JSON.parse(work.job).tap do |msg| + msg.delete("class") + msg.delete("wrapped") + end + allow(work).to receive(:job).and_return(invalid_job_data.to_json) + end + + it "returns false and does not reschedule the job" do + expect(Sidekiq::Client).not_to receive(:enqueue_to_in) + expect(work).not_to receive(:acknowledge) + expect(subject.send(:reschedule_throttled, work, requeue_to: "queue:default")).to be_falsey + end + end + end + + describe "#retry_in" do + context "when both strategies return nil" do + let(:options) { concurrency.merge(threshold) } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(nil) + allow(subject.threshold).to receive(:retry_in).and_return(nil) + end + + it "raises an error indicating it cannot compute a valid retry interval" do + expect do + subject.send(:retry_in, work) + end.to raise_error("Cannot compute a valid retry interval") + end + end + end + end + end + describe "#reset!" do context "when only concurrency constraint given" do let(:options) { concurrency } diff --git a/spec/lib/sidekiq/throttled_spec.rb b/spec/lib/sidekiq/throttled_spec.rb index 970122f8..8c8ae48f 100644 --- a/spec/lib/sidekiq/throttled_spec.rb +++ b/spec/lib/sidekiq/throttled_spec.rb @@ -2,29 +2,30 @@ require "json" -RSpec.describe Sidekiq::Throttled do - describe ".setup!" do - before do - require "sidekiq/processor" - allow(Sidekiq).to receive(:server?).and_return true - described_class.setup! - end +class ThrottledTestJob + include Sidekiq::Job + include Sidekiq::Throttled::Job - it "infuses Sidekiq::BasicFetch with our patches" do - expect(Sidekiq::BasicFetch).to include(Sidekiq::Throttled::Patches::BasicFetch) - end + def perform(*); end +end - it "injects Sidekiq::Throttled::Middleware server middleware" do - if Sidekiq::VERSION >= "7.0" - expect(Sidekiq.default_configuration.server_middleware.exists?(Sidekiq::Throttled::Middleware)) - .to be true - else - expect(Sidekiq.server_middleware.exists?(Sidekiq::Throttled::Middleware)) - .to be true - end +RSpec.describe Sidekiq::Throttled do + it "registers server middleware" do + require "sidekiq/processor" + allow(Sidekiq).to receive(:server?).and_return true + + if Sidekiq::VERSION >= "7.0" + expect(Sidekiq.default_configuration.server_middleware.exists?(Sidekiq::Throttled::Middlewares::Server)) + .to be true + else + expect(Sidekiq.server_middleware.exists?(Sidekiq::Throttled::Middlewares::Server)).to be true end end + it "infuses Sidekiq::BasicFetch with our patches" do + expect(Sidekiq::BasicFetch).to include(Sidekiq::Throttled::Patches::BasicFetch) + end + describe ".throttled?" do it "tolerates invalid JSON message" do expect(described_class.throttled?("][")).to be false @@ -52,6 +53,20 @@ described_class.throttled? message end + it "passes JID and arguments to registered strategy" do + strategy = Sidekiq::Throttled::Registry.add("foo", + threshold: { limit: 1, period: 1 }, + concurrency: { limit: 1 }) + + payload_jid = jid + args = ["foo", 1] + message = %({"class":"foo","jid":#{payload_jid.inspect},"args":#{args.inspect}}) + + expect(strategy).to receive(:throttled?).with payload_jid, *args + + described_class.throttled? message + end + it "unwraps ActiveJob-jobs default sidekiq adapter" do strategy = Sidekiq::Throttled::Registry.add("wrapped-foo", threshold: { limit: 1, period: 1 }, @@ -85,5 +100,50 @@ described_class.throttled? message end + + it "unwraps ActiveJob-jobs job parameters" do + strategy = Sidekiq::Throttled::Registry.add("wrapped-foo", + threshold: { limit: 1, period: 1 }, + concurrency: { limit: 1 }) + + payload_jid = jid + args = ["foo", 1] + message = JSON.dump({ + "class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", + "wrapped" => "wrapped-foo", + "args" => [{ "job_class" => "TestJob", "arguments" => args }], + "jid" => payload_jid + }) + + expect(strategy).to receive(:throttled?).with payload_jid, *args + + described_class.throttled? message + end + end + + describe ".requeue_throttled" do + let(:sidekiq_config) do + if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0") + Sidekiq::DEFAULTS + else + Sidekiq::Config.new(queues: %w[default]).default_capsule + end + end + + let!(:strategy) { Sidekiq::Throttled::Registry.add("ThrottledTestJob", threshold: { limit: 1, period: 1 }) } + + before do + ThrottledTestJob.sidekiq_throttled_requeue_options = { to: :other_queue, with: :enqueue } + end + + it "invokes requeue_throttled on the strategy" do + payload_jid = jid + job = { class: "ThrottledTestJob", jid: payload_jid.inspect }.to_json + work = Sidekiq::BasicFetch::UnitOfWork.new("queue:default", job, sidekiq_config) + + expect(strategy).to receive(:requeue_throttled).with(work, to: :other_queue, with: :enqueue) + + described_class.requeue_throttled work + end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index ce733462..fd069beb 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,6 +1,5 @@ # frozen_string_literal: true -require_relative "support/simplecov" if ENV["CI"] || ENV["COVERAGE"] require_relative "support/sidekiq" require_relative "support/timecop" @@ -15,6 +14,21 @@ @strategies.clear @aliases.clear end + + # Reset config + Sidekiq::Throttled.configure do |throttled_config| + defaults = Sidekiq::Throttled::Config.new + + throttled_config.cooldown_period = defaults.cooldown_period + throttled_config.cooldown_threshold = defaults.cooldown_threshold + end + end + + # Sidekiq-Pro related specs require license set in Bundler + unless Bundler.settings["gems.contribsys.com"]&.include?(":") + config.define_derived_metadata(sidekiq_pro: true) do |metadata| + metadata[:skip] = "Sidekiq::Pro license not found or not supported" + end end # rspec-expectations config goes here. You can use an alternate diff --git a/spec/support/sidekiq.rb b/spec/support/sidekiq.rb index 95a1b013..c8f69b47 100644 --- a/spec/support/sidekiq.rb +++ b/spec/support/sidekiq.rb @@ -8,11 +8,66 @@ require "sidekiq" require "sidekiq/cli" -$TESTING = false # rubocop:disable Style/GlobalVars +begin + require "sidekiq-pro" +rescue LoadError + # Sidekiq Pro is not available +end + +$TESTING = true # rubocop:disable Style/GlobalVars REDIS_URL = ENV.fetch("REDIS_URL", "redis://localhost:6379") -module JidGenerator +module SidekiqThrottledHelper + def new_sidekiq_config + cfg = Sidekiq::Config.new + cfg.redis = { url: REDIS_URL } + cfg.logger = PseudoLogger.instance + cfg.logger.level = Logger::WARN + cfg.server_middleware do |chain| + chain.add(Sidekiq::Throttled::Middlewares::Server) + end + cfg + end + + # https://github.com/sidekiq/sidekiq/blob/7df28434f03fa1111e9e2834271c020205369f94/test/helper.rb#L30 + def reset_redis! + if Sidekiq.default_configuration.instance_variable_defined?(:@redis) + existing_pool = Sidekiq.default_configuration.instance_variable_get(:@redis) + existing_pool&.shutdown(&:close) + end + + RedisClient.new(url: REDIS_URL).call("flushall") + + # After resetting redis, create a new Sidekiq::Config instance to avoid ConnectionPool::PoolShuttingDownError + Sidekiq.instance_variable_set :@config, new_sidekiq_config + new_sidekiq_config + end + + def stub_job_class(name, &block) + klass = stub_const(name, Class.new) + + klass.include(Sidekiq::Job) + klass.include(Sidekiq::Throttled::Job) + + klass.instance_exec do + def perform(*); end + end + + klass.instance_exec(&block) if block + end + + def enqueued_jobs(queue) + q = queue.start_with?("queue:") ? queue : "queue:#{queue}" + Sidekiq.redis do |conn| + conn.lrange(q, 0, -1).map do |job| + JSON.parse(job).then do |payload| + [payload["class"], payload["args"]] + end + end + end + end + def jid SecureRandom.hex 12 end @@ -35,15 +90,8 @@ def output end end -if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0") - Sidekiq[:queues] = %i[default] -else - Sidekiq.configure_server do |config| - config.queues = %i[default] - end -end - Sidekiq.configure_server do |config| + config.queues = %i[default] config.redis = { url: REDIS_URL } config.logger = PseudoLogger.instance end @@ -54,15 +102,11 @@ def output end RSpec.configure do |config| - config.include JidGenerator - config.extend JidGenerator + config.include SidekiqThrottledHelper config.before do PseudoLogger.instance.reset! - Sidekiq.redis do |conn| - conn.flushdb - conn.script("flush") - end + reset_redis! end end diff --git a/spec/support/simplecov.rb b/spec/support/simplecov.rb deleted file mode 100644 index ca4f6f67..00000000 --- a/spec/support/simplecov.rb +++ /dev/null @@ -1,13 +0,0 @@ -# frozen_string_literal: true - -require "simplecov" - -SimpleCov.start do - command_name "BUNDLE_GEMFILE=#{ENV.fetch('BUNDLE_GEMFILE')}" - - enable_coverage :branch - - add_filter "/gemfiles/" - add_filter "/spec/" - add_filter "/vendor/" -end