在springboot下 *** 作rabbitMQ。
1.pom文件配置pom文件配置如下:
2.yml配置org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-configuration-processortrue org.projectlombok lombokcom.alibaba fastjson1.2.78
server: port: 8080 spring: rabbitmq: host: 192.168.161.114 port: 5672 username: root password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual # 手动ack concurrency: 5 #消费端最小并发数 max-concurrency: 10 #消费端最大并发数 prefetch: 5 # 一次请求中预处理的消息数量 cache: channel: size: 50 #缓存的channel数量 #自定义配置 rabbitmq-demo: defaultExchange: amqpExchange queue: queue routeKey: queue_key3.java代码
自定义MQ配置:MQProperties:
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Component @ConfigurationProperties(prefix = "rabbitmq-demo") @Data public class MQProperties { private String defaultExchange; private String routeKey; private String queue; }
RabbitMQ中队列及exchange的配置:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableRabbit public class RabbitMQConfig { @Autowired private MQProperties mqProperties; @Bean public Queue queue() { boolean durable = true; boolean exclusive = false; boolean autoDelete = false; return new Queue(mqProperties.getQueue(),durable,exclusive,autoDelete); } @Bean public DirectExchange defaultExchange() { boolean durable = true; boolean autoDelete = false; return new DirectExchange(mqProperties.getDefaultExchange(), durable, autoDelete); } @Bean public Binding binding() { return BindingBuilder.bind(queue()) .to(defaultExchange()) .with(mqProperties.getRouteKey()); } }
生产者Producer:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Slf4j @Service public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MQProperties mqProperties; public void sendMessage(String msg) { rabbitTemplate.convertAndSend(mqProperties.getDefaultExchange(), mqProperties.getRouteKey(), msg); } }
消费者 Consumer:
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; @Slf4j @Service public class Consumer { @RabbitListener(queues = "${rabbitmq-demo.queue}") public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag){ log.info("消费者获取消息内容:{}",payload); RabbitMQUtils.askMessage(channel, tag); } }
定义一个发送数据的接口 Controller:
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class Controller { @Autowired Producer producer; @RequestMapping("/sendQueue") @ResponseBody public String sendQueue(String msg) { producer.sendMessage(msg); return "success"; } }
定义一个工具类RabbitMQUtils:
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @Slf4j public class RabbitMQUtils { public static void askMessage(Channel channel, long tag) { askMessage(channel, tag, false); } public static void askMessage(Channel channel, long tag, boolean multiple) { try { channel.basicAck(tag, multiple); } catch (IOException e) { log.error("RabbitMQ,IO异常,异常原因为:{}", e.getMessage()); } } public static void rejectMessage(Channel channel, long tag) { rejectMessage(channel, tag, false, false); } public static void rejectAndBackMQ(Channel channel, long tag) { rejectMessage(channel, tag, false, true); } public static void rejectMessage(Channel channel, long tag, boolean multiple, boolean request) { try { channel.basicNack(tag, multiple, request); } catch (IOException e) { log.error("RabbitMQ,IO异常,异常原因为:{}", e.getMessage()); } } }4.测试
上述代码在springboot中启动以后,进行测试:
wget http://127.0.0.1:8080/sendQueue?msg=testmsg
测试结果:
2021-11-02 15:36:45.515 INFO 14692 --- [ntContainer#0-4] com.dhb.rabbitmq.demo.Consumer : 消费者获取消息内容:testmsg
在RabbitMQ的后台界面,队列queue的绑定关系如下:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)