消息中间件学习笔记

消息中间件学习笔记,第1张

消息中间件学习笔记

本文参考自江南一点雨的Spring Boot+Vue系列视频教程第 12 章,详情参加【Spring Boot+Vue系列视频教程】

本文的代码地址为SpringBoot学习案例中的jms和amqp

消息中间件学习笔记
  • 一、springboot中使用activemq
    • 1、启动activemq
    • 2、用Spring Initiaizre新建activemq项目
  • 二、springboot中使用rabbitmq
    • 1、启动rabbitmq
    • 2、用Spring Initiaizre新建rabbitmq项目
    • 3、演示rabbitmq`中有四种交换机模式`
      • ①:演示`Direct`交换机模式(路由模式)
      • ②:演示`Fanout`交换机模式(发布订阅模式)
      • ③:演示`Topics`交换机模式
      • ④:演示`Header`交换机模式

一、springboot中使用activemq

此处用active mq写一个收发消息的简单案例

1、启动activemq

访问activemq官网,会发现有ActiveMQ "Classic"与ActiveMQ Artemis两个版本供使用,后者是未来版本,此处下载ActiveMQ "Classic"即可,本文下载的是Windows下的ActiveMQ 5.15.15 (Apr 28th, 2021)版本(若用浏览器下载慢,可以复制下载链接用迅雷下载)。
下载完成之后得到压缩文件,解压该文件,进入到bin目录下,双击activemq启动:

通过网址http://localhost:8161/访问,默认名和密码都是admin,出现下方页面代表启动成功。

2、用Spring Initiaizre新建activemq项目

①:新建项目名称设置为jms,导入三个依赖:

②:在application.yml文件中配置activemq

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616 #通信端口
    packages:
      trust-all: true #信任所有包
    user: admin
    password: admin
server:
  port: 7723

③:在启动类中定义消息队列

@SpringBootApplication
public class JmsApplication {

    public static void main(String[] args) {
        SpringApplication.run(JmsApplication.class, args);
    }

    //消息队列
    @Bean
    Queue queue() {
        //咱们定义的消息队列
        return new ActiveMQQueue("t2-queue");
    }
}

④:创建配置类JmsComponent

@Component
public class JmsComponent {
    //消息发送模板
    @Autowired
    JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    Queue queue;

    public void send(Message message) {
        //参数一:发送目的地 参数二:消息内容 参数三:
        jmsMessagingTemplate.convertAndSend(queue, message);
    }

    //指明接收哪儿的消息
    @JmsListener(destination = "t2-queue")
    public void receive(Message msg) {
        System.out.println("msg = " + msg);
    }
}

⑤:创建传输信息实体类(用Lombok的@Data省略getters and setters和toString等方法)

//传输需要序列化
@Data
public class Message implements Serializable {
    private String content;
    private Date date;
}

⑥:启动JmsApplication,在测试类中新建并发送信息:

@SpringBootTest
class JmsApplicationTests {

    @Autowired
    JmsComponent jmsComponent;
    @Test
    void contextLoads() {
        Message message = new Message();
        message.setContent("hello world");
        message.setDate(new Date());
        jmsComponent.send(message);

    }

}

⑦:启动测试类,查看JmsApplication项目的控制台,信息发送与接收成功

二、springboot中使用rabbitmq 1、启动rabbitmq

在linux上直接安装rabbitmq坑比较多,建议直接用docker安装,此处不做展开讲述,在docker上安装好rabbitmq后,使用docker ps -a指令查看所有容器,此处rabbitmq的容器名为myrabbit(在安装容器的时候指定)
执行docker start myrabbit指令启动rabbitmq。

2、用Spring Initiaizre新建rabbitmq项目

①:新建项目名为amqp,导入下面三个依赖:

②:在application.yml文件中配置rabbitmq,host根据rabbitmq地实际运行地址进行设置:

spring:
  rabbitmq:
    username: guest
    password: guest
    host: 192.168.42.155
    port: 5672

