[会写代码的健身爱好者成长史]之阿里canal数据实时同步-附kafka和Elasticsearch实现

[会写代码的健身爱好者成长史]之阿里canal数据实时同步-附kafka和Elasticsearch实现,第1张

[会写代码的健身爱好者成长史]之阿里canal数据实时同步-附kafka和Elasticsearch实现

目录

 

1.什么是canal

2.canal工作原理

3.下载安装

 4.连接配置kafka

1.打开config目录下的canal.properties文件

 2.修改相关配置

 3.mysql相关配置

3.1验证binary log日志是否开启

4.测试kafka

5.canal实现同步Elasticsearch

1.添加依赖

2.编写代码


1.什么是canal

canal是阿里开源针对mysql的一个数据实时同步的一个框架,可以实现无代码,简单配置即可完成数据库数据的同步到kafka,rabbitMQ等消息,增加,修改,删除

2.canal工作原理

canal的工作原理:其实就是mysql的主从复制,只不过canal不是真正的从节点,而是伪装成mysql的slave从节点,模拟mysql从节点的交互方式,给主节点发送dump请求,主节点接收到了之后,会把binary log日志推送给canal,然后canal进行解析,从而发送到存储目的地

3.下载安装

下载canal.deployer服务端

canal下载地址,下载1.14,1.15都可以,但是1.15才能支持es7.x以上的版本注意一下

这里以1.15为例,下载解压后

bin目录下有个startup.bat 双击即可打开

 4.连接配置kafka

首先要修改几个配置

1.打开config目录下的canal.properties文件

 2.修改相关配置

2.1 往下拉会看见关于kafka相关的配置

2.2 在config目录下有个example文件,点击后会看见instance.properties文件,打开配置一下kafka的主题,以下 *** 作都是在instance.properties文件里面

 2.3如果数据库不在本地,记得修改下这里

 2.4如果没有其他的设置,这里的username/password要修改为数据库的账户和密码

 2.5这里的slaveId要打开,值不要和mysql配置的一样就好

 

 3.mysql相关配置

这里以mysql5.7.20解压版本为例

解压版本目录下是没有my.ini文件,自行创建一个就好,具体解压版要如何安装自己百度看下,这里只说一下canal相关的配置

 因为mysql默认是不开binary log日志的,所以第一步要开启binary log日志,在my.ini配置一下就好,打开my.ini,务必在[mysqld]下面添加这四行,不在mysqld下面添加不生效,server-id=1如果有可以不加

log-bin=mysql-bin # 开启Binlog 一般只需要修改这一行即可
binlog-format=ROW # 设置格式 此行可以不加 命令设置即可 详见下方拓展
binlog-do-db=test # 监控mysql中test这个库下面的所有表
server_id=1  # 配置serverID 
3.1验证binary log日志是否开启

要重启mysql不然配置的不生效,再mysql的bin路径下

D:mysql-5.7.20-winx64bin  输入如下命令,要用管理员的方式进cmd命令

首先应该先关闭mysql服务

net stop mysql

然后再开启mysql服务

net start mysql

然后登录mysql看下日志是否开启

mysql -uroot -p
show variables like 'log_bin';

 开启之后就可以开始测试了!

4.测试kafka

首先要下载安装kafka,然后打开kafka,进入之前在canal配置文件里面配置的topic主题监控

需要在kafka的路径下输入,D:kafka_2.11-2.4.1

打开zookeeper
.binwindowszookeeper-server-start.bat configzookeeper.properties
打开kafka
.binwindowskafka-server-start.bat .configserver.properties
进入topic
.binwindowskafka-console-consumer.bat --bootstrap-server 10.88.40.156:9092 --topic topic2 --from-beginning

在数据库添加一条数据

 去topic看下是否发送到了

 ok结束,canal数据同步到kafka结束


5.canal实现同步Elasticsearch

canal1.15才支持es7.x以上

记得修改canal的config目录下的canal.properties文件

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp  #如果是es修改为tcp模式

