RabbitMQ消息中间件

RabbitMQ消息中间件,第1张


首先要先部署好RabbitMQ

在spring中使用RabbitMQ使用的时AMQP统一规范,需要引入依赖

<!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

一、基本使用
推送端yml配置,virtual-host是配置rabbitmq的虚拟主机,类似于不同的服务器,这样当rabbitmq部署多个通道队列的时候,可以有效的进行隔离

spring:
  rabbitmq:
    host: 121.***.***.***
    port: 5672
    virtual-host: /
    username: test
    password: 123456

消费端yml配置,注意这里配置listener.simple.prefetch=1,意味着消费端每次只预取一个消息,消费过之后才能再次拉取消息

spring:
  rabbitmq:
    host: 121.***.***.***
    port: 5672
    virtual-host: /
    username: test
    password: 123456
    listener:
      simple:
        prefetch: 1

(一)、消息队列方式一

1.推送端
推送端主要使用的是RabbitTemplate

	@Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMsg")
    public String sendMsg(){
        String queueName = "simple.queue";
        String message = "hello world";
        rabbitTemplate.convertAndSend(queueName,message);
        return "send success";
    }

2.消费端

@Component
public class MyRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listeneQueue1(String msg) throws InterruptedException {
        System.out.println("收到msg:"+msg);
    }
}

(二)、消息队列方式二

1.推送端

	@Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMsg")
    public String sendMsg(){
        String queueName = "simple.queue";
        String message = "hello world";
        rabbitTemplate.convertAndSend(queueName,message);
        return "send success";
    }

2.消费端

@Component
public class MyRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listeneQueue1(String msg) throws InterruptedException {
        System.out.println("Listener111收到msg:"+msg);
        Thread.sleep(1000);
    }

    @RabbitListener(queues = "simple.queue")
    public void listeneQueue21(String msg) throws InterruptedException {
        System.out.println("Listener222收到msg:"+msg);
        Thread.sleep(20);
    }
}

(三)、消息队列方式三(采用fanoutexchange交换机)

1.推送端
首先需要配置交换机并绑定队列

@Configuration
public class FanoutConfig {

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
	
    @Bean
    public Binding fanoutBinding(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
	//这里是绑定第二个队列,和上一个重复
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

之后再编写消息推送代码

	@RequestMapping("/sendMsg2")
    public String sendMsg2(){
        String fanoutExchange = "itcast.fanout";
        String message = "fanout test";
        rabbitTemplate.convertAndSend(fanoutExchange,"aaa",message);
        return "send success";
    }

2消费端

@Component
public class MyRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listeneQueue3(String msg) throws InterruptedException {
        System.out.println("Listener333收到msg:"+msg);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listeneQueue4(String msg) throws InterruptedException {
        System.out.println("Listener444收到msg:"+msg);
    }
}

(四)、消息队列方式四(采用directexchange交换机)
其实就是在上一个基础之上加入了bingkey,使得在推送数据的时候,交换机会根据bingkey添加到还有相同key的消息队列

1.推送端

	@Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/sendMsg3")
    public String sendMsg2(){
        String directExchange = "itcast.direct";
        String message = "direct test";
        Map<String,String> map = new HashMap<>();
        map.put("女生","柳岩");
        map.put("男生","蔡徐坤222");
        rabbitTemplate.convertAndSend(directExchange,"red",map);
        return "send success";
    }

2.消费端
由于上一个方式的绑定需要写非常多的bean,所以这里可以采用注解的方式进行交换机和队列的绑定

@Component
public class MyRabbitListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listeneQueue1(String msg) throws InterruptedException {
        System.out.println("directListener111收到msg:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listeneQueue2(Map msg) throws InterruptedException {
        System.out.println("directListener222收到msg:"+msg);
    }
}

(五)、消息队列方式五(采用topicexchange交换机)

通配符用#表示,比如china.#或#.news

二、消息转换器
当我们使用AMQP发送Object对象时,默认会采取序列化的方法把对象转化为字节数据,这样一是数据庞大,二是不便于查阅,所以我们一般采用json的方式进行序列化
解决方法:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/871227.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存