From f53cb0e6a9ba3804d20276916a1a3a9ca12be7ae Mon Sep 17 00:00:00 2001 From: Rain Date: Wed, 7 Feb 2024 23:14:40 +0800 Subject: [PATCH] docs: :memo: add rate limiter implement doc --- .gitignore | 4 +- src/blogs/backend/GolangSync.md | 866 +++++++++++++++++++++++++++++++ src/blogs/backend/rateLimiter.md | 334 ++++++++++++ 3 files changed, 1203 insertions(+), 1 deletion(-) create mode 100644 src/blogs/backend/GolangSync.md create mode 100644 src/blogs/backend/rateLimiter.md diff --git a/.gitignore b/.gitignore index 16ef6953..e7a3cf1d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ node_modules/ .DS_Store yarn-error.log -yarn.lock \ No newline at end of file +yarn.lock +.obsidian +template \ No newline at end of file diff --git a/src/blogs/backend/GolangSync.md b/src/blogs/backend/GolangSync.md new file mode 100644 index 00000000..30f8c984 --- /dev/null +++ b/src/blogs/backend/GolangSync.md @@ -0,0 +1,866 @@ +--- +title: Golang并发控制 +author: Rain +tags: + - Golang +categories: + - 后端 +date: 2024-02-06 23:34:42 +--- + + +Go中可以使用一个`go`关键字让程序异步执行 + +一个比较常见的场景:逐个异步调用多个函数,或者循环中异步调用 + +```go + +func main() { + + go do1() + + go do2() + + go do3() + +} + +// 或者 + +func main() { + +for i := range []int{1,2,3}{ + + go do(i) + +} + +} + +``` + + + +如果了解Go并发机制,就知道`main`在其他goroutine运行完成之前就已经结束了,所以上面代码的运行结果是不符合预期的。我们需要使用一种叫做并发控制的手段,来保证程序正确运行 + +举个例子: + +已知有一个现成的函数`search`,能够按照关键词执行搜索 + +期望实现一个新的函数`coSearch`能够进行批量查询 + +```go + +package main + + + +import ( + +"context" + +"errors" + +"fmt" + +"sync" + +) + + + +func search(ctx context.Context, word string) (string, error) { + +if word == "Go" { + +return "", errors.New("error: Go") // 模拟结果 + +} + +return fmt.Sprintf("result: %s", word), nil // 模拟结果 + +} + + + +func coSearch(ctx context.Context, words []string) (results []string, err error) { + +//tbd + + + +return + +} + + + +func main() { + +words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"} + +results, err := coSearch(context.Background(), words) + +if err != nil { + +fmt.Println(err) + +return + +} + + + +fmt.Println(results) + +} + +``` + +## **并发控制基础** + +`sync.WaitGroup`是Go标准库中用来控制并发的结构,这里放一个使用`WaitGroup`实现`coSearch`的示例 + + +```go + +package main + + + +import ( + +"context" + +"errors" + +"fmt" + +"sync" + +) + + + +func search(ctx context.Context, word string) (string, error) { + +if word == "Go" { + +return "", errors.New("error: Go") // 模拟结果 + +} + +return fmt.Sprintf("result: %s", word), nil // 模拟结果 + +} + + + +func coSearch(ctx context.Context, words []string) ([]string, error) { + +var ( + +wg = sync.WaitGroup{} + +once = sync.Once{} + +results = make([]string, len(words)) + +err error + +) + + + +for i, word := range words { + +wg.Add(1) + + + +go func(word string, i int) { + +defer wg.Done() + + + +result, e := search(ctx, word) + +if e != nil { + +once.Do(func() { + +err = e + +}) + + + +return + +} + + + +results[i] = result + +}(word, i) + +} + + + +wg.Wait() + + + +return results, err + +} + + + +func main() { + +words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"} + +results, err := coSearch(context.Background(), words) + +if err != nil { + +fmt.Println(err) + +return + +} + + + +fmt.Println(results) + +} + +``` + + + +上面的代码中有非常多的细节,来逐个聊一聊 + + + +### `sync.WaitGroup{}`并发控制 + + + +`sync.WaitGroup{}`的用法非常简洁 + + + +- 当新运行一个goroutine时,我们需要调用`wg.Add(1)` + +- 当一个goroutine运行完成的时候,我们需要调用`wg.Done()` + +- `wg.Wait()`让程序阻塞在此处,直到所有的goroutine运行完毕。 + + + +对于`coSearch`来说,等待所有goroutine运行完成,也就完成了函数的任务,返回最终的结果 + + + +```go + +var ( + +wg = sync.WaitGroup{} + +//...省略其他代码 + +) + + + +for i, word := range words { + +wg.Add(1) + + + +go func(word string, i int) { + +defer wg.Done() + +//...省略其他代码 + +}(word, i) + +} + + + +wg.Wait() + +``` + + + +### `for`循环中的goroutine + + + +这是一个Go经典错误,如果goroutine中使用了`for`迭代的变量,所有goroutine都会获得最后一次循环的值。例如下面的示例,并不会输出"a", "b", "c" 而是输出 "c", "c", "c" + + + +```go + +func main() { + +done := make(chan bool) + + + +values := []string{"a", "b", "c"} + +for _, v := range values { + +go func() { + +fmt.Println(v) + +done <- true + +}() + +} + + + +// wait for all goroutines to complete before exiting + +for _ = range values { + +<-done + +} + +} + +``` + + + +正确的做法就是像上文示例一样,将迭代的变量赋值给函数参数,或者赋值给新的变量 + + + +```go + +for i, word := range words { + +// ... + +go func(word string, i int) { + +// fmt.Println(word, i) + +}(word, i) + +} + + + +for i, word := range words { + +i, word := i, word + +go func() { + +// fmt.Println(word, i) + +}() + +} + +``` + + +> 由于这个错误实在太常见,从Go 1.22开始Go已经修正了这个经典的错误:Fixing For Loops in Go 1.22。 + +> 不过Go 1.22默认不会开启修正,需要设置环境变量`GOEXPERIMENT=loopvar`才会 开启 + +### 并发安全 + +简单理解:当多个goroutine对同一个内存区域进行读写时,就会产生并发安全的问题,它会导致程序运行的结果不符合预期 + +上面的示例把最终的结果放入了`results = make([]string, len(words))`中。虽然我们在goroutine中并发的对于`results`变量进行写入,但因为每一个goroutine都写在了独立的位置,且没有任何读取的操作,因此`results[i] = result`是并发安全的 + +```text + +results = [ xxxxxxxx, xxxxxxxx, xxxxxxxx, .... ] + +^ ^ ^ + +| | | + +goroutine1 goroutine2 goroutine3 + +``` + + +这也意味着如果使用`results = append(results, result)`的方式并发赋值,因为会涉及到slice的扩容等操作,所以并不是并发安全的,需要利用`sync.Mutex{}`进行加锁 + +如果想尽可能的提高程序的并发性能,推荐使用 `results[i] = result`这种方式赋值 + +### `sync.Once{}`单次赋值 + +示例`coSearch`中,会返回第一个出错的`search`的`error`。`err`是一个全局变量,在并发goroutine中赋值是并发不安全的操作 + +```go + +//...省略其他代码 + +go func(word string, i int) { + +defer wg.Done() + + + +result, e := search(ctx, word) + +if e != nil && err == nil { + +err = e + + + +return + +} + + + +results[i] = result + +}(word, i) + +//...省略其他代码 + +``` + +对于全局变量的赋值比较常规做法就是利用`sync.Mutex{}`进行加锁。但示例的逻辑为单次赋值,我们刚好可以利用同在`sync`库的`sync.Once{}`来简化代码 + +`sync.Once{}`功能如其名,将我们要执行的逻辑放到它的`Do()`方法中,无论多少并发都只会执行一次 + +```go + +//...省略其他代码 + +go func(word string, i int) { + +defer wg.Done() + + + +result, e := search(ctx, word) + +if e != nil { + +once.Do(func() { + +err = e + +}) + + + +return + +} + + + +results[i] = result + +}(word, i) + +//...省略其他代码 + +``` + + +### goroutine数量控制 + +`coSearch`入参的数组可能非常大,如果不加以控制可能导致我们的服务器资源耗尽,我们需要控制并发的数量 + +利用带缓冲channel可以实现 + +```go + +tokens := make(chan struct{}, 10) + + + +for i, word := range words { + +tokens <- struct{}{} // 新增 + +wg.Add(1) + + + +go func(word string, i int) { + +defer func() { + +wg.Done() + +<-tokens // 新增 + +}() + + + +result, e := search(ctx, word) + +if e != nil { + +once.Do(func() { + +err = e + +}) + + + +return + +} + + + +results[i] = result + +}(word, i) + +} + + + +wg.Wait() + +``` + + + +如上,代码中创建了10个缓冲区的channel,当channel被填满时,继续写入会被阻塞;当goroutine运行完成之后,除了原有的`wg.Done()`,我们需要从channel读取走一个数据,来允许新的goroutine运行 + + + +通过这种方式,我们控制了`coSearch`最多只能运行10个goroutine,当超过10个时需要等待前面运行的goroutine结束 + + + +### context.Context + + + +并发执行的goroutine只要有一个出错,其他goroutine就可以停止,没有必要继续执行下去了。如何把取消的事件传导到其他goroutine呢?`context.Context`就是用来传递类似上下文信息的结构 + + + +```go + +ctx, cancel := context.WithCancelCause(ctx) // 新增 + +defer cancel(nil) // 新增 + + + +for i, word := range words { + +tokens <- struct{}{} + +wg.Add(1) + + + +go func(word string, i int) { + +defer func() { + +wg.Done() + +<-tokens + +}() + + + +result, e := search(ctx, word) + +if e != nil { + +once.Do(func() { + +err = e + +cancel(e) // 新增 + +}) + + + +return + +} + + + +results[i] = result + +}(word, i) + +} + + + +wg.Wait() + +``` + + + +## **完整的代码** + + + +最终完成的效果如下 + + + +```go + +package main + + + +import ( + +"context" + +"errors" + +"fmt" + +"sync" + +) + + + +func search(ctx context.Context, word string) (string, error) { + +select { + +case <-ctx.Done(): + +return "", ctx.Err() + +default: + +if word == "Go" || word == "Java" { + +return "", errors.New("Go or Java") + +} + +return fmt.Sprintf("result: %s", word), nil // 模拟结果 + +} + +} + + + +func coSearch(ctx context.Context, words []string) ([]string, error) { + +ctx, cancel := context.WithCancelCause(ctx) + +defer cancel(nil) + + + +var ( + +wg = sync.WaitGroup{} + +once = sync.Once{} + + + +results = make([]string, len(words)) + +tokens = make(chan struct{}, 2) + + + +err error + +) + + + +for i, word := range words { + +tokens <- struct{}{} + +wg.Add(1) + + + +go func(word string, i int) { + +defer func() { + +wg.Done() + +<-tokens + +}() + + + +result, e := search(ctx, word) + +if e != nil { + +once.Do(func() { + +err = e + +cancel(e) + +}) + + + +return + +} + + + +results[i] = result + +}(word, i) + +} + + + +wg.Wait() + + + +return results, err + +} + +``` + + + +## **并发控制库errgroup** + + + +可以看到要实现一个较为完备的并发控制,需要做的工作非常多。不过Go官方团队为大家准备了 golang.org/x/sync/errgroup + + + +`errgroup`提供的能力和上文的示例类似,实现方式也类似,包含并发控制,错误传递,`context.Context`传递等 + + + +```go + +package main + + + +import ( + +"context" + +"fmt" + +"sync" + + + +"golang.org/x/sync/errgroup" + +) + + + +func coSearch(ctx context.Context, words []string) ([]string, error) { + +g, ctx := errgroup.WithContext(ctx) + +g.SetLimit(10) + +results := make([]string, len(words)) + + + +for i, word := range words { + +i, word := i, word + + + +g.Go(func() error { + +result, err := search(ctx, word) + +if err != nil { + +return err + +} + + + +results[i] = result + +return nil + +}) + +} + + + +err := g.Wait() + + + +return results, err + +} + +``` + + + +`errgroup`的用法也很简单 + +- 使用`g, ctx := errgroup.WithContext(ctx)`来创建goroutine的管理器 + +- `g.SetLimit()`可以设置允许的最大的goroutine数量 + +- 类似于`go`关键词,`g.Go`异步执行函数 + +- `g.Wait()`和`sync.WaitGroup{}`的`wg.Wait()`类似,会阻塞直到所有goroutine都运行完成,并返回其中一个goroutine的错误 \ No newline at end of file diff --git a/src/blogs/backend/rateLimiter.md b/src/blogs/backend/rateLimiter.md new file mode 100644 index 00000000..a4c53b7f --- /dev/null +++ b/src/blogs/backend/rateLimiter.md @@ -0,0 +1,334 @@ +--- +title: 限流组件实现 +author: Rain +tags: + - Golang + - 高并发 +categories: + - 后端 +date: 2024-02-07 23:01:07 +--- +## **一、服务流量限制的重要性** + +随着业务规模的增长,服务的流量也会激增,大流量可能会压垮服务器,导致服务瘫痪。因此需对服务的流量进行限制,确保在大流量的情况下也能正常运行。 + +当流量激增时,会占用大量服务器资源和带宽,可能会压垮整个系统。比如流量激增期间数据库连接用尽,会导致服务无法访问数据库而宕机。用限制流量可以有效防止流量暴增压垮系统。 + +没有限流时,流量激增期间会启动很多无用的任务占用服务器资源,造成不必要的浪费。适当限流可以排队或拒绝无效请求,有效控制资源消耗使用。 + +## **二、常见的限流算法** + +限流的基本思想是通过算法预先设置阈值,当流量达到阈值时自动限制,常见的限流算法有: + +1. 计数器算法 + +计数器算法根据时间窗口内的请求数进行限制,基本思路是设置一个计数器统计时间窗口内的请求数,当请求数达到限流阈值时,就拒绝服务或者排队。 + +**优点: **实现简单,资源消耗少。 + +**缺点: **无法处理突发流量,时间窗口比较难确定合适的值。 + +2. 漏桶算法 + +漏桶算法是限制请求通过的速率,基本思路是设置桶的容量和流出速率,如果请求流入速率过大会被桶阻止,根据固定速率流出,起到平滑调节速率的作用。 + +**优点: **可以很好地应对突发流量。 + +**缺点: **需要实时处理每一个请求,对系统资源消耗较大。 + +3. 令牌桶算法 + +令牌桶算法按照固定速率向桶中放入令牌,每次请求需要消耗一个令牌,只有拿到令牌的请求才允许通过。放令牌的速率可以通过调节,实现不同速率的请求限流。 + +**优点: **拥堵控制能力更强,可自定义速率。 + +**缺点: **需要实时处理请求,系统资源消耗依然比较大。 + +## **三、限流实战** + +1. 使用计数器限制总请求数 + +计数器算法主要是基于时间窗口和计数器来进行限制的。下面来看一个基于计数器进行限流的实现: + +```go +package limit + +import ( + "sync/atomic" + "time" +) + +// 计数器限流器 +type CounterLimiter struct { + count uint64 // 当前计数 + lastUpdate int64 // 上次更新的时间 + + limitPerSec uint64 // 每秒限制的请求数 +} + +// 创建计数器限流器 +func NewCounterLimiter(limitPerSec uint64) *CounterLimiter { + return &CounterLimiter{ + count: 0, + lastUpdate: time.Now().Unix(), + limitPerSec: limitPerSec, + } +} + +// 实现限流器接口的Allow() +func (c *CounterLimiter) Allow() bool { + now := time.Now().Unix() + elapse := now - c.lastUpdate + + c.lastUpdate = now + addedCount := elapse * c.limitPerSec + + c.count += uint64(addedCount) + if c.count > c.limitPerSec { + c.count = c.limitPerSec + } + + if c.count < c.limitPerSec { + c.count++ + return true + } + + return false +} +``` + +基于计数器的限流器 CounterLimiter,主要逻辑是 + +创建限流器时指定限流频率 limitPerSec + +Allow() 方法校验是否限流 + +- 计算当前时间和上次更新时间差 elapsed +- 根据时间差计算这段时间新增的限流额度 addedCount +- 计数器统计值增加 addedCount +- 判断计数器是否达到阈值 +- 到达阈值则限流,未到则计数 +1 并放行 + +只需要创建一个限流器,并在请求处理前调用 Allow() 方法判断是否限流即可。 + +```go +func httpHandler(w http.ResponseWriter, r *http.Request) { + + limiter := NewCounterLimiter(10) + + if !limiter.Allow() { + + http.Error(w, http.StatusText(429), + http.StatusTooManyRequests) + + return + } + + // 核心逻辑 +} +``` + +2. 使用漏桶算法限制请求通过速率 + +漏桶算法需要设置一个桶的容量 capacity,和漏出流量的速率 flow per second。如果请求流入速度过快,会被桶的容量限制而丢弃。 + +使用漏桶算法实现请求通过速率限流的示例: + +```go +type LeakyBucket struct { + capacity int64 // 桶容量 + used int64 // 当前已使用 + mu sync.Mutex + lastLeakTime time.Time // 上次漏水时间 + + flow int64 // 每秒流速 +} + +// 创建漏桶限流器 +func NewLeakyBucket(capacity, flow int64) *LeakyBucket { + return &LeakyBucket{ + capacity: capacity, + used: 0, + flow: flow, + } +} + +// 实现限流器接口的Allow() +func (l *LeakyBucket) Allow() bool { + l.mu.Lock() + defer l.mu.Unlock() + + now := time.Now() + l.leak(now) + + if l.used >= l.capacity { + return false + } + + l.used += 1 + return true +} + +// 漏水处理 +func (l *LeakyBucket) leak(now time.Time) { + + delta := now.Sub(l.lastLeakTime) + + leaked := delta.Seconds() * float64(l.flow) + + // 计算这段间隔内的漏水量 + leakedInt := int64(leaked) + if leakedInt > (l.used) { + // 漏出了全部水 + l.used = 0 + } else { + l.used -= leakedInt + } + + l.lastLeakTime = now +} +``` + +漏桶算法的实现主要分为两个部分: + +- A llow() 方法检查当前使用量是否达到桶容量,未到则请求数 +1 并返回 true,已到则返回 false 限流 +- leak() 方法按照固定流速进行漏水,对应速率进行的限流 + +可通过设置桶容量和流速,来限制请求通过系统的速率了。 + +3. 使用令牌桶算法释放固定数额的令牌 + +令牌桶算法的主要逻辑是按照一定速率往桶中放入令牌。 + +请求在处理前需要先获取令牌,如果没有可用令牌则丢弃该请求或进入队列等待。 + +下面是使用令牌桶算法实现的限流器: + +```go +package limit + +import ( + "sync" + "time" +) + +// 令牌桶算法限流器 +type TokenBucket struct { + capacity int64 // 桶容量 + rate int64 // 令牌放入速率 + tokens int64 // 当前令牌数 + lastUpdate int64 // 上次添加令牌的时间 + mu sync.Mutex +} + +// 创建令牌桶限流器 +func NewTokenBucket(capacity, rate int64) *TokenBucket { + return &TokenBucket{capacity: capacity, rate: rate, tokens: capacity, lastUpdate: time.Now().Unix()} +} + +// Allow 方法实现限流器接口 +func (t *TokenBucket) Allow() bool { + t.addTokens() + t.mu.Lock() + defer t.mu.Unlock() + if t.tokens <= 0 { + return false + } + t.tokens-- + return true +} + +// 按速率添加令牌 +func (t *TokenBucket) addTokens() { + now := time.Now().Unix() + elapse := now - t.lastUpdate + add := elapse * t.rate + t.lastUpdate = now + t.tokens += add + if t.tokens > t.capacity { + t.tokens = t.capacity + } +} +``` + +主要逻辑分为两部分: + +- Allow() 方法处理请求前获取令牌 +- addTokens() 方法按照速率往桶中添加令牌 + +调整桶容量和添加令牌的速率,来达到平滑限流的效果。 + +4. 封装限流中间件,便于业务复用 + +在上例中,实现了通用的限流器接口,包含创建限流器和 Allow() 校验方法。可以基于这个接口,进一步封装成限流中间件。 + +```go +package limit + +import "net/http" + +// 中间件实现 +func LimitMiddleware(handler http.Handler, limiter Limiter) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, + r *http.Request) { + + if !limiter.Allow() { + + http.Error(w, http.StatusText(429), + http.StatusTooManyRequests) + + return + } + + handler.ServeHTTP(w, r) + }) +} + +func main() { + // 使用方式 + limiter := NewTokenBucket(capacity, rate) + http.Handle("/", LimitMiddleware(myHandler, limiter)) +} +``` + +## **四、优化限流策略** + +很多时候业务访问量会有周期性波动或突发变化, 需要能够检测实时流量,动态调整限流参数。比如可以根据每分钟的请求量实时调整下一分钟的限流阈值。 + +根据业务需要可以设置自定义策略,比如对重要接口限流预留更多资源,对次要接口限流更严格等。允许更加细粒度地控制限流。 + +可以根据服务器负载、平均响应时间等指标,动态决定是否需要限流以保护系统。在流量大幅增长时自动跟进限制。 + +## **五、使用 Redis 实现分布式限流** + +之前的限流方式都是在单机上通过计数或时间实现,存在一定的不足。可使用 Redis 实现分布式限流。 + +Redis 的性能和扩展性优势: + +Redis 单机可以达到 10 万 + QPS 的性能, pipeline 批量操作可以进一步提升这一指标。此外 Redis 很容易通过主从复制和分片来进行扩展。正是得益于这些优势,才使得它非常适合实现分布式限流。 + +利用 Redis 的计数器和定时任务实现分布式限制 ,主要的思想是: + +- 对每个唯一请求路径维护一个计数器 +- 每次请求计数器 +1 +- 当计数器达到阈值则返回限流 +- 通过定时任务定期重置计数器计数 + +客户端请求时通过 Lua 脚本 canRequest.lua 来进行判断: + +```lua +-- canRequest.lua 限流判断脚本 +local key = KEYS[1] +local limit = tonumber(ARGV[1]) +local current = tonumber(redis.call('get', key) or "0") + +if current + 1 > limit then + return 0 +else + redis.call("INCRBY", key, 1) + redis.call("expire", key, 1) + return 1 +end +``` + +使用一个唯一键 key 存储计数器,并设置 key 的过期时间,比如 1 秒。根据 key 的当前计数是否达到阈值来拒绝请求。可轻松通过 Redis 实现分布式限流,并可以横向扩展提高效率。 \ No newline at end of file