1.添加依赖
        
        
            org.springframework.boot
            spring-boot-starter-data-elasticsearch
        
        
        
            com.alibaba
            fastjson
            1.2.76
        
        
        
            org.jsoup
            jsoup
            1.13.1
        
        
        
            com.alibaba.otter
            canal.client
            1.1.4
        
2.编写代码
canal实现数据库发生增加,修改和删除就会被获取到
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mybatisplus.entity.Content;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

@Component
public class CanalClient  {
    //@PostConstruct项目启动是就会加载这个方法,用这个注解可以实现此方法一直属于运行的状态,只要数据库发生增加,修改和删除就会被获取到
    @PostConstruct
    public  List canal() throws InvalidProtocolBufferException {
        List contentList = new ArrayList<>();
        //获取连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",11111),
                "example","","");
        //连接
        canalConnector.connect();
        //订阅数据库
        canalConnector.subscribe("test.*");
        //获取数据,一次抓取100条,没有100条就全部抓完,有多少抓多少,并不会堵塞
        Message message = canalConnector.get(100);
        //获取Entry集合
        List entries = message.getEntries();
        //如果集合为空,就等待几秒
        if (!entries.isEmpty()){
            //遍历entries逐个解析
            for (CanalEntry.Entry entry : entries) {
                //1.获取表名
                String tableName = entry.getHeader().getTableName();
                //2.获取获取类型
                CanalEntry.EntryType entryType = entry.getEntryType();
                //3.获取序列化后的数据
                ByteString storevalue = entry.getStorevalue();
                //4.判断当前的数据类型是否为rowData类型
                if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                    //反序列化数据
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue);
                    //获取当前事件的类型
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    //获取数据集
                    List rowDataList = rowChange.getRowDatasList();
                    //遍历rowDataList数据集并打印
                    for (CanalEntry.RowData rowdata: rowDataList) {
                        //修改之后的数据
                        JSonObject afterData = new JSonObject();
                        List afterColumnsList = rowData.getAfterColumnsList();
                        for (CanalEntry.Column column : afterColumnsList) {
                            //name字段名,value是字段名对应的值
                            afterData.put(column.getName(),column.getValue());
                            //将JSONObject类型的数据转化为content对象
                            Content content = afterData.toJavaObject(Content.class);
                            contentList.add(content);
                        }
                        System.out.println("表名:"+tableName+",类型"+eventType+",after:"+contentList);
                    }
                }
            }
        }
        return contentList;
    }
}

连接Elasticsearch,写个config类

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ElasticsearchConfig {

    @Bean
    public RestHighLevelClient restHighLevelClient(){
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
                RestClient.builder(new HttpHost("localhost",9200,"http"))
        );
        return restHighLevelClient;
    }
}

 解析网页数据,拿到数据放到数据库,从而实现数据库变化并实时同步到es索引库中

import com.mybatisplus.entity.Content;
import org.jsoup.Jsoup;
import org.jsoup.nodes.document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;


@Component
public class HtmlParseUtil {

    public List parseJD(String keyword) throws IOException {
        List contentList = new ArrayList<>();
        //获得请求
        String url = "https://search.xxxxx.com/Search?keyword="+keyword;
        //解析地址,如果30s内解析不到就会报错
        // jsoup返回的 document 对象就是 js浏览器里面的document对象
        document parse = Jsoup.parse(new URL(url), 30000);
        //先获取id=J_goodsList的大的一个商品div
        Element elementById = parse.getElementById("J_goodsList");
        //再获取包含一个个商品的li标签
        Elements li = elementById.getElementsByTag("li");
        //循环这个li,这里的li包含的就是一个个商品,商品有价格,名称,店铺名,图片地址等等
        for (Element element : li) {
            //为了用户体验,一般大厂的网页图片都懒加载,刚开始只会加载一个默认图片,正在的图片地址在这个data-lazy-img 里面,而不是在src里面
            String img = element.getElementsByTag("img").eq(0).attr("data-lazy-img");
            String price = element.getElementsByClass("p-price").eq(0).text();
            String name = element.getElementsByClass("p-name").eq(0).text();
            String shop = element.getElementsByClass("p-shopnum").eq(0).text();
            Content content = new Content();
            content.setImg(img);
            content.setPrice(price);
            content.setTitle(name);
            content.setShop(shop);
            contentList.add(content);
        }
        return contentList;
    }

