Go | 限流器实现

Go | 限流器实现,第1张

可作为面试时的亮点写在简历上

一、限流算法 🚀 1.0 总结

计数器固定窗口算法实现简单,容易理解。和漏斗算法相比,新来的请求也能够被马上处理到。但是流量曲线可能不够平滑,有“突刺现象”,在窗口切换时可能会产生两倍于阈值流量的请求;

计数器滑动窗口算法作为计数器固定窗口算法的一种改进,有效解决了窗口切换时可能会产生两倍于阈值流量请求的问题,但计算机很难处理滑动的行为,只能通过轮询的方式模拟(redis的过期时间本质上也是轮询),会占用CPU资源/内存浪费;

漏斗算法能够对流量起到整流的作用,让随机不稳定的流量以固定的速率流出,但是不能解决流量突发的问题;

令牌桶算法作为漏斗算法的一种改进,除了能够起到平滑流量的作用,还允许一定程度的流量突发。但可能需要初始化;

以上四种限流算法都有自身的特点,具体使用时还是要结合自身的场景进行选取。
令牌桶算法一般用于保护自身的系统,对调用者进行限流,保护自身的系统不被突发的流量打垮。如果自身的系统实际的处理能力强于配置的流量限制时,可以允许一定程度的流量突发,使得实际的处理速率高于配置的速率,充分利用系统资源。
漏斗算法一般用于保护第三方的系统,比如自身的系统需要调用第三方的接口,为了保护第三方的系统不被自身的调用打垮,便可以通过漏斗算法进行限流,保证自身的流量平稳的打到第三方的接口上。

1.1 计数器

维护一个计数器,开始处理请求时计数器+1,请求处理完毕后计数器-1

优点:实现简单,Java中原子类Atomic即可实现,分布式场景采用Redis incr
缺点:不能应对突发流量

1.2 固定窗口

计数器+定时更新

缺点:固定窗口临界问题,假设接口每秒的限流为100,但在0.55s~1.05s的0.5s中涌入200个请求可能会导致限流为100/s的系统崩溃

1.3 滑动窗口

记录时间窗口中每个请求到来的时间,保证任意时间窗口中请求数量均不超过系统限制

缺点:无法解决短时间内集中流量的冲击

boolean limiter() {
	long now = currentTimeMillis(); // 获取当前时间
	long counter = getCounterInTimeWindow(now)	// 根据当前时间获取时间窗口内的计数
	if (counter < threshold) { 		// 小于阈值
		addToTimeWindow(now);		// 记录当前时间
		return true;
	}
	return false;
}
1.4 Leaky Bucket 漏桶

使用队列保存全部request,新的request放入队列尾部,队列满时将其丢弃

优点

可以平滑处理突发流量容易实现队列/缓冲区大小恒定,内存使用率高

缺点

请求延迟出现突发流量,队列被old request占满时,new request会被饿死 1.5 令牌桶

定速的往桶内放入令牌,令牌数量超过桶的限制,丢弃。请求来了先向桶内索要令牌,索要成功则通过被处理,反之拒绝

缺点:存在突刺问题,导致服务器空闲状态下,存在大量资源浪费

二、单机限流器

rate/limiter.go中限流器的实现

令牌桶只要桶中还有 Token,请求就还可以一直进行。当突发量激增到一定程度,则才会按照预定速率进行消费。

2.1 内部结构 实现令牌桶的常规思路:单独维护一个Timer和BlockingQueue,从而实现每隔一段时间向队列中添加令牌、取出令牌;Go中的实现:通过计数的方式表示桶中剩余的令牌,采用lazyload的思想,每次取token之前会先根据上次更新令牌数的时间差更新桶中Token数量;
type Limiter struct {
	mu     sync.Mutex
	limit  Limit	// 控制产生令牌的速度 float64的别名
	burst  int		// 令牌桶的最大容量
	tokens float64	// 令牌数量
	last time.Time	// 上次更新令牌桶的时间
	lastEvent time.Time	// 上次发生限速器事件的时间
}
2.2 构造方法
// r:每秒更新token的数量
// b:令牌桶的最大容量
func NewLimiter(r Limit, b int) *Limiter {
	return &Limiter{
		limit: r,
		burst: b,
	}
}

创建Limiter对象的方式有两种:

// 1. 指定每秒产生token数量
limiter1 := rate.NewLimiter(10, 1)
// 2. 指定放置token的时间间隔:每100ms 产生一个token
limiter2 := rate.NewLimiter(rate.Every(100 * time.Millisecond), 1);
2.3 常用方法 Reserve:根据限流器设定速率进行处理速度的限制,并不丢弃requestWait :可以设置延迟的 deadline,超过延迟时丢弃 requestAllow :直接丢弃超过限制速率的 request Allow/AllowN
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool

