统一流控服务开源:基于.Net Core的流控服务统一流控服务开源-1:场景&业界做法&算法篇

统一流控服务开源:基于.Net Core的流控服务统一流控服务开源-1:场景&业界做法&算法篇,第1张

概述先前有一篇博文,梳理了流控服务场景、业界做法和常用算法 统一流控服务开源-1:场景&业界做法&算法篇 最近完成了流控服务的开发,并在生产系统进行了大半年的验证,稳定可靠。今天整理一下

先前有一篇博文,梳理了流控服务的场景、业界做法和常用算法

统一流控服务开源-1:场景&业界做法&算法篇

最近完成了流控服务的开发,并在生产系统进行了大半年的验证,稳定可靠。今天整理一下核心设计和实现思路,开源到Github上,分享给大家

     https://github.com/zhouguoqing/FlowControl

 一、令牌桶算法实现

  先回顾一下令牌桶算法示意图

  

  

  随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms) 往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),

 

  如果桶已经满了就不再加了. 新请求来临时, 会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.

 

  令牌添加速度支持动态变化,实时控制处理的速率.

  令牌桶有两个关键的属性:令牌桶容量(大小)和时间间隔,

  有两个关键 *** 作,从令牌桶中取Token;令牌桶定时的reset重置。

  我们看TokenBucket类:

using System;namespace CZ.FlowControl.Service{     CZ.FlowControl.Spi;    /// <summary>    /// 令牌桶    </summary>    public abstract class TokenBucket : IThrottleStrategy    {        protected long bucketTokenCapacity;        private static Readonly object syncRoot = new object();         ticksRefillinterval;         nextRefillTime;        //number of tokens in the bucket         tokens;        protected TokenBucket(long bucketTokenCapacity,long refillinterval,1)"> refillintervalinMilliSeconds)        {            if (bucketTokenCapacity <= 0)                throw new ArgumentOutOfRangeException("bucketTokenCapacity",bucket token capacity can not be negative");            if (refillinterval < refillintervalRefill interval cannot be negativeif (refillintervalinMilliSeconds <= refillintervalinMilliSecondsRefill interval in milliseconds cannot be negative);            this.bucketTokenCapacity = bucketTokenCapacity;            ticksRefillinterval = TimeSpan.FromMilliseconds(refillinterval * refillintervalinMilliSeconds).Ticks;        }        <summary>         是否流控        </summary>        <param name="n"></param>        <returns></returns>        bool ShouldThrottle(long n = 1)        {            TimeSpan waitTime;            return ShouldThrottle(n,1)">out waitTime);        }        bool ShouldThrottle(long n,1)"> TimeSpan waitTime)        {            if (n <= 0) nShould be positive integerlock (syncRoot)            {                Updatetokens();                if (tokens < n)                {                    var timetoIntervalEnd = nextRefillTime - SystemTime.UtcNow.Ticks;                    if (timetoIntervalEnd <  waitTime);                    waitTime = TimeSpan.FromTicks(timetoIntervalEnd);                    return true;                }                tokens -= n;                waitTime = TimeSpan.Zero;                false;            }        }         更新令牌        </summary>        voID Updatetokens();        return ShouldThrottle(1,1)"> waitTime);        }         CurrentTokenCount        {            get            {                 (syncRoot)                {                    Updatetokens();                    return tokens;                }            }        }    }}

 这个抽象类中,将Updatetoken作为抽象方法暴露出来,给实现类更多的灵活去控制令牌桶重置 *** 作。基于此实现了“固定令牌桶”FixedTokenBucket

    ///  固定令牌桶     FixedTokenBucket : TokenBucket    {        public FixedTokenBucket(long maxTokens,1)"> refillintervalinMilliSeconds)            : base(maxTokens,refillinterval,refillintervalinMilliSeconds)        {        }        overrIDe  Updatetokens()        {            var currentTime = SystemTime.UtcNow.Ticks;            if (currentTime < nextRefillTime)                ;            tokens = bucketTokenCapacity;            nextRefillTime = currentTime + ticksRefillinterval;        }    }

   固定令牌桶在每次取Token时,都要执行方法ShouldThrottle。这个方法中:

   并发取Token是线程安全的,这个地方用了Lock控制,损失了一部分性能。同时每次获取可用Token的时候,都会实时Check一下是否需要到达reset令牌桶的时间。

   获取到可用令牌后,令牌桶中令牌的数量-1。如果没有足够的可用令牌,则返回等待到下次reset令牌桶的时间。如下代码:

        );            lock (syncRoot)            {                Updatetokens();                ;                }                tokens -=;            }        }

   以上就是令牌桶算法的实现。我们继续看漏桶算法。

 二、漏桶算法实现

  首先回顾一下漏桶算法的原理:

  

  

  水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),

 

  当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,

 

  可以看出漏桶算法能强行限制数据的传输速率.

 

  有两个变量:

 

