RabbitMQ入门与五种工作模式

RabbitMQ入门与五种工作模式,第1张

RabbitMQ入门与五种工作模式 一、MQ 的基本概念

1.1 MQ概述
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息进行持久化的容器。多用于分布式系统之间进行通信,比如在分布式系统中,多个项目就可以使用MQ来进行通信,A项目的数据发送MQ容器中,B项目在通过MQ容器拿到消息

1.2 应用解耦

上图可以看出库存、支付、物流、都依赖于订单系统,如果库存系统挂了势必会影响订单系统,如果订单系统也挂了,库存、支付、物流都会挂掉,如果我增加一个X系统也依赖于订单系统,也会挂掉,耦合性太高了,下面我们用MQ来解耦


上图使用MQ解耦后,就好多嘞,订单系统的数据发送给MQ,库存、支付、物流系统直接依赖于MQ,库存挂了不影响其他系统了,因为订单、支付、物流都和MQ打交到了,哪怕我再增加一个X系统,也和MQ打交道了,库存系统恢复了后,直接也和MQ建立关系,继续运作。

1.3 异步提速

上图可以看出订单系统去DB里查询数据需要20ms,库存、支付、物流三个再去查询订单各自都需要300ms,每个系统得到结果都需要320ms。

上图使用MQ解耦后,就快多了,订单数据在DB查询出来传递给MQ,库存、支付、物流三个在DB中拿取只有5秒,每个系统得到数据只有25ms。

1.4 削峰填谷

上图比如A系统的某个接口,一次最多可以接收到1000个请求,高并发情况下每秒达到了5000个请求,这样系统肯定会崩溃的,使用MQ进行削峰填谷

上图可以看出加上了MQ后,一次5000个请求全部到达MQ里面去了,但是这种情况MQ会积压很多消息,这就叫做削峰,等到高峰期过了之后,A系统设置每秒从MQ拉取1000次,慢慢的消费完MQ里的消息,叫做填谷。

1.5 常见的 MQ 产品

1.6 RabbitMQ 简介
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

上图就是RabbitMq中的简介
Producer:生产者,消息的发起者
Consumer:消费者,消息的接受者
Connection:生产者和消费者与MQ服务器建立的长连接
channel:相当于轻量级的Connection,每次发送接收消息都建立一个Connection长连接太浪费资源了,就在Connection内部建立逻辑连接channel。
Broker:就是MQ服务器
Virtual host:虚拟机,相当于一个MQ服务器的一个分组,可以有多个
Exchange:交换机,消息到达MQ的第一站,消息由交换机根据routing key转发给不同的队列。
Queue:队列,消息最终由交换机到达队列等待消费者来拿取
Binding:绑定交换机和队列之间的关系

1.7 RabbitMQ 提供了 5种工作模式
RabbitMQ 提供了 5 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

下面就来熟悉5种模式,先看看项目结构

首先需要创建一个maven项目,这里不再讲述如何创建maven项目了

二、五种队列模式

创建maven项目后,需要先创建RabbitMQ所需要的工具类

在com.wdp.rabbitmq.utils包下创建RabbitUtils工具类,该工具类是存放RabbitMQ账号密码虚拟机连接等

package com.wdp.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class RabbitUtils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();
    static {
        connectionFactory.setHost("39.107.90.1");//服务器地址
        connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
        connectionFactory.setUsername("wdp123"); //mq的账号
        connectionFactory.setPassword("wdp123"); //mq的密码
        connectionFactory.setVirtualHost("wdp"); //mq的虚拟机
    }

    
    public static Connection getConnection(){
        Connection conn = null;
        try {
            conn = connectionFactory.newConnection();
            return conn;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

在com.wdp.rabbitmq.utils包下创建RabbitConstant类定义交换机和队列的名称

package com.wdp.rabbitmq.utils;

public class RabbitConstant {
    public static final String QUEUE_HELLOWORLD = "helloworld";
    public static final String QUEUE_SMS = "sms";
    public static final String EXCHANGE_WEATHER = "weather";
    public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
    public static final String QUEUE_BAIDU = "baidu";
    public static final String QUEUE_SINA = "sina";
    public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}

2.1、简单队列

P:消费者
C:消费者
红框:队列
简单队列只有一个生产者一个消费者,发送消息过后,消费者去队列拿取消息消费

在com.wdp.rabbitmq.hellword包下创建生产者

package com.wdp.rabbitmq.hellword;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取mq建立的长连接
        Connection connection = RabbitUtils.getConnection();
        //创建通信通道,生产者从通道发送数据到mq服务器
        Channel channel = connection.createChannel();
        //创建队列,声明并创建一个队列,如果队列已经存在直接使用
        //第一个参数:队列名称
        //第二个参数:数据是否持久化,false对应不持久化的数据,如果数据没有被消费者消费,消费者MQ停掉数据就会丢失
        //第三个参数:队列是否私有化,false代表所有消费者都可以访问,true代表只有第一次使用它的消费者才可以访问
        //第四个参数:是否自动删除,false代表停掉后不自动删除这个队列
        //其他额外的参数,null
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null);

        String message = "hello word";
        //第一个参数:交换机,简单队列暂时不需要
        //第二个参数:队列的名称
        //第三个参数:额外的设置属性
        //第四个参数:传递消息的字节数组
        channel.basicPublish("",RabbitConstant.QUEUE_HELLOWORLD,null,message.getBytes());
        channel.close();
        connection.close();
        System.out.println("发送成功");

    }
}

