配置
kafka: bootstrap-servers: 101.34.177.108:9092 # 多个用英文逗号隔开 producer: retries: 3 # 默认为 0,发送主题失败后重试的次数 batch-size: 100 # 默认为 0,批处理发送主题大小 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer listener: ack-mode: MANUAL_IMMEDIATE # 手动提交模式 concurrency: 5 # 消费监听线程数,当配置值大于 Kafka 分区数,按分区数执行 poll-timeout: 5000 # 单次拉取消息的超时(毫秒) missing-topics-fatal: false consumer: enable-auto-commit: false # 建议关闭自动提交 Offset,不然报错很难处理 auto-offset-reset: earliest max-poll-records: 100 # 单次拉取最大记录数 group-id: item # 消费组,消费者多实例的情况下,配置同一个消费组,实例数不能超过 Topic 的分区数 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
pom依赖
org.springframework.kafka spring-kafka
相关工具类
@Data @ToString public class MessageObject { private Long id; private Object msg; private Date sendTime; }
@Data @ToString public class Message { private Long id; private String msg; private Date sendTime; }
@Component @Slf4j public class KafkaSender { @Resource private KafkaTemplatekafkaTemplate; private Gson gson = new GsonBuilder().create(); public Boolean send(String topic , String msg) { Message message = new Message(); try { message.setId(System.currentTimeMillis()); message.setMsg(msg); message.setSendTime(new Date()); ListenableFuture > future = kafkaTemplate.send(topic, msg); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { // 给发Kafaka主题,发送消息 kafkaTemplate.send(topic,gson.toJson(message)); log.info(topic+" - 生产者 发送消息失败:" + message.toString()); } @Override public void onSuccess(SendResult stringObjectSendResult) { // 给发Kafaka主题,发送消息 log.info(topic+" - 生产者 发送消息成功:" + message.toString()); } }); } catch (Exception e) { e.printStackTrace(); log.info(topic+" - 生产者 发送消息失败:", message.toString()); } return true; } }
消费者类
package com.menglar.soap.item.common.lazada_kafaka_consumer; import com.alibaba.fastjson.JSON; import com.menglar.soap.item.service.LazadaItemService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.support.Acknowledgment; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.CompletableFuture; @Component @Slf4j public class ItemCategoryConsumer { @Autowired private LazadaItemService lazadaItemService; @Qualifier("threadPoolTaskExecutor") @Autowired private ThreadPoolTaskExecutor poolTaskExecutor; //@KafkaListener(topics = KafkaConstants.ITEM_CLAIM) public void setItemRecommendCategory(ConsumerRecord, ?> record, Acknowledgment ack) { log.info(record.topic()+" - 消费者 开始消费:" + record.value().toString()); try { // 消费到批量设置类目的kafak主题数据处理业务,判断消费到的数据是否为空,不为空进行业务处理ok if (record.value()!= null) { Object value = record.value(); KafkaVO kafkaVO = JSON.parseObject((String) value, KafkaVO.class); Long userId = kafkaVO.getUserId(); ListitemList = kafkaVO.getItemList(); //开启多线程消费 for (Long itemId : itemList) { CompletableFuture.runAsync(()-> lazadaItemService.setItemCategory(userId, itemId),poolTaskExecutor); } } // 回调ack确认 ack.acknowledge(); log.info(record.topic()+" - 消费者 消费消息成功:" + record.value().toString()); } catch (Exception e) { log.error(record.topic()+" - 消费者 消费消息失败:" + record.value().toString(),e.getStackTrace()); } } // // @KafkaListener(topics = KafkaConstants.ITEM_CLAIM) // public void operateMethod(ConsumerRecord, ?> record, Acknowledgment ack){ // log.info("开始监听"); // Object value = record.value(); // KafkaVO kafkaVO = JSON.parseObject((String) value, KafkaVO.class); // Long userId = kafkaVO.getUserId(); // log.info("消费成功"); // } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)