elasticsearch源码关于TransportSearchAction【阶段二】

elasticsearch源码关于TransportSearchAction【阶段二】,第1张

回顾:TransportSearchAction#executeSearch

searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
    Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards).start();
1.AbstractSearchAsyncAction

abstract class AbstractSearchAsyncAction extends InitialSearchPhase
AbstractSearchAsyncAction:方法1

/**
 * This is the main entry point for a search. This method starts the search execution of the initial phase.
 */
public final void start() {
    executePhase(this);
}

AbstractSearchAsyncAction:方法2
根据不同的SearchPhase 的子类执行,比如SearchQueryThenFetchAsyncAction

  private void executePhase(SearchPhase phase) {
        try {
            phase.run(); 
        } catch (Exception e) {
        }
    }

行父类的父类的方法InitialSearchPhase#run
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
abstract class AbstractSearchAsyncAction extends InitialSearchPhase
abstract class InitialSearchPhase extends SearchPhase

2.InitialSearchPhase

abstract class InitialSearchPhase extends SearchPhase
InitialSearchPhase:方法1

 @Override
    public final void run() throws IOException {
        if (shardsIts.size() > 0) {
            // 取最小分片
            int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
            final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
            assert success;
            for (int index = 0; index < maxConcurrentShardRequests; index++) {
                // 获取分片路由
                final SearchShardIterator shardRoutings = shardsIts.get(index);
                assert shardRoutings.skip() == false;
                if (!Objects.equals(shardRoutings.shardId().getIndexName(), ".kibana")) {
                    System.out.println("【search】InitialSearchPhase ->SearchPhase[抽象类] " +
                        "->CheckedRunnable[接口]#run,执行shard级请求");
                }
                performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
            }
        }
    }

InitialSearchPhase:方法2
关注:onShardResult(result, shardIt)

 private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {

        final Thread thread = Thread.currentThread();
            try {
                executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
                    shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
                    @Override
                    public void innerOnResponse(FirstResult result) {
                        if (!Objects.equals(result.getSearchShardTarget().getShardId().getIndexName(),".kibana")){
                            System.out.println("InitialSearchPhase#performPhaseOnShard返回结果 -> 实现接口SearchActionListener#innerOnResponse,");
                        }
                        maybeFork(thread, () -> onShardResult(result, shardIt));
                    }

                    @Override
                    public void onFailure(Exception t) {
                        maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
                    }
                });
            } catch (final Exception e) {
            }
        }
3.SearchQueryThenFetchAsyncAction

final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
转发分片,连接信息,节点信息,分片信息,任务信息

 protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
                                       final SearchActionListener<SearchPhaseResult> listener) {
        getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
            buildShardSearchRequest(shardIt), getTask(), listener);
    }
4.SearchTransportService

public class SearchTransportService extends AbstractComponent

  //  转发分片,连接信息,节点信息,分片信息,任务信息
    public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
                                 final SearchActionListener<SearchPhaseResult> listener) {
        // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
        // this used to be the QUERY_AND_FETCH which doesn't exist anymore
        final boolean fetchDocuments = request.numberOfShards() == 1;
        Supplier<SearchPhaseResult> supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;

        final ActionListener handler = responseWrapper.apply(connection, listener);//比如 SearchExecutionStatsCollector
        transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
            new ActionListenerResponseHandler<>(handler, supplier));
    }
5.TransportService

public class TransportService extends AbstractLifecycleComponent
TransportService :方法:1

   public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
                                                               final TransportRequest request, final Task parentTask,
                                                               final TransportResponseHandler<T> handler) {
        sendChildRequest(connection, action, request, parentTask, TransportRequestOptions.EMPTY, handler);
    }

TransportService :方法:2

public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
                                                               final TransportRequest request, final Task parentTask,
                                                               final TransportRequestOptions options,
                                                               final TransportResponseHandler<T> handler) {
        request.setParentTask(localNode.getId(), parentTask.getId());
        try {
            sendRequest(connection, action, request, options, handler);
        } catch (TaskCancelledException ex) {
        }

    }

TransportService :方法:3
异步发送请求

 public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                TransportResponseHandler<T> handler) {

        asyncSender.sendRequest(connection, action, request, options, handler);
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/792598.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存