stage5 day07 拼多商城整合rabbitmq(订单流量削峰),Rabbitmq-spring boot整合

stage5 day07 拼多商城整合rabbitmq(订单流量削峰),Rabbitmq-spring boot整合,第1张

stage5 day07 拼多商城整合rabbitmq(订单流量削峰),Rabbitmq-spring boot整合

学习链接

不了解的点
    @PostConstruct注解
    spring会自动执行@PostConstructspringboot执行流程是什么?
    扫描创建实例 --> 自动注入 --> @PostConstruct --> 后续流程(启动消费者)rabbitmq 中队列参数的意思

    RabbitAutoConfiguration 自动配置类,会根据这里的设置的参数,在服务器上创建队列创建消费者的注解是什么?
    @RabbitListener
    @RabbitListener 创建一个消费者,启动一个消费者线程开始接收消息
    每个@RabbitListener都会创建一个消费者
    1)自动创建实例
    2)自动注册成为消费者
    3)自动开始接收消息
    4)自动调用消息处理方法

pdShop 订单模块

添加空间,使用rabbitmq下自己的空间,需要在服务器上手动创建


1 拼多商城整合 rabbitmq ---- 订单存储的解耦(流量削峰)

当用户下订单时,我们的业务系统直接与数据库通信,把订单保存到数据库中
当系统流量突然激增,大量的订单压力,会拖慢业务系统和数据库系统
我们需要应对流量峰值,让流量曲线变得平缓,如下图

为了进行流量削峰,引入rabbitmq消息队列,当购物系统产生订单后,可以把订单数据发送到消息队列;
而订单消费者应用从消息队列接收消息,并把订单保存到数据库


这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者可以从容地从消息队列慢慢接收订单,向数据库保存

1.1 生产者-发送订单 1.1.1 pom.xml依赖

spring提供了更方便的消息队列访问接口,它对RabbitMQ的客户端API进行了封装,使用起来更加方便


	org.springframework.boot
	spring-boot-starter-amqp

1.1.2 application.yml

添加Rabbitmq的连接消息

1.1.3 修改主程序 RunPdApp

在主程序中添加下面的方法创建Queue实例

当创建RabbitMQ连接和信道后,Spring的RabbitMQ工具会自动在服务器中创建队列,
代码在RabbitAdmin.declareQueues()方法中

1.1.4 修改 OrderServiceImpl

1.2 消费者-接收订单,并保存到数据库 1.2.1 复制一份消费者 pdweb ,pd-web项目复制为pd-order-consumer

1.2.2 修改 application.yml

端口修改成81

1.2.3 新建OrderConsumer 消费者类

package com.pd;

import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
//通过@RabbitListener注解配置来接收消息 ,需要指定队列名 ,spring封装后的rabbitmq API

@RabbitListener(queues = "orderQueue")
public class OrderConsumer {
    //注入业务
    @Autowired
    private OrderService orderService;

    //指定处理消息的方法,在同一个类中只能设置一次
    @RabbitHandler
    public void receive(PdOrder pdOrder) throws Exception {
        orderService.saveOrder(pdOrder);
    }
}
1.2.4 修改OrderServiceImpl的saveOrder() 方法

1.3 测试 (注意 *** 作顺序)

1.注册登录
2.添加地址
3.下订单

1 注册登录

2 添加地址


3 下订单


4 启动消费者(pd_web_consumer)

2 rabbitmq-spring整合 2.1 简单模式 2.1.1 Main
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    //配置helloworld队列参数
    @Bean
    public Queue helloworldQueue(){
        return new Queue("helloworld",false,false,false);
    }


    @Autowired
    private Producer p;
    

    @PostConstruct
    public void test() {
        //在新的线程中执行阻塞 *** 作,避免影响spring主线程的执行
        new Thread(()->{
            try {
                Thread.sleep(3000);//等待消费者启动后再发消息
            } catch (InterruptedException e) {
//                e.printStackTrace();
            }
            p.send();
        }).start();


    }
}
2.1.2 Producer
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    @Autowired
    private AmqpTemplate t;

    public void send(){
        t.convertAndSend("helloworld","Hello world!");
    }
}

2.1.3 Consumer
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {
    @RabbitListener(queues = "helloworld")
    public void receive (String msg){
        System.out.println("收到:"+msg);
    }
}
2.2 工作模式

生产者

创建两个消费者测试


合理分发 手动Ack,qos=1
    手动Ack
    spring集成rabbitmq,默认就是手动Ack,spring会自动发送回执qos=1
    yml中添加 prefech参数 预抓取,spring设置的默认值是250 ,需要设置成1
消息持久化

    队列持久化

    消息的持久化 ,spring默认已添加了持久参数

2.3 广播模式(群发)

fanout交换机

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    //新建交换机
    @Bean
    public FanoutExchange logs(){
        //非持久,不自动删除
        return new FanoutExchange("logs",false,false);
    }

    @Autowired
    private Producer p;
    

    @PostConstruct
    public void test() {
        //在新的线程中执行阻塞 *** 作,避免影响spring主线程的执行
        new Thread(()->{
            while (true){
                System.out.println("输入消息:");
                String s=new Scanner(System.in).nextLine();
                p.send(s);
            }
        }).start();
    }
}
@Component
public class Producer {
    @Autowired
    private AmqpTemplate t;

    public void send(String msg){
        //广播模式第二个参数无效
        t.convertAndSend("logs","",msg.getBytes());
    }

}
//创建自己的队列,与交换机绑定
//随机命名,非持久,独占

@Component
public class Consumer {
    @RabbitListener(bindings = @QueueBinding(
            //设置 队列,spring会随机命名,非持久,独占,自动删除
            value = @Queue,//(name = "",durable = "",autoDelete = ""),
            //交换机 declare = "false" 不在这里自动创建交换机
            exchange = @Exchange(name = "logs",declare = "false")
    ))
    public void receive1 (String msg){
        System.out.println("消费者1收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            //设置 队列,spring会随机命名,非持久,独占,自动删除
            value = @Queue,//(name = "",durable = "",autoDelete = ""),
            //交换机 declare = "false" 不在这里自动创建交换机
            exchange = @Exchange(name = "logs",declare = "false")
    ))
    public void receive2 (String msg){
        System.out.println("消费者2收到:"+msg);
    }
}

2.4 路由模式 (关键字)

direct 直连模式

Producer

Main

Consumer


2.5 主题模式 (通配符关键字) Main

Consumer

测试

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5718571.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存