java中的限流

java中的限流,第1张

文章目录
  • 限流
    • 参考地址
    • 前言
    • 一、为什么要限流
      • 在限流中有两个概念需要了解
    • 二、限流算法实现分类
      • 限流的分类:
      • 6种具体的实现限流的手段:
      • 限流按照规模来分类:
      • Tomcat配置maxThreads限流
      • Nginx 限流
        • **控制速率**
        • **控制并发数**
    • 三、固定窗口计数法
      • 固定窗口计数法的思路是:
      • 特点
      • 突刺现象的具体表现
      • redis的lua脚本代码(适用于分布式系统)
    • 四、滑动窗口计数法
      • **引子**
      • **算法原理**
      • **特点**
      • **redis的lua脚本代码(适用于分布式系统)**
        • 使用List(列表)来实现
        • 使用SortedSet(有序集合)来实现
      • **多个阈值升级版**
        • 请求数的多个阈值
        • 请求流量的多个阈值
    • 五、漏桶算法(Leaky Bucket)
      • **算法原理**

限流 参考地址

redis中文命令网站:http://doc.redisfans.com/

redis官方incr介绍:https://redis.io/commands/incr/

redis的模块 redis-cell 参考地址:https://github.com/brandur/redis-cell

redis的简单限流:https://redis.com/redis-best-practices/basic-rate-limiting/

文档参考内容:https://www.cnblogs.com/shuai666/p/15693545.html

文档参考内容:https://blog.csdn.net/wangxy_job/article/details/106313398

文档参考内容:https://blog.csdn.net/legend050709/article/details/114917637

文档参考内容:https://www.cnblogs.com/shuai666/p/15693545.html

文档参考内容:https://blog.csdn.net/yangzsirr/article/details/119897405

文档参考内容:https://blog.csdn.net/qq_39954022/article/details/77866166

文档参考内容:https://www.csdn.net/tags/NtzaUgwsNjQ3NTgtYmxvZwO0O0OO0O0O.html

文档参考内容:https://blog.csdn.net/u013735734/article/details/123494680

文档参考内容:https://www.cnblogs.com/shoshana-kong/p/14759604.html

文档参考内容:https://blog.csdn.net/Dongguabai/article/details/114001220

文档参考内容:https://blog.csdn.net/u013735734/article/details/123494680

前言

最后换工作,新工作的第一个任务就是要做网关的限流工作。

其实之前也了解了一些限流的框架,例如:spring-cloud-gateway(spring-cloud-gateway-server.jar包中提供的限流)、 sentinel 和 Guava提供的限流工具类RateLimiter等,但也仅仅的局限于此了

趁着这个机会好好的啃一下这个骨头,毕竟知识在于积累嘛。

一、为什么要限流

众所周知,网关作为入口,不仅控制这流量的进入,其实还可以做很多的事情,比如,限流。

设想一下:当我们的系统被频繁的请求的时候,如果不做任何的限制,就有可能在一段时间涌入大量的请求从而将系统压垮。

而微服务之后,每个服务都是独立的,所以都有被压垮的危险,但是如果有了网关,那么就可以在网关系统做限流,因为所有的请求都需要先通过网关系统才能路由到微服务中,所以就能保护网关后的各个微服务。

限流可以认为是服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。

一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。

比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

在限流中有两个概念需要了解
  • 阈值:在一个单位时间内允许的请求量。如 QPS 限制为10,说明 1 秒内最多接受 10 次请求。
  • 拒绝策略:超过阈值的请求的拒绝策略,常见的拒绝策略有直接拒绝、排队等待等。
二、限流算法实现分类 限流的分类:

1)合法性验证限流:比如验证码、IP 黑名单等,这些手段可以有效的防止恶意攻击和爬虫采集;

2)容器限流:比如 Tomcat、Nginx 等限流手段,其中 Tomcat 可以设置最大线程数(maxThreads),当并发超过最大线程数会排队等待执行;而Nginx提供了两种限流手段:一是控制速率,二是控制并发连接数;

3)服务端限流:比如我们在服务器端通过限流算法实现限流。

6种具体的实现限流的手段:

1)Tomcat 使用 maxThreads 来实现限流。

2)Nginx 的 limit_req_zone 和 burst 来实现速率限流。

3)Nginx 的 limit_conn_zone 和 limit_conn 两个指令控制并发连接的总数。

4)时间窗口算法借助 Redis 的有序集合可以实现。

5)漏桶算法可以使用 Redis-Cell 来实现。

6)令牌算法可以解决Google的guava包来实现。

限流按照规模来分类:

1)单节点限流:限流的方案仅适用于单节点规模,再大规模集群下不适用

2)分布式系统:适用于大规模集群限流,当然单节点也支持

需要注意的是借助Redis实现的限流方案可用于分布式系统,而guava实现的限流只能应用于单机环境。

