Elasticsearch — 聚合查询,先分组后排序 ,分页

Elasticsearch — 聚合查询,先分组后排序 ,分页,第1张

需求
对明细数据先按 waybillId 分组,再按 eventTime 降序,取最新一条数据。

桶聚合(bucket)
Elasticsearch 桶聚合,目的就是数据分组,先将数据按指定的条件分成多个组,然后对每一个组进行统计。

  1. ES SQL 写法

     {
       "query": {
         "bool": {
           "must": [
             {
               "match_all": {}
             }
           ]
         }
       },
       "aggs": {
         "waybillIdAgg": {
           "terms": {
             "field": "waybillId",
             "size": 1000,
             "min_doc_count": 1
           },
           "aggs": {
             "top1": {
               "top_hits": {
                 "size": 1,
                 "sort": [
                   {
                     "eventTime": {
                       "order": "desc"
                     }
                   }
                 ]
               }
             }
           }
         }
       }
     }
    
  2. Java Elasticsearch 写法及结果解析

     // 查询条件
     BoolQueryBuilder queryBool = QueryBuilders.boolQuery();
    
     BoolQueryBuilder inFilter = new BoolQueryBuilder();
     waybillIds.forEach(
         waybillId -> inFilter.should(QueryBuilders.termQuery("waybillId", waybillId)));
     queryBool.must(inFilter);
    
     // 桶聚合(bucket),按waybillId分组
     TermsAggregationBuilder termsAggregationBuilder =
         AggregationBuilders.terms("waybillIdAgg").field("waybillId").size(1000).minDocCount(1);
     // 嵌套桶,再按时间倒序取第一条数据
     TopHitsAggregationBuilder sort =
         AggregationBuilders.topHits("top1").size(1).sort("eventTime", SortOrder.DESC);
     termsAggregationBuilder.subAggregation(sort);
     // 分页
         SearchSourceBuilder searchSourceBuilder =
             SearchSourceBuilder.searchSource().query(queryBool).aggregation(termsAggregationBuilder)
          	.from(pageIndex <= 1 ? 0 : ((pageIndex - 1) * pageSize)).size(pageSize);
         // 查询请求
     SearchRequest searchRequest = new SearchRequest(esIndexConfig.getIndexNameTrackingDetail());
     searchRequest.source(searchSourceBuilder);
    
     // 执行查询
     SearchResponse searchResponse =
         restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
         
     // 处理聚合查询结果
     Aggregations aggregations = searchResponse.getAggregations();
     Terms terms = aggregations.get("waybillIdAgg");
     List trackingDetails =
         terms.getBuckets().stream()
             .map(
                 t -> {
                   Aggregation top1 = t.getAggregations().get("top1");
                   Optional first =
                       Arrays.stream(((ParsedTopHits) top1).getHits().getHits()).findFirst();
                   if (first.isPresent()) {
                     Map trackingDetailMap = first.get().getSourceAsMap();
                     return BeanUtil.fillBeanWithMap(trackingDetailMap, new TrackingDetail(), false);
                   }
                   return null;
                 })
             .collect(Collectors.toList());
    

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/923362.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存