文章持续更新中,欢迎关注!
一、概述 1. 是什么?在实际应用中有很多消息中间件,比如现在企业里常用的有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,学习所有这些消息中间件无疑需要大量时间经历成本,那有没有一种技术,使我们不再需要关注具体的消息中间件的细节,而只需要用一种适配绑定的方式,自动的在各种消息中间件内切换呢?消息驱动就是这样的技术,它能屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
Spring Cloud Stream 是一个构建消息微服务驱动的框架。可以屏蔽底层消息中间件的差异,降低版本切换成本,统一消息的编程模型,目前仅支持 RabbitMQ 和 Kafka。
2. 设计思想 标准 MQ 的设计思想
生产者 / 消费者之间靠消息媒介传递信息内容,Message
消息必须走特定的通道,MessageChannel
消息通道里的消息如何被消费呢,谁负责收发处理?消息通道MessageChannel的子接口SubscribableChannel,由消息处理器MessageHandler所订阅
Spring Cloud Stream 的设计思想如果我们的项目中用到了 RabbitMQ 和 Kafka 两种消息中间件,由于它们的架构不同,对实际开发造成了一定困扰;或者用到了一种消息中间件,随着后面的业务需求需要向另一种消息队列迁移,这无疑是灾难性的,会造成一大堆的改动,因为它们与系统耦合了,这时候 Spring Cloud Stream 就可以为我们提供一种解耦的方式。
Spring Cloud Stream 提供的解决方案是:通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离。向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种消息中间件的实现。
inputs 对应消费者,outputs 对应生产者
Stream中的消息通信方式遵循了发布-订阅模式,用 Topic 主题进行广播(在RabbitMQ就是Exchange,在Kafka中就是Topic)
3. 工作流程-
Binder:绑定器,很方便的连接中间件,屏蔽差异
-
Channel:通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储与转发的媒介,通过 Channel 对队列进行配置
-
Source 和 Sink:简单理解就是参照物是 Spring Cloud Stream 本身,从 Stream 发布消息就是输出,接收消息就是输入
新建三个子模块分别对应于消息的生产者和消费者:
3.application.ymlspringcloud com.dyh.springcloud 1.0-SNAPSHOT 4.0.0 springcloud-stream-rabbitmq-provider8801org.springframework.cloud spring-cloud-starter-stream-rabbitorg.springframework.cloud spring-cloud-starter-netflix-eureka-clientorg.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-actuatororg.projectlombok lomboktrue org.springframework.boot spring-boot-starter-testtest
server: port: 8801 spring: application: name: springcloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称,OUTPUT表示这是消息的发送方 destination: testExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/ instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址4.主启动类 Provider8801Application
package com.dyh.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; @EnableEurekaClient @SpringBootApplication public class Provider8801Application { public static void main(String[] args) { SpringApplication.run(Provider8801Application.class, args); } }5.业务类
在业务来中分别要编写发送消息接口及其实现类,并在发送接口消息的实现类中添加@EnableBinding注解用来绑定消息的推送管道,消息生产者绑定的消息推送管道包为org.springframework.cloud.stream.messaging.Source
发送消息接口 - IMessageProviderpackage com.dyh.springcloud.service; public interface IMessageProvider { public String send(); }发送消息接口的实现类 - IMessageProviderImpl
package com.dyh.springcloud.service.impl; import com.dyh.springcloud.service.IMessageProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import java.util.UUID; @EnableBinding(Source.class) //定义消息的推送管道 public class IMessageProviderImpl implements IMessageProvider { @Autowired private MessageChannel output; //消息发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build());//发送消息 System.out.println("========serial:" + serial); return null; } }
注意我们在service的实现类中不再需要@Service注解,因为这个service不再是传统意义上的和Controller、DAO数据等进行交互的service,而是要绑定绑定器打交道的service。
SendMessageControllerpackage com.dyh.springcloud.controller; import com.dyh.springcloud.service.IMessageProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class SendMessageController { @Autowired private IMessageProvider messageProvider; @GetMapping("/sendMessage") public String sendMessage() { return messageProvider.send(); } }6.测试
启动服务注册中心(7001,7002),启动RabbitMQ,启动消息生产者微服务8801,我们在RabbitMQ的控制面板中可以看见多出了一个名为testExchange的交换机,这个交换机恰恰就是我们之前在配置文件中配置的交换机名字:
然后我们访问 http://localhost:8801/sendMessage 使用消息生产者微服务发送消息,在其微服务后台我们看到了打印的消息:
在RabbitMQ的控制面板中我们也看到了确实发送了消息。
8802,8803 POM文件一致
3.application.ymlorg.springframework.cloud spring-cloud-starter-stream-rabbitorg.springframework.cloud spring-cloud-starter-netflix-eureka-clientorg.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-actuatororg.projectlombok lomboktrue org.springframework.boot spring-boot-starter-testtest
application.yml的书写和消息生产者的几乎一致,特别需要注意的是,消息生产者微服务用到的通道为OUTPUT,而消息消费者微服务用到的通道为INPUT,其他的配置文件信息就只需要注意端口号、注册服务名的区别即可:
跟生产者主启动类一致主要2个注解
@EnableEurekaClient
@SpringBootApplication
由于是消费者,所以只需要编写其Controller即可,在其Controller上同样需要添加
@EnableBinding注解用来绑定消息的推送管道,消息消费者绑定的消息推送管道包为org.springframework.cloud.stream.messaging.Sink,在接收消息的方法中需要使用@StreamListner注解来监听其绑定的消息推送管道
package com.dyh.springcloud.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) public class ReceiveMessageController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message6.测试message) { System.out.println("消费者" + serverPort + "号,收到消息:" + message.getPayload()); } }
然后启动消费者服务(8802,8803),用生产者发送消息,我们可以发现在消费者端可以成功接收到消息:
生产者发送消息
消费者接收消息
当生产者发送消息后,此时的我们的消费者都接受了消息并进行了消费,也就是说同一条消息被多个消息消费者所消费,为了避免这种情况,我们可以使用 Stream 中的消息分组来解决。
在 Stream 中,处于同一个组的多个消费者是竞争关系,就可以保证消息只被一个服务消费一次,而不同组是可以重复消费的。现在默认分组就是不同的,组流水号不一样。
3.分组解决重复消费问题将不想产生重复消费的服务分为同一个组即可
在8802/8803微服务中的配置文件中分别添加组名属性:
spring: cloud: stream: bindings: input: group: A
再次重启两个消息消费方微服务,此时我们可以看到在分组A下已经有了两个消费者:
再用生产者发送5条消息,我们发现8802/8803分别消费了2条和3条不同的消息,而没有出现重复消费的问题:
如果我们的消费者因为种种原因宕机了,生产者此时发送了消息,没有配置 group 属性的消费者重新上线后无法接收到之前的消息,而配置了 group 的消费者仍会接收到消息,这就是持久化的效果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)