sentinel源码分析

sentinel源码分析,第1张

为什么需要sentinel
  1. 微服务架构,但服务之间存在很强的依赖,如何避免下层服务挂掉后,不影响上层服务呢?
  2. 如何防止雪崩呢?
  3. 秒杀场景,服务器的性能都是有上线的,如何避免大流量,高并发的请求不拖垮服务,并返回给用户友好的界面反馈呢?
示例

客户端代码,使用了限流的get请求,并抛出异常

@RestController
@RequestMapping("/sentinel")
public class SentinelController {

    static {
      	// 初始化规则
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource("hello");
        rule1.setCount(2);
        rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
        rule1.setLimitApp("default");
        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }

    @GetMapping("hello")
    public String hello() throws BlockException {
      	//限流核心代码
        Entry entry = SphU.entry("hello");
        return "sucess";
    }
}

使用postman进行简单测试

sentinel的概念 基本概念
  1. 面向分布式服务架构的流量控制组件
  2. 以流量为切入点,从流量控制 熔断降级 系统负载保护等多维度保护服务的稳定性
  3. 应用场景:秒杀 流量削峰填谷 集群流量控制 实时熔断下游不可用应用等
组成部分
  • 核心库
  • 控制台(dashboard)后台负责监控配置规则等
核心结构

Sentinel 的核心骨架,将不同的 Slot 按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和规则判断部分(rule checking)

重要概念 资源
  • Java程序中的任何内容,比如提供或调用的服务、方法 、一段代码等 。
  • 通过Sentinel API 定义包围的代码就是资源,可以使用方法签名、URL 、服务名称作为资源名来表示资源
规则
  • 包括流量控制规则、熔断降级规则、系统保护规则、来源访问控制规则和 热点参数规则
  • 规则是可以动态实时调整的(通过控制台,nacos,zk,apollo等配置中心…)
  • 持久化规则:实现datasource接口,自定义规则的存储数据源
修改规则方法
  • 通过 API 直接修改 (loadRules),仅保存在内存中
  • 通过 DataSource 适配不同数据源修改,保存在外部数据源中(比如可以保存到apollo中,并且从apollo中拉取数据)
Slot
  • Sentinel 的工作流程就是围绕着插槽来展开的, slot按一定的顺序执行,形成责任链的模式

  • 通过 SlotChainBuilder 作为 SPI 接口, 以通过实现 SlotsChainBuilder 接口加入自定义的 slot 并自定义编排各个 slot 之间的顺序,从而可以给 Sentinel 添加自定义的功能

  • 每个资源有一个全局的slot chain

context

维护着当前调用链的元数据

  • entranceNode:当前调用链的入口节点
  • curEntry:当前调用链的当前entry
  • node:与当前entry所对应的curNode
  • origin:当前调用链的调用源
Node

保存资源的实时统计数据,例如:passQps,blockQps,rt等实时数据

功能 流量控制


流量控制(flow control),其原理是监控应用流量的 QPS 或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。
FlowSlot 会根据预设的规则,结合前面 NodeSelectorSlot、ClusterNodeBuilderSlot、StatisticSlot 统计出来的实时信息进行流量控制。
限流抛出的异常是FlowException

策略
  • 基于QPS的流量控制

  • 基于并发数的流量控制

熔断降级


在调用链路中某个资源出现不稳定状态时(例如调用超时或异常比例升高),对这个资源的调用进行限制,让请求快速失败,降低调用链路中的不稳定性
熔断抛出的异常是DegradeException
根据请求响应时间或异常比例

策略
  • 平均响应时间

  • 异常比率

  • 异常数

系统自适应限流

系统维度的自适应保护能力,防止雪崩 ,让系统的入口流量和系统的负责达到一个平衡
异常是SystemBlockException

策略

热点参数限流

异常是ParamFlowException

来源访问控制

根据调用方来限制资源是否通过,可以使用 Sentinel 的黑白名单控制的功能。黑白名单根据资源的请求来源(origin)限制资源是否通过,若配置白名单则只有请求来源位于白名单内时才可通过;若配置黑名单则请求来源位于黑名单时不通过,其余的请求通过。

