RabbitMQ(编程不良人)

RabbitMQ(编程不良人),第1张

RabbitMQ(编程不良人) 1.MQ引言 1.1什么是MQ

MQ(Message Quene]:翻译为 消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,可以轻松的实现系统间解耦。别名为消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2MQ有哪些

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

1.3不同MQ特点
  • ActiveMQ:ActiveNo 是Apache出品,最流行的,能力强劲的开源消息总线,它是一个完全支持JMS规范的的消息中间件,丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中同件,在中小型企业鼓受欢迎!
  • Kafka:Kafka是linkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,0.8版本开始支持复制,不支持事务,对消息的重复,丢失、错误没有严格要求。适合产生大量数据的互联网服务的数据收集业务。
  • RocketMQ:RocketNo是阿里开源的消息中间件,它是纯Java开发,具有高看吐量,高可用性,适合大规模分布式系统应用的特点,RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,自前在阿里集团被广泛应用于交易、充值、流计算、消息推送,日志流式处理。binglng分发等场景。
  • RabbitMQ:RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMOP协议来实理,AMQP的主要特征是面向消息,队列,路由(包括点对点和发布/订阅),可重性,安全,AHQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
2.RabbitMQ

引入依赖


     com.rabbitmq
     amqp-client
     5.7.2
 

在管理界面设置虚拟主机

添加新用户

将用户连接上虚拟主机

2.1第一种模型(直连)

  • p:生产者,也就是发送消息的程序
  • c:消费者,消息的接收者,会一直等待消息到来
  • queue:消息队列,红色部分,类似一个邮箱,可以缓存消息

开发生产者

package helloword;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {

    //生效消息
    @Test
    public void test1() throws IOException, TimeoutException {

        Connection connection = RabbitMqUtils.getConnection();

        //获取链接通道
        Channel channel = connection.createChannel();

        //通道绑定消息队列
        //参数1:队列名称,如果不存在则自动创建
        //参数2:是否持久化,为false的话rabbitmq重启时队列会丢失
        //参数3:是否独占队列
        //参数4:是否在消费完成后自动删除队列
        //参数5:额外附加参数
        channel.queueDeclare("hello",false,false,false,null);

        //发布消息
        //参数1:交换机名称  参数2:队列名称  参数3:传递消息额外设置(比如设置消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN)  参数4:消息具体内容
        channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());

        RabbitMqUtils.closeConnection(channel,connection);
    }
}


开发消费者

package helloword;
import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello",false,false,false,null);

        //消费消息
        //参数1:队列名称  参数2:开启消息的自动确认机制  参数3:消费时的回调接口  参数4:消息具体内容
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
        //RabbitMqUtils.closeConnection(channel,connection);
    }
}


链接工具类

package utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {

    //因为这是一个重量级资源,在类加载的时候创建一次就行了
    private static ConnectionFactory connectionFactory;

    static {
        //创建连接mq的连接工厂对象
        connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq的主机
        connectionFactory.setHost("47.115.203.69");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接那个虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置访问虚拟主机的用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");
    }

    //定义提供链接对象的方法
    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

    //关闭通道和链接工具
    public static void closeConnection(Channel channel,Connection connection){
        try {
            if (channel!=null){
                channel.close();
            }
            if (connection!=null){
                connection.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
第二种模型(work quene)

work quene,也被称为任务模型,当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度,长此以往,消息就会堆积越来越多,无法及时处理,此时就可以使用work模型:让多个消费者绑定到一个队列。队列中的消息一旦消费就会消失,因此任务是不会被重复执行的。

  • p:生产者,任务的发布者
  • c1:消费者1,假设完成速度较慢
  • c2:消费者2,假设完成速度较快

生产者

package workquene;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMqUtils;
import java.io.IOException;

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channe1 = connection.createChannel();

        channe1.queueDeclare("work",true,false,false,null);

        for (int i = 0; i < 10; i++) {
            //生产消息
            channe1.basicPublish("","work",null,(i+"queue").getBytes());
        }
        RabbitMqUtils.closeConnection(channe1,connection);
    }
}

消费者1

package workquene;

import com.rabbitmq.client.*;
import utils.RabbitMqUtils;
import java.io.IOException;

public class consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("work",true,false,false,null);

        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1:"+new String(body));
            }
        });
    }
}

消费者2

package workquene;

import com.rabbitmq.client.*;
import utils.RabbitMqUtils;
import java.io.IOException;

public class consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("work",true,false,false,null);

        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-2:"+new String(body));
            }
        });
    }
}

结果(会平均分发)

如果希望不是平均分发,而是能者多劳

消费者1

package workquene;