Allow 实际上就是 AllowN(time.Now(),1)

AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之返回不消费 Token,false,不进行等待。

通常对应这样的线上场景,如果请求速率过快,就直接丢弃某些请求

Wait/WaitN
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

Wait 实际上就是 WaitN(ctx,1)

当使用 Wait 方法消费 Token 时,如果此时桶内 Token 数组不足 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 Token 满足条件。如果充足则直接返回。

Wait 方法有一个 context 参数。我们可以设置 context 的 Deadline 或者 Timeout,来决定此次 Wait 的最长时间。

Reserve/ReserveN
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

Reserve 相当于 ReserveN(time.Now(), 1)

ReserveN 调用完成后返回 Reservation * 对象

该对象中 Delay() 方法返回需要等待的时间,若等待时间为0,则不需等待,否则需要到达等待时间后才能继续后续action; Cancel() 取消等待,将token归还;

使用实例:

	r := lim.ReserveN(time.Now(), 1)
	if !r.OK() {
      // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
	  return
	}
	time.Sleep(r.Delay())
	// r.Cancel()  // 取消等待
	Act()
2.4 实现原理

Wait、Allow、Reserve均通过调用 reserveN() 方法实现,reserveN()源码如下:

type Reservation struct {
	ok        bool		// 能否满足请求数量
	lim       *Limiter
	tokens    int		// 桶中剩余的令牌数量
	timeToAct time.Time	// 满足令牌数量/结束等待的时间
	limit Limit
}

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
	// 计算上次请求到本次请求之间应当放入的令牌数量
	now, last, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result 判断当前需要的token是否超过limiter的最大容量、等待时长是否超过限制
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	last := lim.last
	if now.Before(last) {
		last = now
	}

	// Calculate the new number of tokens, due to time that passed. 根据时间直接计算能产生的令牌数量
	elapsed := now.Sub(last)
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	return now, last, tokens
}

总结:整个过程利用了 Token 数量与时间之间的相互转化关系

计算从上次取 Token 的时间到当前时刻,期间一共新产生了多少 Token:我们只在取 Token 之前生成新的 Token,也就意味着每次取 Token 的间隔,实际上也是生成 Token 的时间间隔。我们可以利用 tokensFromDuration, 轻易的算出这段时间一共产生 Token 的数目。
那么,当前 Token 数目 = 新产生的 Token 数目 + 之前剩余的 Token 数目 - 要消费的 Token 数目。

如果消费后剩余 Token 数目大于零,说明此时 Token 桶内仍不为空,此时 Token 充足,无需调用侧等待。
如果 Token 数目小于零,则需等待一段时间。
那么这个时候,我们可以利用 durationFromTokens 将当前负值的 Token 数转化为需要等待的时间。

将需要等待的时间等相关结果返回给调用方。

2.5 动态更新速率

Limiter 支持可以调整速率和桶大小:

SetLimit(Limit) 改变放入 Token 的速率SetBurst(int) 改变 Token 桶大小

可以通过当前系统环境如:机器数量、机器能够承载的最大QPS,进行定时调整

2.6 float精度问题

在将时间间隔转换为对应产生的token数量时采用如下方法:

func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	return d.Seconds() * float64(limit)
}

由于 d.Seconds() 为float类型,因此上述方法出现两个float64类型相乘的情况,可能出现精度损失,详见该issue

修改方法:

func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	sec := float64(d/time.Second) * float64(limit)
	nsec := float64(d%time.Second) * float64(limit)
	return sec + nsec/1e9
}

分别求出token数量的证书部分和小数部分,在进行相加,可得到精确的数值。

三、分布式限流器

实现思路
借助 redis 做统一的 token 管理,每次访问都到 redis 处获取并新增

3.1 较低QPS 设置每个时间段一个 redis key,例如限制每秒1000个请求,则redis key为 timestamp+业务信息每次处理请求前 对相应的redis key进行 incr,当到达限流值时对流量进行drop 3.2 高QPS 在本地内存中缓存一部分redis数据,防止redis qps过高(例如:每次从redis中获取100个token,当token消耗完成在到redis中更新,超过阈值就进行限流)使用 go 协程定时进行数据更新 四、Sentinel流控

TODO
原理解析
官网流控文档

参考资料:
限流算法
Design a Scalable Rate Limiting Algorithm — System Design

限流算法总结

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/993993.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存