如果你觉得服务器端限流麻烦,可以在不改任何代码的情况下直接使用容器限流(Nginx或Tomcat),但前提是能满足项目中的业务需求。

Tomcat配置maxThreads限流

Tomcat 8.5 版本的最大线程数在 conf/server.xml 配置中,如下所示:

    <Connector port="8080" protocol="HTTP/1.1" URIEncoding="UTF-8"
               connectionTimeout="20000"
               maxThreads="150"
               redirectPort="8443" />

其中 maxThreads 就是 Tomcat 的最大线程数,当请求的并发大于此值(maxThreads)时,请求就会排队执行,这样就完成了限流的目的。

注意:

maxThreads 的值可以适当的调大一些,Tomcat默认为 150(Tomcat 版本 8.5),但这个值也不是越大越好,要看具体的服务器配置,需要注意的是每开启一个线程需要耗用 1MB 的 JVM 内存空间用于作为线程栈之用,并且线程越多 GC 的负担也越重。

最后需要注意一下, *** 作系统对于进程中的线程数有一定的限制,Windows 每个进程中的线程数不允许超过 2000,Linux 每个进程中的线程数不允许超过 1000。

Nginx 限流

Nginx 提供了两种限流方法:一种是控制速率,另一种是控制并发连接数。

控制速率

nginx可以使用 ngx_http_limit_req_module 模块的 limit_req_zone 指令进行限流访问,防止用户恶意攻击刷爆服务器。

ngx_http_limit_req_module 模块是nginx默认安装的,所以直接配置即可。

我们可以使用 limit_req_zone 用来限制单位时间内的请求数,即速率限制。

第一步:在nginx.conf文件中的http模块下配置:

limit_req_zone $binary_remote_addr zone=commonper:10m rate=10r/s;

配置参数:

  • $binary_remote_addr :表示通过 remote_addr 这个标识来做限制,“binary_”的目的是缩写内存占用量,是限制同一客户端ip地址,可以理解为(客户端IP)的二进制格式,固定占用4个字节。
  • zone=commonper:10m:表示生成一个大小为10M,名字为 commonper 的内存区域,用来存储访问的频次信息
  • rate=10r/s:表示允许相同标识的客户端的访问频次,这里限制的是每秒10 次,即每秒只处理十个请求,还可以有比如****30r/m的,即限制每2秒访问一次,即每2秒才处理一个请求****。

第二步:在http模块的子模块server下面配置

location ~^(.+\.php)(.*)$ {
   	limit_req zone=commonper burst=5 nodelay;
	...
}

配置参数:

  • zone=commonper :设置使用哪个配置区域来做限制,与上面 limit_req_zone 里的name对应
  • burst=5:重点说明一下这个配置,burst爆发的意思,这个配置的意思是设置一个大小为5的缓冲区当有大量请求(爆发)过来时,超过了 访问频次 限制的 请求可以先放到这个缓冲区内等待,但是这个等待区里的位置只有5个,超过的请求会直接报503的错误然后返回。如果不配置这个参数的话则会精确的控制总访问次数,但是在生产环境未免太苛刻了,实际情况下我们应该控制一个IP单位总时间内的总访问次数,而不是那样精确到毫秒,所以我们可以使用 burst 关键字开启设置
  • nodelay:字面的意思是不延迟,具体说是对用户发起的请求不做延迟处理,而是立即处理。
    • 如果设置,会在瞬时提供处理(burst + rate)个请求的能力,请求超过(burst + rate)的时候就会直接返回503,永远不存在请求需要等待的情况。(这里的rate的单位是:r/s)
    • 如果没有设置,则所有请求会依次等待排队

举例说明:

某一时刻有两个请求同时到达nginx,其中一个被处理,另一个放到了缓冲队列里。

虽然配置了nodelay导致第二个请求也被瞬间处理了,但还是占用了缓冲队列的一个长度,如果下一秒没有请求过来,这个占用burst一个长度的空间就会被释放,否则就只能继续占用着burst的空间,直到burst空间占用超过5之后,再来请求就会直接被nginx拒绝,返回503错误码。

可见,如果第二秒又来了两个请求,其中一个请求又占用了一个burst空间,第三秒、第四秒直到第五秒,每秒都有两个请求过来,虽然两个请求都被处理了(因为配置了nodelay),但其中一个请求仍然占用了一个burst长度,五秒后整个burst长度=5都被占用了。第六秒再过来两个请求,其中一个请求就被拒绝了

控制并发数

利用 limit_conn_zone 和 limit_conn 两个指令即可控制并发数,示例配置如下:

第一步:在nginx.conf文件中的http模块下配置:

limit_conn_zone $binary_remote_addr zone=conn_zone:1m;

配置参数:

  • $binary_remote_addr :表示通过 remote_addr 这个标识来做限制,“binary_”的目的是缩写内存占用量,是限制同一客户端ip地址,可以理解为(客户端IP)的二进制格式,固定占用4个字节。

  • zone=conn_zone:1m:表示生成一个大小为10M,名字为 conn_zone 的内存区域,用来存储访问的频次信息

