一.概述
消息对于现代软件项目来说,占有很重要的地位;同时市场上也发展处ActiveMq、RabbitMQ、Kafka、RocketMQ、Pulsar等众多优秀的框架;这些优秀的框架都由自身的特点和擅长的业务领域,在大数据领域中Kafka目前是使用较多的框架,Pulsar是一个后起之秀,目前处于一个快速发展的状态,有望能够成为下一代中间件的黑马。在本案例中我们选择使用Kafka作为内部消息通知的框架,以适应项目中大数据量的高吞吐、实时流计算等功能实现。
二.约束定义
Topic命名约束
Topic分为单类和混合类消息,不同类的消息命名约束如下:
单类:xxx.topic.[自定义名称].sigle混合类:xxx.topic.[自定义名称].bus
三.各类介绍
KafkaProducerConfig自动配置Kafka生产者KafkaConsumerConfig自动配置Kafka消费者RetryErrorHandler实现消费者处理消息失败后重新发送到消息队列KafkaMessage实现对发送的消息包装,提供重试次数、分类等信息KafkaSender实现消息的统一发送入口功能KafkaTopicConfig自动装载topic名称信息KafkaListener提供自动注册消息消费监听接口类KafkaListenerFactory提供启动时自动注册实现了KafkaListener的消息消费者
四.实现
- 配置文件
Kafka功能有独立的配置文件,放置在srcmainresourceskafka.properties,相关的值在maven_*.properties中配置。
# kafka config kafka.hosts=localhost:9092 kafka.group=xxx.${profiles.name}.${spring.application.name} # 单消息通道,需要以sigle结尾 kafka.topic.admin-test=${kafka.topic.admin-test}
- KafkaMessage
创建类com.xxx.common.kafka.KafkaMessage。KafkaMessage是一个抽象类包含记录当前消息重发处理的次数retry、消息类型type、第一次创建消息的时间time信息。
public abstract class KafkaMessage{ // 尝试次数 @Getter int retry; // 生成时间 @Getter long time = System.currentTimeMillis(); // 消息类型 String type; // 消息实体数据 @Setter @Getter T data; public KafkaMessage(){} public KafkaMessage(T data){ this.data = data; } public void addRetry(){ this.retry++; } // 获取消息类型 protected abstract String getType(); }
- KafkaListener
创建类com.xxx.common.kafka.KafkaListener。KafkaListener是一个接口,继承ConsumerAwareMessageListener(提供Consumer信息和自动提交offsets功能)接口。
topic方法用于返回监听器监听的topic名称factory方法用于指定监听器容器的创建工厂group方法用于指定监听器的groupid
public interface KafkaListenerextends ConsumerAwareMessageListener { String topic(); default String factory(){ return "defaultKafkaListenerContainerFactory"; } default String group(){ return "default";} }
- KafkaTopicConfig
创建类:com.xxx.common.kafka.KafkaTopicConfig。KafkaTopicConfig用于自动装入kafka.properties文件中的kafka.topic.*信息
@Data @Configuration @ConfigurationProperties(prefix="kafka.topic") @PropertySource("classpath:kafka.properties") public class KafkaTopicConfig { String userLogin; String userLogout; String userRefresh; String userRegister; String hotArticle; }
- KafkaProducerConfig
创建类com.xxx.common.kafka.KafkaProducerConfig。KafkaProducerConfig类是自动化配置类,定义了默认的Producer工厂,以及KafkaTemplate,并约束了消息的类型为String,大小不超过16M。
@Data @Configuration @EnableKafka @ConfigurationProperties(prefix="kafka") @PropertySource("classpath:kafka.properties") public class KafkaProducerConfig { private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; private String hosts; @Autowired(required = false) private ProducerListenerproducerListener; @Bean public DefaultKafkaProducerFactory producerFactory() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.getHosts()); props.put(ProducerConfig.RETRIES_CONFIG, 10); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 5_000); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,3*MAX_MESSAGE_SIZE); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,3*MAX_MESSAGE_SIZE); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 256 * 1024); return new DefaultKafkaProducerFactory<>( props); } @Bean public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { KafkaTemplate t = new KafkaTemplate<> (producerFactory); if (this.producerListener != null) { t.setProducerListener(this.producerListener); } return t; } }
- KafkaSender
创建类com.xxx.common.kafka.KafkaSender。KafkaSender类是所有发送消息的方法统一管理器,其实是通过kafkaTemplate发送。
@Component public class KafkaSender { Logger logger = LoggerFactory.getLogger(KafkaSender.class); @Autowired KafkaTemplatekafkaTemplate; @Autowired ObjectMapper mapper; @Autowired KafkaTopicConfig kafkaTopicConfig; public void sendMesssage(String topic, String key, KafkaMessage> message){ try { this.kafkaTemplate.send(topic, key, mapper.writevalueAsString(message)); }catch (Exception e){ logger.error("send message to [{}] error:",topic,e); } } public void sendMesssageNoWrap(String topic, String key, String message){ try { this.kafkaTemplate.send(topic, key, message); }catch (Exception e){ logger.error("send message to [{}] error:",topic,e); } } }
- RetryErrorHandler
创建类com.xxx.common.kafka.RetryErrorHandler。RetryErrorHandler类用于在消费者解析消息出现错误时,重新放回消息到队列中,并设置超过一个小时或者超过10次处理错误的消息丢弃,避免消息无限滚动;然后这类消息可以通过日志搜索查找出数据补偿重试。
@Component public class RetryErrorHandler extends LoggingErrorHandler { private static Logger logger = LoggerFactory.getLogger(RetryErrorHandler.class); private static final int RETRY_COUNT = 10; private static final int TIME_OUT = 3_600_000;//1个小时超时 @Autowired KafkaSender sender; @Autowired ObjectMapper mapper; @Override public void handle(Exception thrownException, ConsumerRecord, ?> record) { super.handle(thrownException, record); if (record != null) { try{ KafkaMessage> message = mapper.readValue((String)record.value(),KafkaMessage.class); message.addRetry(); long time = System.currentTimeMillis()-message.getTime(); if(message.getRetry()>RETRY_COUNT||time>TIME_OUT){ logger.info("超时或者尝试{}次后,抛弃消息[topic:{}][{}]",RETRY_COUNT,record.topic(),record.value()); }else{ this.sender.sendMesssage(record.topic(), (String)record.key(), message); logger.info("处理失败重新回滚到队列[retry:{}][topic:{}][key:{}]", message.getRetry(), record.topic(), record.key()); } }catch (Exception e){ sender.sendMesssageNoWrap(record.topic(), (String) record.key(), (String) record.value()); } } } }
- KafkaProducerConfig
创建类com.xxx.common.kafka.KafkaProducerConfig。KafkaProducerConfig主要配置消费者监听器,配置重试器、错误处理器等信息,同时设置group消费者。
@Data @Configuration @EnableKafka @ConfigurationProperties(prefix="kafka") @PropertySource("classpath:kafka.properties") public class KafkaConsumerConfig { private static final int ConCURRENCY = 8; public final static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class); String hosts; String group; @Bean("defaultKafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory(RetryErrorHandler retryErrorHandler) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setRetryTemplate(this.buildRetryTemplate()); factory.setErrorHandler(retryErrorHandler); factory.getContainerProperties().setAckOnError(false); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<> (buildComsumerConfig())); factory.setConcurrency(KafkaConsumerConfig.CONCURRENCY); return factory; } protected Map buildComsumerConfig() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.getHosts()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, this.group); propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 8 * 1024 * 1024); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 90_000); return propsMap; } private RetryTemplate buildRetryTemplate() { RetryTemplate t = new RetryTemplate(); ExponentialBackOffPolicy backOff = new ExponentialRandomBackOffPolicy(); backOff.setInitialInterval(1000L); t.setBackOffPolicy(backOff); t.setRetryPolicy(new SimpleRetryPolicy(5)); t.registerListener(new RetryListenerSupport() { @Override public void onError(RetryContext context, RetryCallback callback, Throwable throwable) { KafkaConsumerConfig.LOGGER.warn("Retry processing Kafka message" + context.getRetryCount() + " times", throwable); } }); return t; } }
- KafkaListenerFactory
创建类com.xxx.common.kafka.KafkaListenerFactory。KafkaListenerFactory类实现在构造之后扫描实现了的KafkaListener接口的Bean,并自动注册成消费者监听器。
@Component public class KafkaListenerFactory implements InitializingBean { Logger logger = LoggerFactory.getLogger(KafkaListenerFactory.class); @Autowired DefaultListableBeanFactory defaultListableBeanFactory; @Override public void afterPropertiesSet() { Mapmap = defaultListableBeanFactory.getBeansOfType(KafkaListener.class); for (String key : map.keySet()) { KafkaListener k = map.get(key); AbstractKafkaListenerContainerFactory factory = (AbstractKafkaListenerContainerFactory)defaultListableBeanFactory.getBean(k.factory()); AbstractMessageListenerContainer container = factory.createContainer(k.topic()); container.setupMessageListener(k); String beanName = k.getClass().getSimpleName()+"AutoListener" ; defaultListableBeanFactory.registerSingleton(beanName,container); logger.info("add auto listener [{}]",beanName); } } }
- MessagesRegister
@Log4j2 @Component public class MessagesRegister implements InitializingBean { Mapmessages = Maps.newConcurrentMap(); @Override public void afterPropertiesSet() throws Exception { Reflections reflections = new Reflections("com.xxx"); Set > ms = reflections.getSubTypesOf(KafkaMessage.class); if(ms!=null){ ms.forEach(cla->{ try { Constructor>[] cs = cla.getConstructors(); KafkaMessage mess = null; if (cs != null && cs.length > 0) { Class[] temp = cs[0].getParameterTypes(); Object[] parms = new Object[temp.length]; for (int i = 0; i < temp.length; i++) { if(temp[i].isPrimitive()){ if(temp[i].getName().contains("boolean")){ parms[i]=false; }else { parms[i] = 0; } }else{ parms[i]=null; } } mess = (KafkaMessage) cs[0].newInstance(parms); } else { mess = (KafkaMessage) cla.newInstance(); } String type = mess.getType(); messages.put(type,cla); }catch (Exception e){ System.out.println(cla+"====================:"+cla.getConstructors()[0].getParameterCount()); e.printStackTrace(); } }); } log.info("================================================="); log.info("scan kafka message resultt[{}]",messages); log.info("================================================="); } public Class extends KafkaMessage> findClassByType(String type){ return this.messages.get(type); } }
五.消息生产者
@SpringBootTest @RunWith(SpringRunner.class) public class KafkaTest { @Autowired KafkaTemplatekafkaTemplate; @Test public void test(){ try { this.kafkaTemplate.send("topic.test", "123key","123value"); System.out.println("================================="); Thread.sleep(500000);// 休眠等待消费者接收消息 } catch (Exception e) { e.printStackTrace(); } } }
六.消息消费者
@Component public class TestKafkaListener implements KafkaListener{ @Override public String topic () { return "topic.test"; } @Override public void onMessage (ConsumerRecord< String, String > data, Consumer< ?, ? > consumer){ System.out.println("===========receive test message:" + data); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)