上一篇:Sentinel源码阅读(一)
熔断降级前文说了,每一种规则都是责任链中的一个节点,对应不同的实现类,熔断降级的类就是DegradeSlot,位于com.alibaba.csp.sentinel.slots.block.degrade包下。目录结构如下:
除了DegradeSlot,还有
CircuitBreaker:断路器,并有异常数断路器ExceptionCircuitBreaker与RT断路器ResponseTimeCircuitBreaker
DegradeException:BlockException的子类,降级抛出的是这个异常类
DegradeRule:降级规则
DegradeSlot的entry方法首先执行performChecking方法,核心逻辑都在这里。
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { performChecking(context, resourceWrapper); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void performChecking(Context context, ResourceWrapper r) throws BlockException { ListcircuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { return; } for (CircuitBreaker cb : circuitBreakers) { if (!cb.tryPass(context)) { throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); } } }
第一步是根据资源标识获取断路器list。DegradeRuleManager.getCircuitBreakers是直接从其内部一个map get资源标识映射的断路器的。而这个map的初始化在DegradeRuleManager.RulePropertyListener::reloadFrom方法中
private synchronized void reloadFrom(Listlist) { Map > cbs = buildCircuitBreakers(list); Map > rm = new HashMap<>(cbs.size()); for (Map.Entry > e : cbs.entrySet()) { assert e.getValue() != null && !e.getValue().isEmpty(); Set rules = new HashSet<>(e.getValue().size()); for (CircuitBreaker cb : e.getValue()) { rules.add(cb.getRule()); } rm.put(e.getKey(), rules); } DegradeRuleManager.circuitBreakers = cbs; DegradeRuleManager.ruleMap = rm; }
DegradeRule是从配置中取的(或者手动构造)不再赘述,这个方法读取DegradeRule列表,并将其转化为CircuitBreaker。转换的方法比较简单,见DegradeRuleManager::newCircuitBreakerFrom。
然后实际上就是遍历断路器列表,执行其tryPass方法判断调用是否能通过,不通过则抛出DegradeException异常。因此下面重点看下断路器CircuitBreaker类。
Sentinel的断路器借鉴了一篇经典文章:CircuitBreaker
总的来说,断路器有三种状态:
open:开启状态(即熔断状态),直接返回false,如果开启时间超过了熔断时间,则转为半开状态
half-open:半开状态,这种状态下会允许下一个请求通过,并对其直接进行异常或RT的校验,而不考虑阈值,如果异常,则继续转为开启状态,如果正常,说明链路恢复了,转为关闭状态。
close:关闭状态,返回true
Sentinel的实现完全一致
public boolean tryPass(Context context) { // Template implementation. if (currentState.get() == State.CLOSED) { return true; } if (currentState.get() == State.OPEN) { // For half-open state we allow a request for probing. return retryTimeoutArrived() && fromOpenToHalfOpen(context); } return false; }
Entry在exit时,会调用CircuitBreaker的onRequestComplete方法,取异常或RT,如果在半开状态,若有异常或RT过高,则继续转为开启,否则关闭。以ResponseTimeCircuitBreaker为例
public void onRequestComplete(Context context) { SlowRequestCounter counter = slidingCounter.currentWindow().value(); Entry entry = context.getCurEntry(); if (entry == null) { return; } long completeTime = entry.getCompleteTimestamp(); if (completeTime <= 0) { completeTime = TimeUtil.currentTimeMillis(); } long rt = completeTime - entry.getCreateTimestamp(); if (rt > maxAllowedRt) { counter.slowCount.add(1); } counter.totalCount.add(1); handleStateChangeWhenThresholdExceeded(rt); } private void handleStateChangeWhenThresholdExceeded(long rt) { if (currentState.get() == State.OPEN) { return; } if (currentState.get() == State.HALF_OPEN) { // In detecting request // TODO: improve logic for half-open recovery if (rt > maxAllowedRt) { fromHalfOpenToOpen(1.0d); } else { fromHalfOpenToClose(); } return; } ... }
最后再讲讲异常数断路器ExceptionCircuitBreaker与RT断路器ResponseTimeCircuitBreaker是如何工作的。
不同断路器要实现的其实就是进行异常的统计,并在状态转换时,进行不同的 *** 作。
两者内部都内部维护了一个LeapArray:
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { private final int strategy; private final int minRequestAmount; private final double threshold; private final LeapArraystat; public ExceptionCircuitBreaker(DegradeRule rule) { this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs())); } ExceptionCircuitBreaker(DegradeRule rule, LeapArray stat) { super(rule); this.strategy = rule.getGrade(); boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT; AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count"); AssertUtil.notNull(stat, "stat cannot be null"); this.minRequestAmount = rule.getMinRequestAmount(); this.threshold = rule.getCount(); this.stat = stat; } }
LeapArray是一个滑动窗口算法的实现。这个类在Sentinel许多地方都用到了来进行统计,下一节会讲到。每次请求结束时,都会进行统计,将总数与异常数并写入滑动窗口中。以此作为计算是否到达阈值的依据。区别只是判断异常的方式,ExceptionCircuitBreaker根据是否抛出Exception判断,ResponseTimeCircuitBreaker根据记录的RT是否超过阈值判断。
List滑动窗口-LeapArraycounters = slidingCounter.values(); long slowCount = 0; long totalCount = 0; for (SlowRequestCounter counter : counters) { slowCount += counter.slowCount.sum(); totalCount += counter.totalCount.sum(); } if (totalCount < minRequestAmount) { return; } double currentRatio = slowCount * 1.0d / totalCount; if (currentRatio > maxSlowRequestRatio) { transformToOpen(currentRatio); } if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 && Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) { transformToOpen(currentRatio); }
由于在熔断降级这里首先看到这个类,那就在本文讲掉了。
LeapArray是滑动窗口算法的一个实现,用来做数据统计,支持任意窗口大小,任意窗口数量。
public abstract class LeapArray{ protected int windowLengthInMs; protected int sampleCount; protected int intervalInMs; private double intervalInSecond; protected final AtomicReferenceArray > array; private final ReentrantLock updateLock = new ReentrantLock(); }
几个核心变量
intervalInMs:统计的窗口大小,单位毫秒
intervalInSecond:同上,单位秒
sampleCount:一个窗口中的采样次数,或者叫bucket数
windowLengthInMs:每个bucket的长度,计算方式为intervalInMs/sampleCount
(这里吐槽一下命名,为什么不叫bucketLengthInMs呢,歧义很大啊!!!,其他地方也是,window、bucket傻傻分不清楚)
array:实际的窗口,存放统计数据。AtomicReferenceArray是对数组的封装,支持多种线程安全的方法。
updateLock:一个可重入锁,用于保证重置滑动窗口的线程安全。(官方文档中用类似Ringbuffer的环形窗口描述,那么也可以理解为回到起点时)。
在代码中有几个概念:
TimeIdx:就是某时刻对应桶的index。获取方法是取模。
private int calculateTimeIdx( long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); }
WindowStart:某个桶对应的起始时刻。也很好理解,依靠取模运算。
protected long calculateWindowStart( long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; }
isWindowDeprecated:桶是否过期。如果某个桶的起始时刻已经落后超过一个窗口大小(intervalInMS)则过期了。
public boolean isWindowDeprecated( WindowWrapwindowWrap) { return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap); } public boolean isWindowDeprecated(long time, WindowWrap windowWrap) { return time - windowWrap.windowStart() > intervalInMs; }
了解这几个概念后,我们看一下重点的currentWindow方法。在此之前我画一个示意图,便于理解。
||--------|--------|--------|--------|--------||-------- 0 100 200 300 400 500
窗口大小为500,采样5次,每个桶长度为100。
以下是代码:
public WindowWrapcurrentWindow(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); long windowStart = calculateWindowStart(timeMillis); while (true) { WindowWrap old = array.get(idx); if (old == null) { WindowWrap window = new WindowWrap (windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { return window; } else { Thread.yield(); } } else if (windowStart == old.windowStart()) { return old; } else if (windowStart > old.windowStart()) { if (updateLock.tryLock()) { try { return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); } } else if (windowStart < old.windowStart()) { return new WindowWrap (windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
currentWindow方法的作用是找到或创建该时刻对应的桶(window)。
一行行看:
首先计算了时刻对应的TimeIdx与WindowStart
一个while循环,每次循环开始取Idx对应的桶 (这里产生了一个问题,为什么要while循环?)
判断
如果该桶为null(第一次循环,array还为空时),创建一个新桶返回。这边可能有多个线程同时尝试添加,因此用了乐观锁CAS保证线程安全。
如果该桶windowStart与时刻对应windowStart相同,表明桶已被创建可以直接使用,则直接返回该桶。
如果该桶的windowStart小于时刻的windowStart(由于取模 *** 作,会重新回到头节点,这时桶都是过期的),表明一个新的时间窗口的开启,此时执行resetWindowTo重置该桶。这边直接用了锁来保证重置 *** 作的线程安全性。
我思考后觉得可能会触发多次重置 *** 作,例如两个线程都已经走到了该分支,而一个reset之后马上释放锁,另一个此时刚好正在获取锁,则又会重置一次。因此实现的resetWindowTo方法要保证多次执行不会出问题。当然我感觉一般也不会出什么问题
最后一个分支一般不会走到,除非手动修改,代码里也这么说明的,不赘述了。
值得注意的是,在获取锁失败,或者CAS失败后,都会执行Thread.yield方法,查阅了一下这个方法是一个native方法,会释放当前线程cpu时间片。这下上面提到的为什么要while的问题就豁然开朗了,不就是一个自旋 *** 作吗!!这种代码在业务中很少使用到,还是看开源代码有收获。
另外values方法也值得注意一下:
public Listvalues() { return values(TimeUtil.currentTimeMillis()); } public List values(long timeMillis) { if (timeMillis < 0) { return new ArrayList (); } int size = array.length(); List result = new ArrayList (size); for (int i = 0; i < size; i++) { WindowWrap windowWrap = array.get(i); if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) { continue; } result.add(windowWrap.value()); } return result; }
它的作用是返回一整个窗口的数据。需要注意的是,由于窗口是循环利用的(类似环形),因此可能有上一个时间窗口的数据,所以它会调用isWindowDeprecated方法判断是否是当前窗口的数据。
算法应用
那么,算法原理部分讲完了,我们来看看LeapArray在熔断降级部分是如何应用的吧
LeapArray数据存储使用了泛型,比较灵活,以异常数熔断器为例。
static class SimpleErrorCounterLeapArray extends LeapArray{ public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); } @Override public SimpleErrorCounter newEmptyBucket(long timeMillis) { return new SimpleErrorCounter(); } @Override protected WindowWrap resetWindowTo(WindowWrap w, long startTime) { // Update the start time and reset value. w.resetTo(startTime); w.value().reset(); return w; } } static class SimpleErrorCounter { private LongAdder errorCount; private LongAdder totalCount; public SimpleErrorCounter() { this.errorCount = new LongAdder(); this.totalCount = new LongAdder(); } public LongAdder getErrorCount() { return errorCount; } public LongAdder getTotalCount() { return totalCount; } public SimpleErrorCounter reset() { errorCount.reset(); totalCount.reset(); return this; } }
定义了SimpleErrorCounter类作为数据统计,记录了错误的请求数量与总的请求数量。LongAdder是一个线程安全的高效的数据累加器,在Java concurrent包中。
然后实际使用中,熔断器在构造时会初始化一个LeapArray。
public ExceptionCircuitBreaker(DegradeRule rule) { this(rule, new SimpleErrorCounterLeapArray(1, 1000)); }
初始化了一个窗口大小为1000ms,桶数量为1的滑动窗口,也就是以1秒为单位直接统计。这里我也颇有微词,因为这样只统计了当前窗口。假如阈值是1000,第一秒内的后10ms,有900个错误,下一秒的前10ms,有900个错误,那在短短的20ms内,就有了1800个错误,几乎是阈值的两倍,但无法触发降级。如果划分两个桶,每次计算错误比例是靠当前桶与前一个桶加起来计算,就能避免这个问题。当然,或许作者觉得那样实现太麻烦,熔断没有必要那么做,也情有可原。
RT熔断器实现也是差不多的。都是1个桶,1000ms的窗口大小。统计的是总的请求数与超时的请求数,有兴趣的可以自己去看。
小结本文详细分析了熔断降级的原理与Sentinel中滑动窗口的实现与应用。而LeapArray这个类在许多其他地方也扮演着关键的作用,而熔断模块中对其的使用较为简单,限流、统计模块中有更高级的应用,未完待续。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)