在com.wdp.rabbitmq.hellword包下创建消费者

package com.wdp.rabbitmq.hellword;

import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) throws IOException {
        //获取mq建立的长连接
        Connection connection = RabbitUtils.getConnection();
        //创建通信通道,生产者从通道发送数据到mq服务器
        Channel channel = connection.createChannel();
        //创建队列,声明并创建一个队列,如果队列已经存在直接使用
        //第一个参数:队列名称
        //第二个参数:数据是否持久化,false对应不持久化的数据,如果数据没有被消费者消费,消费者MQ停掉数据就会丢失
        //第三个参数:队列是否私有化,false代表所有消费者都可以访问,true代表只有第一次使用它的消费者才可以访问
        //第四个参数:是否自动删除,false代表停掉后不自动删除这个队列
        //其他额外的参数,null
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null);


        //从MQ服务器中获取数据
        //第一个参数:队列名
        //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
        //第三个参数要传入DefaultConsumer的实现类,手动进行确认消息
        channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD,false,new Reciver(channel));
    }
}

class  Reciver extends DefaultConsumer{



    private Channel channel;
    //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
    public Reciver(Channel channel) {
        super(channel);
        this.channel = channel;
    }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("消费者接收到的消息:"+message);

        System.out.println("消息的TagId:"+envelope.getDeliveryTag());
        //false只确认签收当前的信息,设置为true的时候则代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

注意:简单队列是不需要交换机的,会走默认的交换机

2.2 Work queues 工作队列模式
在这里插入图片描述

P:消费者
C1:消费者
C2:消费者
红框:队列
工作队列:是生产者投递到了队列,消费者会有多个同时去队列去。

应用场景:多个消费者同时绑定一个队列,比如一条短信要让所有人收到,就可以使用工作队列模式

在com.wdp.rabbitmq.workqueue包下创建一个SMS类用于发送消息

package com.wdp.rabbitmq.workqueue;


public class SMS {

    private String name;
    private String mobile;
    private String content;

