回顾:TransportSearchAction#executeSearch
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards).start();
1.AbstractSearchAsyncAction
abstract class AbstractSearchAsyncAction extends InitialSearchPhase
AbstractSearchAsyncAction:方法1
/**
* This is the main entry point for a search. This method starts the search execution of the initial phase.
*/
public final void start() {
executePhase(this);
}
AbstractSearchAsyncAction:方法2
根据不同的SearchPhase 的子类执行,比如SearchQueryThenFetchAsyncAction
private void executePhase(SearchPhase phase) {
try {
phase.run();
} catch (Exception e) {
}
}
行父类的父类的方法InitialSearchPhase#run
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
abstract class AbstractSearchAsyncAction extends InitialSearchPhase
abstract class InitialSearchPhase extends SearchPhase
abstract class InitialSearchPhase extends SearchPhase
InitialSearchPhase:方法1
@Override
public final void run() throws IOException {
if (shardsIts.size() > 0) {
// 取最小分片
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
for (int index = 0; index < maxConcurrentShardRequests; index++) {
// 获取分片路由
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
if (!Objects.equals(shardRoutings.shardId().getIndexName(), ".kibana")) {
System.out.println("【search】InitialSearchPhase ->SearchPhase[抽象类] " +
"->CheckedRunnable[接口]#run,执行shard级请求");
}
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}
}
InitialSearchPhase:方法2
关注:onShardResult(result, shardIt)
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
final Thread thread = Thread.currentThread();
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
if (!Objects.equals(result.getSearchShardTarget().getShardId().getIndexName(),".kibana")){
System.out.println("InitialSearchPhase#performPhaseOnShard返回结果 -> 实现接口SearchActionListener#innerOnResponse,");
}
maybeFork(thread, () -> onShardResult(result, shardIt));
}
@Override
public void onFailure(Exception t) {
maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
}
});
} catch (final Exception e) {
}
}
3.SearchQueryThenFetchAsyncAction
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
转发分片,连接信息,节点信息,分片信息,任务信息
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
final SearchActionListener<SearchPhaseResult> listener) {
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}
4.SearchTransportService
public class SearchTransportService extends AbstractComponent
// 转发分片,连接信息,节点信息,分片信息,任务信息
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
final SearchActionListener<SearchPhaseResult> listener) {
// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
// this used to be the QUERY_AND_FETCH which doesn't exist anymore
final boolean fetchDocuments = request.numberOfShards() == 1;
Supplier<SearchPhaseResult> supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
final ActionListener handler = responseWrapper.apply(connection, listener);//比如 SearchExecutionStatsCollector
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(handler, supplier));
}
5.TransportService
public class TransportService extends AbstractLifecycleComponent
TransportService :方法:1
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
final TransportRequest request, final Task parentTask,
final TransportResponseHandler<T> handler) {
sendChildRequest(connection, action, request, parentTask, TransportRequestOptions.EMPTY, handler);
}
TransportService :方法:2
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
request.setParentTask(localNode.getId(), parentTask.getId());
try {
sendRequest(connection, action, request, options, handler);
} catch (TaskCancelledException ex) {
}
}
TransportService :方法:3
异步发送请求
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
asyncSender.sendRequest(connection, action, request, options, handler);
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)