简介
如果一般流量过大,下游系统反应不过来,这个时候就需要限流了,其实和上地铁是一样的,就是减慢上游访问下游的速度。
限制访问服务的频次或者频率,防止服务过载,被刷爆等。
Golang 官方扩展包 time(golang.org/x/time/rate) 中,提供了一个基于令牌桶等限流器实现。
原理概述
- 令牌:每次拿到令牌,才可访问
- 桶 ,桶的最大容量是固定的,以固定的频率向桶内增加令牌,直至加满
- 每个请求消耗一个令牌。
- 限流器初始化的时候,令牌桶一般是满的。
具体使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package limiter import ( "fmt" "testing" "time" "golang.org/x/time/rate" ) func TestLimter(t *testing.T) { limiter := rate.NewLimiter(rate.Every(time.Millisecond*31), 2) //time.Sleep(time.Second) for i := 0; i < 10; i++ { var ok bool if limiter.Allow() { ok = true } time.Sleep(time.Millisecond * 20) fmt.Println(ok, limiter.Burst()) } } |
执行结果:
=== RUN TestLimter
true 2
true 2
true 2
false 2
true 2
true 2
false 2
true 2
true 2
false 2
--- PASS: TestLimter (0.21s)
通过执行结果可以看到, 令牌桶开始是2个满的,由于令牌的间隔比请求的间隔多了11ms(31-20), 所以每两个请求会失败一次。
具体实现原理
先看下限流器的创建方法: NewLimiter
1
2
3
4
5
6
|
func NewLimiter(r Limit, b int) *Limiter { return &Limiter{ limit: r, burst: b, } } |
查看限流器数据结构 Limiter
1
2
3
4
5
6
7
8
9
10
11
|
// The methods AllowN, ReserveN, and WaitN consume n tokens. type Limiter struct { mu sync.Mutex limit Limit burst int tokens float64 // last is the last time the limiter's tokens field was updated last time.Time // lastEvent is the latest time of a rate-limited event (past or future) lastEvent time.Time } |
- burst 表示了桶的大小
- limit 表示放入桶的频率
- tokens 表示剩余令牌个数
- last 最近取走 token 的时间
- lastEvent 最近限流事件的时间
当令牌桶发放后,会保留在 Reservation 对象中, 定义如下, Reservation 对象,描述了一个达到 timeToAct 时间后,可以获取到的令牌的数量 tokens 数。
1
2
3
4
5
6
7
|
type Reservation struct { ok bool // 是否满足条件分配到了tokens lim *Limiter // 发送令牌的限流器 tokens int // tokens 的数量 timeToAct time.Time // 满足令牌发放的时间 limit Limit // 令牌发放速度 } |
限流器如何限流
官方提供的限流器有阻塞等待, 也有直接判断方式的, 还有提供维护预留式等。
如何实现限流的代码,在 reserveN 中。
使用时,每次都调用了 Allow() 方法
1
2
3
4
5
6
7
8
9
10
11
|
// Allow is shorthand for AllowN(time.Now(), 1). func (lim *Limiter) Allow() bool { return lim.AllowN(time.Now(), 1) } // AllowN reports whether n events may happen at time now. // Use this method if you intend to drop / skip events that exceed the rate limit. // Otherwise use Reserve or Wait. func (lim *Limiter) AllowN(now time.Time, n int) bool { return lim.reserveN(now, n, 0).ok } |
继续查看 reserverN 算法
方法说明:
- 三个参数: now, n, maxFutureReserve
- 在 now 时间需要拿到 n 个令牌,最多等待的时间为 maxFutureReserve
- 结果将返回一个预留令牌的对象 Reservation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
// maxFutureReserve specifies the maximum reservation wait duration allowed. // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() // 首先判断是否放入频次是否为无穷大,如果为无穷大,说明暂时不限流 if lim.limit == Inf { lim.mu.Unlock() return Reservation{ ok: true, lim: lim, tokens: n, timeToAct: now, } } // 拿到截止 now 时间时,可以获取的令牌 tokens 数量,上一次拿走令牌的时间是last now, last, tokens := lim.advance(now) // Calculate the remaining number of tokens resulting from the request. // 更新 tokens数量,把需要拿走的去掉 tokens -= float64(n) // Calculate the wait duration // 如果 tokens 数量为负数,说明需要等待,计算等待时间 WaitDuration var waitDuration time.Duration if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) } // Decide result // 计算是否满足分配要求 // 1. 需要分配的大小不超过桶容量 // 2. 等待时间不超过设定的等待时长 ok := n <= lim.burst && waitDuration <= maxFutureReserve // Prepare reservation // 最后构造一个 Resvervation 对象 r := Reservation{ ok: ok, lim: lim, limit: lim.limit, } if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) } // Update state // 需要更新当前 limit 的值 if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { lim.last = last } lim.mu.Unlock() return r } |
从实现上看, limiter 并不是每隔一段时间更新当前桶的数量,而是记录了上次访问时和当前桶中令牌的数量,当再次访问时,通过上次访问时间计算出当前令牌的数量,决定是否可以发放令牌。
到此这篇关于Go 基于令牌桶的限流器实现的文章就介绍到这了,更多相关Go 令牌桶限流器内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://juejin.cn/post/7025792604889415716