- Canal通过Kafka实现MySQL与Redis同步
- Zookeeper 安装
- Kafka 安装
- 修改 Canal 文件配置
- 编写实体类和 Kafka 消费者
- 验证
Canal通过Kafka实现MySQL与Redis同步
Docker 环境安装、MySQL 安装、Redis 安装、Canal 安装、MySQL文件配置和 Canal 文件配置请移步 Canal通过TCP实现MySQL与Redis同步 查看。
Zookeeper 安装- 下载 Zookeeper3.7.0 的 docker 镜像:
docker pull zookeeper:3.7.0
- 使用如下命令启动 Zookeeper 服务:
docker run --name zookeeper -p 2181:2181 --restart always -d zookeeper:3.7.0
Kafka 安装–restart always 的设置可以使 docker 启动时同时启动 Zookeeper。
- 下载 kafka2.13-2.8.1 的 docker 镜像:
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0
-e KAFKA_ZOOKEEPER_CONNECT=10.0.0.4:2181 -e
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.4:9092 -e
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka:2.13-2.8.1
- 进入 kafka 容器:
docker exec -it kafka /bin/sh
- 进入 Kafka 安装目录下:
cd opt/kafka_2.13-2.8.1
- 创建 topic
kafka-topics.bat --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic mall
修改 Canal 文件配置
- 进入 Canal 容器:
docker exec -it canal /bin/bash
- 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/instance.properties
- 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/example/instance.properties
编写实体类和 Kafka 消费者
- 引入 Kafka 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 在 application.yml 文件增加 Kafka 配置:
kafka:
bootstrap-servers: 10.0.0.4:9092
consumer:
group-id: mall-master
- 创建 CanalBean 对象接收 Kafka 消息:
package com.macro.mall.canal;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @ClassName CanalBean
* @Description TODO
* @Author 听秋
* @Date 2022/5/9 19:46
* @Version 1.0
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CanalBean {
/** 数据 */
private List<UmsAdmin> data;
/** 数据库名称 */
private String database;
private long es;
/** 递增,从1开始 */
private int id;
/** 是否是DDL语句 */
private boolean isDdl;
/** 表结构的字段类型 */
private MysqlType mysqlType;
/** UPDATE语句,旧数据 */
private String old;
/** 主键名称 */
private List<String> pkNames;
/** sql语句 */
private String sql;
private SqlType sqlType;
/** 表名 */
private String table;
private long ts;
/** (新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等 */
private String type;
}
package com.macro.mall.canal;
/**
* @ClassName MysqlType
* @Description TODO
* @Author 听秋
* @Date 2022/5/9 19:48
* @Version 1.0
**/
public class MysqlType {
private String id;
private String username;
private String password;
private String icon;
private String email;
private String nickName;
private String note;
private String createTime;
private String loginTime;
private String status;
}
package com.macro.mall.canal;
/**
* @ClassName SqlType
* @Description TODO
* @Author 听秋
* @Date 2022/5/9 19:49
* @Version 1.0
**/
public class SqlType {
private int id;
private int username;
private int password;
private int icon;
private int email;
private int nickName;
private int note;
private int createTime;
private int loginTime;
private int status;
}
- 创建 Kafka 消费者:
package com.macro.mall.canal;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @ClassName CanalConsumer
* @Description TODO
* @Author 听秋
* @Date 2022/5/9 19:49
* @Version 1.0
**/
@Component
@Slf4j
public class CanalConsumer {
@Autowired
private RedisTemplate redisTemplate;
private static String REDIS_DATABASE = "mall";
private static String REDIS_KEY_ADMIN = "ums:admin";
private static String insert = "INSERT";
private static String update = "UPDATE";
@KafkaListener(topics = "mall")
public void receive(ConsumerRecord<?, ?> consumer) {
String value = (String) consumer.value();
log.info("topic名称:{}, key:{}, 分区位置:{}, 下标:{}, value:{}", consumer.topic(), consumer.key(),
consumer.partition(), consumer.offset(), value);
// 转换为javaBean
CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
// 获取类型
String type = canalBean.getType();
// 获取是否是DDL语句
boolean isDdl = canalBean.isDdl();
// 不是DDL语句
if (!isDdl) {
List<UmsAdmin> UmsAdmins = canalBean.getData();
if (insert.equals(type) || update.equals(type)) {
//新增或更新语句
for (UmsAdmin umsAdmin : UmsAdmins) {
//新增到redis中
redisTemplate.opsForValue().set(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername(),
JSONObject.toJSONString(umsAdmin));
}
} else {
// 删除语句
for (UmsAdmin umsAdmin : UmsAdmins) {
//从redis中删除
redisTemplate.delete(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername());
}
}
}
}
}
验证
- 修改前
MySQL
Redis
- 修改后
日志
MySQL
Redis
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)