最近单位由于业务整改,需要重新设计一套任务调度中间件,便于各分系统根据任务优先级进行集中调度,详细需求如下:
-
高并发
-
低延迟
-
高可靠
-
根据任务优先级进行转发
由于工作业务需要,各分系统统一将任务消息发送到Kafka中,所以设计的任务调度中间件业务逻辑如下图所示:
代码如下所示:
-
xml文件中添加Kafka和RabbitMQ依赖包
org.apache.kafka kafka-clients2.5.1 org.springframework.boot spring-boot-starter-amqp
- 在application.yaml文件中添加RabbitMQ配置参数
spring: rabbitmq: publisher-/confirm/i-type: correlated #发送到交换器后触发回调,属性有三种确认类型,none是禁用发布确认 publisher-returns: true #是否开启消息发送到队列(Queue)后触发回调 template: retry: # 消息发送失败重试相关配置 enabled: true initial-interval: 3000 max-attempts: 3 max-interval: 10000 multiplier: 1 host: localhost port: 5672 listener: simple: acknowledge-mode: auto # 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认) # concurrency: 10 # 最小线程数量 # max-concurrency: 20 # 最大线程数量 prefetch: 1 # 每个消费者可能未完成的最大未确认消息数量,消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch 设置为较小数值,让优先级越高的消息更快加入到消费者线程。
- 构建RabbitMQ配置文件
@Configuration public class RabbitMQConfig { private static final String EXCHANGE = "priority-exchange"; public static final String QUEUE = "priority-queue"; private static final String ROUTING_KEY = "priority.queue.#"; @Bean Queue queue() { Mapargs= new HashMap<>(); args.put("x-max-priority", 100); return new Queue(QUEUE, false, false, false, args); } @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } }
-
RabbitMq生产者
public void sendPriorityMessage(String content, Integer priority) { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content, message -> { message.getMessageProperties().setPriority(priority); return message; }); }
-
构建Kafka消费者
public ConsumercreateConsumer() { Properties props = new Properties(); // 指定Kafka服务的ip地址及端口 props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress); // 指定group.id,Kafka中的消费者需要在消费者组里 props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 是否开启自动提交 props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 消息key的反序列化器 props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); // 消息value的反序列化器 props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); // // 指定每次poll方法返回的记录数量,该方法仅针对手动提交有效 // props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); // 设定session.timeout.ms超时时间防止出现rebalance props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); // props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); return new KafkaConsumer<>(props); }
-
构建数据通道将Kafka中的数据发送到RabbitMQ优先级队列中
public void KafkaMonitor() { Consumerconsumer = createConsumer(); kafkaSwitch.setSwitchOn(-1); Long pollIntervalInt = Long.valueOf(pollInterval); Collection collection = new ArrayList (); collection.add(topic1); consumer.subscribe(collection); int count = 0; try { while (true){ System.out.println("准备进行kafka消费,获取开关变量="+ kafkaSwitch.getSwitchOn()); log.info("准备进行kafka消费,获取开关变量="+ kafkaSwitch.getSwitchOn()); if (kafkaSwitch.getSwitchOn() == 1){ log.info("开始结束消费" + consumer.toString()); consumer.close(); log.info("消费已结束"); System.out.println("消费已结束"); kafkaSwitch.setSwitchOn(-1); }else { // 从Topic中拉取数据 ConsumerRecords records = consumer.poll(Duration.ofMillis(pollIntervalInt)); for (ConsumerRecord record : records){ count ++; JSonObject jsonObject = JSONObject.parseObject(record.value()); if (jsonObject.containsKey("priority")){ sendPriorityMessage(record.value(), jsonObject.getInteger("priority")); log.info("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = " + jsonObject.getString("priority")); System.out.println("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = " + jsonObject.getString("priority")); }else{ sendPriorityMessage(record.value(), 0); log.info("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = 0" ); System.out.println("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = 0"); } } //控制往RabbitMq传输数据的速度,避免在RM中堆积过多数据 if (count >= Integer.getInteger(buffer)){ Thread.sleep(20000); //该数值应小于sessionTimeout数值,避免频繁rebalance } count = 0; } } }catch (WakeupException e){ //消费停止 }catch (Exception e){ //消费异常,记录信息 System.out.println(e.toString()); log.error(e.toString()); }finally { System.out.println("关闭消费者"); log.info("关闭消费者"); } }
ps:
-
由于Kafka消费速度与RabbitMQ消费速度存在较大差异,为避免数据在RabbitMQ中过多堆积,应当适当控制Kafka消费速度,每当Kafka消费者往RabbitMQ中写入一定数量(Buffer)的数据时,设置Kafka消费者睡眠一段时间,需要注意的是,要保持Kafka消费者睡眠时间小于Kafka消费者SessionTimeOut时间,从而避免Kafka消费者群组频繁rebalance而造成kafka消费速度下降。
-
应当添加Kafka消费者控制器,控制打开/关闭消费者,在上述代码中设置kafkaSwitch对象,该对象代码如下所示,当kafkaSwitch.getSwitchOn() == 1时Kafka消费结束
public class KafkaSwitch { private int switchOn = -1; public int getSwitchOn() { return switchOn; } public void setSwitchOn(int switchOn) { this.switchOn = switchOn; } }
-
RabbitMQ消费者
@RabbitListener(id = "taskschedule", queues = "priority-queue", autoStartup = "false", concurrency = "20-50") public void listen(String message) { try{ jobSubmitService.PgsService(message); System.out.println("从rabbitmq获取信息" + message); } catch (Exception e){ log.error(e.getMessage()); } }
ps:
-
为了提高RabbitMQ优先级队列消费速度,可以启动多个消费者进行消费,可以在@RabbitListener注解中进行配置,例如concurrency = "20-50",也可以在application.yaml文件中进行配置,如上述配置文件中concurrency、max-concurrency参数所示。
-
应当添加RabbitMQ消费者控制器,控制打开/关闭消费者,如下述代码所示
public void rabbitlistener(String consumerId, String startflag){ MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer(consumerId); if ("true".equals(startflag)){ consumer.start(); }else { consumer.stop(); } }
-
Feign+Hystrix熔断服务
@FeignClient(name = "name", fallback = JobSubmitServiceHystrixImpl.class) public interface JobSubmitService { @RequestMapping(value = "/JobSubmit", method = RequestMethod.POST) public ResponseEntity
public ResponseEntity
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)