{ "size": 0, "aggs": { "all_speaker": { "terms": { "field": "speaker", "size": 5 }, "aggs": { "avg_speech": { "avg": { "field": "speech_number" } } } } } }
第一步:首先会使用org.elasticsearch.search.aggregations.LeafBucketCollector#collect 方法搜集所有的all_speaker桶中的数据
return new LeafBucketCollector() { int lastDoc = 0; @Override public void collect(int doc, long bucket) throws IOException { if (context == null) { context = ctx; docDeltasBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT); bucketsBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT); } docDeltasBuilder.add(doc - lastDoc); bucketsBuilder.add(bucket); lastDoc = doc; maxBucket = Math.max(maxBucket, bucket); } };
第二步:重放收集到的数据,只处理上轮中幸存下的桶及 selectedBuckets
@Override public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { if (finished == false) { throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called"); } if (this.selectedBuckets != null) { throw new IllegalStateException("Already been replayed"); } final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE); for (long bucket : selectedBuckets) { hash.add(bucket); } this.selectedBuckets = hash; boolean needsScores = scoreMode().needsScores(); Weight weight = null; if (needsScores) { Query query = isGlobal ? new MatchAllDocsQuery() : searchContext.query(); weight = searchContext.searcher().createWeight(searchContext.searcher().rewrite(query), ScoreMode.COMPLETE, 1f); } for (Entry entry : entries) { assert entry.docDeltas.size() > 0 : "segment should have at least one document to replay, got 0"; try { final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); DocIdSetIterator scoreIt = null; if (needsScores) { Scorer scorer = weight.scorer(entry.context); // We don't need to check if the scorer is null // since we are sure that there are documents to replay (entry.docDeltas it not empty). scoreIt = scorer.iterator(); leafCollector.setScorer(scorer); } final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator(); final PackedLongValues.Iterator buckets = entry.buckets.iterator(); int doc = 0; for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) { doc += docDeltaIterator.next(); final long bucket = buckets.next(); final long rebasedBucket = hash.find(bucket); if (rebasedBucket != -1) { if (needsScores) { if (scoreIt.docID() < doc) { scoreIt.advance(doc); } // aggregations should only be replayed on matching documents assert scoreIt.docID() == doc; } leafCollector.collect(doc, rebasedBucket); } } } catch (CollectionTerminatedException e) { // collection was terminated prematurely // continue with the following leaf } } collector.postCollection(); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)