import com.rabbitmq.client.*;
import utils.RabbitMqUtils;

import java.io.IOException;
public class consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.basicQos(1);//每次只能消费一个消息
        channel.queueDeclare("work",true,false,false,null);
        //参数2:自动确认机制 true 消费者自动小rabbitmq确认消费信息;为false时不会自动确认
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("消费者-1:"+new String(body));
                //参数1:确认队列中那个具体消息  参数2:是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消费者2

package workquene;

import com.rabbitmq.client.*;
import utils.RabbitMqUtils;
import java.io.IOException;

public class consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        final Channel channe = connection.createChannel();

        channe.basicQos(1);//每次只能消费一个消息
        channe.queueDeclare("work",true,false,false,null);
        //参数2:自动确认机制 true 消费者自动小rabbitmq确认消费信息;为false时不会自动确认
        channe.basicConsume("work",false,new DefaultConsumer(channe){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-2:"+new String(body));
                channe.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}


第三种模型(fanout)

fanout 扇出,也被称为广播

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费 1

生产者

package fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMqUtils;
import java.io.IOException;

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();

        //将通道声明交换机
        //参数1:交换机名字
        //参数2:交换机类型  fanout 广播模式
        channel.exchangeDeclare("logs","fanout");

        //发送消息  第二个参数为路由,暂时无用
        channel.basicPublish("logs","",null,"fanout hello".getBytes());

        RabbitMqUtils.closeConnection(channel,connection);
    }
}

消费者

package fanout;
import com.rabbitmq.client.*;
import utils.RabbitMqUtils;
import java.io.IOException;

public class consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道绑定交换机
        channel.exchangeDeclare("logs","fanout");

        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //绑定交换机和队列  第三个参数为路由,在此处暂时没有用处
        channel.queueBind(queue,"logs","");

        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });
    }
}

再多复制几个消费者可以看出,每一个消费者都能拿到交换机中的消息

第四种模型(Routing)

1.Routing之订阅模型-Direct(直连):
再fanout模式中,一条消息会被所有订阅的队列都消费,但有时在某些场景下,希望不同的消息被不同的队列消费,这时需要用到Direct类型的Exchange

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由Key)
  • 消息的发送方向Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不能再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才能接收到消息
  • p:生产者,向Exchange发送消息,指定一个RoutingKey
  • x:Exchange(交换机),接收生产者的消息,然后把消息递交给与RoutingKey完全匹配的队列
  • c1:消费者,其所在队列指定了需要RoutingKey为error的消息
  • c2:消费者,其所在队列指定了需要RoutingKey为info,error,warning的消息

生产者

package direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMqUtils;
import java.io.IOException;

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机
        //参数1:交换机名称
        //参数2:direct  路由模式
        channel.exchangeDeclare("logs_direct","direct");

        //发送消息
        String routingKey = "info";
        channel.basicPublish("logs_direct",routingKey,null,("direct模型"+routingKey).getBytes());
    }
}

消费者1

package direct;
import com.rabbitmq.client.*;
import utils.RabbitMqUtils;
import java.io.IOException;

public class consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道绑定交换机
        channel.exchangeDeclare("logs_direct","direct");

        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //基于routeKey绑定交换机和dl
        channel.queueBind(queue,"logs_direct","error");

        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });
    }
}

消费者2

package direct;

import com.rabbitmq.client.*;
import utils.RabbitMqUtils;
import java.io.IOException;

public class consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道绑定交换机
        channel.exchangeDeclare("logs_direct","direct");

        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //基于routeKey绑定交换机和dl
        channel.queueBind(queue,"logs_direct","info");
        channel.queueBind(queue,"logs_direct","error");
        channel.queueBind(queue,"logs_direct","warning");

        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
            }
        });
    }
}



结论:可以看出,当路由为info时,只有消费者2才能拿到信息;如果路由为error,则两个消费者都能拿到信息

第五种模型(Routing)

1.Routing之订阅模型-Topic:
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型的Exchange可以让队列绑定RoutingKey的时候使用通配符!这种模型RoutingKey一般都是由一个或多个单词组成,多个单词字间以“.”分割,例如:item.insert

  • *:表示匹配一个词
  • #:表示匹配一个或多个词

生产者

package Topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMqUtils;
import java.io.IOException;

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机
        //参数1:交换机名称
        //参数2:topic  路由模式
        channel.exchangeDeclare("topics","topic");

        //发送消息
        String routingKey = "user.save";
        channel.basicPublish("topics",routingKey,null,("topic"+routingKey).getBytes());

        RabbitMqUtils.closeConnection(channel,connection);
    }
}

消费者

