springboot整合elasticsearch-7.8.0版本----从配置到基础的增删改查

springboot整合elasticsearch-7.8.0版本----从配置到基础的增删改查,第1张

springboot整合elasticsearch-7.8.0版本----从配置到基础的增删改查

一、首先也是最重要的:(说几个注意事项):
1.由于是maven整合elasticsearch:::所以需要保证maven引入的版本和你的虚拟机或者是docker拉下来运行的版本大版本保持一致—例如:
我就出现了由于docker上安装的版本比较高导致客户端java代码报错的情况。所以我删除了docker内的镜像和容器。重新pull了一个7.8.0版本的elasticsearch
2.注意引入的maven包内的版本不之一的时候,在springboot的配置文件pom的properties标签内新增统一版本的elastic

 
    UTF-8
    1.8
    1.8
    2.2.5.RELEASE
    7.2.0<!--新增-->
  

二、配置
1.pom文件配置


    
      org.elasticsearch.client
      elasticsearch-rest-high-level-client
      7.2.0
      
        
          org.elasticsearch.client
          elasticsearch-rest-client
        
      
    

    
      org.elasticsearch.client
      elasticsearch-rest-client
      7.2.0
    

2.application-dev.yml内的参数配置–下面会使用

elasticsearch1:
  address: 192.175.1.196:9200

3.ElasticSearch的config配置

package com.client.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizevalue;
import org.elasticsearch.common.unit.Timevalue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;
import java.util.stream.Collectors;


@Slf4j
@Configuration
public class ElasticSearchConfig {

    @Value("${elasticsearch1.address}")
    private String address;

    
    private HttpHost esHttpHost(){
        String[] addresses = address.split(":");
        int port = Integer.valueOf(addresses[1]);
        return new HttpHost(addresses[0],port);
    }

    
    @Bean
    public RestClientBuilder restClientBuilder(){
        return RestClient.builder(esHttpHost());
    }

    
    @Bean(name = "highLevelClient")
    public RestHighLevelClient highLevelClient(@Autowired RestClientBuilder restClientBuilder){
        return new RestHighLevelClient(restClientBuilder);
    }

    
    @Bean(name = "bulkProcessor")
    public BulkProcessor bulkProcessor(@Qualifier("highLevelClient") @Autowired RestHighLevelClient restHighLevelClient){
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
                log.info("1.【before builprocessor】批次携带{}请求数量{}",l,bulkRequest.numberOfActions());
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                if(!bulkResponse.hasFailures()){
                    log.info("2.【after bulk-成功】bulkprocessor - 批量【{}】完成在{}秒!",l,bulkResponse.getIngestTookInMillis()/1000);
                }else{
                    BulkItemResponse[] response = bulkResponse.getItems();
                    for (BulkItemResponse r:response) {
                        if(r.isFailed()){
                            log.info("2.【after bulk-失败】bulkprocessor - 批量【{}】失败,原因是{}!",l,r.getFailureMessage());
                        }
                    }
                }
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                List> requestList = bulkRequest.requests();
                List esIds = requestList.stream().map(DocWriteRequest::id).collect(Collectors.toList());
                log.error("3.【afterbulk-failed失败】es执行失败,失败的esid为:{}",esIds,throwable);
            }
        };
        BulkProcessor.Builder builder = BulkProcessor.builder(((bulkreq,bulkresponselistener)->
                restHighLevelClient.bulkAsync(bulkreq, RequestOptions.DEFAULT,bulkresponselistener)),listener);
        //到达10000时候刷新
        builder.setBulkActions(10000);
        //内存达到8m时候刷新
        builder.setBulkSize(new ByteSizevalue(8L, ByteSizeUnit.MB));
        //设置时间间隔为10秒
        builder.setFlushInterval(Timevalue.timevalueSeconds(10));
        //设置允许的并发请求数目
        builder.setConcurrentRequests(8);
        //设置重试策略
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(Timevalue.timevalueSeconds(1),3));
//        builder.setGlobalType("");
        return builder.build();
    }

}


4.具体 *** 作

package com.client.controller;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.client.entity.User;
import com.client.entity.response.Response;
import com.client.entity.response.ResponseLwLog;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.Timevalue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.script;
import org.elasticsearch.script.scriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;


@Slf4j
@RequestMapping("/testEs")
@RestController
public class ElasticSearchController {


    private final RestHighLevelClient restHighLevelClient;
    private final BulkProcessor bulkProcessor;

