RabbitMQ的相关 *** 作2--轮训分发消息

RabbitMQ的相关 *** 作2--轮训分发消息,第1张

RabbitMQ的相关 *** 作2--轮训分发消息

目录
  • 1. 创建SpringBoot项目,引入相关依赖
  • 2. 为了减少重复代码书写,编写连接工厂创建信道的工具类
  • 3. 编写生产者代码
  • 4. 编写消费者代码
  • 5. 运行代码测试

1. 创建SpringBoot项目,引入相关依赖
    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            commons-io
            commons-io
            2.6
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        
        
            org.springframework.amqp
            spring-rabbit-test
            test
        
    
2. 为了减少重复代码书写,编写连接工厂创建信道的工具类
public class RabbitMqUtils {
    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("120.24.192.216");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
3. 编写生产者代码
public class Task01 {
    public static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台中接收信息发送
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("发送消息完成:"+message);
        }
    }
}

4. 编写消费者代码
public class Worker01 {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //消息的接收
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            System.out.println("接收到的消息:"+new String(message.getBody()));
        };
        //消息接收被取消时执行的内容
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息者取消消息消费接口回调逻辑");
        };
        System.out.println("C2等待接收消息......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
5. 运行代码测试

多进程运行两个消费者代码,首先设置idea可以多进程运行,选中Allow multiple instances

运行生产者代码,并手动输入需要发送的消息

观察消费者接收消息的情况

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5618717.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存