Transport模块之REST的解析与处理
注册REST处理处理请求
HTTP请求执行路径
Transport模块之REST的解析与处理注册REST处理基于ES源码6.7.2
借由上图回顾一下通信模块的初始化过程,在ActionModule下对Rest请求处理进行了注册,注册过程在initRestHandlers方法中。可以发现对REST请求执行处理的类的命名是Rest*Action,同时可以发现这些处理类都继承了baseRestHandler,而baseRestHandler继承了RestHandler。
public void initRestHandlers(SuppliernodesInCluster) { List catActions = new ArrayList<>(); Consumer registerHandler = a -> { if (a instanceof AbstractCatAction) { catActions.add((AbstractCatAction) a); } }; registerHandler.accept(new RestMainAction(settings, restController)); registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter)); registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController)); registerHandler.accept(new RestNodesStatsAction(settings, restController)); registerHandler.accept(new RestNodesUsageAction(settings, restController)); registerHandler.accept(new RestNodesHotThreadsAction(settings, restController)); registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController)); registerHandler.accept(new RestClusterStatsAction(settings, restController)); registerHandler.accept(new RestClusterStateAction(settings, restController, settingsFilter)); registerHandler.accept(new RestClusterHealthAction(settings, restController)); ... ... ... for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter, indexNameexpressionResolver, nodesInCluster)) { registerHandler.accept(handler); } } registerHandler.accept(new RestCatAction(settings, restController, catActions)); }
以RestClusterHealthAction为例,在其构造函数中对请求头中请求方法为GET,URI为/_cluster/health和拥有占位符/_cluster/health/{index}的处理类为this(即自己)。
public RestClusterHealthAction(Settings settings, RestController controller) { super(settings); controller.registerHandler(RestRequest.Method.GET, "/_cluster/health", this); controller.registerHandler(RestRequest.Method.GET, "/_cluster/health/{index}", this); }
由于继承了baseRestHandler,所以必须实现prepareRequest方法,用于在接收到请求时,做一些前置工作,比如验证参数,转换为内部RPC请求等。
处理请求 HTTP请求执行路径baseRestHandler的handleRequest方法
@Override public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { // 对请求的预处理 final RestChannelConsumer action = prepareRequest(request, client); // validate unconsumed params, but we must exclude params used to format the response // use a sorted set so the unconsumed parameters appear in a reliable sorted order final SortedSetunconsumedParams = request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new)); // validate the non-response params if (!unconsumedParams.isEmpty()) { final Set candidateParams = new HashSet<>(); candidateParams.addAll(request.consumedParams()); candidateParams.addAll(responseParams()); throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter")); } if (request.hasContent() && request.isContentConsumed() == false) { deprecationLogger.deprecated( "request [{} {}] does not support having a body; Elasticsearch 7.x+ will reject such requests", request.method(), request.path()); } usageCount.increment(); // 实际执行真正的Action action.accept(channel); }
RestClusterHealthAction的prepareRequest
@Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { // 构造内部请求clusterHealthRequest ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index"))); clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local())); clusterHealthRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterHealthRequest.masterNodeTimeout())); clusterHealthRequest.timeout(request.paramAsTime("timeout", clusterHealthRequest.timeout())); String waitForStatus = request.param("wait_for_status"); if (waitForStatus != null) { clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT))); } clusterHealthRequest.waitForNoRelocatingShards( request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards())); clusterHealthRequest.waitForNoInitializingShards( request.paramAsBoolean("wait_for_no_initializing_shards", clusterHealthRequest.waitForNoRelocatingShards())); if (request.hasParam("wait_for_relocating_shards")) { // wait_for_relocating_shards has been removed in favor of wait_for_no_relocating_shards throw new IllegalArgumentException("wait_for_relocating_shards has been removed, " + "use wait_for_no_relocating_shards [true/false] instead"); } String waitForActiveShards = request.param("wait_for_active_shards"); if (waitForActiveShards != null) { clusterHealthRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes())); if (request.param("wait_for_events") != null) { clusterHealthRequest.waitForEvents(Priority.valueOf(request.param("wait_for_events").toUpperCase(Locale.ROOT))); } // 在AbstractClient中将REST和tcpAction,在ActionModule找到对应的处理类 return channel -> client.admin().cluster().health(clusterHealthRequest, new RestStatusToXContentListener<>(channel)); }
AbstractClient的health
@Override public void health(final ClusterHealthRequest request, final ActionListenerlistener) { execute(ClusterHealthAction.INSTANCE, request, listener); }
之后通过NodeClient.executeLocally将请求发出去。开始转入TCP请求处理了。
publicTask executeLocally(GenericAction action, Request request, ActionListener listener) { return transportAction(action).execute(request, listener); }
可以看出REST应该是在内部通信上有包装了一层。为了简化和方便开发人员通过客户端调用。ES的客户端建议使用REST Client。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)