- 1. 创建SpringBoot项目,引入相关依赖
- 2. 为了减少重复代码书写,编写连接工厂创建信道的工具类
- 3. 编写生产者代码
- 4. 编写消费者代码
- 5. 运行代码测试
2. 为了减少重复代码书写,编写连接工厂创建信道的工具类org.springframework.boot spring-boot-starter-amqpcommons-io commons-io2.6 org.springframework.boot spring-boot-starter-testtest org.junit.vintage junit-vintage-engineorg.springframework.amqp spring-rabbit-testtest
public class RabbitMqUtils { public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("120.24.192.216"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }3. 编写生产者代码
public class Task01 { public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //从控制台中接收信息发送 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完成:"+message); } } }4. 编写消费者代码
public class Worker01 { public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //消息的接收 DeliverCallback deliverCallback = (consumerTag,message) ->{ System.out.println("接收到的消息:"+new String(message.getBody())); }; //消息接收被取消时执行的内容 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息者取消消息消费接口回调逻辑"); }; System.out.println("C2等待接收消息......"); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }5. 运行代码测试
多进程运行两个消费者代码,首先设置idea可以多进程运行,选中Allow multiple instances
运行生产者代码,并手动输入需要发送的消息
观察消费者接收消息的情况
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)