上篇文章,我们讲解了config + bus 实现所有服务配制自动刷新的功能,本篇文章继续讲解下消息驱动Stream
上篇文章地址:https://blog.csdn.net/qq_43692950/article/details/122025347
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka。
为什么使用stream
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
stream 中的注解
我们stream 基于 RabbitMQ 上使用,所以在搭建之前确保已经安装好RabbitMQ 。
创建SpringBoot module 引入下面依赖:
org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-actuatororg.springframework.cloud spring-cloud-starter-stream-rabbit
配制文件修改:
server: port: 7071 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.168.40.1 port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: msgExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置
创建服务提供service
接口:
public interface IMessageProvider { public String send(String msg); }
实现:
@EnableBinding(Source.class) //定义消息的推送管道 public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 消息发送管道 @Override public String send(String msg) { return output.send(MessageBuilder.withPayload(msg).build()) ? "发送成功" : "发送失败!"; } }
创建测试接口:
@RestController public class SendMessageController { @Resource private IMessageProvider messageProvider; @GetMapping("/sendMessage/{msg}") public ResponseTemplate sendMessage(@PathVariable String msg) { String result = messageProvider.send(msg); return ResSuccessTemplate.builder().data(result).build(); } }
启动消息提供者服务,查看RabbitMQ的控制台:
我们写的交换机已经注册到RabbitMQ中了。
下面继续搭建消息的消费者。
三、搭建消息消费者再创建一个SpringBoot 的 module,同样引入下面依赖:
org.springframework.boot spring-boot-starter-weborg.springframework.cloud spring-cloud-starter-stream-rabbitorg.springframework.boot spring-boot-starter-actuator
修改配制文件:
server: port: 6062 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: www.bixc.net port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: msgExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置
注意:这里的destination 要和 提供者的相同,才表示链接同一个交换机。
编写消息监听:
@Slf4j @Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Messagemessage) { log.info("接受到的消息: {} , 当前 port: {} ", message.getPayload(), serverPort); } }
启动服务消费者,下面开始测试:
浏览器访问:http://localhost:7071/sendMessage/哈哈哈
已经发送成功了,再看下消费者端的打印日志:
同样也接受到了消息。
现在我们修改下消费者端的端口,再次启动一个相同的消费者,端口为6061。
再次发送消息,看下消费者端的请求:
两个消费者都接受到了消息,如果只想一个消费者接收消息,只需放在同一个组下面接口,上面配制中没有指定组,会自动生成一个组:
那现在我们加上分组,修改消费端的配制:
spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: www.bixc.net port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: msgExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: groupA
添加上group: groupA指定了组名,再来看RabbitMQ中的信息:
下面再发送下消息:
只有一个消费者收到了消息。
这里还有一个注意的是,如果不配置分组,每次启动都会随机分配一个分组,如果接受消息突然中断服务,再次启动就会造成消息的丢失,所以一定要指定组信息。
喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)