步骤
  1. 定义资源(基于框架 注解 等)
  2. 定义规则(流量控制规则 熔断降级规则 系统保护规则 访问控制规则 热点规则 )
  3. 检查规则是否生效(父类异常BlockException)
源码分析

开源项目的结构:

实现限流熔断的主要源码

com.alibaba.csp.sentinel.CtSph#entryWithPriority(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, boolean, java.lang.Object…)

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
    throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
        // so here init the entry only. No rule checking will be done.
        return new CtEntry(resourceWrapper, null, context);
    }

    if (context == null) {
        // Using default context.
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }

    // Global switch is close, no rule checking will do.
    if (!Constants.ON) {
        return new CtEntry(resourceWrapper, null, context);
    }

    // 重点 此处是获取责任链(通过spi自定义solt)
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

    /*
     * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
     * so no rule checking will be done.
     */
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        // 异常向上一层抛 异常的处理由业务层处理
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}
获取责任链

最终获取责任链的源码:com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder#build

public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
		// 通过spi获取定义的slot 
    List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
    for (ProcessorSlot slot : sortedSlotList) {
        if (!(slot instanceof AbstractLinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }

        chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
    }

    return chain;
}
FlowSlot
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }
	   // 集群限流
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    // 本地单个节点的限流
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                      boolean prioritized) {
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }
		// 根据配置的规则来获取相应的控制器
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
控制台

主要作用就是实时数据的监控和规则的配置(如果没有引入控制台,也可以通过日志来查看请求限流数据,默认日志文件路径:${user.home}/logs/csp/)

控制台启动
  1. GitHub上下载源码,进行打包,运行脚本:java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar
  2. GitHub上下载源码,配置启动参数:-Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard ,直接运行main方法
客户端配置
  1. 客户端启动参数加上:-Dcsp.sentinel.dashboard.server=localhost:8080
  2. 通过http实现,引入jar:
<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-transport-simple-http</artifactId>
</dependency>
控制台和客户端的通信原理

底层通过Socket进行通讯。ps:客户端需要进行资源的请求,才能开始与控制台建立联系

客户端建立通讯的源码
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
    throws BlockException {
  	// 请求资源时 初始化Env实例中的静态方法
    return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}
public class Env {

    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        // 下面方法中进行通讯的一些初始化
        InitExecutor.doInit();
    }

}
public static void doInit() {
    if (!initialized.compareAndSet(false, true)) {
        return;
    }
    try {
    		// 通过spi,获取InitFunc的所有实现类
        List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
        List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
        for (InitFunc initFunc : initFuncs) {
            RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
            insertSorted(initList, initFunc);
        }
        // 通过上面排序后得到initList里面的实现类是:
        // CommandCenterInitFunc,HeartbeatSenderInitFunc,MetricCallbackInit
        for (OrderWrapper w : initList) {
          	// 分别调用上面三个对象的初始化方法
            w.func.init();
            RecordLog.info("[InitExecutor] Executing {} with order {}",
                w.func.getClass().getCanonicalName(), w.order);
        }
    } catch (Exception ex) {
        RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
        ex.printStackTrace();
    } catch (Error error) {
        RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
        error.printStackTrace();
    }
}

其中CommandCenterInitFunc,HeartbeatSenderInitFunc实现客户端和控制台之间的通信,MetricCallbackInit主要实现统计slot的记录。下面依次分析CommandCenterInitFunc,HeartbeatSenderInitFunc,MetricCallbackInit

CommandCenterInitFunc.init()

控制台请求处理的入口

