Sentinel源码阅读(二)​

Sentinel源码阅读(二)​,第1张

Sentinel源码阅读(二)​

上一篇:Sentinel源码阅读(一)

本文主要包括Sentinel的熔断降级与滑动窗口算法部分。

熔断降级

前文说了,每一种规则都是责任链中的一个节点,对应不同的实现类,熔断降级的类就是DegradeSlot,位于com.alibaba.csp.sentinel.slots.block.degrade包下。目录结构如下:

 除了DegradeSlot,还有

CircuitBreaker:断路器,并有异常数断路器ExceptionCircuitBreaker与RT断路器ResponseTimeCircuitBreaker

DegradeException:BlockException的子类,降级抛出的是这个异常类

DegradeRule:降级规则

DegradeSlot的entry方法首先执行performChecking方法,核心逻辑都在这里。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    performChecking(context, resourceWrapper);
​
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
​
void performChecking(Context context, ResourceWrapper r) throws BlockException {
    List circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        return;
    }
    for (CircuitBreaker cb : circuitBreakers) {
        if (!cb.tryPass(context)) {
            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}

第一步是根据资源标识获取断路器list。DegradeRuleManager.getCircuitBreakers是直接从其内部一个map get资源标识映射的断路器的。而这个map的初始化在DegradeRuleManager.RulePropertyListener::reloadFrom方法中

private synchronized void reloadFrom(List list) {
    Map> cbs = buildCircuitBreakers(list);
    Map> rm = new HashMap<>(cbs.size());
​
    for (Map.Entry> e : cbs.entrySet()) {
        assert e.getValue() != null && !e.getValue().isEmpty();
​
        Set rules = new HashSet<>(e.getValue().size());
        for (CircuitBreaker cb : e.getValue()) {
            rules.add(cb.getRule());
        }
        rm.put(e.getKey(), rules);
    }
​
    DegradeRuleManager.circuitBreakers = cbs;
    DegradeRuleManager.ruleMap = rm;
}

DegradeRule是从配置中取的(或者手动构造)不再赘述,这个方法读取DegradeRule列表,并将其转化为CircuitBreaker。转换的方法比较简单,见DegradeRuleManager::newCircuitBreakerFrom。

然后实际上就是遍历断路器列表,执行其tryPass方法判断调用是否能通过,不通过则抛出DegradeException异常。因此下面重点看下断路器CircuitBreaker类。

Sentinel的断路器借鉴了一篇经典文章:CircuitBreaker

总的来说,断路器有三种状态:

open:开启状态(即熔断状态),直接返回false,如果开启时间超过了熔断时间,则转为半开状态

half-open:半开状态,这种状态下会允许下一个请求通过,并对其直接进行异常或RT的校验,而不考虑阈值,如果异常,则继续转为开启状态,如果正常,说明链路恢复了,转为关闭状态。

close:关闭状态,返回true

 Sentinel的实现完全一致

public boolean tryPass(Context context) {
    // Template implementation.
    if (currentState.get() == State.CLOSED) {
        return true;
    }
    if (currentState.get() == State.OPEN) {
        // For half-open state we allow a request for probing.
        return retryTimeoutArrived() && fromOpenToHalfOpen(context);
    }
    return false;
}

Entry在exit时,会调用CircuitBreaker的onRequestComplete方法,取异常或RT,如果在半开状态,若有异常或RT过高,则继续转为开启,否则关闭。以ResponseTimeCircuitBreaker为例

public void onRequestComplete(Context context) {
    SlowRequestCounter counter = slidingCounter.currentWindow().value();
    Entry entry = context.getCurEntry();
    if (entry == null) {
        return;
    }
    long completeTime = entry.getCompleteTimestamp();
    if (completeTime <= 0) {
        completeTime = TimeUtil.currentTimeMillis();
    }
    long rt = completeTime - entry.getCreateTimestamp();
    if (rt > maxAllowedRt) {
        counter.slowCount.add(1);
    }
    counter.totalCount.add(1);
​
    handleStateChangeWhenThresholdExceeded(rt);
}
private void handleStateChangeWhenThresholdExceeded(long rt) {
    if (currentState.get() == State.OPEN) {
        return;
    }
    
    if (currentState.get() == State.HALF_OPEN) {
        // In detecting request
        // TODO: improve logic for half-open recovery
        if (rt > maxAllowedRt) {
            fromHalfOpenToOpen(1.0d);
        } else {
            fromHalfOpenToClose();
        }
        return;
    }
​
    ...
}

最后再讲讲异常数断路器ExceptionCircuitBreaker与RT断路器ResponseTimeCircuitBreaker是如何工作的。

不同断路器要实现的其实就是进行异常的统计,并在状态转换时,进行不同的 *** 作。

两者内部都内部维护了一个LeapArray:

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
​
    private final int strategy;
    private final int minRequestAmount;
    private final double threshold;
​
    private final LeapArray stat;
​
    public ExceptionCircuitBreaker(DegradeRule rule) {
        this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
    }
​
    ExceptionCircuitBreaker(DegradeRule rule, LeapArray stat) {
        super(rule);
        this.strategy = rule.getGrade();
        boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
        AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");
        AssertUtil.notNull(stat, "stat cannot be null");
        this.minRequestAmount = rule.getMinRequestAmount();
        this.threshold = rule.getCount();
        this.stat = stat;
    }
}

