场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种
1.串行的方式
2.并行的方式
串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。
并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已 经提高的处理时间,但是,前面说过,邮件和短信对正常的使用网站没有任何影响,客户端没有必要等着其 发送完成才显示注册成功,应该是写入数据库后就返回。引入消息队列后,把发送邮件,短信不是必须的业 务逻辑的 *** 作进行异步处理。
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽 略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。
2.5.2) 应用解耦场景:双11是购物狂欢节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。
这种做法有一个缺点:当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合
2.5.3) 流量削峰订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息,进行库存 *** 作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阈值的订单直接丢弃
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
2.6 SpringBoot整合RabbitMQ1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。
2.秒杀业务根据消息队列中的请求信息,再做后续处理。
在springboot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
生产者工程:
- application.yml文件配置RabbitMQ相关信息
- 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
- 注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
消费者工程:
- application.yml文件配置RabbitMQ相关信息
- 创建消息处理类,用于接收队列中的消息并进行处理
1)创建项目
创建出生产者(Springboot-rabbitmq-producer)和消费者模块(springboot-rabbitmq-consumer)
2)配置application.yml文件
spring: rabbitmq: host:192.168.81.10 port:5672 virtual-host:/springboot_mq username:admin password:admin
注意:生产者和消费者项目配置文件内容一样
3)在pom.xml中导入起步依赖
spring-boot-parent org.springframework.boot 2.1.7.RELEASE org.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-test
4)创建项目引导类
生产者模块项目引导类
@SpringBootApplication public class RabbitmqProducerApplication { public static void main(String[] args) { SpringApplication.run(RabbitmqProducerApplication.class, args); } }
消费者模块项目引导类
@SpringbootApplication public class RabbitmqConsumerApplication{ public static void main(String []args){ SpringApplication.run(RabbitmqConsumerApplication.class,args); } }2.6.2 五种消息模型
消费端核心配置:
spring: rabbitmq: host:192.168.81.10 port:5672 virtual-host:/springboot_mq username:admin password:admin listener: simple: acknowledge-mode:manual #设置手动ack concurrency:1 #设置消费端监听个数 max-concurrency:10 #最大个数
其他要点:
- 首先配置手工确认模式,用ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列,根据业务记录日志等处理。
- 可以设置小段的监听个数和最大个数,用于控制消费端的并发情况。
@RabbitListener注解的使用
- 消费监听@RabbitMQListener注解,这个对于在实际工作中非常的好用。
- @RabbitMQListener是一个组合注解,里面可以注解配置
- @QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等。
(1)简单消息模型
当然消费者也可以设置队列持久化或者自动删除、手动ack等,如下图:
注意:springboot中如果设置了手动ack确认,那么当网络出现异常、或者程序异常、服务器异常情况,消息会重回队列。
(2)work消息模型
WorkConsumer.java
@Component public class WorkConsumer{ @RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message,Channel channel) throws Exception{ //设置每个消费者同时只能处理一条信息 //channel.basicQos(1); //模拟性能差,0.1秒处理一个 Thread.sleep(10); System.out.println(" [work] 消费者1 :"+ message +"!"); } @RabbitListener(queueToDeclare = @queue("work")) public void receive2(String message,Channel channel) throws Exception{ //设置每个消费者同时只能处理一条消息 //channel.basicQos(1); System.out.println(" [work] 消费者2:"+ message +"!"); } }
生产者测试类
@Runwith(SpringRunner.class) @SpringBootTest(classes = RabbitmqProducerApplication.class) public class ProducerTest{ @Autowired RabbitTemplate rabbitTemplate; @Test public void test2() throws InterruputedException{ for(int i=1;i<=100;i++){ //消息内容 String message = "task .." +i; rabbitTemplate.convertAndSend("work",message); System.out.println(" [work] 生产者 '" +message +"'"); } } }
(3)订阅模型-Fanout
订阅模型-Fanout也称为广播模式
可以有多个消费者
每个消费者有自己的队列
每个队列都要绑定到Exchange(交换机)
生产者发送的消息,只能发送到交换机,交换机来决定要给哪个队列,生产者无法决定。
交换机把消息发送给绑定过的所有队列
队列的消费者都能拿到消息,实现一条消息被多个消费者消费。
FanoutConsumer.java
@Component public class FanoutConsumer{ @RabbitListener( bindings = @QueueBinding( value=@Queue(value="fanout_queue1_test",durable="true"), exchange = @Exchange( value = "fanout_ex_test", type = ExchangeTypes.FANOUT, durable = "true", ignoreDeclarationExceptions = "true" )) ) public void receive1(String message){ System.out.println("fanout消费者1:"+message); } @RabbitListener( bindings = @QueueBinging( value=@Queue(value = "fanout_queue2_test",durable = "true"), exchange = @Exchange( value = "fanout_extest", durable = "true", ignoreDeclarationExceptions = "true", type = ExchangeTypes.FANOUT )) ) public void receive2(String message){ System.out.println("fanout消费者2:"+message); } }
生产者测试类
@Test public void test3() throws InterruptedException{ String msg = "hello,everyone"; for(int i=0;i<10;i++){ //这里注意细节,第二个参数需要写,否则第一个参数就变成routingkey了 //rabbitTemplate.convertAndSend("fanout_ex_test","",msg); } Thread.sleep(1000); }
(4)订阅模型-Direct
在Fanout模式中,一条消息,会被所有订阅的队列都消费,但是,在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用到Direct类型的Exchange。
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),消息的发送方在 向 Exchange 发送消息时,也必须指定消息的RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing key 进行判断,只有队列的Routingkey与消息的Routingkey完全一致,才会接收到消息。
DirectConsumer.java
@Component public class DirectConsumer { @RabbitListener( bindings = @QueueBinding( value = @Queue(name = "direct_queue1_test",durable = "true"), exchange = @Exchange( name = "direct_ex_test", durable = "true", type = ExchangeTypes.DIRECT ), key = "select" ) ) public void receive1(String message){ System.out.println("direct消费者1:"+message); } @RabbitListener( bindings = @QueueBinding( value = @Queue(name="direct_queue2_test",durable = "true"), exchange = @Exchange(name = "direct_ex_test",durable = "true",type = ExchangeTypes.DIRECT), key = {"update","delete","insert"} ) ) public void receive2(String message){ System.out.println("direct消费者2:"+message); } }
生产者测试类
@Test public void test4(){ rabbitTemplate.convertAndSend("direct_ex_test","select","id = 1000"); }
(5)订阅模型-Topic
除去上面的注解用法,还可以写配置类,配置队列,交换机,以及队列和交换机绑定
在消费者模块开发配置类
@Configuration public class RabbitConfig { //交换机名称 public static final String TOPIC_EXCHANGE_NAME = "topic_ex_test"; //声明交换机 @Bean("topic_exchange") public Exchange directExchange(){ return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build(); } //声明队列 @Bean("topic_queue1") public Queue directQueue1(){ return QueueBuilder.durable("topic_queue1").build(); } //交换机与队列绑定 @Bean public Binding QueueExchange1(@Qualifier("topic_queue1"))Queue queue,@Qualifier("topic_exchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("product.#".noargs()); } }
消费者监听类
@Component public class TopicConsumber { @RabbitListener(queues = {"topic_queue1"}) public void receive1(String message){ System.out.println("direct消费者1:"+message); } @RabbitListener(queues = {"topic_queue2"}) public void receive2(String message){ System.out.println("direct消费者2:"+message); } }
生产者测试类
@Test public void test4(){ rabbitTemplate.convertAndSend("topic_ex_test","product.select","id=1000"); }
由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。
2.7 RabbitMQ的集群配置一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单例部署就可以了,但是出于MQ中间件本身的可靠性,并发性,吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案。
参考官方文档:Clustering Guide — RabbitMQ
2.7.1)集群方案1(主备模式)架构图:
首先确保RabbitMQ运行没有问题
[root@localhost ~]# rabbitmqctl status Status of node rabbit@localhost ... [{pid,29322}, {running_applications, [{rabbitmq_management,"RabbitMQ Management Console","3.7.18"}, {amqp_client,"RabbitMQ AMQP Client","3.7.18"}, {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.18"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.18"}, {rabbit,"RabbitMQ","3.7.18"}, {rabbit_common, "Modules shared by rabbitmq-server and rabbitmq-erlang-client", "3.7.18"}, {credentials_obfuscation, "Helper library that obfuscates sensitive values in process state", "1.1.0"}, {cowboy,"Small, fast, modern HTTP server.","2.6.1"}, {ranch,"Socket acceptor pool for TCP protocols.","1.7.1"}, {ssl,"Erlang/OTP SSL application","9.3.5"}, {public_key,"Public key infrastructure","1.6.7"}, {asn1,"The Erlang ASN1 compiler version 5.0.9","5.0.9"}, {stdout_formatter, "Tools to format paragraphs, lists and tables as plain text", "0.2.2"}, {mnesia,"MNESIA CXC 138 12","4.16"}, {cowlib,"Support library for manipulating Web protocols.","2.7.0"}, {xmerl,"XML parser","1.3.21"}, {sysmon_handler,"Rate-limiting system_monitor event handler","1.1.0"}, {tools,"DEVTOOLS CXC 138 16","3.2"}, {os_mon,"CPO CXC 138 46","2.5"}, {inets,"INETS CXC 138 49","7.0.9"}, {crypto,"CRYPTO","4.5.1"}, {jsx,"a streaming, evented json parsing toolkit","2.9.0"}, {observer_cli,"Visualize Erlang Nodes On The Command Line","1.5.2"}, {recon,"Diagnostic tools for production use","2.5.0"}, {lager,"Erlang logging framework","3.8.0"}, {goldrush,"Erlang event stream processor","0.1.9"}, {compiler,"ERTS CXC 138 10","7.4.4"}, {syntax_tools,"Syntax tools","2.2"}, {sasl,"SASL CXC 138 11","3.4"}, {stdlib,"ERTS CXC 138 10","3.9.2"}, {kernel,"ERTS CXC 138 10","6.4.1"}]}, {os,{unix,linux}}, {erlang_version, "Erlang/OTP 22 [erts-10.4.4] [source] [64-bit] [smp:1:1] [ds:1:1:10] [asyncthreads:64] [hipe]n"}, {memory, [{connection_readers,0}, {connection_writers,0}, {connection_channels,0}, {connection_other,29388}, {queue_procs,238984}, {queue_slave_procs,0}, {plugins,405792}, {other_proc,27586784}, {metrics,230748}, {mgmt_db,139096}, {mnesia,124128}, {other_ets,2895496}, {binary,132328}, {msg_index,89856}, {code,24437941}, {atom,1435801}, {other_system,10934946}, {allocated_unused,7233976}, {reserved_unallocated,7024640}, {strategy,rss}, {total,[{erlang,68681288},{rss,82939904},{allocated,75915264}]}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"},{http,15672,"::"}]}, {vm_memory_calculation_strategy,rss}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,1583318630}, {disk_free_limit,50000000}, {disk_free,24159039488}, {file_descriptors, [{total_limit,32668}, {total_used,16}, {sockets_limit,29399}, {sockets_used,0}]}, {processes,[{limit,1048576},{used,433}]}, {run_queue,1}, {uptime,3}, {kernel,{net_ticktime,60}}]
停止rabbitmq服务
[root@localhost ~]# service rabbitmq-server stop Redirecting to /bin/systemctl stop rabbitmq-server.service
启动第一个节点:
[root@localhost ~]# RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start ## ## ## ## RabbitMQ 3.7.18. Copyright (C) 2007-2019 Pivotal Software, Inc. ########## Licensed under the MPL. See https://www.rabbitmq.com/ ###### ## ########## Logs: /var/log/rabbitmq/rabbit1.log /var/log/rabbitmq/rabbit1_upgrade.log Starting broker... completed with 3 plugins.
启动第二个节点:
[root@localhost ~]# RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="- rabbitmq_manageme nt listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start ## ## ## ## RabbitMQ 3.7.18. Copyright (C) 2007-2019 Pivotal Software, Inc. ########## Licensed under the MPL. See https://www.rabbitmq.com/ ###### ## ########## Logs: /var/log/rabbitmq/rabbit2.log /var/log/rabbitmq/rabbit2_upgrade.log Starting broker... completed with 3 plugins.
rabbit1 *** 作作为主节点:
[root@localhost ~]# rabbitmqctl -n rabbit1 stop_app Stopping rabbit application on node rabbit1@localhost ... [root@localhost ~]# rabbitmqctl -n rabbit1 reset Resetting node rabbit1@localhost ... [root@localhost ~]# rabbitmqctl -n rabbit1 start_app Starting node rabbit1@localhost ... completed with 3 plugins.
rabbit2 *** 作为从节点:
[root@localhost ~]# rabbitmqctl -n rabbit2 stop_app Stopping rabbit application on node rabbit2@localhost ... [root@localhost ~]# rabbitmqctl -n rabbit2 reset Resetting node rabbit2@localhost ... [root@localhost ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1@'localhost' #localhost主机名换成自己的 Clustering node rabbit2@localhost with rabbit1@localhost [root@localhost ~]# rabbitmqctl -n rabbit2 start_app Starting node rabbit2@localhost ... completed with 3 plugins.
查看集群状态:
[root@localhost ~]# rabbitmqctl cluster_status -n rabbit1 Cluster status of node rabbit1@localhost ... [{nodes,[{disc,[rabbit1@localhost,rabbit2@localhost]}]}, {running_nodes,[rabbit2@localhost,rabbit1@localhost]}, {cluster_name,<<"rabbit1@localhost">>}, {partitions,[]}, {alarms,[{rabbit2@localhost,[]},{rabbit1@localhost,[]}]}]
web管控台:
结束命令:
rabbitmqctl -n rabbit1 stop rabbitmqctl -n rabbit2 stop
### 集群管理相关命令 //将节点加入到指定集群中,在这个命令执行前需要停止RabbitMQ应用并重置节点 rabbitmqctl join_cluster {cluster_node}[-ram] //显示集群状态 rabbitmqctl cluster_status //修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用 rabbitmqctl change_cluster_node_type{disc|ram} //将节点从集群中删除,允许离线执行 rabbitmqctl forget_cluster_nodes{clusternode} //在集群中的节点应用启动前咨询cluster节点的最新消息,并更新相应的集群信息。 rabbitmqctl update_cluster_nodes{clusternode} 注意:这个和join_cluster不同,他不加入集群,考虑这样一种情况,节点A和节点B都在集群中, 当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候, 它会尝试联系节点B,但是这样会失败,因为B节点已经不在集群中了。 //取消队列queue同步镜像的 *** 作 rabbitmqctl cancel_sync_queue [-p vhost] {queue} //设置集群名称集群名称在客户端连接时通报给客户端。 rabbitmqctl set_cluster_name {name} Federation和Shovel插件也会有用到集群名称的地方。集群名称默认是集群中第一个节点的名称, 通过这个命令可以重新设置。2.7.2)集群方案2(镜像集群)
前面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机绑定这些可以复制到集群里的 任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列 无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个 节点,必须要创建镜像队列。 镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像 队列,我们就以上面的集群接着做。
设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。
rabbitmqctl set_policy my_ha "^" '{"ha-mode":"all"}'
- Name:策略名称
- Pattern:匹配的规则(正则表达式),如果是匹配所有的队列,是^。
- Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号是链接帮助文档。
ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
- all:表示在集群中所有的节点进行镜像
- exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
- nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各个节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering.这使得RabbitMQ本身不需要ActiveMQ、Kafka那样通过Zookeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量的能力的目的。
HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。
1) 安装HAProxy
[root@localhost ~]# cd /usr/local/soft/ [root@localhost soft]# ll total 159104 -rw-r--r--. 1 root root 2527523 Jun 7 07:40 haproxy-2.0.0.tar.gz -rw-r--r--. 1 root root 159019376 May 24 00:16 jdk-8u11-linux-x64.tar.gz -rw-r--r--. 1 root root 1364993 May 24 07:50 redis-3.0.4.tar.gz drwxr-xr-x. 2 root root 174 Jun 3 20:50 soft_rabbitmq [root@localhost soft]# tar -zxvf haproxy-2.0.0.tar.gz -C /usr/local/ [root@localhost soft]# cd /usr/local/haproxy-2.0.0/ [root@localhost haproxy-2.0.0]# make TARGET=linux31 PREFIX=/usr/local/haproxy [root@localhost haproxy-2.0.0]# make install PREFIX=/usr/local/haproxy [root@localhost haproxy-2.0.0]# mkdir /etc/haproxy [root@localhost haproxy-2.0.0]# groupadd -r -g 149 haproxy [root@localhost haproxy-2.0.0]# useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy //创建haproxy配置文件 mkdir /etc/haproxy vim /etc/haproxy/haproxy.cfg
2) 配置HAProxy
配置文件路径:/etc/haproxy/haproxy.cfg
#logging options global log 127.0.0.1 local0 info maxconn 5120 chroot /usr/local/haproxy uid 99 gid 99 daemon quiet nbproc 20 pidfile /var/run/haproxy.pid defaults log global mode tcp option tcplog option dontlognull retries 3 option redispatch maxconn 2000 contimeout 5s clitimeout 60s srvtimeout 15s #front-end IP for consumers and producters listen rabbitmq_cluster bind 0.0.0.0:5672 mode tcp #balance url_param userid #balance url_param session_id check_post 64 #balance hdr(User-Agent) #balance hdr(host) #balance hdr(Host) use_domain_only #balance rdp-cookie #balance leastconn #balance source //ip balance roundrobin server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2 server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2 listen stats bind 192.168.81.10:8100 mode http option httplog stats enable stats uri /rabbitmq-stats stats refresh 5s
启动HAproxy负载
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg //查看haproxy进程状态 ps -ef | grep haproxy 访问如下地址对mq节点进行监控 http://192.168.81.10:8100/rabbitmq-stats
代码中访问mq集群地址,则变为访问haproxy地址:5672
注:以上为学习笔记,仅供学习交流
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)