第二步:在http模块的子模块server下面配置

location / {
    limit_conn conn_zone 1;
    ...
}

配置参数:

  • limit_conn conn_zone 1:设置使用哪个配置区域来做限制,与上面 limit_conn_zone 里的name对应,1 表示对单个IP限制同时存在一个连接,只有当 request header 被后端处理后,这个连接才进行计数。

上面的都是别人家开发好的,作为爱 “扒别人内裤”的程序猿,坑定要从代码层次来介绍一下 *** 作了。

三、固定窗口计数法 固定窗口计数法的思路是:
  1. 将时间划分为固定的窗口大小,例如1s
  2. 在窗口时间段内,每来一个请求,对计数器加1。
  3. 当计数器达到设定限制后,该窗口时间内的之后的请求都被丢弃处理。
  4. 该窗口时间结束后,计数器清零,从新开始计数。如上图所示,10s内限制1000个请求,在第11s的时候计数器会从0重新开始计数。
特点

优点:实现简单,并且内存占用小,我们只需要存储时间窗口中的计数即可;

缺点:流量曲线可能不够平滑,有“突刺现象”(2N)

突刺现象的示意图:

突刺现象的具体表现
  • 一段时间内(不超过时间窗口)系统服务不可用。
    • 例如:窗口大小为1s,限流大小为100,然后恰好在某个窗口的第1ms来了100个请求,然后第2ms-999ms的请求就都会被拒绝,这段时间用户会感觉系统服务不可用。
  • 瞬时流量的临界问题:窗口切换时可能会产生两倍于阈值流量的请求
    • 例如:窗口大小为1s,限流大小为100,然后恰好在某个窗口的第999ms来了100个请求,窗口前期没有请求,所以这100个请求都会通过。再恰好,下一个窗口的第1ms有来了100个请求,也全部通过了,那也就是在2ms之内通过了200个请求,而我们设定的阈值是100,通过的请求达到了阈值的两倍。
redis的lua脚本代码(适用于分布式系统)

思路:

  • 将开始时间存储到一个key中,每到一个新的窗口是重置一下这个时间(开始时间)
  • 获取“当前时间戳” - “窗口时间” 判断是否在开始时间内,如果在的话将当前存储的“请求值 + 1”,并设置有效期;如果不在则说明是一个新的窗口时间,设置当前请求值为:1,并设置缓存时间
  • 在存储上可以选择 String 存储结构,因为该存储结构中有 INCR 命令可以很方便的进行 + 1 *** 作;但是由于需要 *** 作多个 key,在集群环境下可能会存储在不同的节点,导致异常。
  • 或者在存储上可以选择 Hash 或者 SortedSet 存储结构,因为该存储结构中有 HINCRBYZINCRBY 命令可以很方便的进行 + 1 *** 作 ,因为有 fieldmember 字段,所以不会有存储在不同节点的异常。
  • 从简单程度上来说,最后决定选择使用 Hash 存储结构;存储 开始时间的 key 为:LAST_REQ_TIMES;而其他窗口内存储的请求数的key为:“开始时间_windows数”例如:“1650766902_60”

Hash存储结构的命令介绍:

  • HGET:返回哈希表 key 中给定域 field 的值。
  • HMSET:同时将多个 field-value (域-值)对设置到哈希表 key 中。
  • HINCRBY:为哈希表 key 中的域 field 的值加上增量 increment
  • HSETNX :将哈希表 key 中的域 field 的值设置为 value ,当且仅当域 field 不存在。
  • HDEL:删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略。
-- 获取key
local requestTimesKey = KEYS[1]
-- 窗口内可以访问的请求数量
local capacity = tonumber(ARGV[1])
-- 窗口期时间
local window = tonumber(ARGV[2])
-- 缓存的有效期
local ttl = math.floor(window * 1.5)

-- 告诉redis以redis命令的形式实现主从复制
redis.replicate_commands()
-- 获取服务器上的时间
local currentInfo = redis.call('time')
-- 返回的是秒和微秒两个元素
local currTime = tonumber(current_info[1])

-- 获取上一次开始时间
local lastReqTimes = tonumber(redis.call('HGET', requestTimesKey, "LAST_REQ_TIMES"));
if lastReqTimes == nil then
    lastReqTimes = currTime
end
-- 生成filedName值
local fieldName =  tostring(lastReqTimes) .. '_' .. window