 实体类

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.Data;


@Data
public class Content {
    //myBatis-plus 的id自增
    @TableId(type = IdType.AUTO)
    private Integer id;
    private String title;
    private String price;
    private String img;
    private String shop;
}

 mapper层代码

import com.baomidou.mybatisplus.core.mapper.baseMapper;
import com.mybatisplus.entity.Content;
import org.springframework.stereotype.Repository;


@Repository
public interface ContentMapper extends baseMapper {

}

service代码

import com.alibaba.fastjson.JSON;
import com.mybatisplus.canal.CanalClient;
import com.mybatisplus.entity.Content;
import com.mybatisplus.mapper.ContentMapper;
import com.mybatisplus.until.HtmlParseUtil;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.Timevalue;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;


@Service
public class ContentService {

    @Autowired
    private RestHighLevelClient client;

    @Autowired
    private ContentMapper contentMapper;

    @Autowired
    private HtmlParseUtil htmlParseUtil;

    @Autowired
    private CanalClient canalClient;

    //将解析的网页数据保存到数据库中
    public void insertData(String keyword) throws IOException {
        List contentList = htmlParseUtil.parseJD(keyword);
        for (Content content : contentList) {
            contentMapper.insert(content);
        }
    }

    //将数据库中的数据更新到es索引库中
    public boolean saveES(String keyword) throws IOException {
        //网页解析存到数据库
        insertData(keyword);
        //通过canal拿到新增的数据
        List canal = canalClient.canal();
        //数据库批量插入到es索引库中
        BulkRequest bulkRequest = new BulkRequest("jd","jd_goods");
        bulkRequest.timeout();
        for (Content content : canal) {
            bulkRequest.add(new IndexRequest("jd").id(String.valueOf(content.getId())).source(JSON.toJSonString(content), XContentType.JSON));
        }
        BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        //批量插入成功返回false,所以加个非!
        return !bulk.hasFailures();
    }

    //查询数据,分页
    public List> search(String keyword,Integer pageNo,Integer pageSize) throws IOException {
        List> mapList = new ArrayList<>();
        //构建查询请求
        SearchRequest searchRequest = new SearchRequest();
        SearchSourceBuilder builder = new SearchSourceBuilder();
        //模糊查询
        if (keyword.matches("^[A-Za-z0-9]+$")){
            //模糊查询,主要针对于英文,如果你想搜索java,matchQuery可能查询出来,但是如果你只输入jav,那么你就没办法搜索到java
            //这时候需要用模糊查询fuzzyQuery
            builder.query(QueryBuilders.fuzzyQuery("title",keyword).fuzziness(Fuzziness.TWO));
        }else {
            //正常中英文查询
            builder.query(QueryBuilders.matchQuery("title",keyword));
        }
        //分页设置
        builder.from(pageNo);
        builder.size(pageSize);
        builder.timeout(new Timevalue(60, TimeUnit.SECONDS));
        searchRequest.source(builder);
        SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
        for (SearchHit hit : search.getHits().getHits()) {
            mapList.add(hit.getSourceAsMap());
        }
        return mapList;
    }

}

 controller层

import com.mybatisplus.service.ContentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.List;
import java.util.Map;


@RestController
public class ContentController {

    @Autowired
    private ContentService contentService;

    //添加数据
    @GetMapping("/add/{keyword}")
    public boolean add(@PathVariable("keyword") String keyword) throws IOException {
        return contentService.saveES(keyword);
    }

    //查询数据
    @GetMapping("/search/{keyword}/{pageNo}/{pageSize}")
    public List> search(@PathVariable("keyword") String keyword,
                                           @PathVariable("pageNo") Integer pageNo,
                                           @PathVariable("pageSize") Integer pageSize) throws IOException {
       return contentService.search(keyword, pageNo, pageSize);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5709508.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存