public void init() throws Exception {
  	// 获取到实现类 SimpleHttpCommandCenter
    CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();

    if (commandCenter == null) {
        RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
        return;
    }
		// 为SimpleHttpCommandCenter中的静态变量Map handlerMap赋值,
  	// 将所有的CommandHandler实现类放到handlerMap中,CommandHandler的实现类根据command不同处理不同的socket请求,使用了策略模式
    commandCenter.beforeStart();
  	// start方法中主要建立socket连接,然后通过异步线程去执行command的业务代码,下面具体讲解start()源码
    commandCenter.start();
    RecordLog.info("[CommandCenterInit] Starting command center: "
            + commandCenter.getClass().getCanonicalName());
}
// ****重点****
public void start() throws Exception {
    int nThreads = Runtime.getRuntime().availableProcessors();
    // bizExecutor是具体业务执行的线程池
    this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(10),
        new NamedThreadFactory("sentinel-command-center-service-executor"),
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                CommandCenterLog.info("EventTask rejected");
                throw new RejectedExecutionException();
            }
        });

    Runnable serverInitTask = new Runnable() {
    	  // socket通信的端口号
        int port;

        {
            try {
                port = Integer.parseInt(TransportConfig.getPort());
            } catch (Exception e) {
                port = DEFAULT_PORT;
            }
        }

        @Override
        public void run() {
            boolean success = false;
            // 建立ServerSocket通信
            ServerSocket serverSocket = getServerSocketFromBasePort(port);

            if (serverSocket != null) {
                CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
                socketReference = serverSocket;
              	// 创建一个异步线程,执行该异步线程,ServerThread线程主要时刻获取http请求,然后执行任务
                executor.submit(new ServerThread(serverSocket));
                success = true;
                port = serverSocket.getLocalPort();
            } else {
                CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
            }

            if (!success) {
                port = PORT_UNINITIALIZED;
            }

            TransportConfig.setRuntimePort(port);
            executor.shutdown();
        }

    };

    new Thread(serverInitTask).start();
}
class ServerThread extends Thread {

    private ServerSocket serverSocket;

    ServerThread(ServerSocket s) {
        this.serverSocket = s;
        setName("sentinel-courier-server-accept-thread");
    }

    @Override
    public void run() {
      	// 此处重点,通过while(true)时刻获取请求数据
        while (true) {
            Socket socket = null;
            try {
                socket = this.serverSocket.accept();
                setSocketSoTimeout(socket);
              	// 创建线程HttpEventTask,并且将该线程放入业务线程池 
                HttpEventTask eventTask = new HttpEventTask(socket);
                bizExecutor.submit(eventTask);
            } catch (Exception e) {
                CommandCenterLog.info("Server error", e);
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (Exception e1) {
                        CommandCenterLog.info("Error when closing an opened socket", e1);
                    }
                }
                try {
                    // In case of infinite log.
                    Thread.sleep(10);
                } catch (InterruptedException e1) {
                    // Indicates the task should stop.
                    break;
                }
            }
        }
    }
}
@Override
public void run() {
    if (socket == null) {
        return;
    }

    PrintWriter printWriter = null;
    InputStream inputStream = null;
    try {
        long start = System.currentTimeMillis();
        inputStream = new BufferedInputStream(socket.getInputStream());
        OutputStream outputStream = socket.getOutputStream();

        printWriter = new PrintWriter(
            new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));

        String firstLine = readLine(inputStream);
        CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
            + ", addr: " + socket.getInetAddress());
        CommandRequest request = processQueryString(firstLine);

        if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
            // Deal with post method
            processPostRequest(inputStream, request);
        }

        // Validate the target command.
        String commandName = HttpCommandUtils.getTarget(request);
        if (StringUtil.isBlank(commandName)) {
            writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
            return;
        }

        // Find the matching command handler.
      	// 重点是在这里 ,前端的代码主要是获取请求参数,然后解析出commandName
      	// 从前面beforeStart()方法中产生的handlerMap 中根据commandName获取handler
        CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
        if (commandHandler != null) {	
          	// 执行handler的handle方法,主要业务逻辑的实现部分
            CommandResponse<?> response = commandHandler.handle(request);
          	// 将业务处理接口的数据返回到response中
            handleResponse(response, printWriter);
        } else {
            // No matching command handler.
            writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
        }

        long cost = System.currentTimeMillis() - start;
        CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
            + ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
    } catch (RequestException e) {
        writeResponse(printWriter, e.getStatusCode(), e.getMessage());
    } catch (Throwable e) {
        CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
        try {
            if (printWriter != null) {
                String errorMessage = SERVER_ERROR_MESSAGE;
                e.printStackTrace();
                if (!writtenHead) {
                    writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
                } else {
                    printWriter.println(errorMessage);
                }
                printWriter.flush();
            }
        } catch (Exception e1) {
            CommandCenterLog.warn("Failed to write error response", e1);
        }
    } finally {
        closeResource(inputStream);
        closeResource(printWriter);
        closeResource(socket);
    }
}

