一、下载安装
1.rabbitMq的运行需要erlang环境。
erlang官网下载:http://erlang.org/download/otp_win64_24.1.exe (24.1为版本号)。
一直点下一步即可,最好不要修改安装目录,安装后,添加环境变量Path:C:Program Fileserl-24.1bin
验证:在cmd输入 erl -v 可以查看erlang版本
2.rabbitMq官网下载:https://www.rabbitmq.com/install-windows.html
也是一直点下一步安装,可以修改安装目录,比如我的安装目录是 D:ToolsrabbitMqinstallrabbitmq_server-3.9.10。
cmd 进入sbin目录 cd /d D:ToolsrabbitMqinstallrabbitmq_server-3.9.10sbin,
然后启动插件rabbitmq-plugins enable rabbitmq_management。
登录:http://localhost:15672/ guest/guest。可以看到登录成功后的界面。
在刚刚的cmd界面,添加admin用户:rabbitmqctl add_user admin admin
设置管理员权限:rabbitmqctl set_user_tags admin administrator
再次登录:http://localhost:15672/ admin/admin 点击admin,
右侧选择Visual Hosts,添加当前用户的visual hosts:v_host1,后面会用到
二、在java中使用rabbitMq
使用rabbitMq前一般要先了解下常见的queue(也可以先学习rabbitMq,后面再了解),可参考:https://blog.csdn.net/tttalk/article/details/121947982
1.pom.xml引入相关配置
如果你是springBoot项目,可以用下面的配置:
org.springframework.boot spring-boot-starter-amqp
如果不是就用这个:
com.rabbitmq amqp-client4.10.0
2.创建连接文件ConnectionUtil.java
package com.example.springb_web.utils.rabbitMq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //默认端口,若需要修改,需要配置文件rabbitmq.config(需要自己创建,路径为etc/rabbitmq/rabbitmq.cnfig) factory.setPort(5672); //默认用户guest,其下的默认vhost为/,登录admin后,添加v_host1 factory.setVirtualHost("v_host1"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); return connection; } }
3.生产者测试主类,含简单模式、订阅模式和发布订阅模式,里面注释写的满详细的。
package com.example.springb_web.utils.rabbitMq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.unbescape.xml.XmlEscape; import java.io.IOException; import java.util.concurrent.TimeoutException; public class MqProducerTest { //简单模式-应用场景:聊天 private final static String QUEUE_NAME_SIMPLE = "queue_test_simple"; //work模式-资源竞争,应用场景:红包、电商订单、资源调度 private final static String QUEUE_NAME_WORK = "queue_test_work"; //发布订阅模式-共享资源,应用场景:邮件群发、群聊、广播 private final static String QUEUE_NAME_SUBSCRIBE = "queue_test_subscribe"; private final static String EXCHANGE_NAME_SUBSCRIBE = "exchange_name_subscribe"; //路由routing模式,路由模式和topic模式都与发布订阅模式类似,主要是交换机类型不一样,fanout改成direct或topic private final static String QUEUE_NAME_ROUTING = "queue_test_routing"; //topic模式-路由模式的一种 private final static String QUEUE_NAME_TOPIC = "queue_test_topic"; //rpc模式 private final static String QUEUE_NAME_RPC = "queue_test_rpc"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { connection = ConnectionUtil.getConnection(); channel = connection.createChannel(); String msg = "thank you,next"; //简单模式 channel.queueDeclare(QUEUE_NAME_SIMPLE,false,false,false,null); channel.basicPublish("",QUEUE_NAME_SIMPLE,null, msg.getBytes()); //work模式 三个参数代表 durable, exclusive, autoDelete //durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 channel.queueDeclare(QUEUE_NAME_WORK,true,false,false,null); for(int i = 0; i < 100; i++) { channel.basicPublish("",QUEUE_NAME_WORK,null, (msg+i).getBytes()); //每0.5s发送一条,设定comsumer1几乎无延迟接受,consumer2每1.5秒接受一次,理论上consumer1接受的消息数是consumer2的三倍 //试过将consumer2间隔设定为1s,结果几乎是平均分配,一条给consumer1,一条给cunsumer2。间隔时间长一些才能看出区别 Thread.sleep(500); } //发布订阅模式 //exchange类型之Fanout:消息会分配到所有队列 channel.exchangeDeclare(EXCHANGE_NAME_SUBSCRIBE, "fanout"); channel.basicPublish(EXCHANGE_NAME_SUBSCRIBE,"",null, msg.getBytes()); channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
4.消费者代码,其中MyConsumerTest、MyConsumerTest2是测试主类,MessageConsumer、MessageConsumer2主要是处理应答的。这里我用的代码版本还是比较新的,网上很多教程都用了过时的方法,在4.0中(QueueingConsumer)已经废弃了,所以我这里用了MessageConsumer/MessageConsumer2来替代。
package com.example.springb_web.utils.rabbitMq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; public class MyConsumerTest { //简单模式-应用场景:聊天 private final static String QUEUE_NAME_SIMPLE = "queue_test_simple"; //work模式-资源竞争,应用场景:红包、电商订单、资源调度 private final static String QUEUE_NAME_WORK = "queue_test_work"; //发布订阅模式-共享资源,应用场景:邮件群发、群聊、广播 private final static String QUEUE_NAME_SUBSCRIBE = "queue_test_subscribe"; private final static String EXCHANGE_NAME_SUBSCRIBE = "exchange_name_subscribe"; //路由routing模式,路由模式和topic模式都与发布订阅模式类似,主要是交换机类型不一样,fanout改成direct或topic private final static String QUEUE_NAME_ROUTING = "queue_test_routing"; //topic模式-路由模式的一种 private final static String QUEUE_NAME_TOPIC = "queue_test_topic"; //rpc模式 private final static String QUEUE_NAME_RPC = "queue_test_rpc"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { connection = ConnectionUtil.getConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME_SIMPLE,false,false,false,null); //自定义消费者类 //简单模式 MessageConsumer simpleConsumer = new MessageConsumer(channel); channel.basicConsume(QUEUE_NAME_SIMPLE, true, simpleConsumer); //work模式 Consumer workConsumer = new MessageConsumer(channel); //使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。 //还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。 channel.basicQos(1); channel.basicConsume(QUEUE_NAME_WORK, false, workConsumer); //发布订阅模式 Consumer subscribeConsumer = new MessageConsumer(channel); // 绑定队列到交换机--需要在v_host1下新建exchange--exchange_name_subscribe和queue--queue_test_subscribe channel.queueBind(QUEUE_NAME_SUBSCRIBE, EXCHANGE_NAME_SUBSCRIBE, ""); channel.basicQos(1); channel.basicConsume(QUEUE_NAME_SUBSCRIBE, false, subscribeConsumer); //老版本(3.0)代码,QueueingConsumer(4.0已废除)内部用linkedBlockingQueue来存放消息的内容,而linkedBlockingQueue:一个由链表结构组成的有界队列 , // 照先进先出的顺序进行排序,未指定长度的话,默认 此队列的长度为Integer.MAX_VALUE,那么问题来了,如果生产者的速度远远大于消费者的速度, // 也许没等到队列阻塞的条件产生(长度达到Integer.MAX_VALUE)内存就完蛋了, // 在老的版本你可以通过设置 rabbitmq的prefetch属性channel.basicQos(prefetch)来处理这个问题如果不设置可能出现内存问题 } catch (Exception e) { e.printStackTrace(); } } }
package com.example.springb_web.utils.rabbitMq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; public class MyConsumerTest2 { //简单模式-应用场景:聊天 private final static String QUEUE_NAME_SIMPLE = "queue_test_simple"; //work模式-资源竞争,应用场景:红包、电商订单、资源调度 private final static String QUEUE_NAME_WORK = "queue_test_work"; //发布订阅模式-共享资源,生产者没有将消息直接发送到队列,而是发送到了交换机,应用场景:邮件群发、群聊、广播 private final static String QUEUE_NAME_SUBSCRIBE2 = "queue_test_subscribe2"; private final static String EXCHANGE_NAME_SUBSCRIBE = "exchange_name_subscribe"; //路由routing模式,路由模式和topic模式都与发布订阅模式类似,主要是交换机类型不一样,fanout改成direct或topic private final static String QUEUE_NAME_ROUTING = "queue_test_routing"; //topic模式-路由模式的一种 private final static String QUEUE_NAME_TOPIC = "queue_test_topic"; //rpc模式 private final static String QUEUE_NAME_RPC = "queue_test_rpc"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { connection = ConnectionUtil.getConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME_SIMPLE,false,false,false,null); //自定义消费者类 //简单模式 MessageConsumer2 simpleConsumer = new MessageConsumer2(channel); //监听队列,false表示手动返回完成状态,true表示自动 channel.basicConsume(QUEUE_NAME_SIMPLE, true, simpleConsumer); //work模式,若不限制basicQos和关闭自动应答,则会消息会平均分配给两个consumer MessageConsumer2 workConsumer = new MessageConsumer2(channel); //使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。 //还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。 channel.basicQos(1); channel.basicConsume(QUEUE_NAME_WORK, false, workConsumer); //发布订阅模式 Consumer subscribeConsumer = new MessageConsumer(channel); // 绑定队列到交换机,!!!注意这里队列不能跟consumer1一样,不然同一队列只有一个能消费。 // 绑定队列到交换机--需要在v_host1下新建exchange--exchange_name_subscribe和queue--queue_test_subscribe2 channel.queueBind(QUEUE_NAME_SUBSCRIBE2, EXCHANGE_NAME_SUBSCRIBE, ""); channel.basicQos(1); channel.basicConsume(QUEUE_NAME_SUBSCRIBE2, false, subscribeConsumer); //路由模式和topic模式都与发布订阅模式类似,生产者中fanout改成direct或topic //老版本(3.0)代码,QueueingConsumer(4.0已废除)内部用linkedBlockingQueue来存放消息的内容,而linkedBlockingQueue:一个由链表结构组成的有界队列 , // 照先进先出的顺序进行排序,未指定长度的话,默认 此队列的长度为Integer.MAX_VALUE,那么问题来了,如果生产者的速度远远大于消费者的速度, // 也许没等到队列阻塞的条件产生(长度达到Integer.MAX_VALUE)内存就完蛋了, // 在老的版本你可以通过设置 rabbitmq的prefetch属性channel.basicQos(prefetch)来处理这个问题如果不设置可能出现内存问题 } catch (Exception e) { e.printStackTrace(); } } }
package com.example.springb_web.utils.rabbitMq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; public class MessageConsumer extends DefaultConsumer { Channel sendACKChannel; public MessageConsumer(Channel channel){ super(channel); sendACKChannel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //TODO sometingn String msg = new String(body); // 返回确认状态,自动确认模式时需要注释掉 sendACKChannel.basicAck(envelope.getDeliveryTag(),false); System.out.println(" [x] Received '" + msg + "'"); } }
package com.example.springb_web.utils.rabbitMq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; public class MessageConsumer2 extends DefaultConsumer { //应答时会用到 Channel sendACKChannel; public MessageConsumer2(Channel channel){ super(channel); sendACKChannel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //TODO sometingn try { String msg = new String(body); //为了区分和cosumer1的消费速率 Thread.sleep(1500); // 返回确认状态,自动确认模式时需要注释掉 sendACKChannel.basicAck(envelope.getDeliveryTag(),false); System.out.println(" [x] Received '" + msg + "'"); } catch (InterruptedException e) { e.printStackTrace(); } } }
5.测试步骤
(1)简单模式,应用场景:聊天
在此之前先新建一些队列,用于接收数据。我这里在v_host1下建立了queue_test_simple、queue_test_work、queue_test_subscribe、queue_test_subscribe2这四个,和代码中的要能对于上即可。
如图所示,调试的时候把MyConsumerTest和MqProducerTest的work模式以下的代码都注释掉,这样方便调试。
先开启MyConsumerTest,右键MyConsumerTest.java,选择debug运行
同理再开启MqProducerTest,可以看到consumer输出如下
(2)work模式-资源竞争,应用场景:红包、电商订单、资源调度
同理也是注释掉简单模式和发布订阅模式的代码,然后按顺序重新启动消费者1,消费者2和生产者。
可以看到输出如下:
能看到消费者1和消费者2竞争资源,且平均消费者2消费一条,消费者1可以消费3条,这是因为给消费者设置的消费速率不同,生产者每0.5s发送一条,设定comsumer1几乎无延迟接受,consumer2每1.5秒接受一次,理论上consumer1接受的消息数是consumer2的三倍
下图是消费者2设置的延迟
(3)订阅模式-共享资源,应用场景:邮件群发、群聊、广播,生产者没有将消息直接发送到队列,而是发送到了交换机。
这里注意要绑定队列到交换机,需要在v_host1下新建exchange–exchange_name_subscribe(交换机名称,生产者将消息发送到这里),并绑定queue–queue_test_subscribe/queue_test_subscribe2。
然后点击刚刚的交换机,进入绑定界面,绑定上queue_test_subscribe和queue_test_subscribe2
同理也是注释掉简单模式和work模式的代码,然后按顺序重新启动消费者1,消费者2和生产者。
结果如下:
可以看到两个消费者都消费到了同一条消息。
三、集成SpringBoot
1.添加相关的exchange和queue,exhcange包括DirectExchange、TopicExchange,queue包括DirectQueue、topic.qqMail,并绑定用DirectExchange绑定DirectQueue,TopicExchange绑定topic.qqMail、topic.qq。
2.添加相关配置
application.yml:
application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin #虚拟host 可以不设置,使用server默认host virtual-host: v_host1
3.java代码,其中SendMsgController 充当生产者,XXXReceiver为消费者,XXXMqConfig为配置文件。
package com.example.springb_web.utils.SpringBootRabbitMq; import org.apache.kafka.common.Uuid; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; @RestController public class SendMsgController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendDirectMsg") public String sendDirectMsg(){ String msgId = String.valueOf(Uuid.randomUuid()); String msg = "Let'go Bradon!"; String CreateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Mapmap = new HashMap<>(); map.put("msgId",msgId); map.put("msg",msg); map.put("CreateTime",CreateTime); rabbitTemplate.convertAndSend("DirectExchange","DirectRouting",map); return "sendDirectMsg"; } @GetMapping("/sendQqTopicMsg") public String sendQqTopicMsg(){ String msgId = String.valueOf(Uuid.randomUuid()); String msg = "Let'go Bradon! -- qq"; String CreateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map map = new HashMap<>(); map.put("msgId",msgId); map.put("msg",msg); map.put("CreateTime",CreateTime); rabbitTemplate.convertAndSend("TopicExchange","topic.qq",map); return "sendQqTopicMsg"; } @GetMapping("/sendQqMailTopicMsg") public String sendQqMailTopicMsg(){ String msgId = String.valueOf(Uuid.randomUuid()); String msg = "Let'go Bradon! --qqMail"; String CreateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map map = new HashMap<>(); map.put("msgId",msgId); map.put("msg",msg); map.put("CreateTime",CreateTime); rabbitTemplate.convertAndSend("TopicExchange","topic.qqMail",map); return "sendQqMailTopicMsg"; } }
package com.example.springb_web.utils.SpringBootRabbitMq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectExchangeMqConfig { @Bean public Queue GetDirectQueue(){ //durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 return new Queue("DirectQueue",true); } @Bean public DirectExchange GetDirectExchange(){ return new DirectExchange("DirectExchange",true,false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:DirectRouting @Bean public Binding DirectBinding(){ return BindingBuilder.bind(GetDirectQueue()).to(GetDirectExchange()).with("DirectRouting"); } }
package com.example.springb_web.utils.SpringBootRabbitMq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "DirectQueue")//监听的队列名称 TestDirectQueue public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString()); } }
package com.example.springb_web.utils.SpringBootRabbitMq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicExchangeMqConfig { //绑定键 public final static String qqMail = "topic.qqMail"; public final static String qq = "topic.qq"; @Bean public Queue GetQqMailQueue(){ //durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 return new Queue(qqMail,true); } @Bean public Queue GetQqQueue(){ return new Queue(qq,true); } @Bean public TopicExchange GetTopicExchange(){ return new TopicExchange("TopicExchange",true,false); } //绑定 @Bean public Binding qqMailTopicBinding(){ return BindingBuilder.bind(GetQqMailQueue()).to(GetTopicExchange()).with(qqMail); } //只要topic.qq开头的,不管topic.qq或者topic.qqMail,都会分发到该队列 @Bean public Binding qqTopicBinding(){ return BindingBuilder.bind(GetQqQueue()).to(GetTopicExchange()).with("topic.qq#"); } }
package com.example.springb_web.utils.SpringBootRabbitMq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "topic.qqMail") public class TopicQqMailReceiver { @RabbitHandler public void process(Map testMessage){ System.out.println("TopicQqMailReceiver消费者收到消息 : " + testMessage.toString()); } }
package com.example.springb_web.utils.SpringBootRabbitMq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "topic.qq") public class TopicQqReceiver { @RabbitHandler public void process(Map testMessage){ System.out.println("TopicQqReceiver消费者收到消息 : " + testMessage.toString()); } }
4.测试
启动SpringbWebApplication
(1)直连模式:
浏览器进入http://localhost:8080/sendDirectMsg,可以看到结果如下
(2)主题模式
浏览器进入http://localhost:8080/sendQqTopicMsg,可以看到结果如下(记得清空一下控制台)
浏览器进入http://localhost:8080/sendQqMailTopicMsg,可以看到结果如下
可以看到qqMail的消息qq也接收到了,但qq的消息qqMail没有接收到。
这是因为在TopicExchangeMqConfig中绑定时,一个是固定值,一个是通配符。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)