一、首先也是最重要的:(说几个注意事项):
1.由于是maven整合elasticsearch:::所以需要保证maven引入的版本和你的虚拟机或者是docker拉下来运行的版本大版本保持一致—例如:
我就出现了由于docker上安装的版本比较高导致客户端java代码报错的情况。所以我删除了docker内的镜像和容器。重新pull了一个7.8.0版本的elasticsearch
2.注意引入的maven包内的版本不之一的时候,在springboot的配置文件pom的properties标签内新增统一版本的elastic
UTF-8 1.8 1.8 2.2.5.RELEASE 7.2.0 <!--新增-->
二、配置
1.pom文件配置
org.elasticsearch.client elasticsearch-rest-high-level-client7.2.0 org.elasticsearch.client elasticsearch-rest-clientorg.elasticsearch.client elasticsearch-rest-client7.2.0
2.application-dev.yml内的参数配置–下面会使用
elasticsearch1: address: 192.175.1.196:9200
3.ElasticSearch的config配置
package com.client.config; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpHost; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.*; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizevalue; import org.elasticsearch.common.unit.Timevalue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.List; import java.util.stream.Collectors; @Slf4j @Configuration public class ElasticSearchConfig { @Value("${elasticsearch1.address}") private String address; private HttpHost esHttpHost(){ String[] addresses = address.split(":"); int port = Integer.valueOf(addresses[1]); return new HttpHost(addresses[0],port); } @Bean public RestClientBuilder restClientBuilder(){ return RestClient.builder(esHttpHost()); } @Bean(name = "highLevelClient") public RestHighLevelClient highLevelClient(@Autowired RestClientBuilder restClientBuilder){ return new RestHighLevelClient(restClientBuilder); } @Bean(name = "bulkProcessor") public BulkProcessor bulkProcessor(@Qualifier("highLevelClient") @Autowired RestHighLevelClient restHighLevelClient){ BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long l, BulkRequest bulkRequest) { log.info("1.【before builprocessor】批次携带{}请求数量{}",l,bulkRequest.numberOfActions()); } @Override public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) { if(!bulkResponse.hasFailures()){ log.info("2.【after bulk-成功】bulkprocessor - 批量【{}】完成在{}秒!",l,bulkResponse.getIngestTookInMillis()/1000); }else{ BulkItemResponse[] response = bulkResponse.getItems(); for (BulkItemResponse r:response) { if(r.isFailed()){ log.info("2.【after bulk-失败】bulkprocessor - 批量【{}】失败,原因是{}!",l,r.getFailureMessage()); } } } } @Override public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { List> requestList = bulkRequest.requests(); List esIds = requestList.stream().map(DocWriteRequest::id).collect(Collectors.toList()); log.error("3.【afterbulk-failed失败】es执行失败,失败的esid为:{}",esIds,throwable); } }; BulkProcessor.Builder builder = BulkProcessor.builder(((bulkreq,bulkresponselistener)-> restHighLevelClient.bulkAsync(bulkreq, RequestOptions.DEFAULT,bulkresponselistener)),listener); //到达10000时候刷新 builder.setBulkActions(10000); //内存达到8m时候刷新 builder.setBulkSize(new ByteSizevalue(8L, ByteSizeUnit.MB)); //设置时间间隔为10秒 builder.setFlushInterval(Timevalue.timevalueSeconds(10)); //设置允许的并发请求数目 builder.setConcurrentRequests(8); //设置重试策略 builder.setBackoffPolicy(BackoffPolicy.constantBackoff(Timevalue.timevalueSeconds(1),3)); // builder.setGlobalType(""); return builder.build(); } }
4.具体 *** 作
package com.client.controller; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.client.entity.User; import com.client.entity.response.Response; import com.client.entity.response.ResponseLwLog; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.Timevalue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.script; import org.elasticsearch.script.scriptType; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.*; import java.io.IOException; import java.time.LocalDateTime; import java.util.*; @Slf4j @RequestMapping("/testEs") @RestController public class ElasticSearchController { private final RestHighLevelClient restHighLevelClient; private final BulkProcessor bulkProcessor; @Autowired public ElasticSearchController(@Qualifier("highLevelClient") RestHighLevelClient restHighLevelClient, @Qualifier("bulkProcessor") BulkProcessor bulkProcessor) { this.restHighLevelClient = restHighLevelClient; this.bulkProcessor = bulkProcessor; } @GetMapping("/createEs") public ResponseLwLogcreateEs(){ //构建-indexRequest IndexRequest indexRequest = new IndexRequest("people","test"); String json = JSON.toJSONString(new User(10,"小蓝",95)); //设置数据格式 indexRequest.source(json, XContentType.JSON); indexRequest.id("people-1"); //1秒 indexRequest.timeout(Timevalue.timevalueSeconds(1)); indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //存在相同id时是覆盖还是报错 indexRequest.create(true); indexRequest.id("12"); try { restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } return Response.success("success"); } @GetMapping("/esInsertAsync") public void esInsertAsync(){ IndexRequest indexRequest = new IndexRequest("people"); indexRequest.source(JSON.toJSONString(new User(10,"小蓝",95)),XContentType.JSON); indexRequest.timeout(Timevalue.timevalueSeconds(1)); indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //数据存数而不是更新//存在相同id时是覆盖还是报错 indexRequest.create(false); indexRequest.id("19"); restHighLevelClient.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener () { @Override public void onResponse(IndexResponse indexResponse) { ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if(shardInfo.getFailed() > 0){ for (ReplicationResponse.ShardInfo.Failure sharef:shardInfo.getFailures()) { log.error("将id为:{}存入ES时发生失败,原因是:{}",indexRequest.id(),sharef.getCause()); } } } @Override public void onFailure(Exception e) { log.error("数据存入ES时发生异常:{},原因是:{}",indexRequest.id(),e); } }); } @GetMapping("/getEsData") public void saveListEs(){ List userList = Arrays.asList(new User(1,"小明",93), new User(3,"小老",91), new User(7,"小子",91), new User(119,"小路",92), new User(102,"小蓝",95), new User(115,"小刚",96),new User(125,"小紫",196), new User(5,"小小",960),new User(15,"李红",66)); //设置构建 List requestList = new ArrayList<>(); userList.forEach(e->{ IndexRequest indexRequest = new IndexRequest("people"); //id indexRequest.id(e.getId()+""); indexRequest.source(JSON.toJSONString(e),XContentType.JSON); indexRequest.opType(DocWriteRequest.OpType.CREATE); requestList.add(indexRequest); }); requestList.forEach(bulkProcessor::add); } @GetMapping("/updateEs") public void updateDataEs() throws IOException { UpdateRequest updateRequest = new UpdateRequest("people","1"); //通过map-属性-值进行更新 Map map = new HashMap<>(); map.put("name","小明明"); updateRequest.doc(JSON.toJSONString(map),XContentType.JSON); updateRequest.timeout(Timevalue.timevalueSeconds(3)); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //数据存储不是更新 restHighLevelClient.update(updateRequest,RequestOptions.DEFAULT); } @GetMapping("/condUpdateEs") public void updateEsCond() throws IOException { UpdateByQueryRequest update = new UpdateByQueryRequest(); update.indices("people"); //根据ID进行 *** 作 update.setQuery(new TermQueryBuilder("id","115")); //设置 update.setscript(new script(scriptType.INLINE, "painless", "ctx._source.scope=100", Collections.emptyMap())); restHighLevelClient.updateByQuery(update,RequestOptions.DEFAULT); } @GetMapping("/attachEsUpdate") public void attachUpdate(){ List requestList = new ArrayList<>(); //更新的数据 List updateUser = Arrays.asList(new User(3,"小老",91) , new User(7,"小子",91)); // *** 作 updateUser.forEach(u->{ UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("people"); //id updateRequest.id(u.getId()+""); //更新后的数据 Map map = new HashMap<>(); map.put("scope","150"); updateRequest.doc(JSON.toJSONString(map),XContentType.JSON); requestList.add(updateRequest); }); //更新 requestList.forEach(bulkProcessor::add); } @PostMapping("/delEs") public void delEs() throws IOException { DeleteRequest deleteRequest = new DeleteRequest(); deleteRequest.index("people"); deleteRequest.id("102"); restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT); } @PostMapping("/delEsCond") public void selEsCond() throws IOException { DeleteByQueryRequest delete = new DeleteByQueryRequest(); delete.indices("people"); delete.setQuery(new TermQueryBuilder("id",5)); //分词是删除 restHighLevelClient.deleteByQuery(delete,RequestOptions.DEFAULT); } @GetMapping("/selEsInfo") public ResponseLwLog > queryEsList(){ //构建查询参数 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //返回字段 String[] include = {"name","scope","id"}; String[] exclude = {}; searchSourceBuilder.fetchSource(include,exclude); //构建查询条件 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); // searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.termQuery("id",7))); //时间查询范围 // searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.rangeQuery("create_time").from(LocalDateTime.of(2021,11,22,12,12,30)))); // searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.rangeQuery("create_time").to(LocalDateTime.now()))); //分页 Integer size = 50; Integer start = 0; if(start + size > 10000){ throw new RuntimeException("查询数量超过10000"); } searchSourceBuilder.size(size); searchSourceBuilder.from(start); searchSourceBuilder.query(boolQueryBuilder); //构建请求 SearchRequest request = new SearchRequest(); request.source(searchSourceBuilder); //发起请求 SearchResponse response = null; try { response = restHighLevelClient.search(request,RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } //解析返回数据 // long totalhit = response.getHits().getTotalHits().value; log.info("shuju:{}",JSON.toJSONString(response)); List
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)