优先级任务调度中间件(基于Kafka-RabbitMQ进行任务调度)

优先级任务调度中间件(基于Kafka-RabbitMQ进行任务调度),第1张

优先级任务调度中间件(基于Kafka-RabbitMQ进行任务调度)

        最近单位由于业务整改,需要重新设计一套任务调度中间件,便于各分系统根据任务优先级进行集中调度,详细需求如下:

  • 高并发

  • 低延迟

  • 高可靠

  • 根据任务优先级进行转发

        由于工作业务需要,各分系统统一将任务消息发送到Kafka中,所以设计的任务调度中间件业务逻辑如下图所示:

        代码如下所示:

  • xml文件中添加Kafka和RabbitMQ依赖包


    org.apache.kafka
    kafka-clients
    2.5.1




    org.springframework.boot
    spring-boot-starter-amqp
  • 在application.yaml文件中添加RabbitMQ配置参数
spring:
  rabbitmq:
    publisher-/confirm/i-type: correlated  #发送到交换器后触发回调,属性有三种确认类型,none是禁用发布确认
    publisher-returns: true  #是否开启消息发送到队列(Queue)后触发回调
    template:
      retry:     # 消息发送失败重试相关配置
        enabled: true
        initial-interval: 3000
        max-attempts: 3
        max-interval: 10000
        multiplier: 1
    host: localhost
    port: 5672
    listener:
      simple:
        acknowledge-mode: auto # 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)
#        concurrency: 10  # 最小线程数量
#        max-concurrency: 20   # 最大线程数量
        prefetch: 1         # 每个消费者可能未完成的最大未确认消息数量,消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch 设置为较小数值,让优先级越高的消息更快加入到消费者线程。
  • 构建RabbitMQ配置文件
@Configuration
public class RabbitMQConfig {

    private static final String EXCHANGE = "priority-exchange";

    public static final String QUEUE = "priority-queue";

    private static final String ROUTING_KEY = "priority.queue.#";

    
    @Bean
    Queue queue() {
        Map args= new HashMap<>();
        args.put("x-max-priority", 100);
        return new Queue(QUEUE, false, false, false, args);
    }

    
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}
  • RabbitMq生产者

public void sendPriorityMessage(String content, Integer priority) {
    rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,
            message -> {
                message.getMessageProperties().setPriority(priority);
                return message;
            });
}
public Consumer createConsumer() {
        Properties props = new Properties();
        // 指定Kafka服务的ip地址及端口
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
        // 指定group.id,Kafka中的消费者需要在消费者组里
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // 是否开启自动提交
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 消息key的反序列化器
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        // 消息value的反序列化器
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
//        // 指定每次poll方法返回的记录数量,该方法仅针对手动提交有效
//        props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
        // 设定session.timeout.ms超时时间防止出现rebalance
        props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
//        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
        return new KafkaConsumer<>(props);
    }
  • 构建数据通道将Kafka中的数据发送到RabbitMQ优先级队列中

public void KafkaMonitor() {
    Consumer consumer = createConsumer();
    kafkaSwitch.setSwitchOn(-1);
    Long pollIntervalInt = Long.valueOf(pollInterval);
    Collection collection = new ArrayList();
    collection.add(topic1);
    consumer.subscribe(collection);
    int count = 0;
    try {
        while (true){
            System.out.println("准备进行kafka消费,获取开关变量="+ kafkaSwitch.getSwitchOn());
            log.info("准备进行kafka消费,获取开关变量="+ kafkaSwitch.getSwitchOn());
            if (kafkaSwitch.getSwitchOn() == 1){
                log.info("开始结束消费" + consumer.toString());
                consumer.close();
                log.info("消费已结束");
                System.out.println("消费已结束");
                kafkaSwitch.setSwitchOn(-1);
            }else {
                // 从Topic中拉取数据
                ConsumerRecords records = consumer.poll(Duration.ofMillis(pollIntervalInt));
                for (ConsumerRecord record : records){
                    count ++;
                    JSonObject jsonObject = JSONObject.parseObject(record.value());
                    if (jsonObject.containsKey("priority")){
                        sendPriorityMessage(record.value(), jsonObject.getInteger("priority"));
                        log.info("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = " + jsonObject.getString("priority"));
                        System.out.println("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = " + jsonObject.getString("priority"));
                    }else{
                        sendPriorityMessage(record.value(), 0);
                        log.info("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = 0" );
                        System.out.println("消费数据:" + "partition = " + record.partition() + ", offset = " + record.offset() + ", key = " + record.key() + " , value = " + record.value() + ", priority = 0");
                    }
                }
                //控制往RabbitMq传输数据的速度,避免在RM中堆积过多数据
                if (count >= Integer.getInteger(buffer)){
                    Thread.sleep(20000);  //该数值应小于sessionTimeout数值,避免频繁rebalance
                }
                count = 0;
            }
        }
    }catch (WakeupException e){
        //消费停止
    }catch (Exception e){
        //消费异常,记录信息
        System.out.println(e.toString());
        log.error(e.toString());
    }finally {
        System.out.println("关闭消费者");
        log.info("关闭消费者");
    }

}

