目录
1.什么是canal
2.canal工作原理
3.下载安装
4.连接配置kafka
1.打开config目录下的canal.properties文件
2.修改相关配置
3.mysql相关配置
3.1验证binary log日志是否开启
4.测试kafka
5.canal实现同步Elasticsearch
1.添加依赖
2.编写代码
1.什么是canal
2.canal工作原理canal是阿里开源针对mysql的一个数据实时同步的一个框架,可以实现无代码,简单配置即可完成数据库数据的同步到kafka,rabbitMQ等消息,增加,修改,删除
3.下载安装canal的工作原理:其实就是mysql的主从复制,只不过canal不是真正的从节点,而是伪装成mysql的slave从节点,模拟mysql从节点的交互方式,给主节点发送dump请求,主节点接收到了之后,会把binary log日志推送给canal,然后canal进行解析,从而发送到存储目的地
下载canal.deployer服务端
canal下载地址,下载1.14,1.15都可以,但是1.15才能支持es7.x以上的版本注意一下
这里以1.15为例,下载解压后
bin目录下有个startup.bat 双击即可打开
4.连接配置kafka首先要修改几个配置
1.打开config目录下的canal.properties文件 2.修改相关配置2.1 往下拉会看见关于kafka相关的配置
2.2 在config目录下有个example文件,点击后会看见instance.properties文件,打开配置一下kafka的主题,以下 *** 作都是在instance.properties文件里面
2.3如果数据库不在本地,记得修改下这里
2.4如果没有其他的设置,这里的username/password要修改为数据库的账户和密码
2.5这里的slaveId要打开,值不要和mysql配置的一样就好
3.mysql相关配置
这里以mysql5.7.20解压版本为例
解压版本目录下是没有my.ini文件,自行创建一个就好,具体解压版要如何安装自己百度看下,这里只说一下canal相关的配置
因为mysql默认是不开binary log日志的,所以第一步要开启binary log日志,在my.ini配置一下就好,打开my.ini,务必在[mysqld]下面添加这四行,不在mysqld下面添加不生效,server-id=1如果有可以不加
log-bin=mysql-bin # 开启Binlog 一般只需要修改这一行即可 binlog-format=ROW # 设置格式 此行可以不加 命令设置即可 详见下方拓展 binlog-do-db=test # 监控mysql中test这个库下面的所有表 server_id=1 # 配置serverID3.1验证binary log日志是否开启
要重启mysql不然配置的不生效,再mysql的bin路径下
D:mysql-5.7.20-winx64bin 输入如下命令,要用管理员的方式进cmd命令
首先应该先关闭mysql服务
net stop mysql
然后再开启mysql服务
net start mysql
然后登录mysql看下日志是否开启
mysql -uroot -p show variables like 'log_bin';
开启之后就可以开始测试了!
4.测试kafka首先要下载安装kafka,然后打开kafka,进入之前在canal配置文件里面配置的topic主题监控
需要在kafka的路径下输入,D:kafka_2.11-2.4.1
打开zookeeper .binwindowszookeeper-server-start.bat configzookeeper.properties 打开kafka .binwindowskafka-server-start.bat .configserver.properties 进入topic .binwindowskafka-console-consumer.bat --bootstrap-server 10.88.40.156:9092 --topic topic2 --from-beginning
在数据库添加一条数据
去topic看下是否发送到了
ok结束,canal数据同步到kafka结束
5.canal实现同步Elasticsearch
1.添加依赖canal1.15才支持es7.x以上
记得修改canal的config目录下的canal.properties文件
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp #如果是es修改为tcp模式
2.编写代码org.springframework.boot spring-boot-starter-data-elasticsearchcom.alibaba fastjson1.2.76 org.jsoup jsoup1.13.1 com.alibaba.otter canal.client1.1.4
canal实现数据库发生增加,修改和删除就会被获取到
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.mybatisplus.entity.Content; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @Component public class CanalClient { //@PostConstruct项目启动是就会加载这个方法,用这个注解可以实现此方法一直属于运行的状态,只要数据库发生增加,修改和删除就会被获取到 @PostConstruct public Listcanal() throws InvalidProtocolBufferException { List contentList = new ArrayList<>(); //获取连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",11111), "example","",""); //连接 canalConnector.connect(); //订阅数据库 canalConnector.subscribe("test.*"); //获取数据,一次抓取100条,没有100条就全部抓完,有多少抓多少,并不会堵塞 Message message = canalConnector.get(100); //获取Entry集合 List entries = message.getEntries(); //如果集合为空,就等待几秒 if (!entries.isEmpty()){ //遍历entries逐个解析 for (CanalEntry.Entry entry : entries) { //1.获取表名 String tableName = entry.getHeader().getTableName(); //2.获取获取类型 CanalEntry.EntryType entryType = entry.getEntryType(); //3.获取序列化后的数据 ByteString storevalue = entry.getStorevalue(); //4.判断当前的数据类型是否为rowData类型 if (CanalEntry.EntryType.ROWDATA.equals(entryType)){ //反序列化数据 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue); //获取当前事件的类型 CanalEntry.EventType eventType = rowChange.getEventType(); //获取数据集 List rowDataList = rowChange.getRowDatasList(); //遍历rowDataList数据集并打印 for (CanalEntry.RowData rowdata: rowDataList) { //修改之后的数据 JSonObject afterData = new JSonObject(); List afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { //name字段名,value是字段名对应的值 afterData.put(column.getName(),column.getValue()); //将JSONObject类型的数据转化为content对象 Content content = afterData.toJavaObject(Content.class); contentList.add(content); } System.out.println("表名:"+tableName+",类型"+eventType+",after:"+contentList); } } } } return contentList; } }
连接Elasticsearch,写个config类
import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ElasticsearchConfig { @Bean public RestHighLevelClient restHighLevelClient(){ RestHighLevelClient restHighLevelClient = new RestHighLevelClient( RestClient.builder(new HttpHost("localhost",9200,"http")) ); return restHighLevelClient; } }
解析网页数据,拿到数据放到数据库,从而实现数据库变化并实时同步到es索引库中
import com.mybatisplus.entity.Content; import org.jsoup.Jsoup; import org.jsoup.nodes.document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import org.springframework.stereotype.Component; import java.io.IOException; import java.net.URL; import java.util.ArrayList; import java.util.List; @Component public class HtmlParseUtil { public ListparseJD(String keyword) throws IOException { List contentList = new ArrayList<>(); //获得请求 String url = "https://search.xxxxx.com/Search?keyword="+keyword; //解析地址,如果30s内解析不到就会报错 // jsoup返回的 document 对象就是 js浏览器里面的document对象 document parse = Jsoup.parse(new URL(url), 30000); //先获取id=J_goodsList的大的一个商品div Element elementById = parse.getElementById("J_goodsList"); //再获取包含一个个商品的li标签 Elements li = elementById.getElementsByTag("li"); //循环这个li,这里的li包含的就是一个个商品,商品有价格,名称,店铺名,图片地址等等 for (Element element : li) { //为了用户体验,一般大厂的网页图片都懒加载,刚开始只会加载一个默认图片,正在的图片地址在这个data-lazy-img 里面,而不是在src里面 String img = element.getElementsByTag("img").eq(0).attr("data-lazy-img"); String price = element.getElementsByClass("p-price").eq(0).text(); String name = element.getElementsByClass("p-name").eq(0).text(); String shop = element.getElementsByClass("p-shopnum").eq(0).text(); Content content = new Content(); content.setImg(img); content.setPrice(price); content.setTitle(name); content.setShop(shop); contentList.add(content); } return contentList; }
实体类
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import lombok.Data; @Data public class Content { //myBatis-plus 的id自增 @TableId(type = IdType.AUTO) private Integer id; private String title; private String price; private String img; private String shop; }
mapper层代码
import com.baomidou.mybatisplus.core.mapper.baseMapper; import com.mybatisplus.entity.Content; import org.springframework.stereotype.Repository; @Repository public interface ContentMapper extends baseMapper{ }
service代码
import com.alibaba.fastjson.JSON; import com.mybatisplus.canal.CanalClient; import com.mybatisplus.entity.Content; import com.mybatisplus.mapper.ContentMapper; import com.mybatisplus.until.HtmlParseUtil; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.core.Timevalue; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xcontent.XContentType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @Service public class ContentService { @Autowired private RestHighLevelClient client; @Autowired private ContentMapper contentMapper; @Autowired private HtmlParseUtil htmlParseUtil; @Autowired private CanalClient canalClient; //将解析的网页数据保存到数据库中 public void insertData(String keyword) throws IOException { ListcontentList = htmlParseUtil.parseJD(keyword); for (Content content : contentList) { contentMapper.insert(content); } } //将数据库中的数据更新到es索引库中 public boolean saveES(String keyword) throws IOException { //网页解析存到数据库 insertData(keyword); //通过canal拿到新增的数据 List canal = canalClient.canal(); //数据库批量插入到es索引库中 BulkRequest bulkRequest = new BulkRequest("jd","jd_goods"); bulkRequest.timeout(); for (Content content : canal) { bulkRequest.add(new IndexRequest("jd").id(String.valueOf(content.getId())).source(JSON.toJSonString(content), XContentType.JSON)); } BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT); //批量插入成功返回false,所以加个非! return !bulk.hasFailures(); } //查询数据,分页 public List
controller层
import com.mybatisplus.service.ContentService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import java.io.IOException; import java.util.List; import java.util.Map; @RestController public class ContentController { @Autowired private ContentService contentService; //添加数据 @GetMapping("/add/{keyword}") public boolean add(@PathVariable("keyword") String keyword) throws IOException { return contentService.saveES(keyword); } //查询数据 @GetMapping("/search/{keyword}/{pageNo}/{pageSize}") public List> search(@PathVariable("keyword") String keyword, @PathVariable("pageNo") Integer pageNo, @PathVariable("pageSize") Integer pageSize) throws IOException { return contentService.search(keyword, pageNo, pageSize); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)