Skip to content

Enhance SB polling intervals #4328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions app/jobs/reoccurring_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ def maximum_duration_seconds=(duration)
end

def polling_interval_seconds
[@polling_interval || 0, default_polling_interval_seconds].max
@polling_interval || 0
end

def polling_interval_seconds=(interval)
interval = interval.to_i if interval.is_a? String
@polling_interval = interval.clamp(default_polling_interval_seconds, 24.hours)
@polling_interval = interval.clamp(default_polling_interval_seconds, maximum_polling_interval)
end

private
Expand All @@ -58,8 +58,16 @@ def default_polling_exponential_backoff
Config.config.get(:broker_client_async_poll_exponential_backoff_rate)
end

def maximum_polling_interval
Config.config.get(:broker_client_max_async_poll_interval_seconds)
end

def next_execution_in
polling_interval_seconds * (default_polling_exponential_backoff**retry_number)
# use larger polling_interval. Either from job or calculated.
polling_interval = [polling_interval_seconds, default_polling_interval_seconds * (default_polling_exponential_backoff**retry_number)].max

# cap polling interval at maximum_polling_interval
[polling_interval, maximum_polling_interval].min
end

def next_enqueue_would_exceed_maximum_duration?
Expand Down
1 change: 1 addition & 0 deletions config/cloud_controller.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ max_retained_revisions_per_app: 100
broker_client_default_async_poll_interval_seconds: 60
broker_client_max_async_poll_duration_minutes: 10080
broker_client_async_poll_exponential_backoff_rate: 1.0
broker_client_max_async_poll_interval_seconds: 86400

