创建生产者Producerjunit junit4.12 com.rabbitmq amqp-client5.14.0 junit junitRELEASE compile
package a_direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; //消息的生产者 public class Producer { @Test public void sendMessage() throws Exception{ //1、创建一个连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2、设置相关的参数 connectionFactory.setHost("39.105.127.232"); connectionFactory.setPort(5672); connectionFactory.setUsername("sxt");//设置用户密码 connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/v-sxt");//虚拟主机 //3、从链接工厂里面创建一个链接 Connection connection = connectionFactory.newConnection(); //4、创建一个通道 Channel channel = connection.createChannel(); channel.queueDeclare("hello",true,false,false,null); channel.basicPublish("","hello",null,"hello rabbitmq".getBytes()); //7、关闭通道和连接 channel.close(); connection.close(); System.out.println("消息发送成功"); } }创建消费者Consumer
package a_direct; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; //消息的生产者 public class Consumer { @Test public void sendMessage() throws Exception{ //1、创建一个连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2、设置相关的参数 connectionFactory.setHost("39.105.127.232"); connectionFactory.setPort(5672); connectionFactory.setUsername("sxt");//设置用户密码 connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/v-sxt");//虚拟主机 //3、从链接工厂里面创建一个链接 Connection connection = connectionFactory.newConnection(); //4、创建一个通道 Channel channel = connection.createChannel(); channel.queueDeclare("hello",true,false,false,null); channel.basicConsume("hello",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者接收到消息:"+new String(body)); //super.handleDelivery(consumerTag, envelope, properties, body); } }); //不能让程序结束 System.in.read(); // //7、关闭通道和连接 // channel.close(); // connection.close(); // System.out.println("消息发送成功"); } }编写工具类优化代码
项目整体视图
package a_direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import utils.RabbitMQUtils; //消息的生产者 public class Producer { @Test public void sendMessage() throws Exception{ // //1、创建一个连接工厂 // ConnectionFactory connectionFactory=new ConnectionFactory(); // //2、设置相关的参数 // connectionFactory.setHost("39.105.127.232"); // connectionFactory.setPort(5672); // connectionFactory.setUsername("sxt");//设置用户密码 // connectionFactory.setPassword("123456"); // connectionFactory.setVirtualHost("/v-sxt");//虚拟主机 // //3、从链接工厂里面创建一个链接 // Connection connection = connectionFactory.newConnection(); Connection connection = RabbitMQUtils.getConnection(); //4、创建一个通道 Channel channel = connection.createChannel(); channel.queueDeclare("hello",true,false,false,null);要想消息持久化就要设置上面那行的参数 channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes()); //channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());要想消息持久化就要设置上面那行的参数 //7、关闭通道和连接 // channel.close(); // connection.close(); RabbitMQUtils.closeChannelAndConnection(channel,connection); System.out.println("消息发送成功"); } }消费者Consumer
package a_direct; import com.rabbitmq.client.*; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; //消息的生产者 public class Consumer { @Test public void sendMessage() throws Exception{ // //1、创建一个连接工厂 // ConnectionFactory connectionFactory=new ConnectionFactory(); // //2、设置相关的参数 // connectionFactory.setHost("39.105.127.232"); // connectionFactory.setPort(5672); // connectionFactory.setUsername("sxt");//设置用户密码 // connectionFactory.setPassword("123456"); // connectionFactory.setVirtualHost("/v-sxt");//虚拟主机 // //3、从链接工厂里面创建一个链接 // Connection connection = RabbitMQUtils.getConnection(); //4、创建一个通道 Channel channel = connection.createChannel(); channel.queueDeclare("hello",true,false,false,null); channel.basicConsume("hello",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者接收到消息:"+new String(body)); //super.handleDelivery(consumerTag, envelope, properties, body); } }); //不能让程序结束 System.in.read(); // //7、关闭通道和连接 // channel.close(); // connection.close(); // System.out.println("消息发送成功"); } }工具类RabbitMQUtils
package utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //工具类 public class RabbitMQUtils { private static ConnectionFactory connectionFactory; static { connectionFactory=new ConnectionFactory(); //2、设置相关的参数 connectionFactory.setHost("39.105.127.232"); connectionFactory.setPort(5672); connectionFactory.setUsername("sxt");//设置用户密码 connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/v-sxt");//虚拟主机 } public static Connection getConnection(){ try { //从链接工厂里面创建一个链接 Connection connection = connectionFactory.newConnection(); return connection; }catch (Exception e){ System.out.println(e); } return null; } public static void closeChannelAndConnection(Channel channel,Connection connection){ try { if(null!=channel) channel.close(); if (null!=connection) connection.close(); }catch (Exception e){ System.out.println(e); } } }work queue 整体视图
注意:这里的RabbitMQUtils工具类为上种模型所编写
package b_workqueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import utils.RabbitMQUtils; //生产者 public class Producer { @Test public void sendMessage() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("hello",false,false,false,null); //发送消息 for (int i=1;i<=100;i++){ channel.basicPublish("","hello",null,("hello rabbitmq-----workqueue"+i).getBytes()); } //关闭 RabbitMQUtils.closeChannelAndConnection(channel,connection); System.out.println("消息发送成功"); } }创建消费者1
package b_workqueue; import com.rabbitmq.client.*; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; //消费者 public class Consumer1 { @Test public void receiveMessage() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("hello",false,false,false,null); //接收消息 channel.basicConsume("hello",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【1】接收消息:"+new String(body)); } }); System.in.read(); RabbitMQUtils.closeChannelAndConnection(channel,connection); } }创建消费者2
package b_workqueue; import com.rabbitmq.client.*; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; //消费者 public class Consumer2 { @Test public void receiveMessage() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("hello",false,false,false,null); //接收消息 channel.basicConsume("hello",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【2】接收消息:"+new String(body)); } }); System.in.read(); RabbitMQUtils.closeChannelAndConnection(channel,connection); } }测试
注意测试的时候要先运行消费者然后再运行生产者,消费者平均消费
消息的自动确认机制
当消费者1的消费能力有限时,此时消费者就不是平均消费,消费者1消费不完的,可由消费者2消费
广播
创建生产者package c_fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import utils.RabbitMQUtils; //生产者 public class Proudcer { @Test public void senMessage() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //设置交换机 channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //向交换机发消息 channel.basicPublish("logs","",null,"我是一个fanout类型的消息".getBytes()); //关闭 RabbitMQUtils.closeChannelAndConnection(channel,connection); System.out.println("消息发送成功"); } }创建消费者1
package c_fanout; import com.rabbitmq.client.*; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; //消费者1 public class Consumer1 { @Test public void Message() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //设置交换机 channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //从通道里面得到一个临时队列 String queue = channel.queueDeclare().getQueue(); //把临时队列和交换机进行绑定 channel.queueBind(queue,"logs",""); //接受消息 channel.basicConsume(queue,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【1】接收到消息是:"+new String(body)); } }); System.out.println("消费者【1】启动成功"); System.in.read(); } }创建消费者2
package c_fanout; import com.rabbitmq.client.*; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; //消费者2 public class Consumer2 { @Test public void Message() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //设置交换机 channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //从通道里面得到一个临时队列 String queue = channel.queueDeclare().getQueue(); //把临时队列和交换机进行绑定 channel.queueBind(queue,"logs",""); //接受消息 channel.basicConsume(queue,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【2】接收到消息是:"+new String(body)); } }); System.out.println("消费者【2】启动成功"); System.in.read(); } }测试
路由-直连
创建生产者package d_routing_direct; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import utils.RabbitMQUtils; //生产者 public class Proudcer { @Test public void senMessage() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //设置交换机 channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT); //向交换机发消息 发送了四条消息,分别指定了路由key channel.basicPublish("logs","info",null,"我是一个routingkey-Direct类型的消息-info".getBytes()); channel.basicPublish("logs","warn",null,"我是一个routingkey-Direct类型的消息-warn".getBytes()); channel.basicPublish("logs","debug",null,"我是一个routingkey-Direct类型的消息-debug".getBytes()); channel.basicPublish("logs","error",null,"我是一个routingkey-Direct类型的消息-error".getBytes()); //关闭 RabbitMQUtils.closeChannelAndConnection(channel,connection); System.out.println("消息发送成功"); } }创建消费者1
package d_routing_direct; import com.rabbitmq.client.*; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; //消费者1 public class Consumer1 { @Test public void Message() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //设置交换机 channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT); //从通道里面得到一个临时队列 String queue = channel.queueDeclare().getQueue(); //把临时队列和交换机进行绑定 channel.queueBind(queue,"logs","info"); channel.queueBind(queue,"logs","warn"); channel.queueBind(queue,"logs","debug"); //接受消息 channel.basicConsume(queue,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【info】接收到消息是:"+new String(body)); } }); System.out.println("消费者【info】启动成功"); System.in.read(); } }创建消费者2
package d_routing_direct; import com.rabbitmq.client.*; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; //消费者2 public class Consumer2 { @Test public void Message() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //设置交换机 channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT); //从通道里面得到一个临时队列 String queue = channel.queueDeclare().getQueue(); //把临时队列和交换机进行绑定 channel.queueBind(queue,"logs","error"); //接受消息 channel.basicConsume(queue,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【error】接收到消息是:"+new String(body)); } }); System.out.println("消费者【error】启动成功"); System.in.read(); } }测试
.*匹配一个
.#匹配0个或多个
package e_routing_topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import utils.RabbitMQUtils; //生产者 public class Proudcer { @Test public void senMessage() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //设置交换机 channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC); //向交换机发消息 发送了四条消息,分别指定了路由key channel.basicPublish("topic","user.insert",null,"我是一个routingkey-Topic类型的消息-user.insert".getBytes()); channel.basicPublish("topic","user.insert.a",null,"我是一个routingkey-Topic类型的消息-user.insert.a".getBytes()); //关闭 RabbitMQUtils.closeChannelAndConnection(channel,connection); System.out.println("消息发送成功"); } }消费者1
package e_routing_topic; import com.rabbitmq.client.*; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; //消费者1 public class Consumer1 { @Test public void Message() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //设置交换机 channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC); //从通道里面得到一个临时队列 String queue = channel.queueDeclare().getQueue(); //把临时队列和交换机进行绑定 channel.queueBind(queue,"topic","user.*"); //接受消息 channel.basicConsume(queue,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【*】接收到消息是:"+new String(body)); } }); System.out.println("消费者【*】启动成功"); System.in.read(); } }消费者2
package e_routing_topic; import com.rabbitmq.client.*; import org.junit.Test; import utils.RabbitMQUtils; import java.io.IOException; //消费者1 public class Consumer2 { @Test public void Message() throws Exception{ Connection connection = RabbitMQUtils.getConnection(); //创建通道 Channel channel = connection.createChannel(); //设置交换机 channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC); //从通道里面得到一个临时队列 String queue = channel.queueDeclare().getQueue(); //把临时队列和交换机进行绑定 channel.queueBind(queue,"topic","user.#"); //接受消息 channel.basicConsume(queue,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者【#】接收到消息是:"+new String(body)); } }); System.out.println("消费者【#】启动成功"); System.in.read(); } }测试
两个spring boot项目
生产者测试类package com.sxt; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class RabbitmqSpringbootProducerApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { System.out.println(rabbitTemplate); } @Test void testHello(){ rabbitTemplate.convertAndSend("hello","hello world"); System.out.println("消息发送成功"); } }yml配置文件
server: port: 8001 spring: application: name: producer rabbitmq: host: 39.105.127.232 port: 5672 username: user password: 123456 virtual-host: /v-sxt编写生产者HelloConfig配置
package com.sxt.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class HelloConfig { //创建一个队列 @Bean public Queue hello(){//ctrl + p 显示其他参数 这里可以和hello项目一样设置五个参数 Queue hello = new Queue("hello"); return hello; } }消费者HelloConsumer
package com.sxt.consumer; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queuesToDeclare = {@Queue("hello")}) public class HelloConsumer { @RabbitHandler public void receive(String body){ System.out.println("消费者收到消息,内容为:"+body); } }
消费者与生产者的yml配置文件一样,先启动生产者再启动消费者
测试 集成spring boot-work 生产者测试类@Test void testWork(){ for (int i=1;i<=10;i++){ rabbitTemplate.convertAndSend("work","hello work--"+i); } System.out.println("消息全部发送成功"); }生产者配置类WorkConfig
package com.sxt.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class WorkConfig { //创建一个队列 @Bean public Queue work(){//ctrl + p 显示其他参数 这里可以和hello项目一样设置五个参数 Queue work = new Queue("work"); return work; } }消费者配置类WorkConsumer
package com.sxt.consumer; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class WorkConsumer { @RabbitListener(queuesToDeclare = {@Queue("work")}) public void receive1(String body){ System.out.println("消费者[1]收到消息,内容为:"+body); } @RabbitListener(queuesToDeclare = {@Queue("work")}) public void receive2(String body){ System.out.println("消费者[2]收到消息,内容为:"+body); } }测试
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)