springboot集成kafka

springboot集成kafka,第1张

springboot集成kafka

配置

 kafka:
    bootstrap-servers: 101.34.177.108:9092 # 多个用英文逗号隔开
    producer:
      retries: 3 # 默认为 0,发送主题失败后重试的次数
      batch-size: 100 # 默认为 0,批处理发送主题大小
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    listener:
      ack-mode: MANUAL_IMMEDIATE # 手动提交模式
      concurrency: 5 # 消费监听线程数,当配置值大于 Kafka 分区数,按分区数执行
      poll-timeout: 5000 # 单次拉取消息的超时(毫秒)
      missing-topics-fatal: false
    consumer:
      enable-auto-commit: false # 建议关闭自动提交 Offset,不然报错很难处理
      auto-offset-reset: earliest
      max-poll-records: 100 # 单次拉取最大记录数
      group-id: item # 消费组,消费者多实例的情况下,配置同一个消费组,实例数不能超过 Topic 的分区数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

pom依赖

		
		
			org.springframework.kafka
			spring-kafka
		

相关工具类

@Data
@ToString
public class MessageObject {

		private Long id;

		private Object msg;

		private Date sendTime;
}
@Data
@ToString
public class Message {

    private Long id;

    private String msg;

    private Date sendTime;

}
@Component
@Slf4j
public class KafkaSender {

    @Resource
    private KafkaTemplate kafkaTemplate;


    private Gson gson = new GsonBuilder().create();

    
    public Boolean send(String topic , String msg) {

        Message message = new Message();

        try {

            message.setId(System.currentTimeMillis());
            message.setMsg(msg);
            message.setSendTime(new Date());

            ListenableFuture> future = kafkaTemplate.send(topic, msg);

            future.addCallback(new ListenableFutureCallback>() {
                @Override
                public void onFailure(Throwable throwable) {
                    // 给发Kafaka主题,发送消息
                    kafkaTemplate.send(topic,gson.toJson(message));
                    log.info(topic+" - 生产者 发送消息失败:" + message.toString());
                }

                @Override
                public void onSuccess(SendResult stringObjectSendResult) {
                    // 给发Kafaka主题,发送消息
                    log.info(topic+" - 生产者 发送消息成功:" + message.toString());
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
            log.info(topic+" - 生产者 发送消息失败:", message.toString());
        }
        return true;
    }



}

消费者类

package com.menglar.soap.item.common.lazada_kafaka_consumer;

import com.alibaba.fastjson.JSON;
import com.menglar.soap.item.service.LazadaItemService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;


@Component
@Slf4j
public class ItemCategoryConsumer {

	@Autowired
	private LazadaItemService lazadaItemService;
	@Qualifier("threadPoolTaskExecutor")
	@Autowired
	private ThreadPoolTaskExecutor poolTaskExecutor;

    
		//@KafkaListener(topics = KafkaConstants.ITEM_CLAIM)
    public void setItemRecommendCategory(ConsumerRecord record, Acknowledgment ack) {
			   log.info(record.topic()+" - 消费者 开始消费:" + record.value().toString());
		try {
			// 消费到批量设置类目的kafak主题数据处理业务,判断消费到的数据是否为空,不为空进行业务处理ok
			if (record.value()!= null) {
				Object value = record.value();
				KafkaVO kafkaVO = JSON.parseObject((String) value, KafkaVO.class);
				Long userId = kafkaVO.getUserId();
				List itemList = kafkaVO.getItemList();
				//开启多线程消费
				for (Long itemId : itemList) {
					CompletableFuture.runAsync(()-> lazadaItemService.setItemCategory(userId, itemId),poolTaskExecutor);
				}
			}
			// 回调ack确认
			ack.acknowledge();
            log.info(record.topic()+" - 消费者 消费消息成功:" + record.value().toString());
		} catch (Exception e) {
            log.error(record.topic()+" - 消费者 消费消息失败:" + record.value().toString(),e.getStackTrace());
		}
	}


	
//
//	@KafkaListener(topics = KafkaConstants.ITEM_CLAIM)
//	public void operateMethod(ConsumerRecord record, Acknowledgment ack){
//		log.info("开始监听");
//		Object value = record.value();
//		KafkaVO kafkaVO = JSON.parseObject((String) value, KafkaVO.class);
//		Long userId = kafkaVO.getUserId();
//		log.info("消费成功");
//		}


}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705399.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存