server:
  port: 7724
3、演示rabbitmq中有四种交换机模式

在rabbitmq中有四种交换机模式,分别是Direct、Fanout、Topoc、Headers,下面针对这四种模式进行演示。

①:演示Direct交换机模式(路由模式)

新建DirectConfig配置类:

@Configuration
public class DirectConfig {
    //队列1 路由键命名为 t2-queue1
    @Bean
    Queue directQueue() {
        return new Queue("t2-queue1");
    }
	//队列2 路由键命名为 t2-queue2
    @Bean
    Queue directQueue2() {
        return new Queue("t2-queue2");
    }
    // 如果用来direct模式,下面两个bean可以省略。
    //交换机
    @Bean
    DirectExchange directExchange() {
        //参数一:交换机名称 参数二:重启之后队列是否依然有效 参数三:长期未使用的时候是否自动删除
        return new DirectExchange("t2-direct",true,false);
    }

    //粘合剂,作用:将队列和交换机绑定到一起,路由键命名为 direct1和direct2
    @Bean
    Binding directBinding() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct1");
    }

    @Bean
    Binding directBinding2() {
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");
    }
}

新建DirectReceiver消息处理类:

@Component
public class DirectReceiver {
	//监听路由键值为 t2-queue1 和 t2-queue1 的消息队列
    @RabbitListener(queues = "t2-queue1")
    public void handler(String msg) {
        System.out.println("msg1 = " + msg);
    }

    @RabbitListener(queues = "t2-queue2")
    public void handler2(String msg) {
        System.out.println("msg2 = " + msg);
    }
}

启动项目:
启动过程中若遇到AMQP protocol version mismatch; we are version 0-9-1, server sent signature 3,1,0,0报错是由于activemq已经在本机开启了,关闭即可。

在测试类中进行消息发送:

@SpringBootTest
class AmqpApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
   		 //参数一:消息队列路由键值 参数二:内容
        rabbitTemplate.convertAndSend("t2-queue1","hello direct1");
        rabbitTemplate.convertAndSend("t2-queue2","hello direct2");
    }

}

测试结果:

从图形上梳理,P代表生产者(测试类),紫色X代表交换机,红色块列代表队列(在DirectConfig配置类中定义),C代表消费者(在DirectReceiver消息处理类中定义),direct(路由)模式的特点:
生产者P发送数据的时候,需要声明与交换机绑定的队列的路由键,每次新增队列都需要在P(生产者)指定新增队列的路由键。

②:演示Fanout交换机模式(发布订阅模式)

新建配置类FanoutConfig:

@Configuration
public class FanoutConfig {

    @Bean
    Queue queueOne() {
        return new Queue("queue-one");
    }

    @Bean
    Queue queueTwo() {
        return new Queue("queue-two");
    }

    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("t2-fanout",true,false);
    }

    //两个队列与同一个交换机绑定起来
    @Bean
    Binding bindingOne() {
        return BindingBuilder.bind(queueOne()).to(fanoutExchange());
    }

    @Bean
    Binding bindingTwo() {
        return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
    }
}

新建消息处理类FanoutReceiver:

@Component
public class FanoutReceiver {
    @RabbitListener(queues = "queue-one")
    public void handler1(String msg) {
        System.out.println("handler1:msg = " + msg);
    }

    @RabbitListener(queues = "queue-two")
    public void handler2(String msg) {
        System.out.println("handler2:msg = " + msg);
    }

}

启动项目,在测试类中进行消息发送:

@SpringBootTest
class AmqpApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
		//参数一:交换机名称 参数二:routingKey(此处不需要)参数三:内容
        rabbitTemplate.convertAndSend("t2-fanout",null,"hello fanout");

    }

}

测试结果:

从图形上梳理,在P(生产者)中,只需要指明交换机名称以及发布内容,就能调用多个消息队列(前提是这些消息队列已与交换机绑定)

③:演示Topics交换机模式

