在项目中,我们经常会用到消息中间件来达到解耦或者削峰的目的。常用的消息中间件有RabbitMQ、Kafka等。虽然这些消息中间件之间的原理可能类似,但它们的使用方法却是大相径庭的,那如果我们要在项目中换一种消息中间件来实现原有的功能,那么我们需要花费大量的精力去修改原有的代码。
Spring Cloud Stream是一个构建消息驱动的微服务应用框架,它使用Binder和消息中间件建立联系,我们在使用的时候不需要关心我们到底是使用的是RabbitMQ还是Kafka,因此我们可以在消息中间件中随意切换。
1.实践环境Spring Cloud Stream Rabbit 3.2.1
org.springframework.cloud spring-cloud-starter-stream-rabbit3.2.1
Spring Cloud Stream从 3.1 开始支持函数式编程模型,因此我们的配置也是使用函数式编程模型来实现的。
2.生产者 2.1 yml配置文件spring: rabbitmq: host: 192.168.1.2 port: 5672 username: guest password: guest virtual-host: /test cloud: stream: bindings: sms-out-0: # 自定义生产者通道名称 destination: sms-exchange# 自定义Exchange交换机名称,生产者和消费者需要配置同一个Exchange2.2 生产者实现
@Component public class SmsProducer{ @Resource private StreamBridge streamBridge; public void send(String message) { // 生产者的通道名称须与yml中配置的属性一致 String SMS_OUTPUT = "sms-out-0"; streamBridge.send(SMS_OUTPUT , message); } }3 .消费者
3.1 yml配置文件消费者使用函数式编程模型实现
spring: rabbitmq: host: 192.168.1.2 port: 5672 username: guest password: guest virtual-host: /test cloud: stream: bindings: sms-in-0: # 自定义消费者通道名称 destination: sms-exchange# 自定义Exchange交换机名称,生产者和消费者需要配置同一个Exchange3.2 消费者实现
@Component public class SmsConsumer { @Bean public Consumer4.测试sms() { return System.out::println; } }
现在我们开始启动项目,会在RabbitMQ中创建一个Exchange以及与其绑定的一个临时队列(该队列会在应用停止运行时消失)
生成的Exchange
生成的临时队列
@SpringBootTest(classes = StreamApplication.class) public class StreamApplicationTest { @Resource private SmsProducer smsProducer; @Test public void testFunctionSend() { smsProducer.send("测试函数式编程发送消息"); } }
最后,欢迎关注微信公众号一起交流
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)