broker_client_response_parser:
log_errors: true
Expand Down
1 change: 1 addition & 0 deletions lib/cloud_controller/config_schemas/base/api_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class ApiSchema < VCAP::Config
broker_client_default_async_poll_interval_seconds: Integer,
broker_client_max_async_poll_duration_minutes: Integer,
broker_client_async_poll_exponential_backoff_rate: Numeric,
broker_client_max_async_poll_interval_seconds: Integer,
optional(:broker_client_response_parser) => {
log_errors: bool,
log_validators: bool,
Expand Down
1 change: 1 addition & 0 deletions lib/cloud_controller/config_schemas/base/worker_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class WorkerSchema < VCAP::Config
broker_client_default_async_poll_interval_seconds: Integer,
broker_client_max_async_poll_duration_minutes: Integer,
broker_client_async_poll_exponential_backoff_rate: Numeric,
broker_client_max_async_poll_interval_seconds: Integer,
optional(:broker_client_response_parser) => {
log_errors: bool,
log_validators: bool,
Expand Down
205 changes: 181 additions & 24 deletions spec/unit/jobs/reoccurring_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def perform
end
end

it 'keeps the polling interval within the bounds' do
it 'keeps the polling interval within the default bounds' do
job = FakeJob.new
job.polling_interval_seconds = 5
expect(job.polling_interval_seconds).to eq(60)
Expand All @@ -118,11 +118,25 @@ def perform
expect(job.polling_interval_seconds).to eq(24.hours)
end

context 'exponential backoff rate' do
context 'updates the polling interval' do
it 'when changing exponential backoff rate only' do
context 'when maximum polling interval is configured' do
before do
TestConfig.config[:broker_client_max_async_poll_interval_seconds] = 1800
end

it 'limits the polling interval to the configured maximum' do
job = FakeJob.new
job.polling_interval_seconds = 10.days
expect(job.polling_interval_seconds).to eq(1800)
end
end

describe 'exponential backoff rate' do
context 'when changing exponential backoff rate only' do
before do
TestConfig.config[:broker_client_async_poll_exponential_backoff_rate] = 2.0
end

it 'updates the polling interval' do
enqueued_time = 0

Timecop.freeze do
Expand All @@ -141,11 +155,15 @@ def perform
end
end
end
end

it 'when changing exponential backoff rate and default polling interval' do
context 'when changing exponential backoff rate and default polling interval' do
before do
TestConfig.config[:broker_client_async_poll_exponential_backoff_rate] = 1.3
TestConfig.config[:broker_client_default_async_poll_interval_seconds] = 10
end

it 'updates the polling interval' do
enqueued_time = 0

Timecop.freeze do
Expand All @@ -164,36 +182,175 @@ def perform
end
end
end
end

it 'when changing exponential backoff rate and retry_after from the job' do
TestConfig.config[:broker_client_async_poll_exponential_backoff_rate] = 1.3
TestConfig.config[:broker_client_default_async_poll_interval_seconds] = 10
describe 'changing exponential backoff rate and retry_after from the job' do
context 'when retry-after is larger than calculated backoff' do
let(:fake_job) { FakeJob.new(retry_after: [20, 30]) }

enqueued_time = 0
before do
TestConfig.config[:broker_client_async_poll_exponential_backoff_rate] = 1.3
TestConfig.config[:broker_client_default_async_poll_interval_seconds] = 10
end

Timecop.freeze do
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(FakeJob.new(retry_after: [20, 30]))
execute_all_jobs(expected_successes: 1, expected_failures: 0)
enqueued_time = Time.now
it 'uses retry-after interval' do
enqueued_time = 0

Timecop.freeze do
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(fake_job)
execute_all_jobs(expected_successes: 1, expected_failures: 0)
enqueued_time = Time.now
end

# the job should run after 20s (20s > 10 * 1.3^0)
Timecop.freeze(19.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(21.seconds.after(enqueued_time)) do
enqueued_time = Time.now
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end

# the job should run after 30s (30s > 10 * 1.3^1)
Timecop.freeze(29.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(31.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end
end
end

# the job should run after 20s * 1.3^0 = 20 seconds
Timecop.freeze(19.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
context 'when retry-after is smaller than calculated backoff' do
let(:fake_job) { FakeJob.new(retry_after: [10, 20]) }

before do
TestConfig.config[:broker_client_async_poll_exponential_backoff_rate] = 1.3
TestConfig.config[:broker_client_default_async_poll_interval_seconds] = 30
end

Timecop.freeze(21.seconds.after(enqueued_time)) do
enqueued_time = Time.now
execute_all_jobs(expected_successes: 1, expected_failures: 0)
it 'uses calculated interval' do
enqueued_time = 0

Timecop.freeze do
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(fake_job)
execute_all_jobs(expected_successes: 1, expected_failures: 0)
enqueued_time = Time.now
end

# the job should run after 30s (30s > 10s)
Timecop.freeze(29.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(31.seconds.after(enqueued_time)) do
enqueued_time = Time.now
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end

# the job should run after 30s (30s * 1.3^1 = 39 > 20s)
Timecop.freeze(38.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(40.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end
end
end

context 'when calculated backoff gets larger than retry-after' do
let(:fake_job) { FakeJob.new(retry_after: [15, 15, 15]) }

# the job should run after 30s * 1.3^1 = 39 seconds
Timecop.freeze(38.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
before do
TestConfig.config[:broker_client_async_poll_exponential_backoff_rate] = 2
TestConfig.config[:broker_client_default_async_poll_interval_seconds] = 5
end

Timecop.freeze(40.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 1, expected_failures: 0)
it 'uses retry-after until calculated backoff is larger' do
enqueued_time = 0

Timecop.freeze do
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(fake_job)
execute_all_jobs(expected_successes: 1, expected_failures: 0)
enqueued_time = Time.now
end

# the job should run after 15s (15s > 5s (5 * 2^0))
Timecop.freeze(14.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(16.seconds.after(enqueued_time)) do
enqueued_time = Time.now
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end

# the job should run after 15s (15s > 10s (5 * 2^1))
Timecop.freeze(14.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(16.seconds.after(enqueued_time)) do
enqueued_time = Time.now
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end

# the job should run after 20s (20s > 15s (5 * 2^2))
Timecop.freeze(19.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(21.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end
end

context 'when maximum polling interval is configured' do
before do
TestConfig.config[:broker_client_max_async_poll_interval_seconds] = 18
end

it 'limits the polling interval to the configured maximum' do
enqueued_time = 0

Timecop.freeze do
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(fake_job)
execute_all_jobs(expected_successes: 1, expected_failures: 0)
enqueued_time = Time.now
end

# the job should run after 15s (15s > 5s (5 * 2^0))
Timecop.freeze(14.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(16.seconds.after(enqueued_time)) do
enqueued_time = Time.now
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end

# the job should run after 15s (15s > 10s (5 * 2^1))
Timecop.freeze(14.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(16.seconds.after(enqueued_time)) do
enqueued_time = Time.now
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end

# the job should run after 18s (capped at )
Timecop.freeze(17.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 0, expected_failures: 0)
end

Timecop.freeze(19.seconds.after(enqueued_time)) do
execute_all_jobs(expected_successes: 1, expected_failures: 0)
end
end
end
end
end
Expand Down