    @Autowired
    public ElasticSearchController(@Qualifier("highLevelClient") RestHighLevelClient restHighLevelClient,
                                   @Qualifier("bulkProcessor") BulkProcessor bulkProcessor) {
        this.restHighLevelClient = restHighLevelClient;
        this.bulkProcessor = bulkProcessor;
    }

    
    @GetMapping("/createEs")
    public ResponseLwLog createEs(){
        //构建-indexRequest
        IndexRequest indexRequest = new IndexRequest("people","test");

        String json = JSON.toJSONString(new User(10,"小蓝",95));
        //设置数据格式
        indexRequest.source(json, XContentType.JSON);
        indexRequest.id("people-1");
        //1秒
        indexRequest.timeout(Timevalue.timevalueSeconds(1));
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        //存在相同id时是覆盖还是报错
        indexRequest.create(true);
        indexRequest.id("12");
        try {
            restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return Response.success("success");
    }

    @GetMapping("/esInsertAsync")
    public void esInsertAsync(){
        IndexRequest indexRequest = new IndexRequest("people");
        indexRequest.source(JSON.toJSONString(new User(10,"小蓝",95)),XContentType.JSON);
        indexRequest.timeout(Timevalue.timevalueSeconds(1));
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        //数据存数而不是更新//存在相同id时是覆盖还是报错
        indexRequest.create(false);
        indexRequest.id("19");
        restHighLevelClient.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if(shardInfo.getFailed() > 0){
                    for (ReplicationResponse.ShardInfo.Failure sharef:shardInfo.getFailures()) {
                        log.error("将id为:{}存入ES时发生失败,原因是:{}",indexRequest.id(),sharef.getCause());
                    }
                }
            }

            @Override
            public void onFailure(Exception e) {
                log.error("数据存入ES时发生异常:{},原因是:{}",indexRequest.id(),e);
            }
        });
    }

    
    @GetMapping("/getEsData")
    public void saveListEs(){
        List userList = Arrays.asList(new User(1,"小明",93),
                new User(3,"小老",91), new User(7,"小子",91),
                new User(119,"小路",92), new User(102,"小蓝",95),
                new User(115,"小刚",96),new User(125,"小紫",196),
        new User(5,"小小",960),new User(15,"李红",66));
        //设置构建
        List requestList = new ArrayList<>();
        userList.forEach(e->{
            IndexRequest indexRequest = new IndexRequest("people");
            //id
            indexRequest.id(e.getId()+"");
            indexRequest.source(JSON.toJSONString(e),XContentType.JSON);
            indexRequest.opType(DocWriteRequest.OpType.CREATE);
            requestList.add(indexRequest);
        });
        requestList.forEach(bulkProcessor::add);
    }

    
    @GetMapping("/updateEs")
    public void updateDataEs() throws IOException {
        UpdateRequest updateRequest = new UpdateRequest("people","1");
        //通过map-属性-值进行更新
        Map map = new HashMap<>();
        map.put("name","小明明");
        updateRequest.doc(JSON.toJSONString(map),XContentType.JSON);
        updateRequest.timeout(Timevalue.timevalueSeconds(3));
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        //数据存储不是更新
        restHighLevelClient.update(updateRequest,RequestOptions.DEFAULT);
    }

    
    @GetMapping("/condUpdateEs")
    public void updateEsCond() throws IOException {
        UpdateByQueryRequest update = new UpdateByQueryRequest();
        update.indices("people");
        //根据ID进行 *** 作
        update.setQuery(new TermQueryBuilder("id","115"));
        //设置
        update.setscript(new script(scriptType.INLINE,
                "painless",
                "ctx._source.scope=100",
                Collections.emptyMap()));
        restHighLevelClient.updateByQuery(update,RequestOptions.DEFAULT);
    }

    
    @GetMapping("/attachEsUpdate")
    public void attachUpdate(){
        List requestList = new ArrayList<>();
        //更新的数据
        List updateUser = Arrays.asList(new User(3,"小老",91)
                , new User(7,"小子",91));
        // *** 作
        updateUser.forEach(u->{
            UpdateRequest updateRequest = new UpdateRequest();
            updateRequest.index("people");
            //id
            updateRequest.id(u.getId()+"");
            //更新后的数据
            Map map = new HashMap<>();
            map.put("scope","150");

            updateRequest.doc(JSON.toJSONString(map),XContentType.JSON);
            requestList.add(updateRequest);
        });
        //更新
        requestList.forEach(bulkProcessor::add);
    }

    
    @PostMapping("/delEs")
    public void delEs() throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest();
        deleteRequest.index("people");
        deleteRequest.id("102");
        restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT);
    }

    
    @PostMapping("/delEsCond")
    public void selEsCond() throws IOException {
        DeleteByQueryRequest delete = new DeleteByQueryRequest();
        delete.indices("people");
        delete.setQuery(new TermQueryBuilder("id",5));
        //分词是删除
        restHighLevelClient.deleteByQuery(delete,RequestOptions.DEFAULT);
    }

    
    @GetMapping("/selEsInfo")
    public ResponseLwLog> queryEsList(){
        //构建查询参数
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //返回字段
        String[] include = {"name","scope","id"};
        String[] exclude = {};
        searchSourceBuilder.fetchSource(include,exclude);
        //构建查询条件
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//        searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.termQuery("id",7)));
        //时间查询范围
//        searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.rangeQuery("create_time").from(LocalDateTime.of(2021,11,22,12,12,30))));
//        searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.rangeQuery("create_time").to(LocalDateTime.now())));
        //分页
        Integer size = 50;
        Integer start = 0;
        if(start + size > 10000){
            throw new RuntimeException("查询数量超过10000");
        }
        searchSourceBuilder.size(size);
        searchSourceBuilder.from(start);
        searchSourceBuilder.query(boolQueryBuilder);
        //构建请求
        SearchRequest request = new SearchRequest();
        request.source(searchSourceBuilder);
        //发起请求
        SearchResponse response = null;

        try {
            response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        //解析返回数据
//        long totalhit = response.getHits().getTotalHits().value;
        log.info("shuju:{}",JSON.toJSONString(response));
        List> mapList = Lists.newArrayListWithCapacity(response.getHits().getHits().length);
        Arrays.stream(response.getHits().getHits()).forEach(his->mapList.add(his.getSourceAsMap()));
        List userList = new ArrayList<>();
        if(CollectionUtils.isNotEmpty(mapList)){
            userList = JSONArray.parseArray(JSON.toJSONString(mapList),User.class);
        }
        return Response.success(userList);
    }



}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5612279.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存