记录一次springboot2.x整合canal+kafka踩坑

记录一次springboot2.x整合canal+kafka踩坑,第1张

记录一次springboot2.x整合canal+kafka踩坑

运维大大搭建好服务之后,监听到kafka的canal数据后,发现数据全是乱码,程序根本没法正常往下走,贴张截图:

网上找了一圈也没找到一个有效的处理办法,后来有个前辈做过这个,给我指导了一下,原来是序列化问题,直接贴代码:

import com.alibaba.otter.canal.client.kafka.MessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.consumer.servers}")
    private String servers;

    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;

    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;

    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;

    @Value("${kafka.consumer.group.id}")
    private String groupId;

    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Bean
    public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setAutoStartup(false);

        // 将过滤抛弃的消息自动确认
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                long length = consumerRecord.toString().getBytes().length;
                long maxLength = 5 * 1024 *1024; //大于5M
                if(length < maxLength){
                    return  false;
                }
                // 返回true将会被丢弃
                return true;
            }
        });
        return factory;
    }

    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map consumerConfigs() {
        Map propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 【注意】重点就是这里,反序列化时需要用canal中的序列化方式
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }

}
## kafka配置
#kafka.consumer.servers=192.168.3.2:9092
#kafka.consumer.enable.auto.commit=true
#kafka.consumer.session.timeout=6000
#kafka.consumer.auto.commit.interval=1000
#kafka.consumer.auto.offset.reset=earliest
#kafka.consumer.topic=canal-wp-baseline
#kafka.consumer.group.id=canal-wp-baseline
#kafka.consumer.concurrency=20

这样反序列化出来的信息就对了。

后续处理的一段伪代码:

@Component
public class baselineCanalKafkaListener {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(id = CanalConstants.CANAL_KAFKA_ID, topics = {"canal-wp-baseline"})
    public void listen(ConsumerRecord record) {
        if(!(record.value() instanceof Message)){
            return;
        }
        Message message = (Message) record.value();
        int size = message.getEntries().size();
        if (message.getId() == -1 || size == 0) {
            return;
        }
        List infos = createEventInfos(message.getEntries());
        if (CollectionUtils.isEmpty(infos)) {
            return;
        }
        // key--handlerBeanName,value--EsData对象
        Map> listMap = infos.stream().collect(Collectors.groupingBy(EsData::getHandlerBean));
        for (Map.Entry> entry : listMap.entrySet()) {
            ThreadPoolUtils.submit(() -> {
                try {
                    SpringContextUtils.getBean(entry.getKey(), EsDataHandler.class).handle(entry.getValue());
                } catch (Exception exception) {
                    log.error("EsDataHandler执行异常:", exception);
                }
            });
        }

    }


    
    private List createEventInfos(List entrys) {
        //事件信息集合
        List eventInfoList = new ArrayList<>(entrys.size());
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIonBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
            } catch (Exception e) {
                log.error("canal 数据转化 RowChange 对象出错, 数据:" + entry.toString(), e);
                continue;
            }
            //事件类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            //表名
            String tableName = entry.getHeader().getTableName();
            log.info("===========binlog[{} : {}] ,tableName[{}] , eventType : {}",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), tableName, eventType);

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                List columns = getChangeColumns(eventType, rowData);
                Set dataSet = mysqlEsMappings.stream().filter(e -> e.getTableName().equals(tableName)).collect(Collectors.toSet());
                if (CollectionUtils.isEmpty(dataSet)) {
                    log.error("未找到表【{}】的配置处理", tableName);
                    continue;
                }
                for (EsData esData : dataSet) {
                    String esDataIdName = esData.getEsDataIdName();
                    Optional optional = columns.stream().filter(e -> e.getName().equals(esDataIdName)).findAny();
                    if (!optional.isPresent()) {
                        log.error("表:【{}】,dataId:【{}】字段不存在,原始数据:{}", tableName, esDataIdName, columns.toString());
                        continue;
                    }
                    eventInfoList.add(new EsData(tableName, esDataIdName, optional.get().getValue(), esData.getHandlerBean(), eventType, columns));
                }
            }
        }
        return eventInfoList;
    }

    
    private List getChangeColumns(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        if (eventType.equals(CanalEntry.EventType.INSERT)) {
            return rowData.getAfterColumnsList();
        } else if (eventType.equals(CanalEntry.EventType.UPDATE)) {
            return rowData.getAfterColumnsList();
        } else if (eventType.equals(CanalEntry.EventType.DELETE)) {
            return rowData.getBeforeColumnsList();
        }
        log.error("不支持的 *** 作类型");
        return Collections.emptyList();
    }

}
import com.alibaba.otter.canal.protocol.CanalEntry;
import lombok.Getter;
import lombok.Setter;

import java.util.List;

@Getter
@Setter
public class EsData {

    public EsData(String tableName, String esDataIdName, Object esDataIdValue, String handlerBean, CanalEntry.EventType eventType, List changeColumns) {
        this.esDataIdName = esDataIdName;
        this.esDataIdValue = esDataIdValue;
        this.tableName = tableName;
        this.handlerBean = handlerBean;
        this.eventType = eventType;
        this.changeColumns = changeColumns;
    }

    
    private String esDataIdName;
    
    private Object esDataIdValue;
    
    private String tableName;
    
    private String handlerBean;
    
    private CanalEntry.EventType eventType;
    
    private List changeColumns;


    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        EsData esData = (EsData) o;

        if (tableName != null ? !tableName.equals(esData.tableName) : esData.tableName != null) return false;
        return handlerBean != null ? handlerBean.equals(esData.handlerBean) : esData.handlerBean == null;
    }

    @Override
    public int hashCode() {
        int result = tableName != null ? tableName.hashCode() : 0;
        result = 31 * result + (handlerBean != null ? handlerBean.hashCode() : 0);
        return result;
    }

    @Override
    public String toString() {
        return "EsData{" +
                "esDataIdName='" + esDataIdName + ''' +
                ", esDataIdValue=" + esDataIdValue +
                ", tableName='" + tableName + ''' +
                ", handlerBean='" + handlerBean + ''' +
                ", eventType=" + eventType +
                '}';
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4828671.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-10
下一篇 2022-11-10

发表评论

登录后才能评论

评论列表(0条)

保存