RabbitMQ快速入门及六大模式

RabbitMQ快速入门及六大模式,第1张

RabbitMQ快速入门及六大模式

目录

核心组件

运行原理

实现步骤

构建Gradle项目

入门案例(简单模式)

        生产者(代码)

         消费者(代码)

绑定交换机和队列

发布/订阅模式(Publish/Subscribe)

        生产者

        消费者

路由模式(routing)

        生产者

        消费者

主题模式(topics)

        生产者

        消费者

工作模式(Work queue)

        轮询模式

        公平分配模式


核心组件

 - Server:又称Broker接受客户端的连接,实现AMQP实体服务。安装rabbitmq-server Connection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手

- Channel:网络信道,几乎所有的 *** 作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各 Channel,每个Channel代表一个会话任务。

- Message:消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。

- VirtualHost虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和 Queueu,同一个虚拟王机里面不能有相同名字的Exchange

- Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)

- Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.

- Routingkey:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。

- Queue:队列:也成为MessageQueue,消息队列,保存消息并将它们转发给消费者。

运行原理

实现步骤

        1、JDK11
        2、构建一个gradle项目
        3、导入RabbitMQ的相关依赖
        4、启动rabbitmq-server服务
        5、定义生产者
        6、定义消费者
        7、查看消息在rabbitmq-server服务中的过程

构建Gradle项目

        Maven 同样引入相应的依赖

implementation group: 'com.rabbitmq', name: 'amqp-client', version: '5.10.0'
入门案例(简单模式)

        生产者(代码)
package com.any.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();
            String queueName = "queue1";
            
            // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息
            channel.queueDeclare(queueName,false,false,true,null);
            // 5、准备消息
            String message = "hello,anyboot";
            // 6、发送消息给 queue
            channel.basicPublish("",queueName,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
         消费者(代码)
package com.any.rabbitmq.simple;

import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer {
    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();
            String queueName = "queue1";
            channel.basicConsume(queueName, true, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到的消息:" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                    System.out.println("消息接受失败");
                }
            });
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
绑定交换机和队列

        该例子以生产者为例

package com.any.rabbitmq.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;


public class Producer {

    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port

        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取信道Channel
            channel = connection.createChannel();

            // 定义交换机名称
            String exchangeName = "direct-message-exchange";

            // 定义交换机类型
            String exchangeType = "direct";

            // 定义 路由key
            String routingKey = "message";

            
            channel.exchangeDeclare(exchangeName,exchangeType,true);
            

            channel.queueDeclare("queue5",true,false,false,null);
            channel.queueDeclare("queue6",true,false,false,null);
            channel.queueDeclare("queue7",true,false,false,null);

            
            channel.queueBind("queue5",exchangeName,routingKey);
            channel.queueBind("queue6",exchangeName,routingKey);
            channel.queueBind("queue7",exchangeName,"email");

            // 5、准备消息
            String message = "hello,message-exchange";
            // 6、发送消息给 queue
            
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }

}
发布/订阅模式(Publish/Subscribe)

 

        生产者
package com.any.rabbitmq.routing.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;


public class Producer {
    // 发布订阅模式
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();
            // 定义 队列名称
            String queueName = "queue1";
            // 定义交换机名称
            String exchangeName = "fanout-exchange";
            // 定义交换机类型
            String exchangeType = "fanout";
            // 定义 路由key
            String routingKey = "";
            
            // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息
            //channel.queueDeclare(queueName,true,false,true,null);
            // 5、准备消息
            String message = "hello,anyboot";
            // 6、发送消息给 queue
            
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }

}
        消费者
package com.any.rabbitmq.routing.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;


public class Consumer {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
        new Thread(runnable,"queue3").start();
    }
}
路由模式(routing)

        生产者
package com.any.rabbitmq.routing.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;


public class Producer {

    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port

        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();



            // 定义 队列名称
            String queueName = "queue1";


            // 定义交换机名称
            String exchangeName = "direct-exchange";

            // 定义交换机类型
            String exchangeType = "direct";

            // 定义 路由key
            String routingKey = "email";
            
            // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息
            //channel.queueDeclare(queueName,true,false,true,null);
            // 5、准备消息
            String message = "hello,anyboot";
            // 6、发送消息给 queue
            
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }


    }

}
        消费者
package com.any.rabbitmq.routing.direct;

import com.rabbitmq.client.*;

import java.io.IOException;


public class Consumer {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
        new Thread(runnable,"queue3").start();
    }
}
主题模式(topics)

        生产者
package com.any.rabbitmq.routing.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;


public class Producer {

    public static void main(String[] args) {

        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();

            // 定义 队列名称
            String queueName = "queue1";

            // 定义交换机名称
            String exchangeName = "topic-exchange";

            // 定义交换机类型
            String exchangeType = "topic";
            // 定义 路由key
            String routingKey = "com.course.order";
            
            // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息
            //channel.queueDeclare(queueName,true,false,true,null);
            // 5、准备消息
            String message = "hello,anyboot";
            // 6、发送消息给 queue
            
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }


    }

}
        消费者
package com.any.rabbitmq.routing.topics;

import com.rabbitmq.client.*;

import java.io.IOException;


public class Consumer {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
        new Thread(runnable,"queue3").start();
    }
}
工作模式(Work queue)

 

        轮询模式

        生产者

package com.any.rabbitmq.work.polling;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;


public class Producer {

    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();

            // 定义 队列名称
            String queueName = "queue8";

            // 4、发送消息给 queue
            for (int i = 0; i < 20; i++) {
                String message = "hello,work-rabbitmq - " + i;
                
                channel.basicPublish("",queueName,null,message.getBytes());
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }


    }

}

       work1

package com.any.rabbitmq.work.polling;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class Work1 {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue8").start();
    }
}

         work2

package com.any.rabbitmq.work.polling;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class Work2 {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                //channel.
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                        try {
                            TimeUnit.SECONDS.sleep(2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue8").start();
    }
}
        公平分配模式
package com.any.rabbitmq.work.fair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;


public class Producer {

    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port

        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();

            // 定义 队列名称
            String queueName = "queue8";

            // 4、发送消息给 queue
            for (int i = 0; i < 20; i++) {
                String message = "hello,work-rabbitmq - " + i;
                
                channel.basicPublish("",queueName,null,message.getBytes());
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }


    }

}

        work1

package com.any.rabbitmq.work.fair;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class Work1 {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                Channel finalChannel = channel;
                // 确认指标,一次性处理多少条消息
                finalChannel.basicQos(1);

                finalChannel.basicConsume( queueName , false, new DeliverCallback() {
                    // 成功处理
                    @SneakyThrows
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(1000);
                        // 设置手动应答
                        finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);

                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue8").start();
    }
}

         work2

package com.any.rabbitmq.work.fair;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class Work2 {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                Channel finalChannel = channel;
                finalChannel.basicQos(1);

                finalChannel.basicConsume( queueName , false, new DeliverCallback() {
                    // 成功处理
                    @SneakyThrows
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                        // 设置手动应答
                        finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);

                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue8").start();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5698782.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存