【ES源码分析】Transport模块之REST的解析与处理

【ES源码分析】Transport模块之REST的解析与处理,第1张

【ES源码分析】Transport模块之REST的解析与处理

文章目录

Transport模块之REST的解析与处理

注册REST处理处理请求

HTTP请求执行路径

Transport模块之REST的解析与处理

基于ES源码6.7.2

注册REST处理

借由上图回顾一下通信模块的初始化过程,在ActionModule下对Rest请求处理进行了注册,注册过程在initRestHandlers方法中。可以发现对REST请求执行处理的类的命名是Rest*Action,同时可以发现这些处理类都继承了baseRestHandler,而baseRestHandler继承了RestHandler。

    public void initRestHandlers(Supplier nodesInCluster) {
        List catActions = new ArrayList<>();
        Consumer registerHandler = a -> {
            if (a instanceof AbstractCatAction) {
                catActions.add((AbstractCatAction) a);
            }
        };
        registerHandler.accept(new RestMainAction(settings, restController));
        registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter));
        registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController));
        registerHandler.accept(new RestNodesStatsAction(settings, restController));
        registerHandler.accept(new RestNodesUsageAction(settings, restController));
        registerHandler.accept(new RestNodesHotThreadsAction(settings, restController));
        registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController));
        registerHandler.accept(new RestClusterStatsAction(settings, restController));
        registerHandler.accept(new RestClusterStateAction(settings, restController, settingsFilter));
        registerHandler.accept(new RestClusterHealthAction(settings, restController));
        ...
        ...
        ...
        for (ActionPlugin plugin : actionPlugins) {
            for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings,
                    settingsFilter, indexNameexpressionResolver, nodesInCluster)) {
                registerHandler.accept(handler);
            }
        }
        registerHandler.accept(new RestCatAction(settings, restController, catActions));
    }

以RestClusterHealthAction为例,在其构造函数中对请求头中请求方法为GET,URI为/_cluster/health和拥有占位符/_cluster/health/{index}的处理类为this(即自己)。

    public RestClusterHealthAction(Settings settings, RestController controller) {
        super(settings);

        controller.registerHandler(RestRequest.Method.GET, "/_cluster/health", this);
        controller.registerHandler(RestRequest.Method.GET, "/_cluster/health/{index}", this);
    }

由于继承了baseRestHandler,所以必须实现prepareRequest方法,用于在接收到请求时,做一些前置工作,比如验证参数,转换为内部RPC请求等。

处理请求 HTTP请求执行路径

baseRestHandler的handleRequest方法

    @Override
    public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        // 对请求的预处理
        final RestChannelConsumer action = prepareRequest(request, client);

        // validate unconsumed params, but we must exclude params used to format the response
        // use a sorted set so the unconsumed parameters appear in a reliable sorted order
        final SortedSet unconsumedParams =
            request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

        // validate the non-response params
        if (!unconsumedParams.isEmpty()) {
            final Set candidateParams = new HashSet<>();
            candidateParams.addAll(request.consumedParams());
            candidateParams.addAll(responseParams());
            throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
        }

        if (request.hasContent() && request.isContentConsumed() == false) {
            deprecationLogger.deprecated(
                    "request [{} {}] does not support having a body; Elasticsearch 7.x+ will reject such requests",
                    request.method(),
                    request.path());
        }

        usageCount.increment();
        // 实际执行真正的Action
        action.accept(channel);
    }

RestClusterHealthAction的prepareRequest

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    // 构造内部请求clusterHealthRequest
    ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index")));
    clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local()));
    clusterHealthRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterHealthRequest.masterNodeTimeout()));
    clusterHealthRequest.timeout(request.paramAsTime("timeout", clusterHealthRequest.timeout()));
    String waitForStatus = request.param("wait_for_status");
    if (waitForStatus != null) {
        clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT)));
    }
    clusterHealthRequest.waitForNoRelocatingShards(
        request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
    clusterHealthRequest.waitForNoInitializingShards(
        request.paramAsBoolean("wait_for_no_initializing_shards", clusterHealthRequest.waitForNoRelocatingShards()));
    if (request.hasParam("wait_for_relocating_shards")) {
        // wait_for_relocating_shards has been removed in favor of wait_for_no_relocating_shards
        throw new IllegalArgumentException("wait_for_relocating_shards has been removed, " +
                                           "use wait_for_no_relocating_shards [true/false] instead");
    }
    String waitForActiveShards = request.param("wait_for_active_shards");
    if (waitForActiveShards != null) {
        clusterHealthRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
    }
    clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes()));
    if (request.param("wait_for_events") != null) {
        clusterHealthRequest.waitForEvents(Priority.valueOf(request.param("wait_for_events").toUpperCase(Locale.ROOT)));
    }
    // 在AbstractClient中将REST和tcpAction,在ActionModule找到对应的处理类
    return channel -> client.admin().cluster().health(clusterHealthRequest, new RestStatusToXContentListener<>(channel));
}

AbstractClient的health

        @Override
        public void health(final ClusterHealthRequest request, final ActionListener listener) {
            execute(ClusterHealthAction.INSTANCE, request, listener);
        }

之后通过NodeClient.executeLocally将请求发出去。开始转入TCP请求处理了。

public  Task executeLocally(GenericAction action, Request request, ActionListener listener) {
        return transportAction(action).execute(request, listener);
}

可以看出REST应该是在内部通信上有包装了一层。为了简化和方便开发人员通过客户端调用。ES的客户端建议使用REST Client。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5708712.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存