学习链接
不了解的点- @PostConstruct注解
spring会自动执行@PostConstructspringboot执行流程是什么?
扫描创建实例 --> 自动注入 --> @PostConstruct --> 后续流程(启动消费者)rabbitmq 中队列参数的意思
RabbitAutoConfiguration 自动配置类,会根据这里的设置的参数,在服务器上创建队列创建消费者的注解是什么?
@RabbitListener
@RabbitListener 创建一个消费者,启动一个消费者线程开始接收消息
每个@RabbitListener都会创建一个消费者
1)自动创建实例
2)自动注册成为消费者
3)自动开始接收消息
4)自动调用消息处理方法
当用户下订单时,我们的业务系统直接与数据库通信,把订单保存到数据库中
当系统流量突然激增,大量的订单压力,会拖慢业务系统和数据库系统
我们需要应对流量峰值,让流量曲线变得平缓,如下图
为了进行流量削峰,引入rabbitmq消息队列,当购物系统产生订单后,可以把订单数据发送到消息队列;
而订单消费者应用从消息队列接收消息,并把订单保存到数据库
这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者可以从容地从消息队列慢慢接收订单,向数据库保存
spring提供了更方便的消息队列访问接口,它对RabbitMQ的客户端API进行了封装,使用起来更加方便
1.1.2 application.ymlorg.springframework.boot spring-boot-starter-amqp
添加Rabbitmq的连接消息
在主程序中添加下面的方法创建Queue实例
当创建RabbitMQ连接和信道后,Spring的RabbitMQ工具会自动在服务器中创建队列,
代码在RabbitAdmin.declareQueues()方法中
端口修改成81
package com.pd; import com.pd.pojo.PdOrder; import com.pd.service.OrderService; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component //通过@RabbitListener注解配置来接收消息 ,需要指定队列名 ,spring封装后的rabbitmq API @RabbitListener(queues = "orderQueue") public class OrderConsumer { //注入业务 @Autowired private OrderService orderService; //指定处理消息的方法,在同一个类中只能设置一次 @RabbitHandler public void receive(PdOrder pdOrder) throws Exception { orderService.saveOrder(pdOrder); } }1.2.4 修改OrderServiceImpl的saveOrder() 方法 1.3 测试 (注意 *** 作顺序)
1.注册登录
2.添加地址
3.下订单
import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import javax.annotation.PostConstruct; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } //配置helloworld队列参数 @Bean public Queue helloworldQueue(){ return new Queue("helloworld",false,false,false); } @Autowired private Producer p; @PostConstruct public void test() { //在新的线程中执行阻塞 *** 作,避免影响spring主线程的执行 new Thread(()->{ try { Thread.sleep(3000);//等待消费者启动后再发消息 } catch (InterruptedException e) { // e.printStackTrace(); } p.send(); }).start(); } }2.1.2 Producer
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Producer { @Autowired private AmqpTemplate t; public void send(){ t.convertAndSend("helloworld","Hello world!"); } }2.1.3 Consumer
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(queues = "helloworld") public void receive (String msg){ System.out.println("收到:"+msg); } }2.2 工作模式 生产者 创建两个消费者测试
- 手动Ack
spring集成rabbitmq,默认就是手动Ack,spring会自动发送回执qos=1
yml中添加 prefech参数 预抓取,spring设置的默认值是250 ,需要设置成1
队列持久化
消息的持久化 ,spring默认已添加了持久参数
fanout交换机
@SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } //新建交换机 @Bean public FanoutExchange logs(){ //非持久,不自动删除 return new FanoutExchange("logs",false,false); } @Autowired private Producer p; @PostConstruct public void test() { //在新的线程中执行阻塞 *** 作,避免影响spring主线程的执行 new Thread(()->{ while (true){ System.out.println("输入消息:"); String s=new Scanner(System.in).nextLine(); p.send(s); } }).start(); } }
@Component public class Producer { @Autowired private AmqpTemplate t; public void send(String msg){ //广播模式第二个参数无效 t.convertAndSend("logs","",msg.getBytes()); } }
//创建自己的队列,与交换机绑定 //随机命名,非持久,独占 @Component public class Consumer { @RabbitListener(bindings = @QueueBinding( //设置 队列,spring会随机命名,非持久,独占,自动删除 value = @Queue,//(name = "",durable = "",autoDelete = ""), //交换机 declare = "false" 不在这里自动创建交换机 exchange = @Exchange(name = "logs",declare = "false") )) public void receive1 (String msg){ System.out.println("消费者1收到:"+msg); } @RabbitListener(bindings = @QueueBinding( //设置 队列,spring会随机命名,非持久,独占,自动删除 value = @Queue,//(name = "",durable = "",autoDelete = ""), //交换机 declare = "false" 不在这里自动创建交换机 exchange = @Exchange(name = "logs",declare = "false") )) public void receive2 (String msg){ System.out.println("消费者2收到:"+msg); } }2.4 路由模式 (关键字)
direct 直连模式
Producer Main Consumer
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)