-- 允许的状态(allowedType):1为允许访问,2为拒绝访问
local allowedType = 1
-- 已经访问的数量数
local consumeRequested = 1
-- 说明是一个新的window,本来要考虑 currTime 和 上一次访问间隔多个 window,但是由于 EXPIRE 有效期可以不用考虑这个问题
if (currTime> lastReqTimes + window) then
    --	生成一个新的FieldName,假设上一个lastReqTimes为:0,window为:10,第一次的FieldName为:10_10, 下一次为:20_10, 再下一次为:30_10
    local newFieldName = tostring(lastReqTimes + window) .. '_' .. window
    -- 重置 上一次访问访问时间 并设置新的window期的请求数
    redis.call('HMSET', requestTimesKey, "LAST_REQ_TIMES", lastReqTimes, tostring(newFieldName), 1)    
    -- 删除上一个 window 的数据
    redis.call('HDEL', requestTimesKey, fieldName)
else
    -- 不是一个新的window,请求数 +1 即可
    redis.call('HINCRBY', requestTimesKey, fieldName, 1)
    -- 由于第一次的时候不存在,但是在同一个window的时候是存在的,所以这里使用:HSETNX(存在就不设置,否则的话就设置)
    redis.call('HSETNX', requestTimesKey, "LAST_REQ_TIMES", lastReqTimes)
    -- 由于怕之前数据不存在,所以在 +1 之后再获取,这个值一定存在,所以要进行 -1  *** 作
    consumeRequested = tonumber(redis.call('HGET', requestTimesKey, fieldName)) -1
    -- 这里判断一下是否超过了请求数
    if (consumeRequested > capacity) then
        allowedType = 2
    end
end

-- 重置一下有效时间
redis.call("EXPIRE", requestTimesKey, ttl)
return return {allowedType, consumeRequested}
四、滑动窗口计数法 引子

固定窗口计数器算法 由于其存在的临界问题(尖峰问题),可能在时间窗口的重置节点处接收大量流量,从而造成服务崩溃,为解决这个问题,我们引入了 滑动窗口计数法。

TCP协议中数据包的传输,同样也是采用滑动窗口来进行流量控制。

算法原理

滑动窗口算法在固定窗口的基础上,将一个计时窗口分成了若干个小窗口,然后每个小窗口维护一个独立的计数器。

当请求的时间大于当前窗口的最大时间时,则将计时窗口向前平移一个小窗口。

平移时,将第一个小窗口的数据丢弃,然后将第二个小窗口设置为第一个小窗口,同时在最后面新增一个小窗口,将新的请求放在新增的小窗口中。

同时要保证整个窗口中所有小窗口的请求数目之后不能超过设定的阈值。

很明显,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

总结滑动窗口计数法的思路是:

  1. 将时间划分为细粒度的区间,每个区间维持一个计数器,每进入一个请求则将计数器加一。
  2. 多个区间组成一个时间窗口,每流逝一个区间时间后,则抛弃最老的一个区间,纳入新区间。
  3. 若当前窗口的区间计数器总和超过设定的限制数量,则本窗口内的后续请求都被丢弃。
特点

优点:能够完美的解决 固定窗口的“突刺现象” 现象

缺点:

  • 不能解决请求分布不均的问题,即无法平滑流量
  • 实现更复杂,需要维护时间窗口,占用内存更多,计算时间复杂度也相应变大。
  • 从严格意义上来讲 滑动窗口算法 相当于是记录访问日志,如果设置 超长时间窗口超大量阈值 则会占用大量的内存或存储空间,进而在计算时会导致计算市场增加。例如在 30分钟 内允许 800万次 的访问,这样就必须至少保存 30分钟 的访问数据,保留的记录数可能会达到 800万条 以上,这样每次计算的时间可能会非常长。
redis的lua脚本代码(适用于分布式系统) 使用List(列表)来实现

思路:

  • 将数据存储到 List(列表) 中,其中的每个元素的格式为:访问时间_本次消耗的次数,例如:1650766902_1
  • 每次请求获取当前时间,遍历循环 List(列表) 所有数据,获取 当前时间 - 窗口期时间 中所有元素中的消耗次数进行累加,并记录所有不在窗口时间的索引,方便删除,加快运算的速度
  • 判断和 阈值容量 的大小,返回对应的状态和之前累加的数值,根据情况将当前 新的数据插入到 List(列表) 的头部
  • 最后删除不在窗口时间的索引数据
  • 设置缓存的有效时间

Hash存储结构的命令介绍:

  • HGET:返回哈希表 key 中给定域 field 的值。

  • LINDEX :返回列表 key 中,下标为 index 的元素。

  • LPUSH:将一个或多个值 value 插入到列表 key 的表头

  • LTRIM :对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。

-- 存储的key
local requestTimesKey = KEYS[1]
-- 窗口时间
local windowsTimeStr = ARGV[1]
-- 阈值容量
local capacity = ARGV[2]
-- 缓存的有效期
local ttl = math.floor(tonumber(window) * 1.5)

