CanalClient —— 监控 order_info 单表的代码
CanalClient —— 监控 order_info 和 order_detail 多表的代码,对代码做封装package com.zyj.gmall.canal import java.net.{InetSocketAddress, SocketAddress} import java.util import com.alibaba.fastjson.JSonObject import com.alibaba.otter.canal.client.CanalConnectors import com.alibaba.otter.canal.protocol.CanalEntry import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowChange} import com.zyj.gmall.common.Constant import scala.collection.JavaConversions._ object CanalClient { def main(args: Array[String]): Unit = { //1. 连接到canal val address = new InetSocketAddress("hadoop103", 11111) val connector = CanalConnectors.newSingleConnector(address, "example", "", "") connector.connect() //连接 //1.1 订阅数据 gmall2.* 表示gmall2数据下所有的表 connector.subscribe("gmall2.*") //2.读数据,解析数据 while (true) { // 2.1 使用循环的方式持续的从canal读取数据 val msg = connector.get(100) // 2.2 一次从canal拉取最多100条sql数据引起的变化 //2.3 一个entry封装一条sql的变化结果 ,做非空判断 val entriesOption = if (msg != null) Some(msg.getEntries) else None if (entriesOption.isDefined && entriesOption.get.nonEmpty) { val entries = entriesOption.get for (entry <- entries) { //2.4 从每个entry获取一个storevalue val storevalue = entry.getStorevalue //2.5 把storevalue解析出来rowChange val rowChange = RowChange.parseFrom(storevalue) //2.6 一个storevalue中有多个RowData,每个RowData表示一行数据的变化 val rowDatas = rowChange.getRowDatasList //2.7 解析rowDatas中的每行的每列数据 handleDate(entry.getHeader.getTableName, rowDatas, rowChange.getEventType) } } else { println("没有拉取到数据,2秒后重试。。。") Thread.sleep(2000) } } // 处理rowData数据 def handleDate(tableName: String, rowDatas: util.List[CanalEntry.RowData], eventType: CanalEntry.EventType): Unit = { if ("order_info" == tableName && eventType == EventType.INSERT && rowDatas != null && rowDatas.nonEmpty) { for (rowData <- rowDatas) { val result = new JSonObject() //1. 一行所有的变化后的列 val columnsList = rowData.getAfterColumnsList //2. 一行数据将来在kafka中,应该房一样,多列中封装一个json字符串 for (column <- columnsList) { val key = column.getName // 列名 val value = column.getValue // 列值 result.put(key, value) } //3.把数据转成json字符串写入到kafka中,{列名:列值,列名:列值,....} val content = result.toJSonString println(content) MyKafkaUtil.send(Constant.TOPIC_ORDER_INFO, content) } } } } }
MyKafkaUtilpackage com.zyj.gmall.canal import java.net.{InetSocketAddress, SocketAddress} import java.util import com.alibaba.fastjson.JSonObject import com.alibaba.otter.canal.client.CanalConnectors import com.alibaba.otter.canal.protocol.CanalEntry import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowChange} import com.zyj.gmall.common.Constant import scala.collection.JavaConversions._ object CanalClient { def main(args: Array[String]): Unit = { //1. 连接到canal val address = new InetSocketAddress("hadoop103", 11111) val connector = CanalConnectors.newSingleConnector(address, "example", "", "") connector.connect() //连接 //1.1 订阅数据 gmall2.* 表示gmall2数据下所有的表 connector.subscribe("gmall2.*") //2.读数据,解析数据 while (true) { // 2.1 使用循环的方式持续的从canal读取数据 val msg = connector.get(100) // 2.2 一次从canal拉取最多100条sql数据引起的变化 //2.3 一个entry封装一条sql的变化结果 ,做非空判断 val entriesOption = if (msg != null) Some(msg.getEntries) else None if (entriesOption.isDefined && entriesOption.get.nonEmpty) { val entries = entriesOption.get for (entry <- entries) { //2.4 从每个entry获取一个storevalue val storevalue = entry.getStorevalue //2.5 把storevalue解析出来rowChange val rowChange = RowChange.parseFrom(storevalue) //2.6 一个storevalue中有多个RowData,每个RowData表示一行数据的变化 val rowDatas = rowChange.getRowDatasList //2.7 解析rowDatas中的每行的每列数据 handleDate(entry.getHeader.getTableName, rowDatas, rowChange.getEventType) } } else { println("没有拉取到数据,2秒后重试。。。") Thread.sleep(2000) } } // 处理rowData数据 def handleDate(tableName: String, rowDatas: util.List[CanalEntry.RowData], eventType: CanalEntry.EventType): Unit = { if ("order_info" == tableName && eventType == EventType.INSERT && rowDatas != null && rowDatas.nonEmpty) { sendToKafka(Constant.TOPIC_ORDER_INFO, rowDatas) } else if ("order_detail" == tableName && eventType == EventType.INSERT && rowDatas != null && rowDatas.nonEmpty) { sendToKafka(Constant.TOPIC_ORDER_DETAIL, rowDatas) } } } // 把数据发送到kafka private def sendToKafka(topic: String, rowDatas: util.List[CanalEntry.RowData]) = { for (rowData <- rowDatas) { val result = new JSonObject() //1. 一行所有的变化后的列 val columnsList = rowData.getAfterColumnsList //2. 一行数据将来在kafka中,应该房一样,多列中封装一个json字符串 for (column <- columnsList) { val key = column.getName // 列名 val value = column.getValue // 列值 result.put(key, value) } //3.把数据转成json字符串写入到kafka中,{列名:列值,列名:列值,....} val content = result.toJSonString println(content) MyKafkaUtil.send(topic, content) } } }
pom (引用的父模块中fastjson依赖)package com.zyj.gmall.canal import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} object MyKafkaUtil { val prop = new Properties() prop.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092") prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](prop) def send(topic: String, content: String) = { producer.send(new ProducerRecord[String, String](topic, content)) } }
gmall1015 com.zyj.gmall 1.0-SNAPSHOT 4.0.0 gmall-canalcom.alibaba.otter canal.client1.1.2 org.apache.kafka kafka-clients0.11.0.2 com.zyj.gmall gmall-common1.0-SNAPSHOT
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)