SpringCloud冷知识:Stream还能这样玩?

SpringCloud冷知识:Stream还能这样玩?,第1张

 SpringCloudStream概述

Spring Cloud对Spring Cloud Stream(简称SCS)的定位是用于构建高度可扩展的基于事件驱动的微服务,其目的是简化消息在Spring Cloud应用程序中的开发。同时SCS能够提供一套灵活可扩展的编程模型,在Spring的基础上,支持发布/订阅模型、消费者分组、数据分片等。使用SCS能使微服务基于消息驱动的开发模式更加简单透明。 SCS的架构 SCS可以简单地理解为是对第三方消息中间件的一个概念封装,开发人员可以将关注点从消息中间件的特性配置转移到对消息的配置。 下面是一个简单的架构图。

​● Middleware:消息中间件,如RabbitMQ、Kafka、RocketMQ等。 ● Binder:可以认为是适配器,用来将Stream与中间件连接起来,不同的Binder对应不同的中间件,需要我们配置。 ● Application:由Stream封装的消息机制,很少自定义开发。 ● Inputs:输入,可以自定义开发。 ● Outputs:输出,可以自定义开发。 如果将SCS架构从消息层面做进一步细化,则可以分为三个模块,如下图所示。

SCS的核心模块

● Source 当服务发布消息前的前置业务完成后会通过Source将消息发布出去。Source是一个Spring注解接口,它可以将代表消息主体的POJO对象发布到Channel中,发布之前会把该消息对象序列化(默认使用JSON)。

● Channel Channel(消息通道)是消息队列的进一步抽象,它会保存Producer发布的或者Consumer接收的消息。Channel名称一般与目标队列名称相关联。然而,消息队列的名称不会直接在代码中暴露,相反Channel名称会被用在代码中,所以只能在配置文件中配置,为Channel选取正确的消息队列进行读和写,而不是在代码中体现。

● Binder Binder是SCS框架的一部分,它由SCS实现,用来与特殊的消息平台交互。我们可以在不暴露特殊消息平台的类库和API的情况下实现对消息的发布和消费。通过后面的源码介绍,你将会看到它的强大之处。

● Sink 在SCS中,当从消息队列接收到一条消息后,需要Sink。Sink能监听进入Channel中的消息并将消息反序列化成一个POJO对象。之后,消息就能给业务逻辑使用了。

SCS的接入 我们以RabbitMQ为例(消息队列的环境搭建这里不做过多的介绍,本章以Stream为主),新建两个Maven工程,分别作为消息消费者(Server-Receiver)和消息生产者(Server-Sender),在两个项目中引入Stream依赖和Stream对RabbitMQ的依赖,再为生产者单独添加Web依赖,以便能够通过HTTP调用发送信息。

1.接入Maven依赖

2.消费者启动主类Server-Receiver

​注意: 也 可 以 自 定 义 Sink 接口 , 使 用@EnableBinding(SinkDemo.class)的形式。下面是实现代码,自定义信道的名称为SinkDemo,Stream框架会创建出名为SinkDemo的Channel:

​3.添加消费者配置文件application.yml

具体配置详解说明如下(spring.cloud.stream为前缀):

● bindings配置

○ input : 表 示 channelName , 它 是 启 动 类 中@EnableBinding(Sink.class)注解中配置的Sink接口,该接口中默认定义了channelName,当然我们也可以自己写Sink接口。

○ destination:消息中间件的Topic。

○ binder:当前bindings绑定的对应的适配器,该实例表示适配RabbitMQ,名称默认为defaultRabbit,可以自定义,接着需要配置该名称对应的类型、环境信息等。

● binders配置

○ defaultRabbit : binder 适 配 器 名 称 , 和 spring.cloud.stream.bindings.input.binder值一样。 ○ environment:表示当前binder对应的配置信息。

4.生产者Server-Sender实现 为SenderApplication启动类添加@EnableBinding注解,实现代码如下:

​自定义SenderSource接口,以 org.springframework.cloud.stream.messaging.Source源码为参考将Channel的名称改成和消费者的Sink的Channel名称一样。

​5.编写控制器,通过HTTP发送消息

​6.添加生产者application.yml配置,配置方式和消费者的配置方式一样

7.启动消费者和生产者 首先启动消费者,通过查看日志我们看到程序中声明了一个名称为 demotopic.anonymous.88A97a5vQ9Ox07GnNBlKYQ的队列(SCS为我们建的临时队列名称),并且绑定了mytopic主题,创建了一个连上消息队列的连接,下面是部分关键日志输出:

然后启动生产者Server-Sender,在启动日志中我们也看到应用创建了到对应的消息队列的连接。接下来我们通过HTTP发送信息:

在服务消费者的日志中,监听到了对应的消息:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/920089.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存