初学RabbitMQ(一)

初学RabbitMQ(一),第1张

初学RabbitMQ(一) 应用场景:

​ 异步处理

​ 用户注册后,需要发送邮箱和手机验证码:

​ 传统的需要一个步骤一个步骤的完成

​ 使用消息队列后可以在发送邮箱的同时发送验证码

应用解耦

​ 将模块与模块之间的调用相互分离,不在是模块之间的直接调用

流量削峰

​ 场景:秒杀,抢购业务

​ 因为流量过大,导致服务宕机,为解决这个问题,在前端加入消息队列

​ 用户发起秒杀请求,将请求加入消息队列中,超过队列长度就抛弃,抛出秒杀失败页面。

RabbitMQ和Kafka相比:RabbitMQ更可靠 ,相比Kafka性能低。

Kafka性能高(高的离谱,压RabbitMQ一头)但是相对于来说没有RabbitMQ可靠。根据场景选择消息队列。

RabbitMQ各组件功能

RabbitMQ安装与使用

想要安装RabbitMQ,必须先安装erlang语言环境,类似安装tomcat,必须先安装JDK

查看RabbitMQ匹配的Erlang版本:https://www.rabbitmq.com/which-erlang.html

RabbitMQ版本 支持的Erlang最低版本 支持的Erlang最高版本

RabbitMQ的安装:

RabbitMQ高性能消息队列要安装在Linux上而不是Windows上。

erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang

socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads

下载完成后 发送到Linux上。在Linux上进行安装,下方安装命令按照顺序执行。

 [root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm

 [root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

 [root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm

RabbitMQ自带有一个丰富的后台管理页面,需要我们自行启动

[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management

开启RabbitMQ

[root@localhost opt]#	systemctl start rabbitmq-server.service

查看RabbitMQ状态

[root@localhost opt]#	systemctl status rabbitmq-server.service

重启RabbitMQ

[root@localhost opt]#	systemctl restart rabbitmq-server.service

停止运行RabbitMQ

[root@localhost opt]#	systemctl stop rabbitmq-server.service

查看RabbitMQ相关进程

[root@localhost opt]#	ps -ef | grep rabbitmq

访问RabbitMQ自带的后台页面,因为本地安装的Centos自启动有防火墙,我们要访问Linux需要先关闭防火墙或者开放端口。

[root@localhost opt]#	systemctl stop firewalld

访问页面,在网址舒服Linux所在IP地址+端口号15672。

默认账号密码是guest ,guest。默认账号密码是不支持远程访问的,所以我们要新建一个用户

添加账号命令

[root@localhost opt]#	rabbitmqctl add_user 自定义用户名 密码

分配超级管理员角色

[root@localhost opt]#	rabbitmqctl set_user_tags 用户名 administrator

设置角色权限

[root@localhost opt]#	rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"

查看角色列表

[root@localhost opt]#	rabbitmqctl list_users

修改密码命令

[root@localhost opt]#	rabbitmqctl change_password 用户名  要改成的密码

访问成功页面

RabbitMQ带有三个端口

5672:RabbitMQ提供给编程语言客户端链接的端口。

15672:RabbitMQ后台管理界面端口号。

25672:RabibtMQ集群用的端口号。

Java与RabbitMQ连接使用

引入依赖



    com.rabbitmq
    amqp-client
    5.7.3


    org.slf4j
    slf4j-log4j12
    1.7.25
    compile


    org.apache.commons
    commons-lang3
    3.9


配置Log4j日志文件

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file

在IDEA中创建Java与RabbitMQ连接的工具类,因为在创建连接的时候需要选择VirtuaHost虚拟主机 刚开始安装的RabbitMQ只有一个虚拟主机 **/**我们可以自定义创建虚拟主机,可以在后台页面中创建

创建ConnectionUtls类获取连接

public static Connection getConnection() throws IOException, TimeoutException {
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //设置连接端口号
    factory.setPort(5672);
    //设置主机地址
    factory.setHost("192.168.152.130");
    //设置连接RabbitMQ的哪个虚拟主机
    factory.setVirtualHost("/lyux");
    //设置账号密码
    factory.setUsername("lyx");
    factory.setPassword("123456");
    //创建连接
    Connection connection = factory.newConnection();
    return connection;
}
RabbitMQ模式

RabbitMQ一共有6个模式,先学习前五个模式,第六个模式是RPC远程调用。

前五个模式可以分为两大类:点对点模式、发布订阅模式。

点对点模式:PSP(point to point)模式包含三个角色

发布者、消息队列、接收者

队列中保存消息,直到消息被消费或者超时

每个消息发送到消息队列中进行保存,接收者从消息队列中获取消息

特点:

​ 每个消息只有一个消费者,被消费后就不在消息队列了

​ 发送者与接收者没有依赖关系,发送者发送的消息不管接收者接受与否都会放在消息队列中。

接收者接受消息后,需要向发送对象发送应答成功

如果需要保证每条消息都被接受就需要用到P2P模式

发布订阅模式(publish/subscribe)包含三个角色

发布者、交换机、订阅者

多个发布者将消息发送到交换机,交换机将消息分别发送到多个订阅者

特点

每个消息可以有多个订阅者

发布者与订阅者有时间上的依赖,也就是对于某一个订阅者来说,只有订阅了发布者,才能消费发布者的消息

为了消费发布者信息,订阅者需要保持运行状态

如果希望发送的一个消息被多人使用可以采用这个模式。

点对点模式之简单模式

创建消息发送方

ppublic class Sender{
    public static void  main(String[] args){
        //利用创建好的ConnectionUtls类获取连接
       Connection connection =  ConnectionUtils.getConnection();
        //创建信道 
        Channel channel = connection.createChannel();
      
//参数1 – 队列的名称
//参数2 - 如果我们声明一个持久队列,则为真(该队列将在服务器重启后继续存在)
//参数3 – 如果我们声明独占队列(仅限于此连接),则为 true
//参数4 – 如果我们声明一个自动删除队列,则为 true(服务器将在不再使用时将其删除)
//参数5  - 队列的其他属性(构造参数)
          //创建队列
 		channel.queueDeclare("queue1",false,false,false,null);
        
//参数1 - 将消息发布到的交换机
//参数2 – 队列名
//参数3 - 如果要设置“强制”标志,则为 true
//参数4 - 消息的其他属性 - 路由标头等
//参数5 - 消息正文
        //发送信息
        channel.basicPublish("","queue1",null,msg.getBytes(StandardCharsets.UTF_8));
        //关闭连接
        channel.close();
        connection.close();
    }
}

创建接收方

public class Accept{
    public static void main(String[] args){
       //持续保持连接,不然无法接受消息
        while (true) {
        //获取链接
         	Connection connection = ConnectionUtils.getConnection();
            //创建信道
         	Channel channel = connection.createChannel();
        	//创建获取队列后回调的对象 
        	DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                    //开启手动确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
        //获取队列内容
         	channel.basicConsume("queue1", false, consumer); 
        } 
    }
}

初次创建 一定要先启动 发送方,因为先启动接收方的话 没有找到相应的队列会报错。如果想要先启动接收方的话,可以用下方的方法。

public class Accept{
    public static void main(String[] args){
       //持续保持连接,不然无法接受消息
        while (true) {
        //获取链接
         	Connection connection = ConnectionUtils.getConnection();
            //创建信道
         	Channel channel = connection.createChannel();
            //如果队列没有被创建,就创建队列,如果已经被创建,就获取队列
             channel.queueDeclare("queue1",false,false,false,null);
        	//创建获取队列后回调的对象 
        	DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                    //开启手动确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
        //获取队列内容
         	channel.basicConsume("queue1", false, consumer); 
        } 
    }
}
点对点模式之工作队列模式

​ 工作队列模式就是发送方产生消息过快,而接收方接收消息慢,使消息不能及时使用,这时我们就可以多有几个接收方。

1、创建两个接收方代码相同

public class Accept{
        public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        channel.queueDeclare("queue1", false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                try {
                    //延迟时间不同,达到劳动能力不同的效果
                    //时间小的就是干得快的,时间慢的就是干得慢的。
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("queue1", false, consumer);
    }
}

先运行两个接收方,在运行发送方。运行后我们发现并没有达到干得快就干得多的效果。

官方原话如下

为了解决这个问题我们可以在接受方加入

int prefetchCount = 1;
channel.basicQos(prefetchCount);

成品如下

public class Accept {
        public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        channel.queueDeclare("queue1", false, false, false, null);
        	//告诉RabbitMQ 不要向接收方一次发送多条消息,只有确认一条后才发送另外一条。
            channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("queue1", false, consumer);
    }
}

此时我们就达到了能者多干的效果。

面试题:怎么避免消息堆积?

​ 可以采用多个工作方监控发送方的消息​ 通过线程池 ,异步消费。 发布订阅模式

简单来说 就相当于抖音快手粉丝订阅主播,主播发送视频时,每一个粉丝都能收到,而不是只有一个人能收到消息。

相对于点对点模式来说 多了一个交换机(路由)。

创建发送方。

public class Sender {
    public static void main(String[] args) throws IOException, TimeoutException {

        String msg=  "你好,世界";
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建路由(路由名字,路由类型)
        //路由类型可以在后台管理页面中查看
        //fanout
        channel.exchangeDeclare("yangmi","fanout");
        //发送信息
        channel.basicPublish("yangmi","",null,msg.getBytes());
        channel.close();
        connection.close();
    }
}

创建两个接收方,代码基本相同,队列名字不同

public class Accept {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare("fensi2",false,false,false,null);
        //绑定交换机
        channel.queueBind("fensi2","yangmi","");
        //创建接收消息后的回调类
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //创建消费者
        channel.basicConsume("fensi2",false,consumer);
    }
}

发送方发送消息,接收方接受消息,我们发现两个接收方能接受的是同一个消息。

同一个消息能被多个消费者消费。这就是发布订阅模式

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5717441.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存