LeapArray是一个滑动窗口算法的实现。这个类在Sentinel许多地方都用到了来进行统计,下一节会讲到。每次请求结束时,都会进行统计,将总数与异常数并写入滑动窗口中。以此作为计算是否到达阈值的依据。区别只是判断异常的方式,ExceptionCircuitBreaker根据是否抛出Exception判断,ResponseTimeCircuitBreaker根据记录的RT是否超过阈值判断。

List counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
for (SlowRequestCounter counter : counters) {
    slowCount += counter.slowCount.sum();
    totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
    return;
}
double currentRatio = slowCount * 1.0d / totalCount;
if (currentRatio > maxSlowRequestRatio) {
    transformToOpen(currentRatio);
}
if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 &&
        Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) {
    transformToOpen(currentRatio);
}
滑动窗口-LeapArray

由于在熔断降级这里首先看到这个类,那就在本文讲掉了。

LeapArray是滑动窗口算法的一个实现,用来做数据统计,支持任意窗口大小,任意窗口数量。

public abstract class LeapArray {
    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    private double intervalInSecond;
    protected final AtomicReferenceArray> array;
    private final ReentrantLock updateLock = new ReentrantLock();
}

几个核心变量

intervalInMs:统计的窗口大小,单位毫秒

intervalInSecond:同上,单位秒

sampleCount:一个窗口中的采样次数,或者叫bucket数

windowLengthInMs:每个bucket的长度,计算方式为intervalInMs/sampleCount

(这里吐槽一下命名,为什么不叫bucketLengthInMs呢,歧义很大啊!!!,其他地方也是,window、bucket傻傻分不清楚)

array:实际的窗口,存放统计数据。AtomicReferenceArray是对数组的封装,支持多种线程安全的方法。

updateLock:一个可重入锁,用于保证重置滑动窗口的线程安全。(官方文档中用类似Ringbuffer的环形窗口描述,那么也可以理解为回到起点时)。

在代码中有几个概念:

TimeIdx:就是某时刻对应桶的index。获取方法是取模。

private int calculateTimeIdx( long timeMillis) {
    long timeId = timeMillis / windowLengthInMs;
    // Calculate current index so we can map the timestamp to the leap array.
    return (int)(timeId % array.length());
}

WindowStart:某个桶对应的起始时刻。也很好理解,依靠取模运算。

protected long calculateWindowStart( long timeMillis) {
    return timeMillis - timeMillis % windowLengthInMs;
}

isWindowDeprecated:桶是否过期。如果某个桶的起始时刻已经落后超过一个窗口大小(intervalInMS)则过期了。

public boolean isWindowDeprecated( WindowWrap windowWrap) {
    return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);
}
public boolean isWindowDeprecated(long time, WindowWrap windowWrap) {
    return time - windowWrap.windowStart() > intervalInMs;
}

了解这几个概念后,我们看一下重点的currentWindow方法。在此之前我画一个示意图,便于理解。

||--------|--------|--------|--------|--------||-------- 0 100 200 300 400 500

窗口大小为500,采样5次,每个桶长度为100。

以下是代码:

public WindowWrap currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }
    int idx = calculateTimeIdx(timeMillis);
    long windowStart = calculateWindowStart(timeMillis);
    while (true) {
        WindowWrap old = array.get(idx);
        if (old == null) {
            WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                return window;
            } else {
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            return old;
        } else if (windowStart > old.windowStart()) {
            if (updateLock.tryLock()) {
                try {
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

currentWindow方法的作用是找到或创建该时刻对应的桶(window)。

一行行看:

首先计算了时刻对应的TimeIdx与WindowStart

一个while循环,每次循环开始取Idx对应的桶 (这里产生了一个问题,为什么要while循环?)

判断

如果该桶为null(第一次循环,array还为空时),创建一个新桶返回。这边可能有多个线程同时尝试添加,因此用了乐观锁CAS保证线程安全。

如果该桶windowStart与时刻对应windowStart相同,表明桶已被创建可以直接使用,则直接返回该桶。

如果该桶的windowStart小于时刻的windowStart(由于取模 *** 作,会重新回到头节点,这时桶都是过期的),表明一个新的时间窗口的开启,此时执行resetWindowTo重置该桶。这边直接用了锁来保证重置 *** 作的线程安全性。

我思考后觉得可能会触发多次重置 *** 作,例如两个线程都已经走到了该分支,而一个reset之后马上释放锁,另一个此时刚好正在获取锁,则又会重置一次。因此实现的resetWindowTo方法要保证多次执行不会出问题。当然我感觉一般也不会出什么问题

最后一个分支一般不会走到,除非手动修改,代码里也这么说明的,不赘述了。

值得注意的是,在获取锁失败,或者CAS失败后,都会执行Thread.yield方法,查阅了一下这个方法是一个native方法,会释放当前线程cpu时间片。这下上面提到的为什么要while的问题就豁然开朗了,不就是一个自旋 *** 作吗!!这种代码在业务中很少使用到,还是看开源代码有收获。

另外values方法也值得注意一下:

public List values() {
    return values(TimeUtil.currentTimeMillis());
}
​
public List values(long timeMillis) {
    if (timeMillis < 0) {
        return new ArrayList();
    }
    int size = array.length();
    List result = new ArrayList(size);
​
    for (int i = 0; i < size; i++) {
        WindowWrap windowWrap = array.get(i);
        if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
            continue;
        }
        result.add(windowWrap.value());
    }
    return result;
}

它的作用是返回一整个窗口的数据。需要注意的是,由于窗口是循环利用的(类似环形),因此可能有上一个时间窗口的数据,所以它会调用isWindowDeprecated方法判断是否是当前窗口的数据。

算法应用

那么,算法原理部分讲完了,我们来看看LeapArray在熔断降级部分是如何应用的吧

LeapArray数据存储使用了泛型,比较灵活,以异常数熔断器为例。

static class SimpleErrorCounterLeapArray extends LeapArray {
​
    public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
    }
​
    @Override
    public SimpleErrorCounter newEmptyBucket(long timeMillis) {
        return new SimpleErrorCounter();
    }
​
    @Override
    protected WindowWrap resetWindowTo(WindowWrap w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);
        w.value().reset();
        return w;
    }
}
​
static class SimpleErrorCounter {
    private LongAdder errorCount;
    private LongAdder totalCount;
​
    public SimpleErrorCounter() {
        this.errorCount = new LongAdder();
        this.totalCount = new LongAdder();
    }
​
    public LongAdder getErrorCount() {
        return errorCount;
    }
​
    public LongAdder getTotalCount() {
        return totalCount;
    }
​
    public SimpleErrorCounter reset() {
        errorCount.reset();
        totalCount.reset();
        return this;
    }
}

定义了SimpleErrorCounter类作为数据统计,记录了错误的请求数量与总的请求数量。LongAdder是一个线程安全的高效的数据累加器,在Java concurrent包中。

然后实际使用中,熔断器在构造时会初始化一个LeapArray。

public ExceptionCircuitBreaker(DegradeRule rule) {
    this(rule, new SimpleErrorCounterLeapArray(1, 1000));
}

初始化了一个窗口大小为1000ms,桶数量为1的滑动窗口,也就是以1秒为单位直接统计。这里我也颇有微词,因为这样只统计了当前窗口。假如阈值是1000,第一秒内的后10ms,有900个错误,下一秒的前10ms,有900个错误,那在短短的20ms内,就有了1800个错误,几乎是阈值的两倍,但无法触发降级。如果划分两个桶,每次计算错误比例是靠当前桶与前一个桶加起来计算,就能避免这个问题。当然,或许作者觉得那样实现太麻烦,熔断没有必要那么做,也情有可原。

RT熔断器实现也是差不多的。都是1个桶,1000ms的窗口大小。统计的是总的请求数与超时的请求数,有兴趣的可以自己去看。

小结

本文详细分析了熔断降级的原理与Sentinel中滑动窗口的实现与应用。而LeapArray这个类在许多其他地方也扮演着关键的作用,而熔断模块中对其的使用较为简单,限流、统计模块中有更高级的应用,未完待续。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5721740.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存