ElasticSearch 7.15.2 使用java canal 接入实现灵活化增量数据准实时同步

ElasticSearch 7.15.2 使用java canal 接入实现灵活化增量数据准实时同步,第1张

ElasticSearch 7.15.2 使用java canal 接入实现灵活化增量数据准实时同步

前言:
①canal.adapter-1.1.5 支持一对一单表的增量数据同步ElasticSearch 7;
②对于多表聚合场景的SQL满足不了我们的业务需求。
③采用java canal 接入,可以实现灵活化增量数据准实时同步

文章目录
          • 一、java canal 接入
            • 1. 依赖导入
            • 2. 增加配置
            • 3. canal 客户端
            • 4. 消息消费/处理模型
            • 5. 重建关联索引
          • 二、效果验证
            • 2.1. 关闭adapter
            • 2.2. 修改数据
            • 2.3. 数据监控
            • 2.4. 索引查询
            • 2.5. 关联数据修改
            • 2.6. 数据监控
            • 2.7. 索引查询

一、java canal 接入

前提:由于咱们是做增量数据同步ElasticSearch 7.15.2,因此项目中需要提前整合好ElasticSearch 7.15.2

1. 依赖导入
 
        
            com.alibaba.otter
            canal.client
            1.1.5
        

        
        
            com.alibaba.otter
            canal.protocol
            1.1.5
        
        
        
            com.alibaba.otter
            canal.common
            1.1.5
        
2. 增加配置

application.properties

# canal服务端ip
canal.alone-ip=192.168.159.134
# destination
canal.destination=example
canal.username=canal
canal.passwoed=canal
canal.port=11111
3. canal 客户端
package com.imooc.dianping.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;


@Component
public class CanalClient implements DisposableBean {
    private CanalConnector canalConnector;

    @Value("${canal.alone-ip}")
    private String CANALIP;
    @Value("${canal.destination}")
    private String DESTINATION;
    @Value("${canal.username}")
    private String USERNAME;
    @Value("${canal.passwoed}")
    private String PASSWOED;
    @Value("${canal.port}")
    private int PORT;

    @Bean
    public CanalConnector getCanalConnector() {
        //canal实例化
        canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(
                new InetSocketAddress(CANALIP, PORT)), DESTINATION, USERNAME, PASSWOED);
        //连接canal
        canalConnector.connect();
        // 自定filter 格式{database}.{table}
        canalConnector.subscribe();
        // 回滚寻找上次中断的位置
        canalConnector.rollback();
        //返回连接
        return canalConnector;
    }

    @Override
    public void destroy() throws Exception {
        if (canalConnector != null) {
            //防止canal泄露
            canalConnector.disconnect();
        }
    }
}

客户端有了,接下来,解决消息消费的问题?
消息消费的过程,就是轮训跑批的过程。可以简单理解为我们对应消息的消费,类似于canal客户端不断地从canal.deployer当中不断拉取mysql数据库中bin_log同步过来的消息,而消息消费完成之后,去告知canal.deployer,这条消息已经ack确认消费过了,之后,就不用推送给我了。使用这种方式,来完成消息消费的动作。

4. 消息消费/处理模型

接入消息消费模型CanalScheduling

package com.imooc.dianping.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.imooc.dianping.dal.ShopModelMapper;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


@Component
public class CanalScheduling implements Runnable, ApplicationContextAware {

    //记录日志
    private final static Logger logger = LoggerFactory.getLogger(CanalScheduling.class);
    private ApplicationContext applicationContext;