package Topic;
import com.rabbitmq.client.*;
import utils.RabbitMqUtils;
import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道绑定交换机
        channel.exchangeDeclare("topics","topic");

        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //基于routeKey绑定交换机和dl
        channel.queueBind(queue,"topics","user.*");

        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });

    }
}

3.springboot整合RabbitMQ

依赖

		
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.springframework.amqp
            spring-rabbit-test
            test
        

配置文件

spring:
  application:
    name: rabbirmq-springboot
  rabbitmq:
    host: 47.115.203.69
    port: 5672
    username: ems
    password: 123
    virtual-host: /ems

测试

package com.baizhi.test;

import com.baizhi.RabbitmqserviceApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqserviceApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {

    //注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //topic 动态路由 订阅模式
    @Test
    public void testTopic(){
        rabbitTemplate.convertAndSend("topics","user.save","user.save路由信息");
    }

    //route 路由
    @Test
    public void testRoute(){
        rabbitTemplate.convertAndSend("directs","info","发送info的key的路由信息");
    }

    //fanout 广播
    @Test
    public void testFanout(){
        rabbitTemplate.convertAndSend("logs","","Fanout模型发送的信息");
    }

    //work模型
    @Test
    public void testWork(){
        for (int i = 0; i <10 ; i++) {
            rabbitTemplate.convertAndSend("work","work模型"+i);
        }
    }

    //hello word
    @Test
    public void test(){
        rabbitTemplate.convertAndSend("hello","hello word");
    }

}

hello模型

消费者

package com.baizhi.hello;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class helloConsumer {

    @RabbitHandler
    public void receive(String message){
        System.out.println("message:"+message);
    }
}
work模型

消费者

package com.baizhi.work;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class workConsumer {

	//默认在work方式中为公平调度,如果要实现能者多劳需要额外配置
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive(String message){
        System.out.println("message1:"+message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
        System.out.println("message2:"+message);
    }
}
Fanout模型

消费者

package com.baizhi.fanout;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "logs",type = "fanout")//绑定的交换机
            )
    })
    public void receive(String message){
        System.out.println("message1:"+message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "logs",type = "fanout")//绑定的交换机
            )
    })
    public void receive2(String message){
        System.out.println("message2:"+message);
    }
}
路由模型

消费者

package com.baizhi.route;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RouteConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "directs",type = "direct"),//绑定交换机名称和类型
                    key = {"info","error","warn"}
            )
    })
    public void receive(String message){
        System.out.println("message1:"+message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "directs",type = "direct"),//绑定交换机名称和类型
                    key = {"error"}
            )
    })
    public void receive2(String message){
        System.out.println("message2:"+message);
    }
}
动态路由模型

消费者

package com.baizhi.topic;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "topic",type = "topics"),//绑定交换机名称和类型
                    key = {"user.save","user.*"}
            )
    })
    public void receive(String message){
        System.out.println("message1:"+message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "topic",type = "topics"),//绑定交换机名称和类型
                    key = {"order.#","user.*"}
            )
    })
    public void receive2(String message){
        System.out.println("message2:"+message);
    }
}
4.MQ应用场景

1.异步处理
场景说明:用户注册后,需要发注册部件和注册短信,传统的做法有两种1.串行的方式2.并行的方式

  • 串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西
  • 并行方式:将注册信息写入数据库后,发送邮件的同时发送短信,全部完成后返回给客户端
  • 消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等看其发送完成才显示注册成功,应该是写入数据库后就返回,消息队列:引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理

2.应用解耦
场景:双11用户下单后,订单系统需要通知库存系统,传统做法就是订单系统调用库存系统的接口

但是当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合,引入消息队列后

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
  • 库存系统:订阅下单的消息,获取下单消息,进行库 *** 作,就算库存系统出现故障,消息队列也能保证消息的可靠传递,不会导致消息丢失

3.流量削峰
场景:秒杀活动,一般会因为流量过大导致应用挂掉,一般在应用前端加入消息队列

  • 可以控制活动人数,超过一定阈值的订单直接丢弃
  • 可以缓解短时间的高流量压垮应用
  • 1.服务器收到用户的请求之后,首先写入消息队列,当加入消息队列长度超过最大值时,则直接抛弃用户请求或跳转到错误页面
  • 2.秒杀业务根据消息队列中的请求信息再做后续处理
5.RabbitMQ集群架构 普通集群

默认清空下:RabbitMQ代理 *** 作所需的所有数据和状态都将跨所有节点复制。这方面的一个例外是消息队列。默认清空下,消息队列只卫浴一个节点上,尽管它们可以从所有节点看到和访问

解决问题:当集群中某一时刻master节点宕机,可以对队列中的信息进行备份

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5690270.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存