Canal通过Kafka实现MySQL与Redis同步

Canal通过Kafka实现MySQL与Redis同步,第1张

文章目录
  • Canal通过Kafka实现MySQL与Redis同步
    • Zookeeper 安装
    • Kafka 安装
    • 修改 Canal 文件配置
    • 编写实体类和 Kafka 消费者
    • 验证


Canal通过Kafka实现MySQL与Redis同步

Docker 环境安装、MySQL 安装、Redis 安装、Canal 安装、MySQL文件配置和 Canal 文件配置请移步 Canal通过TCP实现MySQL与Redis同步 查看。

Zookeeper 安装
  • 下载 Zookeeper3.7.0 的 docker 镜像:
docker pull zookeeper:3.7.0
  • 使用如下命令启动 Zookeeper 服务:
docker run --name zookeeper -p 2181:2181 --restart always -d zookeeper:3.7.0

–restart always 的设置可以使 docker 启动时同时启动 Zookeeper。

Kafka 安装
  • 下载 kafka2.13-2.8.1 的 docker 镜像:
docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 
-e KAFKA_ZOOKEEPER_CONNECT=10.0.0.4:2181 -e 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.4:9092 -e 
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka:2.13-2.8.1
  • 进入 kafka 容器:
docker exec -it kafka /bin/sh
  • 进入 Kafka 安装目录下:
cd opt/kafka_2.13-2.8.1
  • 创建 topic
kafka-topics.bat --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic mall
修改 Canal 文件配置
  • 进入 Canal 容器:
docker exec -it canal /bin/bash
  • 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/instance.properties


  • 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/example/instance.properties

编写实体类和 Kafka 消费者
  • 引入 Kafka 依赖:
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • 在 application.yml 文件增加 Kafka 配置:
kafka:
    bootstrap-servers: 10.0.0.4:9092
    consumer:
      group-id: mall-master
  • 创建 CanalBean 对象接收 Kafka 消息:
package com.macro.mall.canal;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

/**
 * @ClassName CanalBean
 * @Description TODO
 * @Author 听秋
 * @Date 2022/5/9 19:46
 * @Version 1.0
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CanalBean {

    /** 数据 */
    private List<UmsAdmin> data;

    /** 数据库名称 */
    private String database;

    private long es;

    /** 递增,从1开始 */
    private int id;

    /** 是否是DDL语句 */
    private boolean isDdl;

    /** 表结构的字段类型 */
    private MysqlType mysqlType;

    /** UPDATE语句,旧数据 */
    private String old;

    /** 主键名称 */
    private List<String> pkNames;

    /** sql语句 */
    private String sql;

    private SqlType sqlType;

    /** 表名 */
    private String table;

    private long ts;

    /** (新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等 */
    private String type;
}
package com.macro.mall.canal;

/**
 * @ClassName MysqlType
 * @Description TODO
 * @Author 听秋
 * @Date 2022/5/9 19:48
 * @Version 1.0
 **/

public class MysqlType {
    private String id;
    private String username;
    private String password;
    private String icon;
    private String email;
    private String nickName;
    private String note;
    private String createTime;
    private String loginTime;
    private String status;
}
package com.macro.mall.canal;

/**
 * @ClassName SqlType
 * @Description TODO
 * @Author 听秋
 * @Date 2022/5/9 19:49
 * @Version 1.0
 **/
public class SqlType {
    private int id;
    private int username;
    private int password;
    private int icon;
    private int email;
    private int nickName;
    private int note;
    private int createTime;
    private int loginTime;
    private int status;
}
  • 创建 Kafka 消费者:
package com.macro.mall.canal;


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @ClassName CanalConsumer
 * @Description TODO
 * @Author 听秋
 * @Date 2022/5/9 19:49
 * @Version 1.0
 **/
@Component
@Slf4j
public class CanalConsumer {

    @Autowired
    private RedisTemplate redisTemplate;

    private static String REDIS_DATABASE = "mall";
    private static String REDIS_KEY_ADMIN = "ums:admin";

    private static String insert = "INSERT";
    private static String update = "UPDATE";


    @KafkaListener(topics = "mall")
    public void receive(ConsumerRecord<?, ?> consumer) {
        String value = (String) consumer.value();
        log.info("topic名称:{}, key:{}, 分区位置:{}, 下标:{}, value:{}", consumer.topic(), consumer.key(),
                consumer.partition(), consumer.offset(), value);

        // 转换为javaBean
        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);

        // 获取类型
        String type = canalBean.getType();

        // 获取是否是DDL语句
        boolean isDdl = canalBean.isDdl();

        // 不是DDL语句
        if (!isDdl) {
            List<UmsAdmin> UmsAdmins = canalBean.getData();

            if (insert.equals(type) || update.equals(type)) {
                //新增或更新语句
                for (UmsAdmin umsAdmin : UmsAdmins) {
                    //新增到redis中
                    redisTemplate.opsForValue().set(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername(),
                            JSONObject.toJSONString(umsAdmin));
                }
            } else {
                // 删除语句
                for (UmsAdmin umsAdmin : UmsAdmins) {
                    //从redis中删除
                    redisTemplate.delete(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername());
                }
            }
        }
    }
}
验证
  • 修改前

MySQL

Redis

  • 修改后

日志

MySQL

Redis

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/905996.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-15
下一篇 2022-05-15

发表评论

登录后才能评论

评论列表(0条)

保存