chapter22:Spring Cloud Stream的简单使用

chapter22:Spring Cloud Stream的简单使用,第1张

chapter22:Spring Cloud Stream的简单使用

目录

一、概念二、编码API以及常用注解三、消息的生产方

3.1 依赖3.2 配置文件3.3 业务类3.4 结果 四、消息的消费方

4.1 依赖4.2 配置文件4.3 业务类 五、消息分组

5.1 情景5.2 消息分组

80028003

一、概念
    统一的消息模型,屏蔽差异性,方便去处理多种消息中间件示例图:
二、编码API以及常用注解

三、消息的生产方 3.1 依赖
    
        
            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        
    
3.2 配置文件
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      # 配置要绑定的rabbitmq的服务信息
      binders:
        # 定义的名称,用户Binding整合
        defaultRabbit:
          # 消息组件类型
          type: rabbit
          # 设置rabbitmq相关的环境配置
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        # 信道的名称
        output:
          # 使用的Exchange名称
          destination: studyExchange
          # 设置消息类型,本次为Json,文本使用 text/plain
          content-type: application/json
          # 设置要绑定的消息服务的具体设置  把binder 改为default-binder就可以了
          binder: defaultRabbit

eureka:
  client:
    service-url:
      # 设置与eureka sever交互的地址与查询服务、注册服务
      defaultZone: http://localhost:7001/eureka
  instance:
    # 修改实例id
    instance-id: send-8001.com
    # 访问路径可以显示IP地址
    prefer-ip-address: true
    # Eureka客户端向服务端发送心跳的间隔时间,单位为s(默认30s),这里设置为1s
    lease-renewal-interval-in-seconds: 1
    # Eureka服务端在收到最后一次心跳后等待时间上限,单位为s(默认90s),超时将剔除服务
    lease-expiration-duration-in-seconds: 2
3.3 业务类
@EnableBinding(Source.class)
public class IMessageProviderImpl implements IMessageProvider {

    
    @Resource(name = "output")
    private MessageChannel messageChannel;

    @Override
    public String sendMessage() {
        // 因为构建了message对象  把它传给管道Source,然后再给MQ的,不是用返回值
        String serial = UUID.randomUUID().toString();
        messageChannel.send(MessageBuilder.withPayload(serial).build());
        System.out.println("********serial:" + serial);
        return null;
    }
}
3.4 结果

四、消息的消费方 4.1 依赖
        
            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        
4.2 配置文件
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      # 配置要绑定的rabbitmq的服务信息
      binders:
        # 定义的名称,用户Binding整合
        defaultRabbit:
          # 消息组件类型
          type: rabbit
          # 设置rabbitmq相关的环境配置
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        # 信道的名称
        input:
          # 使用的Exchange名称
          destination: studyExchange
          # 设置消息类型,本次为Json,文本使用 text/plain
          content-type: application/json
          # 设置要绑定的消息服务的具体设置  把binder 改为default-binder就可以了
          binder: defaultRabbit
          # 消息分组:防止重复消费
          group: GroupA

eureka:
  client:
    service-url:
      # 设置与eureka sever交互的地址与查询服务、注册服务
      defaultZone: http://localhost:7001/eureka
  instance:
    # 修改实例id
    instance-id: consumer-8002.com
4.3 业务类
@Component
@EnableBinding(Sink.class)
public class ReceiverMessageController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message message) {
        System.out.println("消费者1号,-------------->收到的消息是:" + message.getPayload() + "t port:" + serverPort);
    }
}

五、消息分组 5.1 情景


就会造成如下结果:

5.2 消息分组
    不同组:重复消费相同组:竞争消息
8002

# 消息分组:防止重复消费 group: GroupA

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      # 配置要绑定的rabbitmq的服务信息
      binders:
        # 定义的名称,用户Binding整合
        defaultRabbit:
          # 消息组件类型
          type: rabbit
          # 设置rabbitmq相关的环境配置
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        # 信道的名称
        input:
          # 使用的Exchange名称
          destination: studyExchange
          # 设置消息类型,本次为Json,文本使用 text/plain
          content-type: application/json
          # 设置要绑定的消息服务的具体设置  把binder 改为default-binder就可以了
          binder: defaultRabbit
          # 消息分组:防止重复消费
          group: GroupA

eureka:
  client:
    service-url:
      # 设置与eureka sever交互的地址与查询服务、注册服务
      defaultZone: http://localhost:7001/eureka
  instance:
    # 修改实例id
    instance-id: consumer-8002.com
8003

# 消息分组:防止重复消费 group: GroupA

server:
  port: 8803

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      # 配置要绑定的rabbitmq的服务信息
      binders:
        # 定义的名称,用户Binding整合
        defaultRabbit:
          # 消息组件类型
          type: rabbit
          # 设置rabbitmq相关的环境配置
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        # 信道的名称
        input:
          # 使用的Exchange名称
          destination: studyExchange
          # 设置消息类型,本次为Json,文本使用 text/plain
          content-type: application/json
          # 设置要绑定的消息服务的具体设置  把binder 改为default-binder就可以了
          binder: defaultRabbit
          # 消息分组:防止重复消费
          group: GroupA

eureka:
  client:
    service-url:
      # 设置与eureka sever交互的地址与查询服务、注册服务
      defaultZone: http://localhost:7001/eureka
  instance:
    # 修改实例id
    instance-id: consumer-8003.com

生产者发送3次请求,8002 以及 8003 共同去共同竞争

生产者发送了两条消息给A组
此刻,消费者1和消费者2宕机了
过会,A组的消费者1和消费2重启;这两条消息会被A组成员竞争.

chapter21:Bus消息总线实现动态刷新chapter23:Spring Cloud Sleuth 和 Zipkin 的简单使用

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5709666.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存