ps:

  1. 由于Kafka消费速度与RabbitMQ消费速度存在较大差异,为避免数据在RabbitMQ中过多堆积,应当适当控制Kafka消费速度,每当Kafka消费者往RabbitMQ中写入一定数量(Buffer)的数据时,设置Kafka消费者睡眠一段时间,需要注意的是,要保持Kafka消费者睡眠时间小于Kafka消费者SessionTimeOut时间,从而避免Kafka消费者群组频繁rebalance而造成kafka消费速度下降。

  2. 应当添加Kafka消费者控制器,控制打开/关闭消费者,在上述代码中设置kafkaSwitch对象,该对象代码如下所示,当kafkaSwitch.getSwitchOn() == 1时Kafka消费结束

public class KafkaSwitch {
    private int switchOn = -1;

    public int getSwitchOn() {
        return switchOn;
    }

    public void setSwitchOn(int switchOn) {
        this.switchOn = switchOn;
    }
}
  • RabbitMQ消费者

@RabbitListener(id = "taskschedule", queues = "priority-queue", autoStartup = "false", concurrency = "20-50")
    public void listen(String message) {
        try{
            jobSubmitService.PgsService(message);
            System.out.println("从rabbitmq获取信息" + message);
        } catch (Exception e){
            log.error(e.getMessage());
        }
    }

ps:

  1. 为了提高RabbitMQ优先级队列消费速度,可以启动多个消费者进行消费,可以在@RabbitListener注解中进行配置,例如concurrency = "20-50",也可以在application.yaml文件中进行配置,如上述配置文件中concurrency、max-concurrency参数所示。

  2. 应当添加RabbitMQ消费者控制器,控制打开/关闭消费者,如下述代码所示

public void rabbitlistener(String consumerId, String startflag){
    MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer(consumerId);
    if ("true".equals(startflag)){
        consumer.start();
    }else {
        consumer.stop();
    }
}
  • Feign+Hystrix熔断服务

@FeignClient(name = "name", fallback = JobSubmitServiceHystrixImpl.class)
public interface JobSubmitService {

    @RequestMapping(value = "/JobSubmit", method = RequestMethod.POST)
    public ResponseEntity> PgsService(@RequestBody String params);
}
public ResponseEntity> PgsService(@RequestBody String params){
    Map errorInfo = new HashMap<>();
    try {
        //由于消费者服务熔断,关闭Kafka消费者
        kafkaErrorProcess.closeConsumer();
        pipeLineService.closeConsumer();
        //将数据发送到kafka中ErrorTopic
        kafkaProducerService.producerAsyncSend(errorTopic, params);
        System.out.println("消费者服务熔断,请检查消费者服务");
        log.error("消费者服务熔断,请检查消费者服务");
        Thread.sleep(Long.getLong(hystrixSleeping));
    }catch (Exception e){
        log.error(e.getMessage());
    }
    return new ResponseEntity<>(errorInfo, HttpStatus.INTERNAL_SERVER_ERROR);
}

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678257.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存