Skip to content

Commit

Permalink
feat: Add exponential backoff functionality (#82)
Browse files Browse the repository at this point in the history
* feat: take Exponential

* wip

* return the wait backoff time from take_exponential

* verify basic exponential backoff

* multiple exponential backoff tests

* configure exp backoff from bucket

* remove extra debugging stuff

* update eslint and fix issues

* fixed window & small fixes
  • Loading branch information
stefanhts authored Dec 18, 2024
1 parent 6c9ee68 commit 79d594c
Show file tree
Hide file tree
Showing 14 changed files with 271 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .eslintrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ env:
es6: true
node: true
extends: 'eslint:recommended'
parserOptions:
ecmaVersion: 2020
rules:
indent:
- error
Expand Down
4 changes: 4 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ class LimitdRedis extends EventEmitter {
this.handler('takeElevated', type, key, opts, cb);
}

takeExponential(type, key, opts, cb) {
this.handler('takeExponential', type, key, opts, cb);
}

wait(type, key, opts, cb) {
this.handler('wait', type, key, opts, cb);
}
Expand Down
67 changes: 58 additions & 9 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ const { validateParams, validateERLParams } = require('./validation');
const { calculateQuotaExpiration, resolveElevatedParams, isFixedWindowEnabled, removeHashtag } = require('./utils');
const EventEmitter = require('events').EventEmitter;

const TAKE_LUA = fs.readFileSync(`${__dirname}/take.lua`, "utf8");
const TAKE_ELEVATED_LUA = fs.readFileSync(`${__dirname}/take_elevated.lua`, "utf8");
const PUT_LUA = fs.readFileSync(`${__dirname}/put.lua`, "utf8");
const TAKE_LUA = fs.readFileSync(`${__dirname}/take.lua`, 'utf8');
const TAKE_ELEVATED_LUA = fs.readFileSync(`${__dirname}/take_elevated.lua`, 'utf8');
const TAKE_EXPONENTIAL_LUA = fs.readFileSync(`${__dirname}/take_exponential.lua`, 'utf8');
const PUT_LUA = fs.readFileSync(`${__dirname}/put.lua`, 'utf8');

const DEFAULT_COMMAND_TIMEOUT = 125; // Milliseconds
const BACKOFF_FACTOR_DEFAULT = 2;
const MULTIPLE_UNIT_DEFAULT = 1000;
const DEFAULT_KEEPALIVE = 10000; // Milliseconds

class LimitDBRedis extends EventEmitter {
Expand Down Expand Up @@ -78,6 +80,11 @@ class LimitDBRedis extends EventEmitter {
lua: TAKE_LUA
});

this.redis.defineCommand('takeExponential', {
numberOfKeys: 1,
lua: TAKE_EXPONENTIAL_LUA
});

this.redis.defineCommand('takeElevated', {
numberOfKeys: 3,
lua: TAKE_ELEVATED_LUA
Expand Down Expand Up @@ -200,7 +207,7 @@ class LimitDBRedis extends EventEmitter {
const prevCall = this.callCounts.get(key);

if (prevCall) {
const shouldGoToRedis = prevCall?.count >= bucketKeyConfig.skip_n_calls
const shouldGoToRedis = prevCall?.count >= bucketKeyConfig.skip_n_calls;


if (!shouldGoToRedis) {
Expand All @@ -220,7 +227,7 @@ class LimitDBRedis extends EventEmitter {
}
}

takeFunc(key, bucketKeyConfig, count)
takeFunc(key, bucketKeyConfig, count);
}

/**
Expand Down Expand Up @@ -260,7 +267,49 @@ class LimitDBRedis extends EventEmitter {
}
return callback(null, res);
});
})
});
}

takeExponential(params, callback) {
this._doTake(params, callback, (key, bucketKeyConfig, count) => {
const useFixedWindow = isFixedWindowEnabled(bucketKeyConfig.fixed_window, params.fixed_window);
this.redis.takeExponential(key,
bucketKeyConfig.ms_per_interval || 0,
bucketKeyConfig.size,
count,
Math.ceil(bucketKeyConfig.ttl || this.globalTTL),
bucketKeyConfig.drip_interval || 0,
bucketKeyConfig.backoff_factor || BACKOFF_FACTOR_DEFAULT,
bucketKeyConfig.multiple_unit || MULTIPLE_UNIT_DEFAULT,
useFixedWindow ? bucketKeyConfig.interval : 0,
(err, results) => {
if (err) {
return callback(err);
}
const remaining = parseInt(results[0], 10);
const conformant = parseInt(results[1], 10) ? true : false;
const current_timestamp_ms = parseInt(results[2], 10);
const reset = parseInt(results[3], 10);
const backoff_factor = parseInt(results[4], 10);
const backoff_time = parseInt(results[5], 10);
const next_token_ms = parseInt(results[6], 10);
const res = {
conformant,
remaining,
reset: Math.ceil(reset / 1000),
limit: bucketKeyConfig.size,
delayed: false,
backoff_factor,
backoff_time,
delta_reset_ms: Math.max(reset - current_timestamp_ms, 0),
delta_backoff_time: next_token_ms - current_timestamp_ms,
};
if (bucketKeyConfig.skip_n_calls > 0) {
this.callCounts.set(key, { res, count: 0 });
}
return callback(null, res);
});
});
}

takeElevated(params, callback) {
Expand All @@ -270,7 +319,7 @@ class LimitDBRedis extends EventEmitter {
erlParams = utils.getERLParams(params.elevated_limits);
const valError = validateERLParams(erlParams);
if (valError) {
return callback(valError)
return callback(valError);
}
}

Expand Down Expand Up @@ -325,7 +374,7 @@ class LimitDBRedis extends EventEmitter {
}
return callback(null, res);
});
})
});
}

/**
Expand Down
82 changes: 82 additions & 0 deletions lib/take_exponential.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
local tokens_per_ms = tonumber(ARGV[1])
local bucket_size = tonumber(ARGV[2])
local new_content = tonumber(ARGV[2])
local tokens_to_take = tonumber(ARGV[3])
local ttl = tonumber(ARGV[4])
local drip_interval = tonumber(ARGV[5])
local backoff_factor = tonumber(ARGV[6])
local mult_unit = tonumber(ARGV[7])
local fixed_window = tonumber(ARGV[8])

local current_time = redis.call('TIME')
local current_timestamp_ms = current_time[1] * 1000 + current_time[2] / 1000

local current = redis.pcall('HMGET', KEYS[1], 'd', 'r')

if current.err ~= nil then
current = {}
end

-- calculate the time of next available token
local last_token_ms = current[1] or 0
local remaining_tokens = 0
if current[2] then
remaining_tokens = tonumber(current[2])
else
remaining_tokens = bucket_size
end

local backoff_step = bucket_size - remaining_tokens
local backoff_time = math.ceil(backoff_factor ^ backoff_step) * mult_unit
local next_token_ms = last_token_ms + backoff_time
local is_passed_wait_time = current_timestamp_ms >= next_token_ms

if current[1] and tokens_per_ms then
-- drip bucket

if fixed_window > 0 then
-- fixed window for granting new tokens
local interval_correction = (current_timestamp_ms - last_token_ms) % fixed_window
current_timestamp_ms = current_timestamp_ms - interval_correction
end

is_passed_wait_time = current_timestamp_ms >= next_token_ms

if not is_passed_wait_time then
new_content = tonumber(current[2])
last_token_ms = current[1]
else
local last_drip = current[1]
local content = current[2]
local delta_ms = math.max(current_timestamp_ms - last_drip, 0)
local drip_amount = delta_ms * tokens_per_ms
new_content = math.min(content + drip_amount, bucket_size)
end
elseif current[1] and tokens_per_ms == 0 and is_passed_wait_time then
-- fixed bucket
new_content = current[2]
end

local enough_tokens = (new_content >= tokens_to_take) and is_passed_wait_time

if enough_tokens then
new_content = new_content - 1
last_token_ms = current_timestamp_ms
end

-- https://redis.io/commands/EVAL#replicating-commands-instead-of-scripts
redis.replicate_commands()

redis.call('HMSET', KEYS[1],
'd', last_token_ms,
'r', new_content)
redis.call('EXPIRE', KEYS[1], ttl)

local reset_ms = 0
if fixed_window > 0 then
reset_ms = current_timestamp_ms + fixed_window
elseif drip_interval > 0 then
reset_ms = math.ceil(current_timestamp_ms + (bucket_size - new_content) * drip_interval)
end

return { new_content, enough_tokens, current_timestamp_ms, reset_ms, backoff_factor, backoff_time, next_token_ms }
38 changes: 25 additions & 13 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ const INTERVAL_TO_MS = {

const INTERVAL_SHORTCUTS = Object.keys(INTERVAL_TO_MS);

const EXPONENTIAL_BACKOFF_DEFAULTS = {
backoff_factor: 0,
multiple_unit: 1
};

const ERL_QUOTA_INTERVAL_PER_CALENDAR_MONTH = 'quota_per_calendar_month';
const ERL_QUOTA_INTERVALS = {
[ERL_QUOTA_INTERVAL_PER_CALENDAR_MONTH]: () => endOfMonthTimestamp()
Expand All @@ -24,7 +29,8 @@ function normalizeTemporals(params) {
'size',
'unlimited',
'skip_n_calls',
'fixed_window'
'fixed_window',
'exponential_backoff',
]);

INTERVAL_SHORTCUTS.forEach(intervalShortcut => {
Expand All @@ -39,6 +45,12 @@ function normalizeTemporals(params) {
type.size = type.per_interval;
}

if(type.exponential_backoff) {
type.backoff_factor= type.exponential_backoff.backoff_factor || EXPONENTIAL_BACKOFF_DEFAULTS.backoff_factor;
type.multiple_unit = type.exponential_backoff.multiple_unit || EXPONENTIAL_BACKOFF_DEFAULTS.multiple_unit;
delete type.exponential_backoff;
}

if (type.per_interval) {
type.ttl = ((type.size * type.interval) / type.per_interval) / 1000;
type.ms_per_interval = type.per_interval / type.interval;
Expand Down Expand Up @@ -100,9 +112,9 @@ function normalizeType(params) {

function normalizeElevatedOverrides(type, override) {
// If the override doesn't provide elevated_limits use the ones defined in the base type (if any)
const normalizedOverride = {}
const normalizedOverride = {};
if (!override.elevated_limits) {
Object.assign(normalizedOverride, override, { elevated_limits: type.elevated_limits })
Object.assign(normalizedOverride, override, { elevated_limits: type.elevated_limits });
return normalizedOverride;
}

Expand All @@ -111,11 +123,11 @@ function normalizeElevatedOverrides(type, override) {
if (typeof override.unlimited === 'undefined'
&& typeof override.size === 'undefined'
&& typeof override.per_interval === 'undefined') {
Object.assign(normalizedOverride,
override,
_.omit(type, 'overrides', 'overridesMatch'),
{elevated_limits: override.elevated_limits}
);
Object.assign(normalizedOverride,
override,
_.omit(type, 'overrides', 'overridesMatch'),
{elevated_limits: override.elevated_limits}
);
}
return normalizedOverride;
}
Expand All @@ -138,7 +150,7 @@ function buildBucket(bucket) {
}

function functionOrFalse(fun) {
return !!(fun && fun.constructor && fun.call && fun.apply)
return fun && fun.constructor && fun.call && fun.apply
? fun
: false;
}
Expand Down Expand Up @@ -194,20 +206,20 @@ function resolveElevatedParams(erlParams, bucketKeyConfig, key, prefix) {
erl_configured_for_bucket: !!(erlParams && bucketKeyConfig.elevated_limits?.erl_configured_for_bucket),
};

elevatedLimits.erl_is_active_key = replicateHashtag(key, prefix, elevatedLimits.erl_is_active_key)
elevatedLimits.erl_quota_key = replicateHashtag(key, prefix, elevatedLimits.erl_quota_key)
elevatedLimits.erl_is_active_key = replicateHashtag(key, prefix, elevatedLimits.erl_is_active_key);
elevatedLimits.erl_quota_key = replicateHashtag(key, prefix, elevatedLimits.erl_quota_key);

return elevatedLimits;
}

function replicateHashtag(baseKey, prefix, key) {
const prefixedBaseKey = key + `:{${prefix}${baseKey}}`;
const idxOpenBrace = baseKey.indexOf('{')
const idxOpenBrace = baseKey.indexOf('{');
if (idxOpenBrace < 0) {
return prefixedBaseKey;
}

const idxCloseBrace = baseKey.indexOf('}', idxOpenBrace)
const idxCloseBrace = baseKey.indexOf('}', idxOpenBrace);
if ( idxCloseBrace <= idxOpenBrace ) {
return prefixedBaseKey;
}
Expand Down
1 change: 1 addition & 0 deletions test/cb.tests.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable */
var assert = require('assert'),
cb = require('../lib/cb');

Expand Down
1 change: 1 addition & 0 deletions test/client.clustermode.tests.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable */
/* eslint-env node, mocha */
const _ = require('lodash');
const assert = require('chai').assert;
Expand Down
1 change: 1 addition & 0 deletions test/client.standalonemode.tests.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable */
/* eslint-env node, mocha */
const _ = require('lodash');
const assert = require('chai').assert;
Expand Down
1 change: 1 addition & 0 deletions test/client.tests.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable */
/* eslint-env node, mocha */
const _ = require('lodash');
const assert = require('chai').assert;
Expand Down
1 change: 1 addition & 0 deletions test/db.clustermode.tests.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable */
const LimitDB = require('../lib/db');
const _ = require('lodash');
const { tests: dbTests } = require('./db.tests');
Expand Down
1 change: 1 addition & 0 deletions test/db.standalonemode.tests.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable */
const LimitDB = require('../lib/db');
const _ = require('lodash');
const { tests: dbTests, buckets} = require('./db.tests');
Expand Down
Loading

0 comments on commit 79d594c

Please sign in to comment.