- SpringCloudStream笔记
- 一、SpringCloudStream简介
- 二、SpringCloudStream架构
- 1、发射器
- 2、通道
- 3、绑定器
- 4、接收器
- 三、SpringCloudStream相关注解
- 四、SpringCloudStream和Kafka
- 1、构建消息生产者
- Ⅰ、引入依赖
- Ⅱ、添加注解
- Ⅲ、编写配置
- Ⅳ、编写代码
- 2、构建消息消费者
- Ⅰ、引入依赖
- Ⅱ、添加注解
- Ⅲ、编写配置
- Ⅳ、编写代码
- 五、SpringCloudStream和RocketMQ
- 1、构建消息生产者
- Ⅰ、添加依赖
- Ⅱ、添加注解
- Ⅲ、编写配置
- Ⅳ、编写代码
- 2、构建消息消费者
- Ⅰ、添加依赖
- Ⅱ、添加注解
- Ⅲ、编写配置
- Ⅳ、编写代码
SpringCloud可以轻松的将消息传递集成到基于Spring的微服务中,他是通过SpringCloudStream子项目来实现这一点的。SpringCloudStream是一个由注解驱动的框架,它允许开发人员在Spring应用程序中轻松构建消息发布者和消息消费者。SpringCloudStream还允许开发人员抽象出正在使用的消息传递平台的实现细节。SpirngCloudStream可以使用多个消息平台(包括Apache Kafka项目和RabbitMQ),而平台的具体实现细节则被排除在应用程序代码之外。在应用程序中实现消息发布和消费是通过平台无关的Spring接口实现的。
为什么有SpringCloudStream:
市面上的消息中间件种类繁多,共同掌握学习成本高,SpringCloudStream编程模型统一了中间件适中的方式,降低了学习的成本(类似与JDBC一样)。
二、SpringCloudStream架构架构图如下:
随着SpringCloud中消息的发布和消费,有4个组件涉及发布消息和消费消息,它们是:
- 发射器(source)
- 通道(channel)
- 绑定器(binder)
- 接收器(sink)
序列化对象,并将消息发布到通道。
2、通道 通道是对队列的一个抽象,它将在消息生产者发布消息或者消息消费者消费消息后保留该消息。通道的名称始终与目标队列名称相互关联。在代码中使用通道名称,队列名称不会直接公开给代码。
3、绑定器 绑定器是与特定的消息平台对话的Spring代码。
4、接收器 服务通过接收器从队列中接收消息。接收器监听传入消息的通道,并将消息反序列化为POJO。
三、SpringCloudStream相关注解Ⅱ、添加注解org.springframework.cloud spring-cloud-starter-stream-kafka
@SpringBootApplication @EnableResourceServer @EnableBinding(Source.class) public class ResourceServiceApplication { public static void main(String[] args) { SpringApplication.run(ResourceServiceApplication.class, args); } }
该注解告诉应用程序,它将绑定到SpringCloudStream消息代理。Source.class告诉SpringCloudStream使用默认的通道和发射器与消息代理进行通信。
@EnableBinding(Source.class)Ⅲ、编写配置
server: port: 8881 security: oauth2: resource: user-info-uri: http://localhost:9991/user spring: cloud: stream: bindings: output: destination: orgChangeTopic content-type: application/json kafka: binder: zkNodes: 127.0.0.1 brokers: 127.0.0.1
SpringCloudStream的配置从spring.cloud.stream.bindings开始。
# 将Source.output()通道和指定的topic绑定 要将消息送到哪里 spring.cloud.stream.bindings.output.destination: orgChangeTopic # 消息要转换为什么类型 spring.cloud.stream.bindings.output.content-type: application/json # 该配置项指定了绑定器要绑定的消息队列的实际ip zkNodes是zookeeper的实际ip(kafka依赖zk) spring.cloud.stream.kafka.binder.zkNodes: 127.0.0.1 # 该配置项指定的是kafka的实际ip spring.cloud.stream.kafka.binder.brokers: 127.0.0.1Ⅳ、编写代码
@Service public class TestMessageService { @Autowired private Source source; public void sendMessage(Person person){ source.output().send( MessageBuilder.withPayload(person).build() ); System.out.println("发送成功"); } }
将默认的Source(发射器)注入到Service类中,source.output()方法开启通道,send()方法发送消息。send()方法接收一个Message>类作为参数,可以使用MessageBuilder类来构建消息。
2、构建消息消费者 Ⅰ、引入依赖Ⅱ、添加注解org.springframework.cloud spring-cloud-starter-stream-kafka
@SpringBootApplication @EnableBinding(Sink.class) public class ResourceServiceReceiveApplication { public static void main(String[] args) { SpringApplication.run(ResourceServiceReceiveApplication.class, args); } }
同理,告诉应用程序要使用SpringCloudStream消息代理,使用默认的通道和接收器。
@EnableBinding(Sink.class)Ⅲ、编写配置
server: port: 9999 spring: cloud: stream: bindings: input: destination: orgChangeTopic content-type: application/json group: one kafka: binder: zkNode: 127.0.0.1 brokers: 127.0.0.1
同理如上,SpringCloudStream的配置从spring.cloud.stream.bindings开始。
# 将Source.output()通道和指定的topic绑定 要将消息送到哪里 spring.cloud.stream.bindings.input.destination: orgChangeTopic # 消息要转换为什么类型 spring.cloud.stream.bindings.input.content-type: application/json # 消息消费者需要指定分组 同一个分组中的一个实例接收并处理消息 保证消息只被同一组的一个消费 避免重复消费 spring.cloud.stream.bindings.input.group: one # 该配置项指定了绑定器要绑定的消息队列的实际ip zkNodes是zookeeper的实际ip(kafka依赖zk) spring.cloud.stream.kafka.binder.zkNodes: 127.0.0.1 # 该配置项指定的是kafka的实际ip spring.cloud.stream.kafka.binder.brokers: 127.0.0.1Ⅳ、编写代码
@Service public class ReceiveService { @Autowired private Sink sink; @StreamListener(Sink.INPUT) public void receive(Person person){ System.out.println("接收到的消息为:" + person); } }
这里使用了注解@StreamListener(Sink.INPUT)来监听消息,其中指定了接收消息的通道是INPUT。
五、SpringCloudStream和RocketMQ 1、构建消息生产者 Ⅰ、添加依赖Ⅱ、添加注解com.alibaba.cloud spring-cloud-starter-stream-rocketmq
@SpringBootApplication @EnableBinding(Source.class) public class CloudProviderApplication { public static void main(String[] args) { SpringApplication.run(CloudProviderApplication.class, args); } }Ⅲ、编写配置
spring: cloud: stream: bindings: output: destination: stream-test-topic content-type: application/json rocketmq: binder: # 配置name-server服务器的地址 name-server: localhost:9876Ⅳ、编写代码
@GetMapping("/mq/{msg}") public String testMQ(@PathVariable("msg") String msg){ source.output().send( MessageBuilder.withPayload(msg).build() ); return "发送成功,消息内容为: " + msg; }2、构建消息消费者 Ⅰ、添加依赖
Ⅱ、添加注解com.alibaba.cloud spring-cloud-starter-stream-rocketmq
@SpringBootApplication @EnableFeignClients @EnableBinding(Sink.class) public class CloudConsumerApplication { public static void main(String[] args) { SpringApplication.run(CloudConsumerApplication.class, args); } }Ⅲ、编写配置
spring: cloud: stream: bindings: input: destination: stream-test-topic content-type: application/json group: one rocketmq: binder: # 配置name-server服务器的地址 name-server: 127.0.0.1:9876Ⅳ、编写代码
@Service public class StreamConsumer { @StreamListener(Sink.INPUT) public void receive(String message){ System.out.println(message); } }
可以看到和使用Kafka大同小异,差异本身是因为俩者实现上的区别,RocketMQ使用NameServer而Kafka使用Zookeeper。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)