7.9.1
归并索引将索引的多个segment合并到少数几个segment,以此提高查询性能,适合与查多数据变化少的场景
kibana *** 作#查询
GET /_cat/segments/kys_recommend?v
#归并 *** 作
POST kys_recommend/_forcemerge?max_num_segments=1
java代码
@Autowired
public RestHighLevelClient restHighLevelClient;
private void forMergeIndex(String indexName) {
ForceMergeRequest request = new ForceMergeRequest(indexName);
request.maxNumSegments(1);
request.flush(true);
ActionListener<ForceMergeResponse> listener = new ActionListener<ForceMergeResponse>() {
@Override
public void onResponse(ForceMergeResponse forceMergeResponse) {
logger.info("MergeIndex成功: ->{}", indexName);
}
@Override
public void onFailure(Exception e) {
logger.info("MergeIndex失败: ->{}", indexName);
}
};
restHighLevelClient.indices().forcemergeAsync(request, RequestOptions.DEFAULT, listener);
}
重索引
将现有索引数据导入另外一个索引,不受数据类型影响
kibana *** 作#1 重索引
POST _reindex?slices=3&refresh&wait_for_completion=false
{
"conflicts": "proceed",
"source": {
"index": "adgn-data-2021-08-08",
"size": 1000
},
"dest": {
"index": "adgn-data-new2021-08-08"
}
}
#2 3WD6KSa3R2Gme7HQGINL1g:2989403
#查看任务状态
GET /_tasks/3WD6KSa3R2Gme7HQGINL1g:440620019
#3 改别名
POST /_aliases
{
"actions": [
{
"add": {
"index": "adgn-data-new2021-08-08",
"alias": "adgn-data-2021-08-08"
}
}
]
}
java代码
传入日期多个索引进行重索引
package com.adgn.timer.timer;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @Author sannian
* @Date 2021/6/17 16:34
* @Version 1.0
*/
@Component
@EnableScheduling
public class ReindexScheduledReady {
public final static Logger logger = LoggerFactory.getLogger(ReindexScheduledReady.class);
final static String indexName = "adgn-data-";
//原索引日期后缀
final public static BlockingQueue<String> queue = new LinkedBlockingDeque<>();
final public static BlockingQueue<Map<String, String>> aliasesQueue = new LinkedBlockingDeque<>();
@Autowired
public RestHighLevelClient restHighLevelClient;
// @Scheduled(cron = "0 0/20 * * * ?")
// @Async("threadPoolTaskExecutor")
public void init() {
try {
String word = queue.poll();
logger.info("重索引获取的日期:" + word);
String oldIndex = indexName + word;
String newIndex = indexName + "new" + word;
if (StringUtils.isNotBlank(word)) {
ReindexRequest request = new ReindexRequest();
request.setSourceIndices(oldIndex);
request.setDestIndex(newIndex);
request.setRefresh(true);
request.setSlices(3);
TaskSubmissionResponse reindexSubmission = restHighLevelClient
.submitReindexTask(request, RequestOptions.DEFAULT);
Map<String, String> map = new HashMap<>(1);
map.put(oldIndex, newIndex);
aliasesQueue.add(map);
String taskId = reindexSubmission.getTask();
toAliase(oldIndex);
logger.info("taskId->{}", taskId);
} else {
toAliase(oldIndex);
}
} catch (Exception e) {
logger.error(e.getMessage());
}
}
private void toAliase(String oldIndex) throws IOException {
Map<String, String> aliaseMap = aliasesQueue.poll();
if (aliaseMap != null) {
if (StringUtils.isNotBlank(aliaseMap.get(oldIndex))) {
aliasesQueue.add(aliaseMap);
} else {
AtomicReference<String> reindexOld = new AtomicReference<>();
AtomicReference<String> reindexNew = new AtomicReference<>();
aliaseMap.forEach((k, v) -> {
reindexOld.set(k);
reindexNew.set(v);
});
//查询旧索引数量
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 准确计数
searchSourceBuilder.trackTotalHits(true);
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
SearchRequest searchRequest = new SearchRequest(reindexOld.get());
searchSourceBuilder.timeout(new TimeValue(10, TimeUnit.SECONDS));
searchSourceBuilder.query(queryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
long oldIndexNumber = searchResponse.getHits().getTotalHits().value;
logger.info("reindexOld ->{},oldIndexNumber->{}", reindexOld, oldIndexNumber);
//查询新索引数量
SearchRequest searchRequest2 = new SearchRequest(reindexNew.get());
searchRequest2.source(searchSourceBuilder);
SearchResponse searchResponse2 = restHighLevelClient.search(searchRequest2, RequestOptions.DEFAULT);
long newIndexNumber = searchResponse2.getHits().getTotalHits().value;
logger.info("reindexNew ->{},newIndexNumber->{}", reindexNew, newIndexNumber);
logger.info("oldIndexNumber ->{},newIndexNumber->{}", oldIndexNumber, newIndexNumber);
//当数量相同时
if (oldIndexNumber == newIndexNumber) {
// //给旧索引删除别名
// IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
// IndicesAliasesRequest.AliasActions aliasAction =
// new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE)
// .index(reindexOld.get())
// .alias(reindexOld.get());
// aliasesRequest.addAliasAction(aliasAction);
// AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
//删除旧索引
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(reindexOld.get());
restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
//给新索引添加别名
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
.index(reindexNew.get())
.alias(reindexOld.get());
aliasesRequest.addAliasAction(aliasAction);
AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
} else {
Map<String, String> map = new HashMap<>(1);
map.put(reindexOld.get(), reindexNew.get());
aliasesQueue.add(map);
}
}
}
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)