-- 定义一份方法来获取 redis 列表的索引和已经请求的请求数
local function getIndexAndConsumeRequested(key, timestamp, window)
    local idx, curr = 0, 0
    --	获取数据进行遍历循环
    local length = redis.call('LLEN', key)
    --	这里通过length来遍历循环数据
    for index=0, length-1, 1 do
        --	获取指定索引的数据
        local entry = redis.call('lindex', key, index)
        -- 判断一下空值
        if (entry ~= nil) then 
            -- 由于存储的结构为:时间_请求数量
            local entry_time_str, num_str = string.match(entry, "(%d+)_(%d+)");
            local entry_time = tonumber(entry_time_str);
			-- 由于是
            if (tonumber(timestamp) - tonumber(window) * 1000 < entry_time) then
                idx = idx + 1;
                curr = curr + tonumber(num_str);
            else
                return idx, curr
            end   
        end
    end 
    return idx, curr
end;

-- 告诉redis以redis命令的形式实现主从复制
redis.replicate_commands()
local current_info = redis.call('time')
-- 返回的是秒和微秒两个元素,
-- 返回的是秒(1970年1月1日午夜起到当前时间经过的秒数)和微秒两个元素值,例如:1650766902 和 672263(这里最大为6位数)
local curr_time = current_info[1] + current_info[2] / 1000000
-- 获取即将在redis中存储的index 和已经请求的请求数
local idx, curr = getIndexAndConsumeRequested(requestTimesKey, curr_time, windowsTimeStr)

-- 这里判断一下 *** 作的返回类型
local allowedType = 2
if (tonumber(capacity) > curr) then
    allowedType = 1
    -- 拼接生成一个新的数据
    local newElement = table.concat({ curr_time, '_', 1 });   
    redis.call('LPUSH', requestTimesKey, newElement);
end

-- trim *** 作,删除之外的其他数据
redis.call('ltrim', requestTimesKey, 0, idx);
-- 设置缓存的有效期
redis.call('expire', requestTimesKey, ttl);
return {allowedType, curr};
使用SortedSet(有序集合)来实现

思路:

  • 获取服务器的时间,包括 1970年1月1日午夜起到当前时间经过的秒数微秒 时间,由于 微秒 的最大值是6为,所以这里要除以 1000000 相当于获取小数
  • 将所有的数据写入到 SortedSet 中,其中的 score 值为当前的时间:秒值.微秒值,例如:1650766902.672263
  • 通过窗口期来计算一下 边界的开始时间 ,计算公式为:当前时间 - 窗口时间(或者叫周期时间)
  • 然后通过 ZCOUNT 命令获取成员的数量,也就是 已经处理的请求数量
  • 判断当前值是否大于 阈值,如果没有则将当前请求添加到 SortedSet 中,为了防止 member 重复,所以值也为当前的时间:秒值.微秒值,例如:1650766902.672263
  • 然后通过 ZREMRANGEBYSCORE 命令 删除窗口期之外的数据,节省一下存储的空间
  • 最后重置一下缓存的有效期

SortedSet存储结构的命令介绍:

  • ZCOUNT:返回有序集 key 中, score 值在 minmax 之间(默认包括 score 值等于 minmax )的成员的数量。
  • ZADD : 将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
  • ZREMRANGEBYSCORE:移除有序集 key 中,所有 score 值介于 minmax 之间(包括等于 minmax )的成员。
  • EXPIRE :为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。
-- 存储的key
local requestTimesKey = KEYS[1]
-- 窗口时间
local windowsTimes = tonumber(ARGV[1])
-- 阈值容量
local capacity = tonumber(ARGV[2])
-- 缓存的有效期
local ttl = math.floor(windowsTimes * 1.5)

-- 告诉redis以redis命令的形式实现主从复制
redis.replicate_commands()
local currentInfo = redis.call('time')
-- 返回的是秒(1970年1月1日午夜起到当前时间经过的秒数)和微秒两个元素值,例如:1650766902 和 672263(这里最大为6位数)
local currTime = currentInfo[1] + currentInfo[2] / 1000000
-- 获取边界的开始时间,当前时间 - 周期信息
local boundaryStartTime = currTime - tonumber(windowsTimes)

-- 获取之前消耗的个数
local consumeRequestTimes = redis.call('ZCOUNT', requestTimesKey, boundaryStartTime, currTime)
-- 这里判断一下 *** 作的返回类型
local allowedType = 2
-- 为了防止数据重复,这里将数字转为字符串
local currTimeString = tostring(currTime)
-- 判断一下状态
if capacity > consumeRequestTimes then
    allowedType = 1
    -- 如果可以正常访问的状态则将当前本次请求的数据写入到redis中
    redis.call('ZADD', requestTimesKey, currTime, currTimeString)
end
-- 移除之前已经失效的数据,加快下一次的访问速度
redis.call('ZREMRANGEBYSCORE', requestTimesKey, 0, math.floor(boundaryStartTime))
-- 重置新的生效期
redis.call("EXPIRE", requestTimesKey, ttl)
return {allowedType, consumeRequestTimes, tonumber(currentInfo[1])}
多个阈值升级版

