一、限流算法 🚀 1.0 总结可作为面试时的亮点写在简历上
计数器固定窗口算法实现简单,容易理解。和漏斗算法相比,新来的请求也能够被马上处理到。但是流量曲线可能不够平滑,有“突刺现象”,在窗口切换时可能会产生两倍于阈值流量的请求;
计数器滑动窗口算法作为计数器固定窗口算法的一种改进,有效解决了窗口切换时可能会产生两倍于阈值流量请求的问题,但计算机很难处理滑动的行为,只能通过轮询的方式模拟(redis的过期时间本质上也是轮询),会占用CPU资源/内存浪费;
漏斗算法能够对流量起到整流的作用,让随机不稳定的流量以固定的速率流出,但是不能解决流量突发的问题;
令牌桶算法作为漏斗算法的一种改进,除了能够起到平滑流量的作用,还允许一定程度的流量突发。但可能需要初始化;
1.1 计数器以上四种限流算法都有自身的特点,具体使用时还是要结合自身的场景进行选取。
令牌桶算法一般用于保护自身的系统,对调用者进行限流,保护自身的系统不被突发的流量打垮。如果自身的系统实际的处理能力强于配置的流量限制时,可以允许一定程度的流量突发,使得实际的处理速率高于配置的速率,充分利用系统资源。
漏斗算法一般用于保护第三方的系统,比如自身的系统需要调用第三方的接口,为了保护第三方的系统不被自身的调用打垮,便可以通过漏斗算法进行限流,保证自身的流量平稳的打到第三方的接口上。
维护一个计数器,开始处理请求时计数器+1,请求处理完毕后计数器-1
优点:实现简单,Java中原子类Atomic即可实现,分布式场景采用Redis incr
缺点:不能应对突发流量
计数器+定时更新
缺点:固定窗口临界问题,假设接口每秒的限流为100,但在0.55s~1.05s的0.5s中涌入200个请求可能会导致限流为100/s的系统崩溃
记录时间窗口中每个请求到来的时间,保证任意时间窗口中请求数量均不超过系统限制
缺点:无法解决短时间内集中流量的冲击
boolean limiter() {
long now = currentTimeMillis(); // 获取当前时间
long counter = getCounterInTimeWindow(now) // 根据当前时间获取时间窗口内的计数
if (counter < threshold) { // 小于阈值
addToTimeWindow(now); // 记录当前时间
return true;
}
return false;
}
1.4 Leaky Bucket 漏桶
使用队列保存全部request,新的request放入队列尾部,队列满时将其丢弃
优点
可以平滑处理突发流量容易实现队列/缓冲区大小恒定,内存使用率高缺点
请求延迟出现突发流量,队列被old request占满时,new request会被饿死 1.5 令牌桶定速的往桶内放入令牌,令牌数量超过桶的限制,丢弃。请求来了先向桶内索要令牌,索要成功则通过被处理,反之拒绝
缺点:存在突刺问题,导致服务器空闲状态下,存在大量资源浪费
rate/limiter.go中限流器的实现
令牌桶只要桶中还有 Token,请求就还可以一直进行。当突发量激增到一定程度,则才会按照预定速率进行消费。
2.1 内部结构 实现令牌桶的常规思路:单独维护一个Timer和BlockingQueue,从而实现每隔一段时间向队列中添加令牌、取出令牌;Go中的实现:通过计数的方式表示桶中剩余的令牌,采用lazyload的思想,每次取token之前会先根据上次更新令牌数的时间差更新桶中Token数量;type Limiter struct {
mu sync.Mutex
limit Limit // 控制产生令牌的速度 float64的别名
burst int // 令牌桶的最大容量
tokens float64 // 令牌数量
last time.Time // 上次更新令牌桶的时间
lastEvent time.Time // 上次发生限速器事件的时间
}
2.2 构造方法
// r:每秒更新token的数量
// b:令牌桶的最大容量
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}
创建Limiter对象的方式有两种:
// 1. 指定每秒产生token数量
limiter1 := rate.NewLimiter(10, 1)
// 2. 指定放置token的时间间隔:每100ms 产生一个token
limiter2 := rate.NewLimiter(rate.Every(100 * time.Millisecond), 1);
2.3 常用方法
Reserve
:根据限流器设定速率进行处理速度的限制,并不丢弃requestWait
:可以设置延迟的 deadline,超过延迟时丢弃 requestAllow
:直接丢弃超过限制速率的 request
Allow/AllowN
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
Allow 实际上就是 AllowN(time.Now(),1)
。
AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之返回不消费 Token,false,不进行等待。
通常对应这样的线上场景,如果请求速率过快,就直接丢弃某些请求
Wait/WaitNfunc (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
Wait 实际上就是 WaitN(ctx,1)
。
当使用 Wait 方法消费 Token 时,如果此时桶内 Token 数组不足 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 Token 满足条件。如果充足则直接返回。
Wait 方法有一个 context 参数。我们可以设置 context 的 Deadline 或者 Timeout,来决定此次 Wait 的最长时间。
Reserve/ReserveNfunc (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
Reserve 相当于 ReserveN(time.Now(), 1)
ReserveN 调用完成后返回 Reservation * 对象
该对象中 Delay()
方法返回需要等待的时间,若等待时间为0,则不需等待,否则需要到达等待时间后才能继续后续action; Cancel()
取消等待,将token归还;
使用实例:
r := lim.ReserveN(time.Now(), 1)
if !r.OK() {
// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
return
}
time.Sleep(r.Delay())
// r.Cancel() // 取消等待
Act()
2.4 实现原理
Wait、Allow、Reserve均通过调用 reserveN() 方法实现,reserveN()源码如下:
type Reservation struct {
ok bool // 能否满足请求数量
lim *Limiter
tokens int // 桶中剩余的令牌数量
timeToAct time.Time // 满足令牌数量/结束等待的时间
limit Limit
}
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// 计算上次请求到本次请求之间应当放入的令牌数量
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result 判断当前需要的token是否超过limiter的最大容量、等待时长是否超过限制
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Calculate the new number of tokens, due to time that passed. 根据时间直接计算能产生的令牌数量
elapsed := now.Sub(last)
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
总结:整个过程利用了 Token 数量与时间之间的相互转化关系
计算从上次取 Token 的时间到当前时刻,期间一共新产生了多少 Token:我们只在取 Token 之前生成新的 Token,也就意味着每次取 Token 的间隔,实际上也是生成 Token 的时间间隔。我们可以利用 tokensFromDuration
, 轻易的算出这段时间一共产生 Token 的数目。
那么,当前 Token 数目 = 新产生的 Token 数目 + 之前剩余的 Token 数目 - 要消费的 Token 数目。
如果消费后剩余 Token 数目大于零,说明此时 Token 桶内仍不为空,此时 Token 充足,无需调用侧等待。
如果 Token 数目小于零,则需等待一段时间。
那么这个时候,我们可以利用 durationFromTokens
将当前负值的 Token 数转化为需要等待的时间。
将需要等待的时间等相关结果返回给调用方。
2.5 动态更新速率Limiter 支持可以调整速率和桶大小:
SetLimit(Limit) 改变放入 Token 的速率SetBurst(int) 改变 Token 桶大小可以通过当前系统环境如:机器数量、机器能够承载的最大QPS,进行定时调整
2.6 float精度问题在将时间间隔转换为对应产生的token数量时采用如下方法:
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
return d.Seconds() * float64(limit)
}
由于 d.Seconds() 为float类型,因此上述方法出现两个float64类型相乘的情况,可能出现精度损失,详见该issue
修改方法:
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}
分别求出token数量的证书部分和小数部分,在进行相加,可得到精确的数值。
三、分布式限流器实现思路
借助 redis 做统一的 token 管理,每次访问都到 redis 处获取并新增
timestamp+业务信息
每次处理请求前 对相应的redis key进行 incr,当到达限流值时对流量进行drop
3.2 高QPS
在本地内存中缓存一部分redis数据,防止redis qps过高(例如:每次从redis中获取100个token,当token消耗完成在到redis中更新,超过阈值就进行限流)使用 go 协程定时进行数据更新
四、Sentinel流控
TODO
原理解析
官网流控文档
参考资料:
限流算法
Design a Scalable Rate Limiting Algorithm — System Design
限流算法总结
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)