一、概念二、编码API以及常用注解三、消息的生产方
3.1 依赖3.2 配置文件3.3 业务类3.4 结果 四、消息的消费方
4.1 依赖4.2 配置文件4.3 业务类 五、消息分组
5.1 情景5.2 消息分组
80028003
一、概念- 统一的消息模型,屏蔽差异性,方便去处理多种消息中间件示例图:
3.2 配置文件org.springframework.cloud spring-cloud-starter-stream-rabbitorg.springframework.cloud spring-cloud-starter-netflix-eureka-client
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称 output: # 使用的Exchange名称 destination: studyExchange # 设置消息类型,本次为Json,文本使用 text/plain content-type: application/json # 设置要绑定的消息服务的具体设置 把binder 改为default-binder就可以了 binder: defaultRabbit eureka: client: service-url: # 设置与eureka sever交互的地址与查询服务、注册服务 defaultZone: http://localhost:7001/eureka instance: # 修改实例id instance-id: send-8001.com # 访问路径可以显示IP地址 prefer-ip-address: true # Eureka客户端向服务端发送心跳的间隔时间,单位为s(默认30s),这里设置为1s lease-renewal-interval-in-seconds: 1 # Eureka服务端在收到最后一次心跳后等待时间上限,单位为s(默认90s),超时将剔除服务 lease-expiration-duration-in-seconds: 23.3 业务类
@EnableBinding(Source.class) public class IMessageProviderImpl implements IMessageProvider { @Resource(name = "output") private MessageChannel messageChannel; @Override public String sendMessage() { // 因为构建了message对象 把它传给管道Source,然后再给MQ的,不是用返回值 String serial = UUID.randomUUID().toString(); messageChannel.send(MessageBuilder.withPayload(serial).build()); System.out.println("********serial:" + serial); return null; } }3.4 结果 四、消息的消费方 4.1 依赖
4.2 配置文件org.springframework.cloud spring-cloud-starter-stream-rabbitorg.springframework.cloud spring-cloud-starter-netflix-eureka-client
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称 input: # 使用的Exchange名称 destination: studyExchange # 设置消息类型,本次为Json,文本使用 text/plain content-type: application/json # 设置要绑定的消息服务的具体设置 把binder 改为default-binder就可以了 binder: defaultRabbit # 消息分组:防止重复消费 group: GroupA eureka: client: service-url: # 设置与eureka sever交互的地址与查询服务、注册服务 defaultZone: http://localhost:7001/eureka instance: # 修改实例id instance-id: consumer-8002.com4.3 业务类
@Component @EnableBinding(Sink.class) public class ReceiverMessageController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message五、消息分组 5.1 情景message) { System.out.println("消费者1号,-------------->收到的消息是:" + message.getPayload() + "t port:" + serverPort); } }
就会造成如下结果:
- 不同组:重复消费相同组:竞争消息
# 消息分组:防止重复消费 group: GroupA
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称 input: # 使用的Exchange名称 destination: studyExchange # 设置消息类型,本次为Json,文本使用 text/plain content-type: application/json # 设置要绑定的消息服务的具体设置 把binder 改为default-binder就可以了 binder: defaultRabbit # 消息分组:防止重复消费 group: GroupA eureka: client: service-url: # 设置与eureka sever交互的地址与查询服务、注册服务 defaultZone: http://localhost:7001/eureka instance: # 修改实例id instance-id: consumer-8002.com8003
# 消息分组:防止重复消费 group: GroupA
server: port: 8803 spring: application: name: cloud-stream-consumer cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称 input: # 使用的Exchange名称 destination: studyExchange # 设置消息类型,本次为Json,文本使用 text/plain content-type: application/json # 设置要绑定的消息服务的具体设置 把binder 改为default-binder就可以了 binder: defaultRabbit # 消息分组:防止重复消费 group: GroupA eureka: client: service-url: # 设置与eureka sever交互的地址与查询服务、注册服务 defaultZone: http://localhost:7001/eureka instance: # 修改实例id instance-id: consumer-8003.com
生产者发送3次请求,8002 以及 8003 共同去共同竞争
生产者发送了两条消息给A组
此刻,消费者1和消费者2宕机了
过会,A组的消费者1和消费2重启;这两条消息会被A组成员竞争.
chapter21:Bus消息总线实现动态刷新chapter23:Spring Cloud Sleuth 和 Zipkin 的简单使用
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)