【SpringCloud学习笔记】SpringCloudStream

【SpringCloud学习笔记】SpringCloudStream,第1张

【SpringCloud学习笔记】SpringCloudStream SpringCloudStream笔记

文章目录
  • SpringCloudStream笔记
    • 一、SpringCloudStream简介
    • 二、SpringCloudStream架构
      • 1、发射器
      • 2、通道
      • 3、绑定器
      • 4、接收器
    • 三、SpringCloudStream相关注解
    • 四、SpringCloudStream和Kafka
      • 1、构建消息生产者
        • Ⅰ、引入依赖
        • Ⅱ、添加注解
        • Ⅲ、编写配置
        • Ⅳ、编写代码
      • 2、构建消息消费者
        • Ⅰ、引入依赖
        • Ⅱ、添加注解
        • Ⅲ、编写配置
        • Ⅳ、编写代码
    • 五、SpringCloudStream和RocketMQ
      • 1、构建消息生产者
        • Ⅰ、添加依赖
        • Ⅱ、添加注解
        • Ⅲ、编写配置
        • Ⅳ、编写代码
      • 2、构建消息消费者
        • Ⅰ、添加依赖
        • Ⅱ、添加注解
        • Ⅲ、编写配置
        • Ⅳ、编写代码

一、SpringCloudStream简介

​ SpringCloud可以轻松的将消息传递集成到基于Spring的微服务中,他是通过SpringCloudStream子项目来实现这一点的。SpringCloudStream是一个由注解驱动的框架,它允许开发人员在Spring应用程序中轻松构建消息发布者和消息消费者。SpringCloudStream还允许开发人员抽象出正在使用的消息传递平台的实现细节。SpirngCloudStream可以使用多个消息平台(包括Apache Kafka项目和RabbitMQ),而平台的具体实现细节则被排除在应用程序代码之外。在应用程序中实现消息发布和消费是通过平台无关的Spring接口实现的。

​ 为什么有SpringCloudStream:

​ 市面上的消息中间件种类繁多,共同掌握学习成本高,SpringCloudStream编程模型统一了中间件适中的方式,降低了学习的成本(类似与JDBC一样)。

二、SpringCloudStream架构

架构图如下:

​ 随着SpringCloud中消息的发布和消费,有4个组件涉及发布消息和消费消息,它们是:

  • 发射器(source)
  • 通道(channel)
  • 绑定器(binder)
  • 接收器(sink)
1、发射器

​ 序列化对象,并将消息发布到通道

2、通道

​ 通道是对队列的一个抽象,它将在消息生产者发布消息或者消息消费者消费消息后保留该消息。通道的名称始终与目标队列名称相互关联。在代码中使用通道名称,队列名称不会直接公开给代码。

3、绑定器

​ 绑定器是与特定的消息平台对话的Spring代码。

4、接收器

​ 服务通过接收器从队列中接收消息。接收器监听传入消息的通道,并将消息反序列化为POJO。

三、SpringCloudStream相关注解 注解名称注解作用@Input注解标识输入通道 通过输入通道接收的消息进入应用程序@Output注解标识输出通道 发布的消息将通过该通道离开应用程序@StreamLIstener监听队列 用于消费者的队列的消息接收@EnableBindings指信道channel和topic绑定在一起 四、SpringCloudStream和Kafka 1、构建消息生产者 Ⅰ、引入依赖

    org.springframework.cloud
    spring-cloud-starter-stream-kafka

Ⅱ、添加注解
@SpringBootApplication
@EnableResourceServer
@EnableBinding(Source.class)
public class ResourceServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(ResourceServiceApplication.class, args);
    }
}

​ 该注解告诉应用程序,它将绑定到SpringCloudStream消息代理。Source.class告诉SpringCloudStream使用默认的通道和发射器与消息代理进行通信。

@EnableBinding(Source.class)
Ⅲ、编写配置
server:
  port: 8881
security:
  oauth2:
    resource:
      user-info-uri: http://localhost:9991/user
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: orgChangeTopic
          content-type: application/json
      kafka:
        binder:
          zkNodes: 127.0.0.1
          brokers: 127.0.0.1

​ SpringCloudStream的配置从spring.cloud.stream.bindings开始。

# 将Source.output()通道和指定的topic绑定 要将消息送到哪里
spring.cloud.stream.bindings.output.destination: orgChangeTopic
# 消息要转换为什么类型
spring.cloud.stream.bindings.output.content-type: application/json

