rabbitMq入门教程

rabbitMq入门教程,第1张

rabbitMq入门教程

一、下载安装
1.rabbitMq的运行需要erlang环境。
erlang官网下载:http://erlang.org/download/otp_win64_24.1.exe (24.1为版本号)。
一直点下一步即可,最好不要修改安装目录,安装后,添加环境变量Path:C:Program Fileserl-24.1bin
验证:在cmd输入 erl -v 可以查看erlang版本

2.rabbitMq官网下载:https://www.rabbitmq.com/install-windows.html
也是一直点下一步安装,可以修改安装目录,比如我的安装目录是 D:ToolsrabbitMqinstallrabbitmq_server-3.9.10。
cmd 进入sbin目录 cd /d D:ToolsrabbitMqinstallrabbitmq_server-3.9.10sbin,
然后启动插件rabbitmq-plugins enable rabbitmq_management。
登录:http://localhost:15672/ guest/guest。可以看到登录成功后的界面。

在刚刚的cmd界面,添加admin用户:rabbitmqctl add_user admin admin
设置管理员权限:rabbitmqctl set_user_tags admin administrator
再次登录:http://localhost:15672/ admin/admin 点击admin,
右侧选择Visual Hosts,添加当前用户的visual hosts:v_host1,后面会用到

二、在java中使用rabbitMq
使用rabbitMq前一般要先了解下常见的queue(也可以先学习rabbitMq,后面再了解),可参考:https://blog.csdn.net/tttalk/article/details/121947982
1.pom.xml引入相关配置
如果你是springBoot项目,可以用下面的配置:


    org.springframework.boot
     spring-boot-starter-amqp
 

如果不是就用这个:


     com.rabbitmq
      amqp-client
      4.10.0
  

2.创建连接文件ConnectionUtil.java

package com.example.springb_web.utils.rabbitMq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //默认端口,若需要修改,需要配置文件rabbitmq.config(需要自己创建,路径为etc/rabbitmq/rabbitmq.cnfig)
        factory.setPort(5672);
        //默认用户guest,其下的默认vhost为/,登录admin后,添加v_host1
        factory.setVirtualHost("v_host1");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        return  connection;
    }
}

3.生产者测试主类,含简单模式、订阅模式和发布订阅模式,里面注释写的满详细的。

