Skip to content

Commit

Permalink
pref(particle): Token bucket limit only used redis lua.
Browse files Browse the repository at this point in the history
  • Loading branch information
yizzuide committed Dec 21, 2023
1 parent 6f55bdd commit fac3d19
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 120 deletions.
4 changes: 2 additions & 2 deletions Milkomeda/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<properties>
<java.version>1.8</java.version>
<kotlin.version>1.6.21</kotlin.version>
<project.release.version>3.15.2-SNAPSHOT</project.release.version>
<project.release.version>3.15.3-SNAPSHOT</project.release.version>
<spring-boot.version>2.7.6</spring-boot.version>
<spring-cloud.version>2021.0.5</spring-cloud.version>
<mybatis.starter.version>3.0.0</mybatis.starter.version>
Expand Down Expand Up @@ -73,7 +73,7 @@
<profile>
<id>sonatype-oss-release</id>
<properties>
<project.release.version>3.15.2</project.release.version>
<project.release.version>3.15.3</project.release.version>
</properties>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* but reject all requests if the bucket is full.
*
* @since 3.15.0
* @version 3.15.3
* @author yizzuide
* Create at 2023/05/21 18:20
*/
Expand All @@ -52,12 +53,12 @@ public class LeakyBucketLimiter extends LimitHandler implements LuaLoader {
/**
* Bucket size.
*/
private long bucketCapacity;
private long capacity;

/**
* loss water count per second (handle request per second).
*/
private long waterRate;
private long rate;

/**
* Lua script list.
Expand All @@ -68,7 +69,7 @@ public class LeakyBucketLimiter extends LimitHandler implements LuaLoader {
public <R> R limit(String key, Process<R> process) throws Throwable {
String limiterKey = key + POSTFIX;
RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScripts[0], Long.class);
Long waterCount = getJsonRedisTemplate().execute(redisScript, Collections.singletonList(limiterKey), getBucketCapacity(), getWaterRate(), System.currentTimeMillis());
Long waterCount = getJsonRedisTemplate().execute(redisScript, Collections.singletonList(limiterKey), getCapacity(), getRate(), System.currentTimeMillis());
if (Environment.isShowLog()) {
log.info("particle drop water from bucket, leave water count: {}", waterCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,18 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;

/**
* The token bucket can solve the problem of double traffic per unit time which relative to {@link TimesLimiter} and sudden request rejected with {@link LeakyBucketLimiter}.
* At begin, full tokens in bucket before request come in.
* At init, full tokens in the bucket before request come in.
*
* @since 3.15.0
* @version 3.15.3
* @author yizzuide
* <br>
* Create at 2023/05/19 21:55
Expand All @@ -51,27 +48,10 @@
public class TokenBucketLimiter extends LimitHandler implements LuaLoader {

/**
* Decorated postfix for limiter key.
* Decorated postfix for a limiter key.
*/
private static final String POSTFIX = ":token_bucket";

/**
* Synchronized State for JMM.
*/
private volatile int startState = 0;

/**
* Sync Lock for wait task put tokens into bucket.
*/
private CountDownLatch countDownLatch = new CountDownLatch(1);

/**
* Task pool scheduler.
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
private ThreadPoolTaskScheduler taskScheduler;

/**
* Lua script list.
*/
Expand All @@ -85,65 +65,35 @@ public class TokenBucketLimiter extends LimitHandler implements LuaLoader {
/**
* Bucket size.
*/
private long bucketCapacity;
private long capacity;

/**
* Each time of put token count in bucket.
* The number of tokens required by the request.
*/
private long tokensPerTime;
private long tokensPerRequest = 1;

/**
* Interval of put token in bucket (second unit).
* Rate of put token in bucket (second unit).
*/
private long interval;
private long rate;

@Override
public <R> R limit(String key, Process<R> process) throws Throwable {
limiterKey = key + POSTFIX;
// first time, wait tokens added in the bucket.
if (startState == 0) {
synchronized (this) {
if (startState == 0) {
startState = 1;
startTask();
countDownLatch.await();
}
}
}
RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScripts[1], Long.class);
Long tokenCount = getJsonRedisTemplate().execute(redisScript, Collections.singletonList(limiterKey));
RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScripts[0], Long.class);
Long tokens = getJsonRedisTemplate().execute(redisScript, Collections.singletonList(limiterKey), getCapacity(), getTokensPerRequest(), getRate(), Instant.now().getEpochSecond());
if (Environment.isShowLog()) {
log.info("particle get tokens from bucket, leave token count: {}", tokenCount);
log.info("left tokens: {}", tokens);
}
// -1 is means bucket empty
boolean isOver = tokenCount == null || tokenCount == -1;
// -1 is means bucket not enough tokens
boolean isOver = tokens == null || tokens == -1;
Particle particle = new Particle(this.getClass(), isOver, null);
return next(particle, key, process);
}

