文章目录
- 前言
- 一、说明
- 二、使用
- 1.引入库
- 2.解读
- 3.配置文件
- 总结
前言
屏蔽底层消息中间件的差异,降低切换成本,同一消息的变成模型
目前只支持Rabbitmq和kafka
中文文档
一、springcloud stream是什么
其实就类似于jdbc一样,统一了对消息处理的细节,让我们不在于注重某一个消息队列是怎样实现的,都有一样的 *** 作api
二、使用
1.引入库
:
当前项目要依赖的坐标org.springframework.cloud spring-cloud-starter-stream-ribbon
它所依赖的版本库
父工程org.springframework.boot spring-boot-dependencies2.2.2.RELEASE pom import org.springframework.cloud spring-cloud-dependenciesHoxton.SR1 pom import ........ com.alibaba.cloud spring-cloud-alibaba-dependencies2.1.0.RELEASE pom import
2.解读
Binder: 是应用与消息中间剑的封装,目前实行了kafka和Rabbitmq的binder, 通过Binder可以很方便的连接中间剑,可以动态的改变消息类型(对应kafka的topic, Rabbitmq的exchange),这些可以通过配置文件来实现。 @Input 注解标识输入通道,通过该输入通道接收的消息进入应用程序 @Output 注解标识输出通道。发布的消息将通过该通道离开应用程序 @StreamListener 监听队列,用于消费的队列的消息接收 @EnableBinding 指通道channel和exchange绑定在一起。3.配置文件
server: port: 8004 spring: application: name: producer-8002 profiles: active: dev cloud: stream: binders: defaultRabbit: type: rabbit #消息组件类型 environment: spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest bindings: #服务的整合处理 output: #通道名称 消息发送需要配置 destination: exchange #交换机名称 content-type: application/json #消息类型 binder: defaultRabbit #设置要绑定的消息服务的具体设置 input: # 消息接收需要配置 destination: exchange #交换机名称 content-type: application/json #消息类型 binder: defaultRabbit #设置要绑定的消息服务的具体设置
配置文件中的 output和intput一个是发送和接收的配置,我们可以配置在一起,或者分开(因为有的服务是专门发送消息和接收消息的)
4.消息生产者我们不需要去关注我们的Rabbitmq的细节
定义接口: public interface ProducerService { String send(); } 接口实现类: import com.sz.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import javax.annotation.Resource; @EnableBinding(Source.class) //定义消息的推送管道 public class ProducerServiceImpl implements ProducerService{ @Resource private MessageChannel output; //消息发送管道 @Override public String send() { boolean flag = output.send(MessageBuilder.withPayload("消息内容").build()); System.out.println(flag); return ""; } } //注意,导包的时候不要导错了5.消息消费者
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @EnableBinding(Sink.class) public class ConsumerListener { @StreamListener(Sink.INPUT) public void input(Messagemessage){ System.out.println("消费消息:"+message.getPayload()); } } //注意,导包的时候不要导错了
此时我们写一个接口去发送消息:
然后控制台就会有消费接收的消息(因为我们打印了)
6.问题 1.重复消费就比如我们上面的消费者代码,我们在开一个服务,写上一样的代码去监听这个交换机, 那么两个监听的方法都可以获取到消息。 这个时候我们就需要使用分组来解决。 注意:在stream中处于同一个分组中的消费者是竞争关系,就能够保证每个消息只能被同一个组中的其中一个消费者消费。但是不同的组是可以消费的。 说明白点就是,处在同一个组的消费者只有一个能够消费到消息, 消费者分布在不同的组上,则每个组都有一个消费者可以消费到消息。 自定义分组: input: destination: exchange #交换机名称 content-type: application/json #消息类型 binder: defaultRabbit #设置要绑定的消息服务的具体设置 group: groupA #分组名称 就是在消费者的服务配置文件中添加group: groupA 多个消费者我们就使用相同的分组名称就可以了2.持久化
其实使用了分组就已经实现了持久化。 但是要注意的一点是:一点要配置分组, 因为配置了分组后,就算宕机了,重新启动后也能够消费到之前的消息, 但是如果把group分组信息删除了,则重启后,是接收不到之前的消息的。 注意注意:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)