一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。

 

   漏桶抽象类:LeakTokenBucket,继承与令牌桶抽象父类 TokenBucket,说明了获取令牌(漏出令牌)在底层的方式是一致的,不一样的是重置令牌的方式(务必理解这一点)

 CZ.FlowControl.Service{     漏桶     LeakyTokenBucket : TokenBucket    {         stepTokens;         ticksstepInterval;        protected LeakyTokenBucket(int refillintervalinMilliSeconds,long stepTokens,1)">long stepInterval,1)"> stepIntervalinMilliseconds)            : this.stepTokens = stepTokens;            if (stepInterval < stepIntervalStep interval cannot be negativeif (stepTokens < stepTokensStep tokens cannot be negativeif (stepIntervalinMilliseconds <= stepIntervalinMillisecondsStep interval in milliseconds cannot be negative);            ticksstepInterval = TimeSpan.FromMilliseconds(stepInterval * stepIntervalinMilliseconds).Ticks;        }    }}

    可以看出,漏桶是在令牌桶的基础上增加了二个重要的属性:这两个属性决定了重置令牌桶的方式

    stepTokens:每间隔时间内漏的数量

    ticksstepInterval:漏的间隔时间

    举个例子:TPS 100,即每秒漏出100个Token,stepTokens =100, ticksstepInterval=1000ms

    漏桶的具体实现有两种:空桶和满桶

    StepDownTokenBucket 满桶:即一把将令牌桶填充满

 漏桶(满桶)    </summary>    <remarks>     StepDownLeakyTokenBucketStrategy resembles a bucket which has been filled with tokens at the beginning but subsequently leaks tokens at a fixed interval    </remarks>     StepDownTokenBucket : LeakyTokenBucket    {        public StepDownTokenBucket(int refillintervalinMilliSeconds,1)">int stepIntervalinMilliseconds) : if (currentTime >= nextRefillTime)            {                set tokens to max                tokens = bucketTokenCapacity;                compute next refill time                nextRefillTime = currentTime + ticksRefillinterval;                ;            }            calculate max tokens possible till the end            var timetoNextRefill = nextRefillTime - currentTime;            var stepsToNextRefill = timetoNextRefill/ticksstepInterval;            var maxPossibletokens = stepsToNextRefill*stepTokens;            if ((timetoNextRefill%ticksstepInterval) > 0) maxPossibletokens +=if (maxPossibletokens < tokens) tokens = maxPossibletokens;        }    }}
VIEw Code

   StepUpLeakyTokenBucket 空桶:即每次只将stepTokens个数的令牌放到桶中   

 1  System; 2  3  CZ.FlowControl.Service 4 { 5     <summary> 6      漏桶(空桶) 7     </summary> 8     <remarks> 9       StepUpLeakyTokenBucketStrategy resemembles an empty bucket at the beginning but get filled will tokens over a fixed interval.10     </remarks>11      StepUpLeakyTokenBucket : LeakyTokenBucket12     {13          lastActivityTime;14 15         public StepUpLeakyTokenBucket( stepIntervalinMilliseconds) 16             : 17         {18         }19 20          Updatetokens()21 22              SystemTime.UtcNow.Ticks;23 24              nextRefillTime)25             {26                 tokens = stepTokens;27 28                 lastActivityTime = currentTime;29                 nextRefillTime = currentTime + ticksRefillinterval;30 31                 ;32             }33 34             calculate tokens at current step35 36             long elapsedtimeSinceLastActivity = currentTime -37             long elapsedStepsSinceLastActivity = elapsedtimeSinceLastActivity / ticksstepInterval;38 39             tokens += (elapsedStepsSinceLastActivity*stepTokens);40 41             if (tokens > bucketTokenCapacity) tokens = bucketTokenCapacity;42             lastActivityTime =43 44     }45 }