总结上面代码主要功能

  1. 将所有的CommandHandler(策略模式)放入一个map中
  2. 异步线程建立socket通信
  3. 通过参数commandName来获取相应的handler
  4. handler处理结果,比如实时监控数据,链路数据的处理返回
HeartbeatSenderInitFunc.init()

主要实现心跳上报到控制台

public void init() {
  	// 获取心跳发送的实现类 SimpleHttpHeartbeatSender
    HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
    if (sender == null) {
        RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
        return;
    }
		// 创建一个定时任务线程池
    initSchedulerIfNeeded();
  	// 获取心跳发送时间间隔,默认时间10秒,可通过配置csp.sentinel.heartbeat.interval.ms来配置
    long interval = retrieveInterval(sender);
    setIntervalIfNotExists(interval);
  	// 发送心跳
    scheduleHeartbeatTask(sender, interval);
}
private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
    pool.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
              	// 每隔interval毫秒执行一次发送心跳
                sender.sendHeartbeat();
            } catch (Throwable e) {
                RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
            }
        }
    }, 5000, interval, TimeUnit.MILLISECONDS);
    RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
        + sender.getClass().getCanonicalName());
}
public boolean sendHeartbeat() throws Exception {
    if (TransportConfig.getRuntimePort() <= 0) {
        RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat");
        return false;
    }
  	// 封装了控制台的host post等信息
    Endpoint addrInfo = getAvailableAddress();
    if (addrInfo == null) {
        return false;
    }
		// 控制台接受心跳请求地址默认为:/registry/machine,也可通过配置项csp.sentinel.heartbeat.api.path配置
    SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
  	// 发送请求参数
    request.setParams(heartBeat.generateCurrentMessage());
    try {
      	// 发送post请求到控制台
        SimpleHttpResponse response = httpClient.post(request);
        if (response.getStatusCode() == OK_STATUS) {
            return true;
        } else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
            RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo
                + ", http status code: " + response.getStatusCode());
        }
    } catch (Exception e) {
        RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo, e);
    }
    return false;
}

总结上面代码主要功能:

  1. 创建一个定时任务线程池(根据配置的时间间隔定时请求控制台接口)
  2. 将心跳数据通过post请求到控制台(控制台接受心跳请求地址默认为:/registry/machine,可自定义配置)
控制台功能 实时监控
  1. 前端每隔10秒请求控制台后端接口获取监控数据

  2. 控制台后端通过定时任务,每隔1秒请求客户端获取统计数据,commandName=“metric”

  3. 客户端通过commandName="metric"指定SendMetricCommandHandler来处理控制台的请求

    上面第2步骤的源码

执行方法:com.alibaba.csp.sentinel.dashboard.metric.MetricFetcher#start

