(自用)RabbitMQ简单复习-2021-10-19

(自用)RabbitMQ简单复习-2021-10-19,第1张

(自用)RabbitMQ简单复习-2021-10-19

RabbitMQ简单复习:

一.架构:

1. Publisher连接->  Broker(Server)-> VHost->Connection-> channel->Exchange->Queu<-Exchange<-channel<- VHost<- Broker<- Consumer

(1) RPC: remote process call远程调用
(2) 依赖: amqp-client
(3) JAVA客户端口: 5672;  图形化界面客户端: 15672
(4) 生产者发布消息-> 交换机路由到队列-> 被消费者监听
(5) Virtual Host: 虚拟主机
(6) Admin: add user-> /test-> administrator-> virtual host /test

2.通讯方式: 五种

(1) Hello-World模式: 1个生产者+1个消费者+1个默认交换机+1个队列
    public class Publisher{//生产者
        @Test
        public void publish() throws IOException{
            //连接
            Connection connection=RabbitMQClient.getConnection();
            //信道
            Channel channel=connection.createChannel();
            //默认交换机, 路由规则, 消息属性, 消息体byte[]
            channel.basicPublish("exchange", "routingKey", BasicProperties, msg.gtBytes());
            channel.close(); //底层io流
            connection.close();
        }
    }
(5) public class Consumer{ //消费者
        @Test
        public void consume() throws IOException{
            Connection connection=RabbitMQClient.getConnection();
            Channel channel=connection.createChannel();
            //队列, 持久化(重启后删除), 排外只能一个消费者, autoDelete, arguments其他信息
            channel.queueDeclare("queue", durable, false, false, null);
            com.rabbitmq.client.Consumer consumer=new DefaultConsumer(channel){//开启监听
                @override //处理消息
                public void handleDelivery(String consumerTag, Envelop envelope, AMQP.BasicPuroperties properties, byte[] body) throws IOException{
                    System.out.println(new String(body, charsetName="utf-8"));
                }
            };
            channel.basicConsume("queue", true, consumer);
            channel.close(); //底层io流
            connection.close();
        }
    }
(2) Work模式: 分摊
     1个生产者+2个消费者+1个默认交换机+2个队列
    Publisher.java + Consumer1.java + Consumer2.java

(3) Publish/Subscribe广播模式: 群发
    1个生产者+2个消费者+1个指定交换机+2个指定队列
    channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
    channel.queueBind("pubsub-queue","pubsub-exchange");

(4) Router路由模式: 匹配路由键
    1个生产者+2个消费者+1个指定交换机+2个指定队列
    channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.DIRECT);
    channel.queueBind("routing-queue","routing-exchange", "Error");
    channel.basicPublish("routing-exchange", "ERROR", null, "ERROR".getBytes());

(5) Topic话题模式: 通配路由键 *匹配一个单词, #匹配多个
    1个生产者+2个消费者+2个指定队列+2个指定交换机
    channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
    channel.queueBind("topic-queue","topic-exchange", "*.read.#");
    channel.basicPublish("topic-exchange", "fast.red.A", "msg".getBytes());

3. Spring-rabbitTemplate:  

(1) 创建springboot工程, 勾选Spring-Web依赖 spring-boot-starter-web
(2) 依赖: spring-boot-starter-amqp  //starter启动器, 起步配置
(3) yml配置:
    spring:
      rabbitmq:
        host: 10.20.159.25
        port: 5672
        username: test
        password: test
        virtual-host: /test
(4) 配置类 RabbitMQConfig.java
@Configuration   //表明是配置类, 配置文件
public class RabbitMQConfig {
    //1. 创建exchange - topic
    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("boot-topic-exchange",true,false);
    }
    //2. 创建queue
    @Bean
    public Queue getQueue(){
        return new Queue("boot-queue",true,false,false,null);
    }
    //3. 绑定在一起
    @Bean
    public Binding getBinding(TopicExchange topicExchange, Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
    }
}  

(5) SpringBoot整合MQ: Consumer.java
@Component
public class Consumer {
    @RabbitListener(queues = "boot-queue")  //监听配置注解
    public void getMessage(Object message){
        System.out.println("接收到消息:" + message);
    }
}

(6) 单元测试 SpringBootRabbitmqApplicationTest.java
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
    }
}

(7) 父类交换机AbstractExchange:
    1) CustomExchange: Hello-world自定义模式
    ?) Work分摊模式: 也属于自定义
    2) FanoutExchange: publish/subscribe广播模式
    3) DirectExchange: Routing路由模式
    3) TopicExchange: Topic话题模式