private void startTask() {
// task for put tokens in bucket
taskScheduler.scheduleAtFixedRate(() -> {
if (startState == 0) {
return;
}
RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScripts[0], Long.class);
long currentTimeMillis = System.currentTimeMillis();
Long tokenCount = getJsonRedisTemplate().execute(redisScript, Collections.singletonList(limiterKey), bucketCapacity, tokensPerTime, interval, currentTimeMillis);
if (Environment.isShowLog()) {
log.info("particle task put tokens into bucket, current token count: {}", tokenCount);
}
// release lock
if (countDownLatch != null && countDownLatch.getCount() > 0) {
countDownLatch.countDown();
countDownLatch = null;
}
}, Instant.now(), Duration.ofSeconds(interval));
}

@Override
public String[] luaFilenames() {
return new String[]{"particle_tokenBucket_limiter.lua", "particle_tokenBucket_consume_limiter.lua"};
return new String[]{"particle_tokenBucket_limiter.lua"};
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,42 +1,47 @@
local key = KEYS[1]
local bucketCapacity = tonumber(ARGV[1])
-- number of tokens added each time
local addToken = tonumber(ARGV[2])
-- token addition interval with second unit
local addInterval = tonumber(ARGV[3])
local currentMills = tonumber(ARGV[4])
local capacity = tonumber(ARGV[1])
-- the number of tokens required by the request
local requested = tonumber(ARGV[2])
-- put tokens per second
local rate = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

-- key of bucket last update time
local lastTimeKey = key..'_update_time'
local timestamp_key = key..'_update_time'
-- add full tokens need time
local fill_time = capacity / rate
-- tokens expire time
local ttl = math.floor(fill_time * 2)

local tokenCount = redis.call('get', key)
-- maximum time required for bucket reset
local resetTime = math.ceil(bucketCapacity / addToken) * addInterval;
-- has tokens
if tokenCount then
local lastTime = tonumber(redis.call('get', lastTimeKey))
-- bucket tokens recovery multiple
local multiple = math.floor((currentMills - lastTime) / addInterval)
local recoveryTokenCount = multiple * addToken
-- must not over the capacity size
tokenCount = math.min(bucketCapacity, tokenCount + recoveryTokenCount) - 1
if tokenCount < 0 then
return -1
end
-- reset expire time
redis.call('set', key, tokenCount, 'EX', resetTime)
redis.call('set', lastTimeKey, lastTime + multiple * addInterval, 'EX', resetTime)
return tokenCount
else
-- first time, full tokens in bucket
tokenCount = bucketCapacity -1
-- set expire time
redis.call('set', key, tokenCount, 'EX', resetTime)
redis.call('set', lastTimeKey, currentMills, 'EX', resetTime)
return tokenCount
-- get left token count
local last_tokens = tonumber(redis.call("get", key))
-- init full tokens in bucket when is empty or has not been used for a long time
if last_tokens == nil then
last_tokens = capacity
end
-- get or init last timestamp
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end





-- calc pass time
local delta = math.max(0, now - last_refreshed)
-- fill up tokens rate at pass time
local filled_tokens = math.min(capacity, last_tokens + (delta * rate))
-- whether the number of tokens in the token bucket meets the number of tokens required by the request
local allowed = filled_tokens >= requested
-- using for calc left token count
local left_tokens = filled_tokens
-- if return tokens is -1, then limit
local tokens = -1
if allowed then
left_tokens = filled_tokens - requested
tokens = left_tokens
end
-- record left tokens and last time
if ttl > 0 then
redis.call('set', key, left_tokens, 'EX', ttl)
redis.call('set', timestamp_key, now, 'EX', ttl)
end
return tokens
2 changes: 1 addition & 1 deletion MilkomedaDemo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>2021.0.5</spring-cloud.version>
<milkomeda.version>3.15.2-SNAPSHOT</milkomeda.version>
<milkomeda.version>3.15.3-SNAPSHOT</milkomeda.version>
<mybatis.starter>2.2.2</mybatis.starter>
<redission.version>3.17.7</redission.version>
<zookeeper.version>3.7.1</zookeeper.version>
Expand Down
13 changes: 7 additions & 6 deletions MilkomedaDemo/src/main/resources/application-milkomeda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ milkomeda:
- name: tokenLimiter
type: token_bucket
props:
bucketCapacity: 10
tokensPerTime: 2
interval: 5
# 桶容量为10个令牌
capacity: 10
# 1秒放5个令牌
rate: 5
key-tpl: limit_token_quota
include-urls: [ "/particle/quota" ]
response:
Expand All @@ -126,10 +127,10 @@ milkomeda:
- name: leakyLimiter
type: leaky_bucket
props:
# 桶可装10个请求
bucketCapacity: 10
# 桶容量为10个请求
capacity: 10
# 1秒处理2个请求(速率恒定)
waterRate: 2
rate: 2
key-tpl: limit_leaky_pull
include-urls: [ "/particle/pull" ]
response:
Expand Down

0 comments on commit fac3d19

Please sign in to comment.