- 项目环境
- 介绍
- Spring Cloud Stream 应用模型
- 入门使用
- 消息生产者
- 消息消费者
- 注解解释
- 自定义消息管道
- 消费分组
- 消息分区
- 设置RoutingKey
1、IDE:idea ,maven
2、 *** 作系统:win10
3、jdk:1.8
4、springboot 2.1.6.RELEASE ,springcloud Greenwich.SR1
在系统开发里面难免用到消息队列,但各个的消息队列又有所区别,SpringCloudStream 的 作用就是屏蔽各种消息队列的区别,对消息队列的 API进行进一步的抽象,使得在springcloud 里面能更加方便的集成各种消息系统。通过使用springcloud Stream ,可以有效简化开发人员对消息中间件的使用复杂程度,让系统开发人员能够有更多精力去关注核心业务逻辑的处理。目前springcloud Stream只支持两大著名的消息中间件,rabbitmq 和 kafka。
Spring Cloud Stream 应用模型Spring Cloud Stream 应用程序由一个中间件中立的核心组成。 应用程序通过在外部代理公开的目标和代码中的输入/输出参数之间建立绑定来与外部世界进行通信。 建立绑定所需的代理特定细节由特定于中间件的 Binder 实现处理。
以下图片来自官方
这里使用rabbit作为消息中间件,自行安装rabbitmq
1、新建一个cloud-stream-provider,添加依赖
org.springframework.cloud spring-cloud-streamorg.springframework.cloud spring-cloud-starter-stream-rabbit
2、创建消息消费者类
@EnableBinding(Sink.class) public class SinkReceiver { @StreamListener(Sink.INPUT) public void receive(Object receivedMessage){ System.out.println("receivedMessage="+receivedMessage); } }
3、启动CloudStreamProviderApplication
启动之后登录rabbitmq后台http://localhost:15672/,使用默认账号密码guest登录
connections一栏能看到我们的连接
点击127.0.0.1:50433可以查看connection详情
再点击127.0.0.1:5672 (1)可以查看Channel详情
点击input.anonymous.leanfVQMRM-6dPfz1XIkCw 就能查看到具体的队列详情
下拉有个Publish message,在这里就可以发布数据
发布完之后就能在控制台看到发布的数据了
怎么知道这个队列input.anonymous.leanfVQMRM-6dPfz1XIkCw一定是启动的那个呢
其实在启动的时候默认就给我们分配了一个,查看控制台信息可以看到
Stream尝鲜完成了,接下来是个简单入门使用
1、创建一个消息生产者类
@EnableBinding(Source.class) public class SourceProvider { @Resource @Qualifier("output") private MessageChannel messageChannel; public void send(Object sendMessage){ messageChannel.send(MessageBuilder.withPayload(sendMessage).build()); } }
2、修改配置文件
server.port=8301 spring.application.name=cloud-stream-provider #消息组件类型 rabbitmq1为自定义的rabbitmq实例名称,如果有多个消息队列实例的话可以参照下面这样 # type:消息中间件类型 spring.cloud.stream.binders.rabbitmq1.type=rabbit spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.host=localhost spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.username=guest spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.password=guest spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.port=5672 spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.virtual-host=/ # 实例2 #spring.cloud.stream.binders.rabbitmq2.type=rabbit #spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.host=localhost #spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.username=guest #spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.password=guest #spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.port=5672 # 要使用的 Exchange 名称 spring.cloud.stream.bindings.output.destination=streamExchange #设置消息类型 spring.cloud.stream.bindings.output.content-type=application/json #要绑定的消息服务的实例名 spring.cloud.stream.bindings.output.binder=rabbitmq1
3、创建测试类用于发送消息
@RunWith(SpringRunner.class) @SpringBootTest(classes = CloudStreamProviderApplication.class) public class CloudStreamProviderTest{ @Autowired private SourceProvider sourceProvider; @Test public void test(){ sourceProvider.send("hello,this is first message"); } }
只有生产者没有消费者是没用的,接下来创建消费者
消息消费者1、新建一个stream-consumer,添加依赖
org.springframework.cloud spring-cloud-streamorg.springframework.cloud spring-cloud-starter-stream-rabbit
2、创建消息接收类SinkReceiver
@EnableBinding(Sink.class) public class SinkReceiver { @StreamListener(Sink.INPUT) public void receive(Message> receivedMessage){ System.out.println("receivedMessage="+receivedMessage.getPayload()); } }
3、修改配置文件
server.port=8401 spring.application.name=cloud-stream-consumer #消息组件类型 rabbitmq1为自定义的rabbitmq实例名称,如果有多个消息队列实例的话可以参照下面这样 # type:消息中间件类型 spring.cloud.stream.binders.rabbitmq1.type=rabbit spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.host=localhost spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.username=guest spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.password=guest spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.port=5672 # 要使用的 Exchange 名称 ,input是rabbitmq的channel名称,后面可以自定义 spring.cloud.stream.bindings.input.destination=streamExchange #设置消息类型 spring.cloud.stream.bindings.input.content-type=application/json #要绑定的消息服务的实例名 spring.cloud.stream.bindings.input.binder=rabbitmq1
启动CloudStreamConsumerApplication,可以看到rabbitmq首页下方有一个消费者
调用CloudStreamProviderTest#test,查看stream-consumer控制台,会发现收到一条消息
receivedMessage=hello,this is first message注解解释
-
@EnableBinding:
-
@StreamListener
改造cloud-stream-provider、stream-consumer
1、新建IProcessor.java
public interface IProcessor { String OUTPUT = "stream_output"; String INPUT = "stream_input"; @Output(IProcessor.OUTPUT) MessageChannel output(); @Input(IProcessor.INPUT) SubscribableChannel input(); }
2、修改配置文件
修改cloud-stream-provider配置文件,把channel名称从out改为stream_output
# 要使用的 Exchange 名称 ,output是rabbitmq的channel名称,后面可以自定义 spring.cloud.stream.bindings.stream_output.destination=streamExchange #设置消息类型 spring.cloud.stream.bindings.stream_output.content-type=application/json #要绑定的消息服务的实例名 spring.cloud.stream.bindings.stream_output.binder=rabbitmq1
修改stream-consumer配置文件,把channel名称从out改为stream_input
# 要使用的 Exchange 名称 ,input是rabbitmq的channel名称,后面可以自定义 spring.cloud.stream.bindings.stream_input.destination=streamExchange #设置消息类型 spring.cloud.stream.bindings.stream_input.content-type=application/json #要绑定的消息服务的实例名 spring.cloud.stream.bindings.stream_input.binder=rabbitmq1
3、新建类用于测试
3.1 cloud-stream-provider消息生产者端
ProcessorProvider.java
@EnableBinding(IProcessor.class) public class ProcessorProvider { @Resource @Qualifier(IProcessor.OUTPUT) private MessageChannel messageChannel; public void send(Object sendMessage){ messageChannel.send(MessageBuilder.withPayload(sendMessage).build()); } }
修改测试类
@RunWith(SpringRunner.class) @SpringBootTest(classes = CloudStreamProviderApplication.class) public class CloudStreamProviderTest { @Autowired private SourceProvider sourceProvider; @Autowired private ProcessorProvider processorProvider; @Test public void test(){ sourceProvider.send("hello,this is first message"); } @Test public void testProcessor(){ processorProvider.send("hello,this is testProcessor message"); } }
3.2 stream-consumer 消息消费端
消息消费类 ProcessorConsumer.java
@EnableBinding(IProcessor.class) public class ProcessorConsumer { @StreamListener(IProcessor.INPUT) public void receive(Message> receivedMessage){ System.out.println("receivedMessage="+receivedMessage.getPayload()); } }
4、测试
调用CloudStreamProviderTest#testProcessor方法,stream-consumer受到消息,控制台显示
receivedMessage=hello,this is testProcessor message
这样一个简单的自定义消息管道就完成了
消费分组在现实的微服务架构中,我们的每一个微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例。在很多情况下,消息生产者发送消息给某个具体微服务时,只希望被消费一次,当进行集群部署时,虽然他们都属于同一个项目,但是消息会被多次消费,为了解决这个问题,springcloud Stream 提出了消费组的概念。
可以通过 spring.cloud.stream.bindings.input.group 属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候,只会有一个成员真正收到消息并进行处理。
测试:
启动两个cloud-stream-consumer消费端,关于springboot 同一个项目在idea多个运行,请看这篇文章
调用CloudStreamProviderTest#testProcessor,会发现cloud-stream-consumer控制台一个会打印收到的消费消息,一个没有
通过多次调用CloudStreamProviderTest#testProcessor,会发现多个cloud-stream-consumer实例的话会轮流收到消息,没有办法控制到底是哪一个实例消费,也就是说,对于同一条消息,它多次到达之后可能是由不同的实例进行消费的。但是对于一些业务场景,需要对一些具有相同特征的消息设置每次都被同一个消费实例处理,比如,一些用于监控服务,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身聚合这些数据,那么消息生产者可以为消息增加一个固有的特征 ID 来进行分区,使得拥有这些 ID 的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。而分区概念的引入就是为了解决这样的问题:当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。
修改消费者cloud-stream-consumer端配置文件,增加如下配置
# 是否开启消息分区 Default: 'false' spring.cloud.stream.bindings.stream_input.consumer.partitioned=true # 当前消费者总的实例数量 spring.cloud.stream.instance-count=2 # 当前实例的索引号,最大值为spring.cloud.stream.instance-count 减 1 spring.cloud.stream.instance-index=1
修改生产者端cloud-stream-provider配置文件,增加如下配置
# 分区键的表达式规则,可以根据实际的输出消息规则配置spEL来生成合适的分区键 spring.cloud.stream.bindings.stream_output.producer.partition-key-expression=payload # 消息分区数量 spring.cloud.stream.bindings.stream_output.producer.partition-count=1
修改完成之后,同时启动多个消费者并为每个消费者指定不同的索引,会发现消息被发送到消息组时,只有一个实例在接收和处理消息。
设置RoutingKeyRoutingKey 其 实 是 RabbitMq 的 概 念 , 在 RabbitMq 里 面 其 实 有 好 几 种 Exchange , 在 SpringCloudStream 里面默认就是使用的最通用的 Topic 如果没有配置 RoutingKey,它使用的 RoutingKey 其实就是#,既类似于 fanout 的广播类型 其实也可以使用 RoutingKey 来实现类似于 direct 直连类型
修改消费端配置文件
#绑定routing-key spring.cloud.stream.rabbit.bindings.stream_input.consumer.binding-routing-key=routing_key_a
修改消息生产端配置文件
#绑定routing-key spring.cloud.stream.rabbit.bindings.stream_output.producer.routing-key-expression='routing_key_a'
springcloudstream
springcloud Stream 使用rabbitmq作为消息中间件的部分就简单完成了,有兴趣的也可以使用kafka作为消息中间件。
GitHub地址:
https://github.com/ArronSun/micro-services-practice.git
参考书籍:
《springcloud微服务实战》
能力一般,水平有限,如有错误,请多指出。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)