首先要先部署好RabbitMQ
在spring中使用RabbitMQ使用的时AMQP统一规范,需要引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
一、基本使用
推送端yml配置,virtual-host是配置rabbitmq的虚拟主机,类似于不同的服务器,这样当rabbitmq部署多个通道队列的时候,可以有效的进行隔离
spring:
rabbitmq:
host: 121.***.***.***
port: 5672
virtual-host: /
username: test
password: 123456
消费端yml配置,注意这里配置listener.simple.prefetch=1,意味着消费端每次只预取一个消息,消费过之后才能再次拉取消息
spring:
rabbitmq:
host: 121.***.***.***
port: 5672
virtual-host: /
username: test
password: 123456
listener:
simple:
prefetch: 1
(一)、消息队列方式一
1.推送端
推送端主要使用的是RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg")
public String sendMsg(){
String queueName = "simple.queue";
String message = "hello world";
rabbitTemplate.convertAndSend(queueName,message);
return "send success";
}
2.消费端
@Component
public class MyRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listeneQueue1(String msg) throws InterruptedException {
System.out.println("收到msg:"+msg);
}
}
(二)、消息队列方式二
1.推送端
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg")
public String sendMsg(){
String queueName = "simple.queue";
String message = "hello world";
rabbitTemplate.convertAndSend(queueName,message);
return "send success";
}
2.消费端
@Component
public class MyRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listeneQueue1(String msg) throws InterruptedException {
System.out.println("Listener111收到msg:"+msg);
Thread.sleep(1000);
}
@RabbitListener(queues = "simple.queue")
public void listeneQueue21(String msg) throws InterruptedException {
System.out.println("Listener222收到msg:"+msg);
Thread.sleep(20);
}
}
(三)、消息队列方式三(采用fanoutexchange交换机)
1.推送端
首先需要配置交换机并绑定队列
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
@Bean
public Binding fanoutBinding(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//这里是绑定第二个队列,和上一个重复
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
之后再编写消息推送代码
@RequestMapping("/sendMsg2")
public String sendMsg2(){
String fanoutExchange = "itcast.fanout";
String message = "fanout test";
rabbitTemplate.convertAndSend(fanoutExchange,"aaa",message);
return "send success";
}
2消费端
@Component
public class MyRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listeneQueue3(String msg) throws InterruptedException {
System.out.println("Listener333收到msg:"+msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listeneQueue4(String msg) throws InterruptedException {
System.out.println("Listener444收到msg:"+msg);
}
}
(四)、消息队列方式四(采用directexchange交换机)
其实就是在上一个基础之上加入了bingkey,使得在推送数据的时候,交换机会根据bingkey添加到还有相同key的消息队列
1.推送端
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg3")
public String sendMsg2(){
String directExchange = "itcast.direct";
String message = "direct test";
Map<String,String> map = new HashMap<>();
map.put("女生","柳岩");
map.put("男生","蔡徐坤222");
rabbitTemplate.convertAndSend(directExchange,"red",map);
return "send success";
}
2.消费端
由于上一个方式的绑定需要写非常多的bean,所以这里可以采用注解的方式进行交换机和队列的绑定
@Component
public class MyRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listeneQueue1(String msg) throws InterruptedException {
System.out.println("directListener111收到msg:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listeneQueue2(Map msg) throws InterruptedException {
System.out.println("directListener222收到msg:"+msg);
}
}
(五)、消息队列方式五(采用topicexchange交换机)
通配符用#表示,比如china.#或#.news
二、消息转换器
当我们使用AMQP发送Object对象时,默认会采取序列化的方法把对象转化为字节数据,这样一是数据庞大,二是不便于查阅,所以我们一般采用json的方式进行序列化
解决方法:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)