最近根据业务的需要,在网关中的限流,让做 请求数 和 流量 两方面来做限流,并且设置了三个阈值:警告阈值、断链阈值、锁定阈值,根据不同的阈值来返回不同的结果。

虽然在 redis 中有多种存储结构来存储数据,但是由于要获取一段时间内的数据,所以最终选择使用 SortedSet 结构来存储数据,因为 SortedSet 可以根据score 值来获取范围段的数据和进行count *** 作。

请求数的多个阈值

思路:

  1. 考虑到应用实例可能会有多个,并且部署的服务器时间可能不一致,所以此处使用的时间为 redis的服务器时间:redis.call(‘time’)
  2. 获取的时间值中包括 秒 和 微秒 两个值,为了防止 SortedSet 结构中的 key 出现重复,所以这里的key值设置为:秒.微妙,例如 1650766902.672263,并且对应的 score 值也为该值(为了计算一段时间内的请求量)
  3. 如果是锁定状态的话,则对应的 score 值为 -1,防止和正常的请求 key 出现重复,所以这里的格式为:秒,例如:1650766902
  4. 请求进来先判断之前是否锁定,如果是锁定的状态,则需要重置一下锁定的时间 和 redis的缓存,并且为了加快下一次访问的速度,则删除掉不在下一次计算时间内的数据(例如:要求 60 秒钟允许 100 次访问请求,2022年2月25日 00:01:00 访问了一次,这里要获取 2022年2月25日 00:00:00 到现在已经请求的数据量,而 2022年2月25日 00:00:00 以前的数据则为不在计算时间的内的数据,所以可以删除了)
  5. 然后要计算一下周期的开始时间(2022年2月25日 00:01:00 减去我们的计算周期 60s),然后计算这段时间的请求量(通过 ZCOUNT 命令进行)
  6. 然后判断一下之前的请求量是否超过了阈值:如果没有超过 警告阈值 则说明返回正常状态(返回状态值为:1),而对应的 警告、断链、锁定的状态分别为:2、3、4,
  7. 为了方便调试,特别将 已经请求的数量 和 当前redis时间的秒钟 返回调用者

SortedSet存储结构的命令介绍:

  • ZCOUNT: 返回有序集 keyscore 值在 minmax 之间(默认包括 score 值等于 minmax )的成员的数量
  • ZREMRANGEBYSCORE:移除有序集 key 中 所有 score 值介于 minmax 之间(包括等于 minmax )的成员
  • ZRANGEBYSCORE:返回有序集 key 中所有 score 值介于 minmax 之间(包括等于 minmax )的成员
  • ZADD:将一个或多个 member 元素及其 score 值加入到有序集 key 当中
  • EXPIRE:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。
-- 这里使用滑动窗口计数法来实现该功能
-- 存储到redis中的key
local requestTimesKey = KEYS[1]

-- 参数1:告警阀值请求次数
local alarmRequestTimes = tonumber(ARGV[1])

-- 参数2:断链阀值请求次数
local brokenReqTimes = tonumber(ARGV[2])

-- 参数3:当前时间戳
local nowTimestamp = tonumber(ARGV[3])

-- 参数4:间隔,例如:60秒作为一个统计周期
local period = tonumber(ARGV[4])

-- 参数5:断链锁定时间
local brokenLinkLockTime = tonumber(ARGV[5])

-- 参数6:锁定时的请求次数
local lockReqTimes = tonumber(ARGV[6])

-- 缓存的有效期
local ttl = math.floor(math.max(period, brokenLinkLockTime) * 1.5)

-- 告诉redis以redis命令的形式实现主从复制
redis.replicate_commands()
-- 获取服务器上的时间
local currentInfo = redis.call('time')

-- 获取锁定的时间
local lockTableResult = redis.call("ZRANGEBYSCORE", requestTimesKey, -1, -1)
local lastLockTime = 0
if next(lockTableResult) then
    -- 由于返回的结果为列表,所以这里获取第一个元素
    lastLockTime = tonumber(lockTableResult[1])
end

-- 返回的是秒(1970年1月1日午夜起到当前时间经过的秒数)和微秒两个元素值,例如:1650766902 和 672263(这里最大为6位数)
local currTime = currentInfo[1] + (currentInfo[2] / 1000000)
-- 获取边界的开始时间,当前时间 - 周期信息
local boundaryStartTime = currTime - tonumber(period)

-- 判断一下是否还在锁定的期限
if lastLockTime + brokenLinkLockTime >= tonumber(currentInfo[1]) then
    -- 重置新的生效期
    redis.call("EXPIRE", requestTimesKey, ttl)

    -- 移除之前已经失效的数据,加快下一次的访问速度(顺便将之前的锁定时间删除)
    redis.call('ZREMRANGEBYSCORE', requestTimesKey, -1, math.floor(boundaryStartTime))

    -- 重置锁定的时间
    redis.call('ZADD', requestTimesKey, -1, currentInfo[1])
    -- 这里直接返回锁定的状态
    return {4, -1, tonumber(currentInfo[1])}