package com.example.springb_web.utils.rabbitMq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.unbescape.xml.XmlEscape;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MqProducerTest {

    //简单模式-应用场景:聊天
    private final static String QUEUE_NAME_SIMPLE = "queue_test_simple";
    //work模式-资源竞争,应用场景:红包、电商订单、资源调度
    private final static String QUEUE_NAME_WORK = "queue_test_work";
    //发布订阅模式-共享资源,应用场景:邮件群发、群聊、广播
    private final static String QUEUE_NAME_SUBSCRIBE = "queue_test_subscribe";
    private final static String EXCHANGE_NAME_SUBSCRIBE = "exchange_name_subscribe";
    //路由routing模式,路由模式和topic模式都与发布订阅模式类似,主要是交换机类型不一样,fanout改成direct或topic
    private final static String QUEUE_NAME_ROUTING = "queue_test_routing";
    //topic模式-路由模式的一种
    private final static String QUEUE_NAME_TOPIC = "queue_test_topic";
    //rpc模式
    private final static String QUEUE_NAME_RPC = "queue_test_rpc";

    public static void main(String[] args)  {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtil.getConnection();
            channel = connection.createChannel();

            String msg = "thank you,next";
            //简单模式
            channel.queueDeclare(QUEUE_NAME_SIMPLE,false,false,false,null);
            channel.basicPublish("",QUEUE_NAME_SIMPLE,null, msg.getBytes());
            
            //work模式  三个参数代表 durable, exclusive, autoDelete
            //durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
            // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
            // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
            channel.queueDeclare(QUEUE_NAME_WORK,true,false,false,null);
            for(int i = 0; i < 100; i++) {
                channel.basicPublish("",QUEUE_NAME_WORK,null, (msg+i).getBytes());
                //每0.5s发送一条,设定comsumer1几乎无延迟接受,consumer2每1.5秒接受一次,理论上consumer1接受的消息数是consumer2的三倍
                //试过将consumer2间隔设定为1s,结果几乎是平均分配,一条给consumer1,一条给cunsumer2。间隔时间长一些才能看出区别
                Thread.sleep(500);
            }
            
            //发布订阅模式
            //exchange类型之Fanout:消息会分配到所有队列
            channel.exchangeDeclare(EXCHANGE_NAME_SUBSCRIBE, "fanout");
            channel.basicPublish(EXCHANGE_NAME_SUBSCRIBE,"",null, msg.getBytes());

            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

4.消费者代码,其中MyConsumerTest、MyConsumerTest2是测试主类,MessageConsumer、MessageConsumer2主要是处理应答的。这里我用的代码版本还是比较新的,网上很多教程都用了过时的方法,在4.0中(QueueingConsumer)已经废弃了,所以我这里用了MessageConsumer/MessageConsumer2来替代。

package com.example.springb_web.utils.rabbitMq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;

public class MyConsumerTest {

    //简单模式-应用场景:聊天
    private final static String QUEUE_NAME_SIMPLE = "queue_test_simple";
    //work模式-资源竞争,应用场景:红包、电商订单、资源调度
    private final static String QUEUE_NAME_WORK = "queue_test_work";
    //发布订阅模式-共享资源,应用场景:邮件群发、群聊、广播
    private final static String QUEUE_NAME_SUBSCRIBE = "queue_test_subscribe";
    private final static String EXCHANGE_NAME_SUBSCRIBE = "exchange_name_subscribe";
    //路由routing模式,路由模式和topic模式都与发布订阅模式类似,主要是交换机类型不一样,fanout改成direct或topic
    private final static String QUEUE_NAME_ROUTING = "queue_test_routing";
    //topic模式-路由模式的一种
    private final static String QUEUE_NAME_TOPIC = "queue_test_topic";
    //rpc模式
    private final static String QUEUE_NAME_RPC = "queue_test_rpc";

    public static void main(String[] args)  {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtil.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME_SIMPLE,false,false,false,null);

            //自定义消费者类
            //简单模式
            MessageConsumer simpleConsumer = new MessageConsumer(channel);
            channel.basicConsume(QUEUE_NAME_SIMPLE, true, simpleConsumer);
            
            //work模式
            Consumer workConsumer = new MessageConsumer(channel);
            //使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
            //还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。
            channel.basicQos(1);
            channel.basicConsume(QUEUE_NAME_WORK, false, workConsumer);
            
            //发布订阅模式
            Consumer subscribeConsumer = new MessageConsumer(channel);
            // 绑定队列到交换机--需要在v_host1下新建exchange--exchange_name_subscribe和queue--queue_test_subscribe
            channel.queueBind(QUEUE_NAME_SUBSCRIBE, EXCHANGE_NAME_SUBSCRIBE, "");
            channel.basicQos(1);
            channel.basicConsume(QUEUE_NAME_SUBSCRIBE, false, subscribeConsumer);

            //老版本(3.0)代码,QueueingConsumer(4.0已废除)内部用linkedBlockingQueue来存放消息的内容,而linkedBlockingQueue:一个由链表结构组成的有界队列 ,
            // 照先进先出的顺序进行排序,未指定长度的话,默认 此队列的长度为Integer.MAX_VALUE,那么问题来了,如果生产者的速度远远大于消费者的速度,
            // 也许没等到队列阻塞的条件产生(长度达到Integer.MAX_VALUE)内存就完蛋了,
            // 在老的版本你可以通过设置 rabbitmq的prefetch属性channel.basicQos(prefetch)来处理这个问题如果不设置可能出现内存问题
             

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

package com.example.springb_web.utils.rabbitMq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;

public class MyConsumerTest2 {

    //简单模式-应用场景:聊天
    private final static String QUEUE_NAME_SIMPLE = "queue_test_simple";
    //work模式-资源竞争,应用场景:红包、电商订单、资源调度
    private final static String QUEUE_NAME_WORK = "queue_test_work";
    //发布订阅模式-共享资源,生产者没有将消息直接发送到队列,而是发送到了交换机,应用场景:邮件群发、群聊、广播
    private final static String QUEUE_NAME_SUBSCRIBE2 = "queue_test_subscribe2";
    private final static String EXCHANGE_NAME_SUBSCRIBE = "exchange_name_subscribe";
    //路由routing模式,路由模式和topic模式都与发布订阅模式类似,主要是交换机类型不一样,fanout改成direct或topic
    private final static String QUEUE_NAME_ROUTING = "queue_test_routing";
    //topic模式-路由模式的一种
    private final static String QUEUE_NAME_TOPIC = "queue_test_topic";
    //rpc模式
    private final static String QUEUE_NAME_RPC = "queue_test_rpc";

    public static void main(String[] args)  {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtil.getConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME_SIMPLE,false,false,false,null);

            //自定义消费者类
            //简单模式
            MessageConsumer2 simpleConsumer = new MessageConsumer2(channel);
            //监听队列,false表示手动返回完成状态,true表示自动
            channel.basicConsume(QUEUE_NAME_SIMPLE, true, simpleConsumer);
            
            //work模式,若不限制basicQos和关闭自动应答,则会消息会平均分配给两个consumer
            MessageConsumer2 workConsumer = new MessageConsumer2(channel);
            //使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
            //还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。
            channel.basicQos(1);
            channel.basicConsume(QUEUE_NAME_WORK, false, workConsumer);
            
            //发布订阅模式
            Consumer subscribeConsumer = new MessageConsumer(channel);
            // 绑定队列到交换机,!!!注意这里队列不能跟consumer1一样,不然同一队列只有一个能消费。
            // 绑定队列到交换机--需要在v_host1下新建exchange--exchange_name_subscribe和queue--queue_test_subscribe2
            channel.queueBind(QUEUE_NAME_SUBSCRIBE2, EXCHANGE_NAME_SUBSCRIBE, "");
            channel.basicQos(1);
            channel.basicConsume(QUEUE_NAME_SUBSCRIBE2, false, subscribeConsumer);
            //路由模式和topic模式都与发布订阅模式类似,生产者中fanout改成direct或topic
            //老版本(3.0)代码,QueueingConsumer(4.0已废除)内部用linkedBlockingQueue来存放消息的内容,而linkedBlockingQueue:一个由链表结构组成的有界队列 ,
            // 照先进先出的顺序进行排序,未指定长度的话,默认 此队列的长度为Integer.MAX_VALUE,那么问题来了,如果生产者的速度远远大于消费者的速度,
            // 也许没等到队列阻塞的条件产生(长度达到Integer.MAX_VALUE)内存就完蛋了,
            // 在老的版本你可以通过设置 rabbitmq的prefetch属性channel.basicQos(prefetch)来处理这个问题如果不设置可能出现内存问题
             

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

package com.example.springb_web.utils.rabbitMq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MessageConsumer extends DefaultConsumer {
    Channel sendACKChannel;
    public MessageConsumer(Channel channel){
        super(channel);
        sendACKChannel = channel;
    }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //TODO sometingn
        String msg = new String(body);
        // 返回确认状态,自动确认模式时需要注释掉
        sendACKChannel.basicAck(envelope.getDeliveryTag(),false);
        System.out.println(" [x] Received '" + msg + "'");
    }
}

package com.example.springb_web.utils.rabbitMq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MessageConsumer2 extends DefaultConsumer {
    //应答时会用到
    Channel sendACKChannel;
    public MessageConsumer2(Channel channel){
        super(channel);
        sendACKChannel = channel;
    }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //TODO sometingn

        try {
            String msg = new String(body);

            //为了区分和cosumer1的消费速率
            Thread.sleep(1500);
            // 返回确认状态,自动确认模式时需要注释掉
            sendACKChannel.basicAck(envelope.getDeliveryTag(),false);
            System.out.println(" [x] Received '" + msg + "'");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

5.测试步骤
(1)简单模式,应用场景:聊天
在此之前先新建一些队列,用于接收数据。我这里在v_host1下建立了queue_test_simple、queue_test_work、queue_test_subscribe、queue_test_subscribe2这四个,和代码中的要能对于上即可。

如图所示,调试的时候把MyConsumerTest和MqProducerTest的work模式以下的代码都注释掉,这样方便调试。
先开启MyConsumerTest,右键MyConsumerTest.java,选择debug运行

同理再开启MqProducerTest,可以看到consumer输出如下

(2)work模式-资源竞争,应用场景:红包、电商订单、资源调度
同理也是注释掉简单模式和发布订阅模式的代码,然后按顺序重新启动消费者1,消费者2和生产者。

可以看到输出如下:


能看到消费者1和消费者2竞争资源,且平均消费者2消费一条,消费者1可以消费3条,这是因为给消费者设置的消费速率不同,生产者每0.5s发送一条,设定comsumer1几乎无延迟接受,consumer2每1.5秒接受一次,理论上consumer1接受的消息数是consumer2的三倍
下图是消费者2设置的延迟

(3)订阅模式-共享资源,应用场景:邮件群发、群聊、广播,生产者没有将消息直接发送到队列,而是发送到了交换机。
这里注意要绑定队列到交换机,需要在v_host1下新建exchange–exchange_name_subscribe(交换机名称,生产者将消息发送到这里),并绑定queue–queue_test_subscribe/queue_test_subscribe2。

然后点击刚刚的交换机,进入绑定界面,绑定上queue_test_subscribe和queue_test_subscribe2

同理也是注释掉简单模式和work模式的代码,然后按顺序重新启动消费者1,消费者2和生产者。
结果如下:


可以看到两个消费者都消费到了同一条消息。

三、集成SpringBoot
1.添加相关的exchange和queue,exhcange包括DirectExchange、TopicExchange,queue包括DirectQueue、topic.qqMail,并绑定用DirectExchange绑定DirectQueue,TopicExchange绑定topic.qqMail、topic.qq。

2.添加相关配置
application.yml:

application:
    name: rabbitmq-provider
  #配置rabbitMq 服务器
rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    #虚拟host 可以不设置,使用server默认host
    virtual-host: v_host1

3.java代码,其中SendMsgController 充当生产者,XXXReceiver为消费者,XXXMqConfig为配置文件。

package com.example.springb_web.utils.SpringBootRabbitMq;

import org.apache.kafka.common.Uuid;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

@RestController
public class SendMsgController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendDirectMsg")
    public String sendDirectMsg(){
        String msgId = String.valueOf(Uuid.randomUuid());
        String msg = "Let'go Bradon!";
        String CreateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map map = new HashMap<>();
        map.put("msgId",msgId);
        map.put("msg",msg);
        map.put("CreateTime",CreateTime);
        rabbitTemplate.convertAndSend("DirectExchange","DirectRouting",map);
        return "sendDirectMsg";
    }

    @GetMapping("/sendQqTopicMsg")
    public String sendQqTopicMsg(){
        String msgId = String.valueOf(Uuid.randomUuid());
        String msg = "Let'go Bradon! -- qq";
        String CreateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map map = new HashMap<>();
        map.put("msgId",msgId);
        map.put("msg",msg);
        map.put("CreateTime",CreateTime);
        rabbitTemplate.convertAndSend("TopicExchange","topic.qq",map);
        return "sendQqTopicMsg";
    }

    @GetMapping("/sendQqMailTopicMsg")
    public String sendQqMailTopicMsg(){
        String msgId = String.valueOf(Uuid.randomUuid());
        String msg = "Let'go Bradon!  --qqMail";
        String CreateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map map = new HashMap<>();
        map.put("msgId",msgId);
        map.put("msg",msg);
        map.put("CreateTime",CreateTime);
        rabbitTemplate.convertAndSend("TopicExchange","topic.qqMail",map);
        return "sendQqMailTopicMsg";
    }

}

package com.example.springb_web.utils.SpringBootRabbitMq;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



@Configuration
public class DirectExchangeMqConfig {

    @Bean
    public Queue GetDirectQueue(){
        //durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        return new Queue("DirectQueue",true);
    }

    @Bean
    public DirectExchange GetDirectExchange(){
        return new DirectExchange("DirectExchange",true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:DirectRouting
    @Bean
    public Binding DirectBinding(){
        return BindingBuilder.bind(GetDirectQueue()).to(GetDirectExchange()).with("DirectRouting");
    }

}

package com.example.springb_web.utils.SpringBootRabbitMq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "DirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {
 
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());
    }
 
}
package com.example.springb_web.utils.SpringBootRabbitMq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicExchangeMqConfig {

    //绑定键
    public final static String qqMail = "topic.qqMail";
    public final static String qq = "topic.qq";


    @Bean
    public Queue GetQqMailQueue(){
        //durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        return new Queue(qqMail,true);
    }

    @Bean
    public Queue GetQqQueue(){
        return new Queue(qq,true);
    }

    @Bean
    public TopicExchange GetTopicExchange(){
        return new TopicExchange("TopicExchange",true,false);
    }

    //绑定
    @Bean
    public Binding qqMailTopicBinding(){
        return BindingBuilder.bind(GetQqMailQueue()).to(GetTopicExchange()).with(qqMail);
    }
    //只要topic.qq开头的,不管topic.qq或者topic.qqMail,都会分发到该队列
    @Bean
    public Binding qqTopicBinding(){
        return BindingBuilder.bind(GetQqQueue()).to(GetTopicExchange()).with("topic.qq#");
    }

}
package com.example.springb_web.utils.SpringBootRabbitMq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "topic.qqMail")
public class TopicQqMailReceiver {

    @RabbitHandler
    public void process(Map testMessage){
        System.out.println("TopicQqMailReceiver消费者收到消息  : " + testMessage.toString());
    }
}

package com.example.springb_web.utils.SpringBootRabbitMq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "topic.qq")
public class TopicQqReceiver {

    @RabbitHandler
    public void process(Map testMessage){
        System.out.println("TopicQqReceiver消费者收到消息  : " + testMessage.toString());
    }
}

4.测试
启动SpringbWebApplication
(1)直连模式:
浏览器进入http://localhost:8080/sendDirectMsg,可以看到结果如下

(2)主题模式
浏览器进入http://localhost:8080/sendQqTopicMsg,可以看到结果如下(记得清空一下控制台)

浏览器进入http://localhost:8080/sendQqMailTopicMsg,可以看到结果如下

可以看到qqMail的消息qq也接收到了,但qq的消息qqMail没有接收到。
这是因为在TopicExchangeMqConfig中绑定时,一个是固定值,一个是通配符。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678885.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存