运维大大搭建好服务之后,监听到kafka的canal数据后,发现数据全是乱码,程序根本没法正常往下走,贴张截图:
网上找了一圈也没找到一个有效的处理办法,后来有个前辈做过这个,给我指导了一下,原来是序列化问题,直接贴代码:
import com.alibaba.otter.canal.client.kafka.MessageDeserializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); factory.setAutoStartup(false); // 将过滤抛弃的消息自动确认 factory.setAckDiscarded(true); factory.setRecordFilterStrategy(new RecordFilterStrategy() { @Override public boolean filter(ConsumerRecord consumerRecord) { long length = consumerRecord.toString().getBytes().length; long maxLength = 5 * 1024 *1024; //大于5M if(length < maxLength){ return false; } // 返回true将会被丢弃 return true; } }); return factory; } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map consumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 【注意】重点就是这里,反序列化时需要用canal中的序列化方式 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } }
## kafka配置 #kafka.consumer.servers=192.168.3.2:9092 #kafka.consumer.enable.auto.commit=true #kafka.consumer.session.timeout=6000 #kafka.consumer.auto.commit.interval=1000 #kafka.consumer.auto.offset.reset=earliest #kafka.consumer.topic=canal-wp-baseline #kafka.consumer.group.id=canal-wp-baseline #kafka.consumer.concurrency=20
这样反序列化出来的信息就对了。
后续处理的一段伪代码:
@Component public class baselineCanalKafkaListener { private final Logger log = LoggerFactory.getLogger(this.getClass()); @KafkaListener(id = CanalConstants.CANAL_KAFKA_ID, topics = {"canal-wp-baseline"}) public void listen(ConsumerRecord, ?> record) { if(!(record.value() instanceof Message)){ return; } Message message = (Message) record.value(); int size = message.getEntries().size(); if (message.getId() == -1 || size == 0) { return; } Listinfos = createEventInfos(message.getEntries()); if (CollectionUtils.isEmpty(infos)) { return; } // key--handlerBeanName,value--EsData对象 Map > listMap = infos.stream().collect(Collectors.groupingBy(EsData::getHandlerBean)); for (Map.Entry > entry : listMap.entrySet()) { ThreadPoolUtils.submit(() -> { try { SpringContextUtils.getBean(entry.getKey(), EsDataHandler.class).handle(entry.getValue()); } catch (Exception exception) { log.error("EsDataHandler执行异常:", exception); } }); } } private List createEventInfos(List entrys) { //事件信息集合 List eventInfoList = new ArrayList<>(entrys.size()); for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIonBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStorevalue()); } catch (Exception e) { log.error("canal 数据转化 RowChange 对象出错, 数据:" + entry.toString(), e); continue; } //事件类型 CanalEntry.EventType eventType = rowChage.getEventType(); //表名 String tableName = entry.getHeader().getTableName(); log.info("===========binlog[{} : {}] ,tableName[{}] , eventType : {}", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), tableName, eventType); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { List columns = getChangeColumns(eventType, rowData); Set dataSet = mysqlEsMappings.stream().filter(e -> e.getTableName().equals(tableName)).collect(Collectors.toSet()); if (CollectionUtils.isEmpty(dataSet)) { log.error("未找到表【{}】的配置处理", tableName); continue; } for (EsData esData : dataSet) { String esDataIdName = esData.getEsDataIdName(); Optional optional = columns.stream().filter(e -> e.getName().equals(esDataIdName)).findAny(); if (!optional.isPresent()) { log.error("表:【{}】,dataId:【{}】字段不存在,原始数据:{}", tableName, esDataIdName, columns.toString()); continue; } eventInfoList.add(new EsData(tableName, esDataIdName, optional.get().getValue(), esData.getHandlerBean(), eventType, columns)); } } } return eventInfoList; } private List getChangeColumns(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { if (eventType.equals(CanalEntry.EventType.INSERT)) { return rowData.getAfterColumnsList(); } else if (eventType.equals(CanalEntry.EventType.UPDATE)) { return rowData.getAfterColumnsList(); } else if (eventType.equals(CanalEntry.EventType.DELETE)) { return rowData.getBeforeColumnsList(); } log.error("不支持的 *** 作类型"); return Collections.emptyList(); } }
import com.alibaba.otter.canal.protocol.CanalEntry; import lombok.Getter; import lombok.Setter; import java.util.List; @Getter @Setter public class EsData { public EsData(String tableName, String esDataIdName, Object esDataIdValue, String handlerBean, CanalEntry.EventType eventType, ListchangeColumns) { this.esDataIdName = esDataIdName; this.esDataIdValue = esDataIdValue; this.tableName = tableName; this.handlerBean = handlerBean; this.eventType = eventType; this.changeColumns = changeColumns; } private String esDataIdName; private Object esDataIdValue; private String tableName; private String handlerBean; private CanalEntry.EventType eventType; private List changeColumns; @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; EsData esData = (EsData) o; if (tableName != null ? !tableName.equals(esData.tableName) : esData.tableName != null) return false; return handlerBean != null ? handlerBean.equals(esData.handlerBean) : esData.handlerBean == null; } @Override public int hashCode() { int result = tableName != null ? tableName.hashCode() : 0; result = 31 * result + (handlerBean != null ? handlerBean.hashCode() : 0); return result; } @Override public String toString() { return "EsData{" + "esDataIdName='" + esDataIdName + ''' + ", esDataIdValue=" + esDataIdValue + ", tableName='" + tableName + ''' + ", handlerBean='" + handlerBean + ''' + ", eventType=" + eventType + '}'; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)