end

-- 为了防止数据重复,这里将数字转为字符串
local currTimeString = tostring(currTime)
-- 获取之前消耗的个数
local consumeRequestTimes = redis.call('ZCOUNT', requestTimesKey, boundaryStartTime, currTime)
-- 这里判断一下 *** 作的返回类型
local allowedType = 1
-- 判断一下是不是锁定的状态
if consumeRequestTimes >= lockReqTimes then
    allowedType = 4
    -- 由于是锁定状态,所以将上次锁定的时间修改为当前时间
    redis.call('ZADD', requestTimesKey, -1, currentInfo[1])
elseif consumeRequestTimes >= brokenReqTimes then
    allowedType = 3
elseif consumeRequestTimes >= alarmRequestTimes then
    allowedType = 2
end

-- 设置值
redis.call('ZADD', requestTimesKey, currTime, currTimeString)

-- 移除之前已经失效的数据,加快下一次的访问速度
redis.call('ZREMRANGEBYSCORE', requestTimesKey, 0, math.floor(boundaryStartTime))

-- 重置新的生效期
redis.call("EXPIRE", requestTimesKey, ttl)

return {allowedType, consumeRequestTimes, tonumber(currentInfo[1])}
请求流量的多个阈值

思路:

  1. 考虑到应用实例可能会有多个,并且部署的服务器时间可能不一致,所以此处使用的时间为 redis的服务器时间:redis.call(‘time’)
  2. 获取的时间值中包括 秒 和 微秒 两个值,为了防止 SortedSet 结构中的 key 出现重复,并且需要记录本次的请求数,所以这里的key值格式为:“秒.微妙_当前请求流量字节数”,例如 “1650766902.672263_1024”,并且对应的 score 值格式为:“秒.微妙”,例如:“1650766902.672263”(为了计算一段时间内的请求量)
  3. 如果是锁定状态的话,则对应的 score 值为 -1,防止和正常的请求 key 出现重复,所以这里的格式为:秒,例如:1650766902
  4. 请求进来先判断之前是否锁定,如果是锁定的状态,则需要重置一下锁定的时间 和 redis的缓存,并且为了加快下一次访问的速度,则删除掉不在下一次计算时间内的数据(例如:要求 60 秒钟允许 100 次访问请求,2022年2月25日 00:01:00 访问了一次,这里要获取 2022年2月25日 00:00:00 到现在已经请求的数据量,而 2022年2月25日 00:00:00 以前的数据则为不在计算时间的内的数据,所以可以删除了)
  5. 然后要计算一下周期的开始时间(2022年2月25日 00:01:00 减去我们的计算周期 60s),然后计算这段时间的 key, 进而通过正则获取到上面存储的请求字节数(string.match(tostring(value), ‘_(%d+)’))进行 sum计算。
  6. 然后判断一下之前的请求流量是否超过了阈值:如果没有超过 警告阈值 则说明返回正常状态(返回状态值为:1),而对应的 警告、断链、锁定的状态分别为:2、3、4,
  7. 为了方便调试,特别将 已经请求的流量 和 当前redis时间的秒钟 返回调用者

SortedSet存储结构的命令介绍:

  • ZCOUNT: 返回有序集 keyscore 值在 minmax 之间(默认包括 score 值等于 minmax )的成员的数量

  • ZREMRANGEBYSCORE:移除有序集 key 中 所有 score 值介于 minmax 之间(包括等于 minmax )的成员

  • ZRANGEBYSCORE:返回有序集 key 中所有 score 值介于 minmax 之间(包括等于 minmax )的成员

  • ZADD:将一个或多个 member 元素及其 score 值加入到有序集 key 当中

  • EXPIRE:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。

-- 这里使用滑动窗口计数法来实现该功能
-- 存储到redis中的key
local flowByteKey = KEYS[1]

-- 参数1:当前的流量字节 byte
local currentBytes = tonumber(ARGV[1])

-- 参数2:警告的字节流 byte
local alarmFlowBytes = tonumber(ARGV[2])

-- 参数3:断流的字节流 byte
local brokenFlowBytes = tonumber(ARGV[3])

-- 参数4:当前时间戳
local nowTimestamp = tonumber(ARGV[4])

-- 参数5:缓存的有效期
local ttl = tonumber(ARGV[5])

-- 参数6:间隔,例如:60秒作为一个统计周期
local period = tonumber(ARGV[6])

-- 参数7:断链锁定时间
local brokenLinkLockTime = tonumber(ARGV[7])

-- 参数8:锁定时请求的字节数
local lockFlowBytes = tonumber(ARGV[8])