private void start() {
  	// 定时任务线程池,每隔intervalSecond=1 秒执行调客户端获取实时数据
    fetchScheduleService.scheduleAtFixedRate(() -> {
        try {
          	// 该方法中,为每个客户端创建一个线程执行doFetchAppMetric方法
          	// 在doFetchAppMetric方法中,又通过异步线程调用方法fetchOnce(app, finalLastFetchMs, finalEndTime, 5);
            fetchAllApp();
        } catch (Exception e) {
            logger.info("fetchAllApp error:", e);
        }
    }, 10, intervalSecond, TimeUnit.SECONDS);
}
private void fetchOnce(String app, long startTime, long endTime, int maxWaitSeconds) {
    ...
    // 定义计数器 数量为客户端总机器数
    final CountDownLatch latch = new CountDownLatch(machines.size());
    // 循环遍历每个客户端机器
    for (final MachineInfo machine : machines) {
        // auto remove
        if (machine.isDead()) {
            latch.countDown();
            appManagement.getDetailApp(app).removeMachine(machine.getIp(), machine.getPort());
            logger.info("Dead machine removed: {}:{} of {}", machine.getIp(), machine.getPort(), app);
            continue;
        }
        if (!machine.isHealthy()) {
            latch.countDown();
            unhealthy.incrementAndGet();
            continue;
        }
      	// 请求客户端 METRIC_URL_PATH = "metric"  ,即commandName="metric"
        final String url = "http://" + machine.getIp() + ":" + machine.getPort() + "/" + METRIC_URL_PATH
            + "?startTime=" + startTime + "&endTime=" + endTime + "&refetch=" + false;
        final HttpGet httpGet = new HttpGet(url);
        httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
        httpclient.execute(httpGet, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(final HttpResponse response) {
                try {
                  	// 封装返回结果
                    handleResponse(response, machine, metricMap);
                    success.incrementAndGet();
                } catch (Exception e) {
                    logger.error(msg + " metric " + url + " error:", e);
                } finally {
                    latch.countDown();
                }
            }

            ...
        });
    }
    try {
        // 重点 在maxWaitSeconds的时间范围内,线程阻塞直到计数器为0,即对所有的客户端机器请求好。
        latch.await(maxWaitSeconds, TimeUnit.SECONDS);
    } catch (Exception e) {
        logger.info(msg + " metric, wait http client error:", e);
    }
    long cost = System.currentTimeMillis() - start;
    //logger.info("finished " + msg + " metric for " + app + ", time intervalMs [" + startTime + ", " + endTime
    //    + "], total machines=" + machines.size() + ", dead=" + dead + ", fetch success="
    //    + success + ", fetch fail=" + fail + ", time cost=" + cost + " ms");
  	// 将客户端返回的统计数据记录到内存中,所存储的对象:InMemoryMetricsRepository#allMetrics
    writeMetric(metricMap);
}
簇点链路

控制台通过异步执行http请求到客户端 ,客户端处理的commandName=“jsonTree”,对应的handler是FetchJsonTreeCommandHandler。下面是控制台发出请求的源码方法是:com.alibaba.csp.sentinel.dashboard.client.SentinelApiClient#executeCommand(java.lang.String, java.lang.String, int, java.lang.String, java.util.Map, boolean)。上面方法返回一个CompletableFuture,通过get()获取返回结果

规则配置(没有外接动态数据源)


查询规则

@GetMapping("/rules")
@AuthAction(PrivilegeType.READ_RULE)
public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app,
                                                         @RequestParam String ip,
                                                         @RequestParam Integer port) {

    ...
    try {
    		// 从客户端查询规则
        List<FlowRuleEntity> rules = sentinelApiClient.fetchFlowRuleOfMachine(app, ip, port);
        // 保存规则于控制台内(先清除控制台内的规则数据,再将客户端查询返回的规则重新保存)
        rules = repository.saveAll(rules);
        return Result.ofSuccess(rules);
    } catch (Throwable throwable) {
        logger.error("Error when querying flow rules", throwable);
        return Result.ofThrowable(-1, throwable);
    }
}

新建规则

@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<AuthorityRuleEntity> apiAddAuthorityRule(@RequestBody AuthorityRuleEntity entity) {
    Result<AuthorityRuleEntity> checkResult = checkEntityInternal(entity);
    ...
    try {
    		// 本地保存规则
        entity = repository.save(entity);
    } catch (Throwable throwable) {
        logger.error("Failed to add authority rule", throwable);
        return Result.ofThrowable(-1, throwable);
    }
    // publishRules推动规则到客户端
    if (!publishRules(entity.getApp(), entity.getIp(), entity.getPort())) {
        logger.info("Publish authority rules failed after rule add");
    }
    return Result.ofSuccess(entity);
}
  1. sentinel官方文档:https://sentinelguard.io/zh-cn/docs/quick-start.html
  2. sentinel和hystrix:https://mp.weixin.qq.com/s/D8RKfnzofM-br_y4fTLIaA
  3. ognl官网:https://commons.apache.org/proper/commons-ognl/language-guide.html

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/732813.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-27
下一篇 2022-04-27

发表评论

登录后才能评论

评论列表(0条)

保存