Springcloud Stream

Springcloud Stream,第1张

Springcloud Stream

 

文章目录
  • 前言
  • 一、说明
  • 二、使用
    • 1.引入库
    • 2.解读
    • 3.配置文件
  • 总结



前言

屏蔽底层消息中间件的差异,降低切换成本,同一消息的变成模型

目前只支持Rabbitmq和kafka

中文文档


一、springcloud stream是什么

其实就类似于jdbc一样,统一了对消息处理的细节,让我们不在于注重某一个消息队列是怎样实现的,都有一样的 *** 作api


二、使用
1.引入库

       当前项目要依赖的坐标
       
            org.springframework.cloud
            spring-cloud-starter-stream-ribbon
        

它所依赖的版本库

父工程


        
            
            
                org.springframework.boot
                spring-boot-dependencies
                2.2.2.RELEASE
                pom
                import
            

            
            
                org.springframework.cloud
                spring-cloud-dependencies
                Hoxton.SR1
                pom
                import
            

            
            
                com.alibaba.cloud
                spring-cloud-alibaba-dependencies
                2.1.0.RELEASE
                pom
                import
            

              ........
        

2.解读
Binder: 是应用与消息中间剑的封装,目前实行了kafka和Rabbitmq的binder,
         通过Binder可以很方便的连接中间剑,可以动态的改变消息类型(对应kafka的topic,
         Rabbitmq的exchange),这些可以通过配置文件来实现。


@Input 注解标识输入通道,通过该输入通道接收的消息进入应用程序

@Output  注解标识输出通道。发布的消息将通过该通道离开应用程序

@StreamListener 监听队列,用于消费的队列的消息接收

@EnableBinding 指通道channel和exchange绑定在一起。


3.配置文件
server:
  port: 8004

spring:
  application:
    name: producer-8002
  profiles:
    active: dev
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit  #消息组件类型
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                username: guest
                password: guest
      bindings: #服务的整合处理
        output: #通道名称    消息发送需要配置
          destination: exchange #交换机名称
          content-type: application/json  #消息类型
          binder: defaultRabbit  #设置要绑定的消息服务的具体设置
        input:   # 消息接收需要配置
          destination: exchange #交换机名称
          content-type: application/json  #消息类型
          binder: defaultRabbit  #设置要绑定的消息服务的具体设置

配置文件中的 output和intput一个是发送和接收的配置,我们可以配置在一起,或者分开(因为有的服务是专门发送消息和接收消息的)

4.消息生产者

我们不需要去关注我们的Rabbitmq的细节

定义接口:

public interface ProducerService {

   String send();
}



接口实现类:

import com.sz.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;

@EnableBinding(Source.class)   //定义消息的推送管道
public class ProducerServiceImpl implements ProducerService{

    @Resource
    private MessageChannel output;  //消息发送管道

    @Override
    public String send() {
        boolean flag = output.send(MessageBuilder.withPayload("消息内容").build());
        System.out.println(flag);
        return "";
    }
}

//注意,导包的时候不要导错了

5.消息消费者
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;


@EnableBinding(Sink.class)
public class ConsumerListener {

    @StreamListener(Sink.INPUT)
    public void  input(Message message){
        System.out.println("消费消息:"+message.getPayload());
    }
}


//注意,导包的时候不要导错了

此时我们写一个接口去发送消息:

然后控制台就会有消费接收的消息(因为我们打印了)

6.问题 1.重复消费
就比如我们上面的消费者代码,我们在开一个服务,写上一样的代码去监听这个交换机,
那么两个监听的方法都可以获取到消息。


这个时候我们就需要使用分组来解决。




注意:在stream中处于同一个分组中的消费者是竞争关系,就能够保证每个消息只能被同一个组中的其中一个消费者消费。但是不同的组是可以消费的。




说明白点就是,处在同一个组的消费者只有一个能够消费到消息,
消费者分布在不同的组上,则每个组都有一个消费者可以消费到消息。




自定义分组:
      input:
          destination: exchange #交换机名称
          content-type: application/json  #消息类型
          binder: defaultRabbit  #设置要绑定的消息服务的具体设置
          group: groupA   #分组名称


就是在消费者的服务配置文件中添加group: groupA
多个消费者我们就使用相同的分组名称就可以了
2.持久化
其实使用了分组就已经实现了持久化。
但是要注意的一点是:一点要配置分组,

因为配置了分组后,就算宕机了,重新启动后也能够消费到之前的消息,
但是如果把group分组信息删除了,则重启后,是接收不到之前的消息的。

注意注意:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5576228.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存