-- 定义一个方法用来获取访问量和索引角标
local function getConsumerFlowBytes(key, boundaryStartTime, currTime)
  -- 定义一个初始化的值, limitCount 表示每次获取 50 个结果
  local consumerFlowBytes, limitCount  = 0, 50
  -- 获取符合条件的总数
  local totalNum = redis.call('ZCOUNT', flowByteKey, boundaryStartTime, currTime)
  -- 符合条件的总数为 0,则直接返回
  if totalNum == 0 then
    return consumerFlowBytes
  end
  -- 获取一下循环的次数
  local loopNum = math.ceil(totalNum/limitCount)
  -- 分页遍历循环数据
  for loopIndex = 1, loopNum do
    -- 获取所有符合的数据
    local arrayResult = redis.call('ZRANGEBYSCORE', key, boundaryStartTime, currTime, "LIMIT", (loopIndex-1) * limitCount, limitCount)
    -- 遍历循环数据
    for i,value in ipairs(arrayResult) do
      --  正则字符串获取数据
      local flowByteStr = string.match(tostring(value), '_(%d+)')
      -- 判断字符串不为空
      if flowByteStr ~= nil then
        consumerFlowBytes = consumerFlowBytes + tonumber(flowByteStr)
      end
    end
  end
  return consumerFlowBytes
end

-- 告诉redis以redis命令的形式实现主从复制
redis.replicate_commands()
-- 获取服务器上的时间
local currentInfo = redis.call('time')

-- 获取锁定的时间
local lockTableResult = redis.call("ZRANGEBYSCORE", flowByteKey, -1, -1)
local lastLockTime = 0
if next(lockTableResult) then
  -- 由于返回的结果为列表,所以这里获取第一个元素
  lastLockTime = tonumber(lockTableResult[1])
end

-- 返回的是秒(1970年1月1日午夜起到当前时间经过的秒数)和微秒两个元素值,例如:1650766902 和 672263(这里最大为6位数)
local currTime = currentInfo[1] + (currentInfo[2] / 1000000)
-- 获取边界的开始时间,当前时间 - 周期信息
local boundaryStartTime = currTime - tonumber(period)

-- 判断一下是否还在锁定的期限
if lastLockTime + brokenLinkLockTime >= tonumber(currentInfo[1]) then
  -- 重置新的生效期
  redis.call("EXPIRE", flowByteKey, ttl)

  -- 移除之前已经失效的数据,加快下一次的访问速度(顺便将之前的锁定时间删除)
  redis.call('ZREMRANGEBYSCORE', flowByteKey, -1, math.floor(boundaryStartTime))

  -- 重置锁定的时间
  redis.call('ZADD', flowByteKey, -1, currentInfo[1])
  -- 这里直接返回锁定的状态
  return {4, -1, tonumber(currentInfo[1])}
end

-- 获取之前消耗的个数
local consumeFlowBytes = getConsumerFlowBytes(flowByteKey, boundaryStartTime, currTime)

-- 这里判断一下 *** 作的返回类型
local allowedType = 1
-- 判断一下是不是锁定的状态
if consumeFlowBytes >= lockFlowBytes then
  allowedType = 4
  -- 由于是锁定状态,所以将上次锁定的时间修改为当前时间
  redis.call('ZADD', flowByteKey, -1, currentInfo[1])
elseif consumeFlowBytes >= brokenFlowBytes then
  allowedType = 3
elseif consumeFlowBytes >= alarmFlowBytes then
  allowedType = 2
end

-- 格式为:1650766902.672263_1232
local keyStr = tostring(currTime) .. '_' .. currentBytes

-- 设置值,这里的key的
redis.call('ZADD', flowByteKey, currTime, keyStr)

-- 移除之前已经失效的数据,加快下一次的访问速度
redis.call('ZREMRANGEBYSCORE', flowByteKey, 0, math.floor(boundaryStartTime))

-- 重置新的生效期
redis.call("EXPIRE", flowByteKey, ttl)
return {allowedType, consumeFlowBytes, tonumber(currentInfo[1])}
五、漏桶算法(Leaky Bucket) 算法原理

就像一个漏斗,上面口大,下面口小,上面可以一直加很多水的水,但是下面小口的口径是固定的,只会有固定单位的水通过。而且上面水量过大,漏斗放不下会溢出丢失。

将水和访问量换一下,就明白了 (核心思想就是请求收到后,会先进入漏斗,然后再按照限定速度请求服务,及可以达到限流的目的,也可以保证后台收到的请求都是平稳的。 但是也有一个缺点,就是突然流量的时候,会导致处理时间太长。当然流量更大的时候会被拒绝,这个是正常的)。

可以看出漏桶算法能强行限制数据的传输速率.

示意图内容如下:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/743297.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-29
下一篇 2022-04-29

发表评论

登录后才能评论

评论列表(0条)

保存