    @Autowired
    private ShopModelMapper shopModelMapper;
    @Resource
    private CanalConnector canalConnector;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Override
    //每个100毫秒 唤醒线程执行run方法
    @Scheduled(fixedDelay = 100)
    public void run() {
        //初始化批次ID
        long batchId = -1;
        try {
            //批次/1000条
            int batchSize = 1000;
            Message message = canalConnector.getWithoutAck(batchSize);
            //1000条批次的ID  当获取的batchId=-1代表没有消息
            batchId = message.getId();
            List entries = message.getEntries();

            // batchId != -1 (内部消费有消息的)
            // entries.size() > 0有对应的内容
            //当batchId != -1 并且entries.size() > 0说明mysql bin_log发生了多少条数据的变化
            if (batchId != -1 && entries.size() > 0) {
                //逐条处理
                entries.forEach(entry -> {
                    //处理类型: bin_log已ROW方式处理的消息
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        //消息解析处理
                        publishCanalEvent(entry);
                    }
                });
            }
            //消息解析完毕后,告知批次消息已经消费ack确认完成
            canalConnector.ack(batchId);
        } catch (Exception e) {
            e.printStackTrace();
            //将本次消息回滚,下次继续消息
            canalConnector.rollback(batchId);
        }
    }

    
    private void publishCanalEvent(CanalEntry.Entry entry) {
        //事件类型 只关注INSERT、UPDATe、DELETE
        CanalEntry.EventType eventType = entry.getHeader().getEventType();
        //获取发生变化的数据库
        String database = entry.getHeader().getSchemaName();
        //获取发生变化的数据库中的表
        String table = entry.getHeader().getTableName();
        CanalEntry.RowChange change = null;
        try {
            //记录这条消息发生了那些变化
            change = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            return;
        }
        change.getRowDatasList().forEach(rowData -> {
            List columns = rowData.getAfterColumnsList();
            //主键
            String primaryKey = "id";
            CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey()
                    && primaryKey.equals(column.getName())).findFirst().orElse(null);

            //将Columns转换成map
            Map dataMap = parseColumnsToMap(columns);
            try {
                indexES(dataMap, database, table);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    
    Map parseColumnsToMap(List columns) {
        Map jsonMap = new HashMap<>();
        columns.forEach(column -> {
            if (column == null) {
                return;
            }
            jsonMap.put(column.getName(), column.getValue());
        });
        return jsonMap;
    }


    private void indexES(Map dataMap, String database, String table) throws IOException {
        logger.info("发生变化的行数据:{} ", dataMap);
        logger.info("发生变化的数据库:{}  ", database);
        logger.info("发生变化的表:{} ", table);

        // 限定处理出具库范围 支处理数据库名称为dianpingdb的消息
        if (!StringUtils.equals("dianpingdb", database)) {
            return;
        }
        
        List> result = new ArrayList<>();
        if (StringUtils.equals("seller", table)) {
            result = shopModelMapper.buildESQuery(new Integer((String) dataMap.get("id")), null, null);
        } else if (StringUtils.equals("category", table)) {
            result = shopModelMapper.buildESQuery(null, new Integer((String) dataMap.get("id")), null);
        } else if (StringUtils.equals("shop", table)) {
            result = shopModelMapper.buildESQuery(null, null, new Integer((String) dataMap.get("id")));
        } else {
            //不关注其他的表
            return;
        }

        for (Map map : result) {
            IndexRequest indexRequest = new IndexRequest("shop");
            indexRequest.id(String.valueOf(map.get("id")));
            indexRequest.source(map);
            //更新索引
            restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        }


    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

5. 重建关联索引
package com.imooc.dianping.dal;

import com.imooc.dianping.model.ShopModel;
import org.apache.ibatis.annotations.Param;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;

public interface ShopModelMapper {

    //当着3个参数中任意一个参数发生变化,我只需要将发生变化的ID传入,重建与此ID关联的索引
    List> buildESQuery(@Param("sellerId")Integer sellerId,@Param("categoryId")Integer categoryId,
                                          @Param("shopId")Integer shopId);
}
    
        select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,
        a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,
        c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag
        from shop a inner join category b on a.category_id = b.id inner join seller c on c.id=a.seller_id
        
            and c.id = #{sellerId}
        
        
            and b.id = #{categoryId}
        
        
            and a.id = #{shopId}
        
    
二、效果验证 2.1. 关闭adapter
cd /app/canal/canal.adapter
 bin/stop.sh
2.2. 修改数据

修改dianpingdb数据库中shop表中ID=1的数据中name的值
将陕西面馆(北京亦庄) 调整为gblfy.com陕西面馆(北京亦庄),提交事务!

2.3. 数据监控

2021-11-23 16:19:21.338  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的行数据:{icon_url=/static/image/shopcover/xchg.jpg, address=船厂路36号, latitude=31.195341, end_time=22:00, created_at=2021-11-19 15:53:52, tags=新开业 人气爆棚, start_time=10:00, updated_at=2021-12-22 15:53:52, category_id=1, name=gblfy.com陕西面馆(北京亦庄), remark_score=4.9, price_per_man=156, id=1, seller_id=1, longitude=120.915855} 
2021-11-23 16:19:21.356  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的数据库:dianpingdb  
2021-11-23 16:19:21.356  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的表:shop 
2.4. 索引查询
# 查询shop索引
GET /shop/_search
{
  "query": {
    "match": {
      "name": "陕西面馆"
    }
  }
}

2.5. 关联数据修改

单表增量同步,官网本身就支持,关联表数据修改,增量准实时同步,官网是不支持的。因此,咱们需要继续测试修改关联表的数据,再次验证。

修改dianpingdb数据库中category表中ID=1的数据中name的值
将美食5 调整为美食我的最爱666,提交事务!

修改前:

修改后:

2.6. 数据监控

2021-11-23 16:25:16.325  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的行数据:{icon_url=/static/image/firstpage/food_u.png, updated_at=2019-06-10 15:33:37, name=美食我的最爱666, created_at=2019-06-10 15:33:37, id=1, sort=99} 
2021-11-23 16:25:16.334  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的数据库:dianpingdb  
2021-11-23 16:25:16.338  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的表:category
2.7. 索引查询
GET /shop/_search
{
  "query": {
    "term": {
      "category_name": "美食我的最爱666"
    }
  }
}

# 查询shop索引
GET /shop/_search
{
  "query": {
    "match": {
      "name": "陕西面馆"
    }
  }
}


至此,我们完成了通过java代码的方式,灵活化的根据ID去接入我们对应es增量的准实时更新!小伙伴们,一起加油!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5606821.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存