VIEw Code

 三、流控服务封装

  第二章节,详细介绍了令牌桶和漏桶的具体实现。基于以上,要重点介绍接口:IThrottleStrategy:流控的具体方式

 CZ.FlowControl.Spi{     流量控制算法策略    interface IThrottleStrategy    {        );        <param name="waitTime"></param>         TimeSpan waitTime);         当前令牌个数        long CurrentTokenCount { ; }    }}

    有了这个流控方式接口后,我们还需要一个流控策略定义类:FlowControlStrategy

    即定义具体的流控策略:以下是这个类的详细属性和成员:  不仅定义了流控策略类型,还定义了流控的维度信息和流控阈值,这样流控就做成依赖注入的方式了! 

 System.Collections.Generic; System.Text; 流控策略     FlowControlStrategy    {         标识        string ID { get; set; }         名称        string name {  流控策略类型        public FlowControlStrategyType StrategyType {  流控阈值-Int        long IntThreshold {  流控阈值-Double        decimal DoubleThreshold {  时间区间跨度        public FlowControlTimespan TimeSpan { ; }        private Dictionary<string,1)">string> flowControlConfigs;         流控维度信息        public Dictionary< FlowControlConfigs        {            if (flowControlConfigs == null)                    flowControlConfigs = new Dictionary<();                 flowControlConfigs;            }                        {                flowControlConfigs = value;            }        }         描述        string Descriptions {  触发流控后是否直接拒绝请求        </summary>                bool IsRefusedRequest {  创建时间        public DateTime CreateTime {  创建人        string Creator {  最后修改时间        public DateTime LastModifyTime {  最后修改人        string LastModifIEr { ; }    }}

   同时,流控策略类型,我们抽象了一个枚举:FlowControlStrategyType

   支持3种流控策略:TPS、Sum(指定时间段内请求的次数),Delay延迟

 流控策略类型枚举    enum FlowControlStrategyType    {         TPS控制策略                TPS, 总数控制策略                Sum,1)"> 延迟控制策略                Delay    }}

  面向每种流控策略类型,提供了一个对应的流控器,比如说TPS的流控器

TPSFlowController,内部使用了固定令牌桶算法
 CZ.FlowControl.Spi;     TPS流量控制器     TPSFlowController : IFlowController    {        public IThrottleStrategy InnerThrottleStrategy        {            ;        }        public FlowControlStrategy FlowControlStrategy { return InnerThrottleStrategy.ShouldThrottle(n,1)"> TPSFlowController(FlowControlStrategy strategy)        {            FlowControlStrategy = strategy;            InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold,1)">1000);        }    }}

  Sum(指定时间段内请求的次数)流控器:

  

 System.IO; System.linq; 一段时间内合计值流量控制器     SumFlowController : IFlowController    {         SumFlowController(FlowControlStrategy strategy)        {            FlowControlStrategy = strategy;            var refillinterval = GetTokenBucketRefillinterval(strategy);            InnerThrottleStrategy = );        }         GetTokenBucketRefillinterval(FlowControlStrategy strategy)        {            long refillinterval = ;            switch (strategy.TimeSpan)            {                case FlowControlTimespan.Second:                    refillinterval = ;                    break;                 FlowControlTimespan.Minute:                    refillinterval = 60 FlowControlTimespan.Hour:                    refillinterval = 60 *  FlowControlTimespan.Day:                    refillinterval = 24 *  refillinterval;        }    }}

  同时,通过一个创建者工厂,根据不同的流控策略,创建对应的流控器(做了一层缓存,性能更好):

 流控策略工厂     FlowControllerFactory    {        static Dictionary< fcControllers;        object syncObj = ();        static FlowControllerFactory instance;        private FlowControllerFactory()        {            fcControllers = ();        }         FlowControllerFactory GetInstance()        {            if (instance == )            {                 (syncObj)                {                    )                    {                        instance = new FlowControllerFactory();                    }                }            }             instance;        }         IFlowController GetorCreateFlowController(FlowControlStrategy strategy)        {            if (strategy == new ArgumentNullException(FlowControllerFactory.GetorCreateFlowController.strategyif (!fcControllers.ContainsKey(strategy.ID))            {                fcControllers.ContainsKey(strategy.ID))                    {                        var fcController = CreateFlowController(strategy);                        if (fcController != )                            fcControllers.Add(strategy.ID,fcController);                    }                }            }            if (fcControllers.ContainsKey(strategy.ID))            {                var controller = fcControllers[strategy.ID];                 controller;            }             IFlowController CreateFlowController(FlowControlStrategy strategy)        {            FlowControllerFactory.CreateFlowController.strategy);            IFlowController controller =  (strategy.StrategyType)            {                 FlowControlStrategyType.TPS:                    controller =  TPSFlowController(strategy);                     FlowControlStrategyType.Delay:                    controller =  DelayFlowController(strategy);                     FlowControlStrategyType.Sum:                    controller =  SumFlowController(strategy);                    default:                     controller;        }    }}

 

   有了流控策略定义、我们更进一步,继续封装了流控Facade服务,这样把流控的变化封装到内部。对外只提供流控服务接口,流控时动态传入流控策略和流控个数:FlowControlService

   

 CZ.FlowControl.Spi;     System.Threading;     统一流控服务     FlowControlService    {         流控        <param name="strategy">流控策略</param>        <param name="count">请求次数</param>        voID FlowControl(FlowControlStrategy strategy,1)">int count = )        {             FlowControllerFactory.GetInstance().GetorCreateFlowController(strategy);            TimeSpan waitTimespan = TimeSpan.Zero;            var result = controller.ShouldThrottle(count,1)"> waitTimespan);             (result)            {                if (strategy.IsRefusedRequest == false && waitTimespan != TimeSpan.Zero)                {                    WaitForAvailable(strategy,controller,waitTimespan,count);                }                else  (strategy.IsRefusedRequest)                {                    new Exception(触发流控!);                }            }        }         等待可用        <param name="controller">流控器<param name="waitTimespan">等待时间voID WaitForAvailable(FlowControlStrategy strategy,IFlowController controller,TimeSpan waitTimespan,1)"> count)        {            var timespan = waitTimespan;            if (strategy.StrategyType == FlowControlStrategyType.Delay)            {                Thread.Sleep(timespan);                while (controller.ShouldThrottle(count,1)"> timespan))            {                Thread.Sleep(timespan);            }        }    }}

  以上,统一流控服务完成了第一个版本的封装。接下来我们看示例代码

 四、示例代码

    先安装Nuget:

Install-Package CZ.FlowControl.Service -Version 1.0.0

 

    

   

    是不是很简单。

    大家如果希望了解详细的代码,请参考这个项目的GitHub地址:

    https://github.com/zhouguoqing/FlowControl

    同时也欢迎大家一起改进完善。

    

 

周国庆

2019/8/9

    

总结

以上是内存溢出为你收集整理的统一流控服务开源:基于.Net Core的流控服务 统一流控服务开源-1:场景&业界做法&算法篇全部内容,希望文章能够帮你解决统一流控服务开源:基于.Net Core的流控服务 统一流控服务开源-1:场景&业界做法&算法篇所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1214901.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-05
下一篇 2022-06-05

发表评论

登录后才能评论

评论列表(0条)