SpringBoot连接普通集群

SpringBoot连接普通集群,第1张

SpringBoot连接普通集群

使用Docker搭建好一个三个节点组成的Rabbitmq集群,接下来用SpringBoot项目连接集群。

依赖:

 
            org.springframework.boot
            spring-boot-starter-amqp
        

配置:

server:
  port: 8080
spring:
  rabbitmq:
#   单机连接配置host port即可
#    host: 192.168.50.134
#    port: 5672

#   集群连接信息
    addresses: 192.168.50.134:5673,192.168.50.134:5672,192.168.50.134:5674

    virtual-host: /dev
    username: tech
    password: tech
    # 开启发送确认机制,感知消息是否到达交换机
    publisher-/confirm/i-type: correlated

    # 开启消息从交换机到队列的确认机制,感知消息是否到达队列
    publisher-returns: true
    # true表示交换机转发消息到队列失败,将消息返给发送者
    template:
      mandatory: true

    #开启消费者手动确认
    listener:
      simple:
        acknowledge-mode: manual

address是集群地址配置,经过多次实验,服务会先使用配置中集群节点地址的第一个地址与RabbitMQ服务器建立连接,这里应该是连接端口号为5673的RabbitMQ。

队列配置:

package com.tech.rabbitmq.spring;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME="exchange_order";
    public static final String QUEUE="order_queue";

    
    @Bean
    public Exchange orderExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    
    @Bean
    public Queue orderQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }

    
    @Bean
    public Binding orderBinding(Queue queue,Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}

消息发送者(send接口)

package com.tech.rabbitmq.spring;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@Slf4j
@RestController
public class MsgSendController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private int c=0;

    @GetMapping("send")
    String send() {
//        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单 "+(++c));
//        }
        return "ok";
    }

    
    @GetMapping("/confirm/i")
    String confirm() {

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("correlationData={}", correlationData);
                log.info("ack={}", ack);
                log.info("cause={}", cause);
                if (ack) {
                    log.info("消息发送成功");
                } else {
                    log.info("消息发送失败");
                }
            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单");
        //模拟发送失败,使用一个不存在的交换机
        //rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+"A","order.new","新订单");
        log.info("ok");
        return "ok";
    }

    @GetMapping("return")
    String ret(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.info("code={}",returned.getReplyCode());
                log.info("returned={}",returned);
            }
        });
//        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单 ReturnsCallback");
//        模拟交换机转发消息到队列失败
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "A"+"order.new", "新订单 ReturnsCallback");
        log.info("ok");
        return "ok";
    }
}

消息消费者:

package com.tech.rabbitmq.spring;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {

    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+deliveryTag);
        System.out.println("message="+message.toString());
        System.out.println("body="+body);

        //进行手动确认
        //消息投递序号 是否批量
        channel.basicAck(deliveryTag,false);
//        if(body.contains("2")){
//            channel.basicAck(deliveryTag,false);
//        }

        //拒收消息
        //消息投递序号  是否批量  是否将消息回退到队列
//        channel.basicNack(deliveryTag,false,true);

        //拒收消息 (不支持批量拒收)
        //消息投递序号 是否将消息回退到队列
//        channel.basicReject(deliveryTag,true);
        System.out.println("*****************************");
    }
}

启动服务后,调用send接口,会发现队列是由服务连接的这个RabbitMQ来创建,这里是5673的端口host地址是rabbit_host2,也就是第二个节点。

 假如第二个节点宕机

 此时如果还是使用order_queue队列发送消息或者消费消息将会报错。因为队列是由rabbitmq2这个节点创建,消息只能存储在rabbitmq2上,而rabbitmq2处于宕机状态,所以无法使用这个队列发送或者消费消息。

启动rabbitmq2,重启服务可以正常使用这个队列了。

 如果出问题的是其他节点,将不影响消息的收发。因为消息只是储存在创建队列的节点上,服务在启动时,先连接的哪个节点就是在哪个节点上创建队列(如果不存在),将来路由到这个队列的消息也是存储在这个节点上,其他节点只是同步元数据,并不会同步消息。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5075015.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-16
下一篇 2022-11-16

发表评论

登录后才能评论

评论列表(0条)

保存