# 该配置项指定了绑定器要绑定的消息队列的实际ip zkNodes是zookeeper的实际ip(kafka依赖zk)
spring.cloud.stream.kafka.binder.zkNodes: 127.0.0.1
# 该配置项指定的是kafka的实际ip
spring.cloud.stream.kafka.binder.brokers: 127.0.0.1
Ⅳ、编写代码
@Service
public class TestMessageService {
    @Autowired
	private Source source;

    public void sendMessage(Person person){
        source.output().send(
        	MessageBuilder.withPayload(person).build()
        );
        System.out.println("发送成功");
    }
}

​ 将默认的Source(发射器)注入到Service类中,source.output()方法开启通道,send()方法发送消息。send()方法接收一个Message类作为参数,可以使用MessageBuilder类来构建消息。

2、构建消息消费者 Ⅰ、引入依赖

    org.springframework.cloud
    spring-cloud-starter-stream-kafka

Ⅱ、添加注解
@SpringBootApplication
@EnableBinding(Sink.class)
public class ResourceServiceReceiveApplication {
    public static void main(String[] args) {
        SpringApplication.run(ResourceServiceReceiveApplication.class, args);
    }
}

​ 同理,告诉应用程序要使用SpringCloudStream消息代理,使用默认的通道和接收器。

@EnableBinding(Sink.class)
Ⅲ、编写配置
server:
  port: 9999
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: orgChangeTopic
          content-type: application/json
          group: one
      kafka:
        binder:
          zkNode: 127.0.0.1
          brokers: 127.0.0.1

​ 同理如上,SpringCloudStream的配置从spring.cloud.stream.bindings开始。

# 将Source.output()通道和指定的topic绑定 要将消息送到哪里
spring.cloud.stream.bindings.input.destination: orgChangeTopic
# 消息要转换为什么类型
spring.cloud.stream.bindings.input.content-type: application/json
# 消息消费者需要指定分组 同一个分组中的一个实例接收并处理消息 保证消息只被同一组的一个消费 避免重复消费
spring.cloud.stream.bindings.input.group: one

# 该配置项指定了绑定器要绑定的消息队列的实际ip zkNodes是zookeeper的实际ip(kafka依赖zk)
spring.cloud.stream.kafka.binder.zkNodes: 127.0.0.1
# 该配置项指定的是kafka的实际ip
spring.cloud.stream.kafka.binder.brokers: 127.0.0.1
Ⅳ、编写代码
@Service
public class ReceiveService {
    @Autowired
    private Sink sink;

    @StreamListener(Sink.INPUT)
    public void receive(Person person){
        System.out.println("接收到的消息为:" + person);
    }
}

​ 这里使用了注解@StreamListener(Sink.INPUT)来监听消息,其中指定了接收消息的通道是INPUT。

五、SpringCloudStream和RocketMQ 1、构建消息生产者 Ⅰ、添加依赖

    com.alibaba.cloud
    spring-cloud-starter-stream-rocketmq

Ⅱ、添加注解
@SpringBootApplication
@EnableBinding(Source.class)
public class CloudProviderApplication {
    public static void main(String[] args) {
        SpringApplication.run(CloudProviderApplication.class, args);
    }
}
Ⅲ、编写配置
spring:
  cloud:
	stream:
      bindings:
        output:
          destination: stream-test-topic
          content-type: application/json
      rocketmq:
        binder:
          # 配置name-server服务器的地址
          name-server: localhost:9876
Ⅳ、编写代码
@GetMapping("/mq/{msg}")
public String testMQ(@PathVariable("msg") String msg){
    source.output().send(
            MessageBuilder.withPayload(msg).build()
    );
    return "发送成功,消息内容为: " + msg;
}
2、构建消息消费者 Ⅰ、添加依赖

    com.alibaba.cloud
    spring-cloud-starter-stream-rocketmq

Ⅱ、添加注解
@SpringBootApplication
@EnableFeignClients
@EnableBinding(Sink.class)
public class CloudConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(CloudConsumerApplication.class, args);
    }
}
Ⅲ、编写配置
spring:
  cloud:
	stream:
      bindings:
        input:
          destination: stream-test-topic
          content-type: application/json
          group: one
      rocketmq:
        binder:
          # 配置name-server服务器的地址
          name-server: 127.0.0.1:9876
Ⅳ、编写代码
@Service
public class StreamConsumer {
    @StreamListener(Sink.INPUT)
    public void receive(String message){
        System.out.println(message);
    }
}

可以看到和使用Kafka大同小异,差异本身是因为俩者实现上的区别,RocketMQ使用NameServer而Kafka使用Zookeeper。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5654497.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存