3. ACK机制 : Message-Acknowledged机制

(1) Msg和Message:
    String msg: 字符串
    Message message: 对象封装方法        
    message.getMessageProperties().getDeliveryTag(),false); 

(2) 手动ACK:消费者做完后, 如果自动ACK容易造成消息丢失
(3) 配置yml: 
    listener:
        simple:
            acknowledge-mode: manual
(4) 监听方法: Consumer.java
    @Component
    public class Consumer {
        @RabbitListener(queues = "boot-queue")
        public void getMessage(String msg, Channel channel, Message message) throws IOException {
            System.out.println("接收到消息:" + msg);
            //int i = 1 / 0;
            //手动ack
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

4. 生产者确认机制: Publisher-/confirm/i

(1) MQ宕机-> 消息丢失-> 持久化Queue

(2) RabbitMQ提供事务和/confirm/i机制:
    1)普通/confirm/i: 信道发送UID返回publisher
        //3.1 开启confirm
        channel.confirmSelect();
        //3.2 发送消息
        String msg = "Hello-World!";
        channel.basicPublish("","HelloWorld",null,msg.getBytes());
        //3.3 判断消息发送是否成功
        if(channel.waitForConfirms()){
            System.out.println("消息发送成功");
        }else{
            System.out.println("发送消息失败");
        } 
               
    2)批量/confirm/i:
         //3.1 开启confirm
        channel.confirmSelect();
        //3.2 批量发送消息
        for (int i = 0; i < 1000; i++) {
            String msg = "Hello-World!" + i;
            channel.basicPublish("","HelloWorld",null,msg.getBytes());
        }
        //3.3 确定批量操作是否成功
        channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException

    3)异步confirm
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
    String msg = "Hello-World!" + i;
    channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
    }
});

5.  Return机制来监听消息是否从exchange送到了指定的queue中

// 开启return机制
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 当消息没有送达到queue时,才会回调执行。
        System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
    }
});
// 在发送消息时,换另外一个,方法重载,指定mandatory参数为true
channel.basicPublish("","HelloWorld",true,null,msg.getBytes());

6. SpringBoot整合MQ:

7. 避免消息重复消费: 

    (1) 消费者message-acknowledge-> 没有走到手动ACK代码-> 消息被重复消费

    (2) 幂等性: 多次 *** 作和一次 *** 作效果一致, 如数据库删除, 网页提交按钮

    (3) 非幂等: *** 作不一致. 如数据库添加

    (4) 解决: 发送消息前-> 消息id放入Redis(key=0正在执行, key=1执行成功)-> 发送消息ACK失败-> MQ发送消息给其他消费者-> 先执行setnx-> 如key存在,则获取值-->如key=0则不消费> 如key=1则ack

(1) 生产者,发送消息时,指定messageId
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    .deliveryMode(1)     //指定消息是否需要持久化 1 - 需要持久化  2 - 不需要持久化
    .messageId(UUID.randomUUID().toString())
    .build();

String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());

(2)消费者,在消费消息时,根据具体业务逻辑去 *** 作redis
DefaultConsumer consume = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        Jedis jedis = new Jedis("10.20.159.25",6379);//记得修改为自己的Linux服务器ip
        String messageId = properties.getMessageId();
        
        //1. setnx到Redis中,默认指定value-0
        String result = jedis.set(messageId, "0", "NX", "EX", 10);
        
        if(result != null && result.equalsIgnoreCase("OK")) {
            System.out.println("接收到消息:" + new String(body, "UTF-8"));
            //消费消息了,打印消息
            
            //2. 消费成功,set messageId 1
            jedis.set(messageId,"1");
            channel.basicAck(envelope.getDeliveryTag(),false);
        }else {
            //3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1则手动ack
            String s = jedis.get(messageId);
            if("1".equalsIgnoreCase(s)){
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        }
    }
};

8.  SpringBoot实现代码-> 避免重复消费

(1) 生产者:
@Test
void contextLoads() throws IOException {
    CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//
    
    rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
    System.in.read();
}


(2) 消费者:
@Autowired
private StringRedisTemplate redisTemplate;

@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
    //0. 获取MessageId
    String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
    //1. 设置key到Redis
    if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
        //2. 消费消息
        System.out.println("接收到消息:" + msg);

        //3. 设置key的value为1
        redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
        //4.  手动ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }else {
        //5. 获取Redis中的value即可 如果是1,手动ack
        if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/4686402.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存