文章目录前言:
①canal.adapter-1.1.5 支持一对一单表的增量数据同步ElasticSearch 7;
②对于多表聚合场景的SQL满足不了我们的业务需求。
③采用java canal 接入,可以实现灵活化增量数据准实时同步
- 一、java canal 接入
- 1. 依赖导入
- 2. 增加配置
- 3. canal 客户端
- 4. 消息消费/处理模型
- 5. 重建关联索引
- 二、效果验证
- 2.1. 关闭adapter
- 2.2. 修改数据
- 2.3. 数据监控
- 2.4. 索引查询
- 2.5. 关联数据修改
- 2.6. 数据监控
- 2.7. 索引查询
1. 依赖导入前提:由于咱们是做增量数据同步ElasticSearch 7.15.2,因此项目中需要提前整合好ElasticSearch 7.15.2
2. 增加配置com.alibaba.otter canal.client1.1.5 com.alibaba.otter canal.protocol1.1.5 com.alibaba.otter canal.common1.1.5
application.properties
# canal服务端ip canal.alone-ip=192.168.159.134 # destination canal.destination=example canal.username=canal canal.passwoed=canal canal.port=111113. canal 客户端
package com.imooc.dianping.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.google.common.collect.Lists; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; @Component public class CanalClient implements DisposableBean { private CanalConnector canalConnector; @Value("${canal.alone-ip}") private String CANALIP; @Value("${canal.destination}") private String DESTINATION; @Value("${canal.username}") private String USERNAME; @Value("${canal.passwoed}") private String PASSWOED; @Value("${canal.port}") private int PORT; @Bean public CanalConnector getCanalConnector() { //canal实例化 canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList( new InetSocketAddress(CANALIP, PORT)), DESTINATION, USERNAME, PASSWOED); //连接canal canalConnector.connect(); // 自定filter 格式{database}.{table} canalConnector.subscribe(); // 回滚寻找上次中断的位置 canalConnector.rollback(); //返回连接 return canalConnector; } @Override public void destroy() throws Exception { if (canalConnector != null) { //防止canal泄露 canalConnector.disconnect(); } } }
客户端有了,接下来,解决消息消费的问题?
消息消费的过程,就是轮训跑批的过程。可以简单理解为我们对应消息的消费,类似于canal客户端不断地从canal.deployer当中不断拉取mysql数据库中bin_log同步过来的消息,而消息消费完成之后,去告知canal.deployer,这条消息已经ack确认消费过了,之后,就不用推送给我了。使用这种方式,来完成消息消费的动作。
接入消息消费模型CanalScheduling
package com.imooc.dianping.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import com.imooc.dianping.dal.ShopModelMapper; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @Component public class CanalScheduling implements Runnable, ApplicationContextAware { //记录日志 private final static Logger logger = LoggerFactory.getLogger(CanalScheduling.class); private ApplicationContext applicationContext; @Autowired private ShopModelMapper shopModelMapper; @Resource private CanalConnector canalConnector; @Autowired private RestHighLevelClient restHighLevelClient; @Override //每个100毫秒 唤醒线程执行run方法 @Scheduled(fixedDelay = 100) public void run() { //初始化批次ID long batchId = -1; try { //批次/1000条 int batchSize = 1000; Message message = canalConnector.getWithoutAck(batchSize); //1000条批次的ID 当获取的batchId=-1代表没有消息 batchId = message.getId(); List5. 重建关联索引entries = message.getEntries(); // batchId != -1 (内部消费有消息的) // entries.size() > 0有对应的内容 //当batchId != -1 并且entries.size() > 0说明mysql bin_log发生了多少条数据的变化 if (batchId != -1 && entries.size() > 0) { //逐条处理 entries.forEach(entry -> { //处理类型: bin_log已ROW方式处理的消息 if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { //消息解析处理 publishCanalEvent(entry); } }); } //消息解析完毕后,告知批次消息已经消费ack确认完成 canalConnector.ack(batchId); } catch (Exception e) { e.printStackTrace(); //将本次消息回滚,下次继续消息 canalConnector.rollback(batchId); } } private void publishCanalEvent(CanalEntry.Entry entry) { //事件类型 只关注INSERT、UPDATe、DELETE CanalEntry.EventType eventType = entry.getHeader().getEventType(); //获取发生变化的数据库 String database = entry.getHeader().getSchemaName(); //获取发生变化的数据库中的表 String table = entry.getHeader().getTableName(); CanalEntry.RowChange change = null; try { //记录这条消息发生了那些变化 change = CanalEntry.RowChange.parseFrom(entry.getStorevalue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); return; } change.getRowDatasList().forEach(rowData -> { List columns = rowData.getAfterColumnsList(); //主键 String primaryKey = "id"; CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null); //将Columns转换成map Map dataMap = parseColumnsToMap(columns); try { indexES(dataMap, database, table); } catch (IOException e) { e.printStackTrace(); } }); } Map parseColumnsToMap(List columns) { Map jsonMap = new HashMap<>(); columns.forEach(column -> { if (column == null) { return; } jsonMap.put(column.getName(), column.getValue()); }); return jsonMap; } private void indexES(Map dataMap, String database, String table) throws IOException { logger.info("发生变化的行数据:{} ", dataMap); logger.info("发生变化的数据库:{} ", database); logger.info("发生变化的表:{} ", table); // 限定处理出具库范围 支处理数据库名称为dianpingdb的消息 if (!StringUtils.equals("dianpingdb", database)) { return; } List
package com.imooc.dianping.dal; import com.imooc.dianping.model.ShopModel; import org.apache.ibatis.annotations.Param; import java.math.BigDecimal; import java.util.List; import java.util.Map; public interface ShopModelMapper { //当着3个参数中任意一个参数发生变化,我只需要将发生变化的ID传入,重建与此ID关联的索引 List
二、效果验证 2.1. 关闭adapterselect a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location, a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id, c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag from shop a inner join category b on a.category_id = b.id inner join seller c on c.id=a.seller_id and c.id = #{sellerId} and b.id = #{categoryId} and a.id = #{shopId}
cd /app/canal/canal.adapter bin/stop.sh2.2. 修改数据
修改dianpingdb数据库中shop表中ID=1的数据中name的值
将陕西面馆(北京亦庄) 调整为gblfy.com陕西面馆(北京亦庄),提交事务!
2021-11-23 16:19:21.338 INFO 3792 --- [ scheduling-1] c.imooc.dianping.canal.CanalScheduling : 发生变化的行数据:{icon_url=/static/image/shopcover/xchg.jpg, address=船厂路36号, latitude=31.195341, end_time=22:00, created_at=2021-11-19 15:53:52, tags=新开业 人气爆棚, start_time=10:00, updated_at=2021-12-22 15:53:52, category_id=1, name=gblfy.com陕西面馆(北京亦庄), remark_score=4.9, price_per_man=156, id=1, seller_id=1, longitude=120.915855} 2021-11-23 16:19:21.356 INFO 3792 --- [ scheduling-1] c.imooc.dianping.canal.CanalScheduling : 发生变化的数据库:dianpingdb 2021-11-23 16:19:21.356 INFO 3792 --- [ scheduling-1] c.imooc.dianping.canal.CanalScheduling : 发生变化的表:shop2.4. 索引查询
# 查询shop索引 GET /shop/_search { "query": { "match": { "name": "陕西面馆" } } }2.5. 关联数据修改
单表增量同步,官网本身就支持,关联表数据修改,增量准实时同步,官网是不支持的。因此,咱们需要继续测试修改关联表的数据,再次验证。
修改dianpingdb数据库中category表中ID=1的数据中name的值
将美食5 调整为美食我的最爱666,提交事务!
修改前:
修改后:
2021-11-23 16:25:16.325 INFO 3792 --- [ scheduling-1] c.imooc.dianping.canal.CanalScheduling : 发生变化的行数据:{icon_url=/static/image/firstpage/food_u.png, updated_at=2019-06-10 15:33:37, name=美食我的最爱666, created_at=2019-06-10 15:33:37, id=1, sort=99} 2021-11-23 16:25:16.334 INFO 3792 --- [ scheduling-1] c.imooc.dianping.canal.CanalScheduling : 发生变化的数据库:dianpingdb 2021-11-23 16:25:16.338 INFO 3792 --- [ scheduling-1] c.imooc.dianping.canal.CanalScheduling : 发生变化的表:category2.7. 索引查询
GET /shop/_search { "query": { "term": { "category_name": "美食我的最爱666" } } }
# 查询shop索引 GET /shop/_search { "query": { "match": { "name": "陕西面馆" } } }
至此,我们完成了通过java代码的方式,灵活化的根据ID去接入我们对应es增量的准实时更新!小伙伴们,一起加油!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)