前言
上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更 *** 作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。
架构设计
通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。
Kafka&Zookeeper搭建
首先在 官网 下载Kafka:
下载后解压文件夹,可以看到以下几个文件:
Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。
通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:
Canal搭建
canal搭建具体可以参考上文,这里只讲解具体的参数配置:
找到/conf目录下的canal.properties配置文件:
然后配置instance,找到/conf/example/instance.properties配置文件:
经过上述配置后,就可以启动canal了。
测试
环境搭建完成后,就可以编写代码进行测试。
1、引入pom依赖
2、封装Redis工具类
在application.yml文件增加以下配置:
封装一个 *** 作Redis的工具类:
3、创建MQ消费者进行同步
创建一个CanalBean对象进行接收:
最后就可以创建一个消费者CanalConsumer进行消费:
测试Mysql与Redis同步
mysql对应的表结构如下:
启动项目后,新增一条数据:
可以在控制台看到以下输出:
如果更新呢?试一下Update语句:
同样可以在控制台看到以下输出:
经过测试完全么有问题。
总结
既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等; 尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。
Doris官网定义 mysql原始表结构 1.doris中关联mysql外表 结果如下: 2.doris中关联kafka导入数据 查看作业 State为RUNNING,表示已经成功。 停止作业 3.通过flink导入mysql数据到doris 方法1:通过mysql-cdc写入kafka,kafka关联doris表。 方法2:通过阿里云DTS->datahub,然后通过Flink写入kafka,再关联到doris外表 如何处理delete数据?对于方法1,需要手动的删除doris中的数据;对于方法2,可以通过dts_operation_flag字段来标示,dts_operation_flag可以为I/U/D,分别表示添加、更新和删除。那我们就只需要在doris表中添加一个dts_operation_flag字段来标示就可以了,查询数据的时候就不再查询等于D的值。 如何处理脏数据?delete doris中的数据,然后insert正确的值;还有个方法是将关联一个外表(这个是正确的值),然后再将doris中的表和外表中的值diff,将diff的值insert到doris中。在开发 Spark Streaming 的公共组件过程中,需要将 binlog 的数据(Array[Byte])转换为 Json 格式,供用户使用,本文提供一种转换的思路。另外我们会用到几个辅助类,为了行文流畅,我们将辅助类的定义放在文章的最后面。如果如果本文有讲述不详细,或者错误指出,肯请指出,谢谢对于 binlog 数据,每一次 *** 作(INSERT/UPDATE/DELETE 等)都会作为一条记录写入 binlog 文件,但是同一条记录可能包含数据库中的几行数据(这里比较绕,可以看一个具体的例子)在数据库中,有 id, name 两个字段,其中 id 为主键,name 随意, age 随意。有两行数据如下idnameage1john30
2john40
那么你进行 *** 作
update table set age = 50 where name = john的时候,就会将两行的数据都进行更改,这两行更改的数据会在同一个 binlog 记录中,这一点会在后面的实现中有体现。
下面,我们给出具体的代码,然后对代码进行分析def desirializeByte(b: (String, Array[Byte])) : (String, String) = {val binlogEntry = BinlogEntryUtil.serializeToBean(b._2) //将 Array[Byte] 数据转换成 com.meituan.data.binlog.BinlogEntry 类,相关类定义参考附录val pkeys = binlogEntry.getPrimaryKeys.asScala //获取主键,这里的 asScala 将 Java 的 List 转换为 Scala 的 Listval rowDatas : List[BinlogRow] = binlogEntry.getRowDatas.asScala.toList //获取具体的信息val strRowDatas = rowDatas.map(a =>{//将获取到的具体信息进行转换,这里主要是将没一条信息的内容,转换 [(K1:V1,K2:V2...Kn:Vn)] 的形式,方面后面进行 Json 化val b = a.getBeforeColumns.asScala//获取 beforColumnsval c = a.getAfterColumns.asScala//获取 afterColumnsval mb = b.map(d =>(d._1, d._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值val mc = c.map(c =>(c._1, c._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值(mb, mc) //返回转换后的 beforeColumns 和 afterColumns})
//下面利用 json4s 进行 Json 化
(binlogEntry.getEventType, compact("rowdata" ->strRowDatas.map{w =>List("row_data" ->("before" ->w._1.toMap) ~ ("after" ->w._2.toMap)) //这里的两个 toMap 是必要的,不然里层会变成 List,这个地方比较疑惑的是,//w._1 按理是 Map类型,为什么还需要强制转换成 Map//而且用 strRowDatas.foreach(x =>println(s"${x._1} ${x._2}")打印的结果表名是 Map}))
desirializeByte 函数传入 topic 中的一条记录,返回参数自己确定,我这里为了测试,返回一个 (String, String) 的 Tuple,第一个字段表示该条记录的 EventType(Insert/Update/Delete 等),第二个字段为 Json 化后的数据。
BinlogEntryUtil.serilizeToBean 是一个辅助类,将 binlog 数据转化为一个 Java bean 类。
第 4 行,我们得到表对应的主键,第 5 行获得具体的数据第 6 行到第 12 行是 Json 化之前的辅助工作,将所有不需要的东西给剔除掉,只留下字段,以及字段对应的值。
第 14, 15 行就是具体的 Json 工作了(使用了 json4s 包进行 Json 化)这个过程中有一点需要注意的是,在 Json 化的时候,记得为 w._1 和 w._2 加 toMap *** 作,不然会变成 List(很奇怪,我将 w._1 和 w._2 打印出来看,都是 Map 类型)或者你可以在第 7,8 行的末尾加上 .toMap *** 作。这个我查了 API,进行了实验,暂时怀疑是在和 json4s 组合的时候,出现了问题,有待验证。
利用上述代码,我们可以得到下面这样 Json 化之后的字符串(我进行了排版,程序返回的 Json 串是不换行的){"rowdata":
[{"row_data":
{"before":{"param_name":"creator","param_value":"chenqiang05","horigindb_etl_id":"2532","utime":"2016-07-26 15:07:16","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"},"after":{"param_name":"creator","param_value":"chendayao","horigindb_etl_id":"2532","utime":"2016-08-01 10:32:01","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"}
}
}]
}"
到这里,基本就完成了一种将 binlog 数据 Json 化的代码。
附录代码,由于这些代码是从其他工程里面抠出来的,可能读起来会不顺畅,还请见谅。
public static BinlogEntryserializeToBean(byte[] input) {BinlogEntrybinlogEntry = null
Entryentry = deserializeFromProtoBuf(input)//从 protobuf 反序列化if(entry != null) {
binlogEntry = serializeToBean(entry)
}
return binlogEntry
}
public static EntrydeserializeFromProtoBuf(byte[] input) {Entryentry = null
try {
entry = Entry.parseFrom(input)
//com.alibaba.otter.canal.protocol.CanalEntry#Entry 类的方法,由 protobuf 生成} catch (InvalidProtocolBufferExceptionvar3) {logger.error("Exception:" + var3)
}
return entry
}
//将 Entry 解析为一个 bean 类
public static BinlogEntryserializeToBean(Entryentry) {RowChangerowChange = null
try {
rowChange = RowChange.parseFrom(entry.getStoreValue())} catch (Exceptionvar8) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), var8)}
BinlogEntrybinlogEntry = new BinlogEntry()String[] logFileNames = entry.getHeader().getLogfileName().split("\\.")String logFileNo = "000000"
if(logFileNames.length >1) {
logFileNo = logFileNames[1]
}
binlogEntry.setBinlogFileName(logFileNo)binlogEntry.setBinlogOffset(entry.getHeader().getLogfileOffset())binlogEntry.setExecuteTime(entry.getHeader().getExecuteTime())binlogEntry.setTableName(entry.getHeader().getTableName())binlogEntry.setEventType(entry.getHeader().getEventType().toString())IteratorprimaryKeysList = rowChange.getRowDatasList().iterator()while(primaryKeysList.hasNext()) {
RowDatarowData = (RowData)primaryKeysList.next()BinlogRowrow = new BinlogRow(binlogEntry.getEventType())row.setBeforeColumns(getColumnInfo(rowData.getBeforeColumnsList()))row.setAfterColumns(getColumnInfo(rowData.getAfterColumnsList()))binlogEntry.addRowData(row)
}
if(binlogEntry.getRowDatas().size() >= 1) {BinlogRowprimaryKeysList1 = (BinlogRow)binlogEntry.getRowDatas().get(0)binlogEntry.setPrimaryKeys(getPrimaryKeys(primaryKeysList1))} else {
ArrayListprimaryKeysList2 = new ArrayList()binlogEntry.setPrimaryKeys(primaryKeysList2)}
return binlogEntry
}
public class BinlogEntry implements Serializable {private String binlogFileName
private long binlogOffset
private long executeTime
private String tableName
private String eventType
private List<String>primaryKeys
private List<BinlogRow>rowDatas = new ArrayList()}
public class BinlogRow implements Serializable {public static final String EVENT_TYPE_INSERT = "INSERT"public static final String EVENT_TYPE_UPDATE = "UPDATE"public static final String EVENT_TYPE_DELETE = "DELETE"private String eventType
private Map<String, BinlogColumn>beforeColumnsprivate Map<String, BinlogColumn>afterColumns}
public class BinlogColumn implements Serializable {private int index
private String mysqlType
private String name
private boolean isKey
private boolean updated
private boolean isNull
private String value
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)