废话不说 先上pom依赖
<这个是统一的依赖管理>
<下面是pom依赖>
elasticsearch-rest-high-level-client
elasticsearch
elasticsearch-rest-client
elasticsearch
elasticsearch-rest-client
commons-logging
fastjson
我这边用的是多模块SpringBoot 这个是service的es依赖
相信码友们不需要其他的依赖了
依赖弄完之后我们开始Service的代码了
首先思路 是先到Mysql中查询到我们想要的数据
然后将查询到的数据转换成json数据然后es就可以进行存储了
代码如下
package com.tm.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class config {
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
COMMON_OPTIONS = builder.build();
}
@Bean
public static RestHighLevelClient esRestClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
//集群配置法
new HttpHost("192.168.206.133",19200,"http")));
return client;
}
}
上面的是连接es的配置
package com.tm.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.tm.config.config;
import com.tm.mapper.EsSyncGoodsSpuMapper;
import com.tm.model.entity.EsSyncGoodsSpuEntity;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
//这三个注解是定时器的
@EnableScheduling
@Component
@Configuration
public class EsSyncGoodsSpuServiceImpl {
//注入一下
@Resource
private EsSyncGoodsSpuMapper esSyncGoodsSpuMapper;
//这个定时器注解就是每过去5秒执行一下这个aaa的方法
@Scheduled(cron = "0/5 * * * * ?")
public void aaa() {
//这边是查询mysql的数据
List
//调用高层对象
RestHighLevelClient restHighLevelClient = config.esRestClient();
//然后我这边使用forEach循环将数据添加到es中
list.forEach(a -> {
//创建一个索引请求(这里面写的是我们想要添加的索引)
IndexRequest index = new IndexRequest("goods_spu");
//这边是获取到我们查询得到的数据将这个查询的id当成我们es中_id(不要es自带的)
index.id(a.getSpuId().toString());
//创建批量 *** 作对象
BulkRequest request = new BulkRequest();
//这里我将查询到的数据循环转换成json
index.source(JSONObject.toJSonString(a), XContentType.JSON);
//将转换成json的数据添加到我们创建的对象中去
request.add(index);
try {
//将数据通过bulk *** 作进入es。。
restHighLevelClient.bulk(request, config.COMMON_OPTIONS);
} catch (Exception e) {
e.printStackTrace();
}
//打印......
System.out.println(list);
});
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)