新建TopicConfig配置类:
新建三个队列,分别命名为xiaomi、huawei和phone,将这三个队列与交换机t2-topic绑定,名称前后的#用于做模糊匹配:
关于模糊匹配:

  • (star) can substitute for exactly one word. 用于匹配一个单词
  • # (hash) can substitute for zero or more words. 用于匹配零个或多个单词
@Configuration
public class TopicConfig {

    @Bean
    Queue xiaomi() {
        return new Queue("xiaomi");
    }

    @Bean
    Queue huawei() {
        return new Queue("huawei");
    }

    @Bean
    Queue phone() {
        return new Queue("phone");
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("t2-topic",true,false);
    }

    @Bean
    Binding xiaomiBinding() {
        return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
    }

    @Bean
    Binding huaweiBinding() {
        return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
    }

    @Bean
    Binding phoneBinding() {
        return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");
    }
}

新建消息处理类TopicReceiver:

@Component
public class TopicReceiver {
    @RabbitListener(queues = "phone")
    public void handler1(String msg) {
        System.out.println("phone-msg = " + msg);
    }

    @RabbitListener(queues = "xiaomi")
    public void handler2(String msg) {
        System.out.println("xiaomi-msg = " + msg);
    }

    @RabbitListener(queues = "huawei")
    public void handler3(String msg) {
        System.out.println("huawei-msg = " + msg);
    }
}

启动项目,在测试类中发送消息:

@SpringBootTest
class AmqpApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("t2-topic","xiaomi.new","小米新闻");
    }

}

测试结果:

xiaomi队列的消费者接受到了消息。
再次测试:

        rabbitTemplate.convertAndSend("t2-topic","xiaomi.new","小米新闻");
        rabbitTemplate.convertAndSend("t2-topic","huawei.watch","华为手表");
        rabbitTemplate.convertAndSend("t2-topic","apple.phone","苹果手机");

测试结果:

从图形上梳理,相比于Direct模式的直来直去,Topics模式通过routingKey模糊匹配,在业务上更加灵活。

④:演示Header交换机模式

这种交换机模式用得比较少,它的策略主要是根据消息的Header将消息路由到不同的队列上面去,和routingKey没有关系,纵观四中交换机模式,会发现只有Topics模式和routingKey的关系比较密切。
新建HeaderConfig配置类:

@Configuration
public class HeaderConfig {
    @Bean
    Queue queueAge() {
        return new Queue("queue-age");
    }

    @Bean
    Queue queueName() {
        return new Queue("queue-name");
    }

    @Bean
    HeadersExchange headersExchange() {
        return new HeadersExchange("t2-header",true,false);
    }

    @Bean
    Binding bindingAge() {
        Map map = new HashMap<>();
        map.put("age",99);
        //消息头里面需要有age,并且值需要为99才路由到这个队列
        return BindingBuilder.bind(queueAge()).to(headersExchange()).whereAny(map).match();
    }

    @Bean
    Binding bindingName() {
    	//消息头里面存在name属性就路由到这个队列
        return BindingBuilder.bind(queueName()).to(headersExchange()).where("name").exists();
    }
}

新建消息处理类HeaderReceiver

@Component
public class HeaderReceiver {
    @RabbitListener(queues = "queue-age")
    public void handler1(String msg) {
        System.out.println("queue-age:msg = " + msg);
    }

    @RabbitListener(queues = "queue-name")
    public void handler2(String msg) {
        System.out.println("queue-name:msg = " + msg);
    }
}

启动项目,在测试类中发送消息:

@SpringBootTest
class AmqpApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        Message nameMsg = MessageBuilder.withBody("hello you".getBytes()).setHeader("name","t2-header").build();
        Message ageMsg = MessageBuilder.withBody("hello you".getBytes()).setHeader("age",99).build();
        rabbitTemplate.send("t2-header",null,nameMsg);
        rabbitTemplate.send("t2-header",null,ageMsg);
    }
}

测试结果:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678691.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存