    public SMS(String name, String mobile, String content) {
        this.name = name;
        this.mobile = mobile;
        this.content = content;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMobile() {
        return mobile;
    }

    public void setMobile(String mobile) {
        this.mobile = mobile;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

在com.wdp.rabbitmq.workqueue下创建OrderSystem类

package com.wdp.rabbitmq.workqueue;

import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class OrderSystem {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        for(int i = 1 ; i <= 100 ; i++) {
            SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
            String jsonSMS = new Gson().toJson(sms);
            channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
        }
        System.out.println("发送数据成功");
        channel.close();
        connection.close();
    }
}

在com.wdp.rabbitmq.workqueue包下创建SMSSender1消费者

package com.wdp.rabbitmq.workqueue;


import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;


public class SMSSender1 {

    public static void main(String[] args) throws IOException {


        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个

        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender1-短信发送成功:" + jsonSMS);

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }


}

在com.wdp.rabbitmq.workqueue包下创建SMSSender2消费者

package com.wdp.rabbitmq.workqueue;


import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;


public class SMSSender2 {

    public static void main(String[] args) throws IOException {


        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个

        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender2-短信发送成功:" + jsonSMS);

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

在com.wdp.rabbitmq.workqueue包下创建SMSSender3消费者

package com.wdp.rabbitmq.workqueue;


import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;


public class SMSSender3 {


    public static void main(String[] args) throws IOException {


        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个

        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender3-短信发送成功:" + jsonSMS);

                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

注意:
channel.basicQos(1);这句代码,这句代码的意思是一个消息必须处理签收完毕在能开始下一个消息,思想就是能者多劳,多个消费者有的执行的快多消费,有的执行的慢就慢消费,工作队列是一对多,必须多个消费者绑定一个队列。

2.3 Pub/Sub 订阅模式

P:生产者
X:交换机
C1:消费者
C2:消费者
红框:队列

发布订阅模式分为
1、生产者发送消息需要指定一个交换机,先发送到交换机里
2、消费者需要建立队列和交换机的绑定关系
3、交换机将消息推送给绑定了该交换机的队列

交换机有三种模式:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic:通配符,把消息交给通过#,*匹配的(路由模式)队列

发布订阅模式的使用场景:比如北京市的天气预报,新浪和百度都需要北京市的天气预报,生产者将天气的消息投递给交换机,百度的队列和新浪的队列都绑定了这个交换机,百度和新浪都可以拿到北京市的天气预报,发布订阅模式使用的是广播模式

在com.wdp.rabbitmq.pubsub包下创建生产者WeatherBureau

package com.wdp.rabbitmq.pubsub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        String input = new Scanner(System.in).next();

        Channel channel = connection.createChannel();

        channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, input.getBytes());

        channel.close();
        connection.close();
    }
}

在com.wdp.rabbitmq.pubsub包下创建Sina

package com.wdp.rabbitmq.pubsub;

import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

public class Sina {
    public static void main(String[] args) throws IOException {
        //获取TCP长连接
        Connection connection = RabbitUtils.getConnection();
        //获取虚拟连接
        final Channel channel = connection.createChannel();
        //声明队列信息
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);

        //queueBind用于交换机和队列的绑定
        //参数1: 队列名 参数2:交互机名  参数3:路由key(暂时用不到)
        channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER,"");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪天气收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

在com.wdp.rabbitmq.pubsub包下创建BiaDu

package com.wdp.rabbitmq.pubsub;

import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

public class BiaDu {
    public static void main(String[] args) throws IOException {
        //获取TCP长连接
        Connection connection = RabbitUtils.getConnection();
        //获取虚拟连接
        final Channel channel = connection.createChannel();
        //声明队列信息
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);

        //queueBind用于交换机和队列的绑定
        //参数1: 队列名 参数2:交互机名  参数3:路由key(暂时用不到)
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER,"");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度天气收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

2.4 Routing 路由模式

P:生产者
X:交换机
C1:消费者
C2:消费者
红框:队列
Routing 路由模式分为3个步骤
1、生产者发送消息到交换机的时候会绑定一个Routing key
2、消费者去队列取消息的时候,消费者建立队列和交换机和Routing key的关系
3、交换机会根据队列的Routing key进行分发消息

上图的图解:
生产者发送error、info、warning为Routing key的消息到交换机,交换机将Routing key为error的消息发送给c1消费者所在的队列,因为c1消费者指定队列消息的Routing key为error,交换机将Routing key为error、info、warning发送给c2消费者所在的队列,因为c2消费者指定队列消息的Routing key为error、info、warning。

应用场景:还是用天气举例,比如北京市和上海市,两个不同的城市天气也不一样,就需要交换机发送不同的天气信息给不同的城市

在com.wdp.rabbitmq.routing包下创建生产者WeatherBureau

package com.wdp.rabbitmq.routing;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.util.Iterator;
import java.util.linkedHashMap;
import java.util.Map;


public class WeatherBureau {


    public static void main(String[] args) throws Exception {

        Map area = new linkedHashMap();
        area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
        area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
        area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
        area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");

        area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
        area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
        area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
        area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");


        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();


        Iterator> itr = area.entrySet().iterator();
        while (itr.hasNext()){
            Map.Entry me = itr.next();
            //第一个参数交换机名字  第二个参数作为消息的routing key
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey(),null,me.getValue().getBytes());
        }

        channel.close();
        connection.close();
    }
}

在com.wdp.rabbitmq.routing包下创建Sina消费者

package com.wdp.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

public class Sina {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数3:路由key
        channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.zhuzhou.20201127");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪天气收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }

}

在com.wdp.rabbitmq.routing包下创建BaiDu消费者

package com.wdp.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

public class BaiDu {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数3:路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.changsha.20201127");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度天气收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

2.5 Topics 通配符模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

应用场景:还是天气的例子,比如说,一个城市下有很多个区县,区县的天气会不一样,这点类似模糊查询,我需要所有东边区县的天气,就需要通过通配符匹配

在com.wdp.rabbitmq.routing包下创建WeatherBureau

package com.wdp.rabbitmq.routing;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.util.Iterator;
import java.util.linkedHashMap;
import java.util.Map;


public class WeatherBureau {


    public static void main(String[] args) throws Exception {

        Map area = new linkedHashMap();
        area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
        area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
        area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
        area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");

        area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
        area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
        area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
        area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");


        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();


        Iterator> itr = area.entrySet().iterator();
        while (itr.hasNext()){
            Map.Entry me = itr.next();
            //第一个参数交换机名字  第二个参数作为消息的routing key
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey(),null,me.getValue().getBytes());
        }

        channel.close();
        connection.close();
    }
}

在com.wdp.rabbitmq.routing包下创建Sina

package com.wdp.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

public class Sina {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数3:路由key
        channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.zhuzhou.20201127");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪天气收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }

}

在com.wdp.rabbitmq.routing包下创建BaiDu

package com.wdp.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;

import java.io.IOException;

public class BaiDu {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数3:路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.changsha.20201127");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度天气收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5135978.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存