elasticsearch的归并以及重索引

elasticsearch的归并以及重索引,第1张

ES版本

7.9.1

归并索引

将索引的多个segment合并到少数几个segment,以此提高查询性能,适合与查多数据变化少的场景

kibana *** 作
#查询
GET /_cat/segments/kys_recommend?v
#归并 *** 作
POST kys_recommend/_forcemerge?max_num_segments=1
java代码
    @Autowired
    public RestHighLevelClient restHighLevelClient;
	private void forMergeIndex(String indexName) {
	        ForceMergeRequest request = new ForceMergeRequest(indexName);
	        request.maxNumSegments(1);
	        request.flush(true);
	        ActionListener<ForceMergeResponse> listener = new ActionListener<ForceMergeResponse>() {
	            @Override
	            public void onResponse(ForceMergeResponse forceMergeResponse) {
	                logger.info("MergeIndex成功: ->{}", indexName);
	            }
	            @Override
	            public void onFailure(Exception e) {
	                logger.info("MergeIndex失败: ->{}", indexName);
	            }
	        };
	        restHighLevelClient.indices().forcemergeAsync(request, RequestOptions.DEFAULT, listener);
	    }
重索引

将现有索引数据导入另外一个索引,不受数据类型影响

kibana *** 作
#1 重索引
POST _reindex?slices=3&refresh&wait_for_completion=false
{
  "conflicts": "proceed", 
  "source": {
    "index": "adgn-data-2021-08-08",
    "size": 1000
  },
  "dest": {
    "index": "adgn-data-new2021-08-08"
  }
}

#2  3WD6KSa3R2Gme7HQGINL1g:2989403
#查看任务状态
GET /_tasks/3WD6KSa3R2Gme7HQGINL1g:440620019


#3 改别名
POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "adgn-data-new2021-08-08",
        "alias": "adgn-data-2021-08-08"
      }
    }
  ]
}
java代码

传入日期多个索引进行重索引

package com.adgn.timer.timer;

import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @Author sannian
 * @Date 2021/6/17 16:34
 * @Version 1.0
 */
@Component
@EnableScheduling
public class ReindexScheduledReady {
    public final static Logger logger = LoggerFactory.getLogger(ReindexScheduledReady.class);
    final static String indexName = "adgn-data-";
    //原索引日期后缀
    final public static BlockingQueue<String> queue = new LinkedBlockingDeque<>();
    final public static BlockingQueue<Map<String, String>> aliasesQueue = new LinkedBlockingDeque<>();
    @Autowired
    public RestHighLevelClient restHighLevelClient;

//    @Scheduled(cron = "0 0/20 * * * ?")
//    @Async("threadPoolTaskExecutor")
    public void init() {
        try {
            String word = queue.poll();
            logger.info("重索引获取的日期:" + word);
            String oldIndex = indexName + word;
            String newIndex = indexName + "new" + word;
            if (StringUtils.isNotBlank(word)) {
                ReindexRequest request = new ReindexRequest();
                request.setSourceIndices(oldIndex);
                request.setDestIndex(newIndex);
                request.setRefresh(true);
                request.setSlices(3);
                TaskSubmissionResponse reindexSubmission = restHighLevelClient
                        .submitReindexTask(request, RequestOptions.DEFAULT);
                Map<String, String> map = new HashMap<>(1);
                map.put(oldIndex, newIndex);
                aliasesQueue.add(map);
                String taskId = reindexSubmission.getTask();
                toAliase(oldIndex);
                logger.info("taskId->{}", taskId);
            } else {
                toAliase(oldIndex);
            }
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

    private void toAliase(String oldIndex) throws IOException {
        Map<String, String> aliaseMap = aliasesQueue.poll();
        if (aliaseMap != null) {
            if (StringUtils.isNotBlank(aliaseMap.get(oldIndex))) {
                aliasesQueue.add(aliaseMap);
            } else {
                AtomicReference<String> reindexOld = new AtomicReference<>();
                AtomicReference<String> reindexNew = new AtomicReference<>();
                aliaseMap.forEach((k, v) -> {
                    reindexOld.set(k);
                    reindexNew.set(v);
                });
                //查询旧索引数量
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                // 准确计数
                searchSourceBuilder.trackTotalHits(true);
                BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                SearchRequest searchRequest = new SearchRequest(reindexOld.get());
                searchSourceBuilder.timeout(new TimeValue(10, TimeUnit.SECONDS));
                searchSourceBuilder.query(queryBuilder);
                searchRequest.source(searchSourceBuilder);
                SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
                long oldIndexNumber = searchResponse.getHits().getTotalHits().value;
                logger.info("reindexOld ->{},oldIndexNumber->{}", reindexOld, oldIndexNumber);
                //查询新索引数量
                SearchRequest searchRequest2 = new SearchRequest(reindexNew.get());
                searchRequest2.source(searchSourceBuilder);
                SearchResponse searchResponse2 = restHighLevelClient.search(searchRequest2, RequestOptions.DEFAULT);
                long newIndexNumber = searchResponse2.getHits().getTotalHits().value;
                logger.info("reindexNew ->{},newIndexNumber->{}", reindexNew, newIndexNumber);
                logger.info("oldIndexNumber ->{},newIndexNumber->{}", oldIndexNumber, newIndexNumber);
                //当数量相同时
                if (oldIndexNumber == newIndexNumber) {
//                    //给旧索引删除别名
//                    IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
//                    IndicesAliasesRequest.AliasActions aliasAction =
//                            new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE)
//                                    .index(reindexOld.get())
//                                    .alias(reindexOld.get());
//                    aliasesRequest.addAliasAction(aliasAction);
//                    AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);

                    //删除旧索引
                    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(reindexOld.get());
                    restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);

                    //给新索引添加别名
                    IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
                    IndicesAliasesRequest.AliasActions aliasAction =
                            new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
                                    .index(reindexNew.get())
                                    .alias(reindexOld.get());
                    aliasesRequest.addAliasAction(aliasAction);
                    AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
                } else {
                    Map<String, String> map = new HashMap<>(1);
                    map.put(reindexOld.get(), reindexNew.get());
                    aliasesQueue.add(map);
                }
            }
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/759626.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-01
下一篇 2022-05-01

发表评论

登录后才能评论

评论列表(0条)

保存