【spring cloud】stream集成rabbitMQ,实现Topic订阅发布

【spring cloud】stream集成rabbitMQ,实现Topic订阅发布,第1张

【spring cloud】stream集成rabbitMQ,实现Topic订阅发布

stream主要负责与消息相关的功能,如与消息中间件rabbitMQ的集成等。这里先示例一个广播功能。生产者 发布消息后,所有的消费者都可以消费到消息。

1,引入依赖
    
        
        
            org.springframework.boot
            spring-boot-starter-web
        

        
        
            org.springframework.boot
            spring-boot-starter-actuator
        

        
        
            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        
    
2,创建Topic
public interface MyTopic {

    String INPUT= "myTopic-consumer";

    String OUTPUT ="myTopic-producer";

    //input是消费者那一方(接收消息的)
    //output是生产者那一方(推送消息的)
    //SubscribaleChannel意味可被订阅的channel
    @Input(INPUT)
    SubscribableChannel input();

    //两个topic被注入到channel的时候,还会被作为一个bean注入进来,如果input和output名字重复了,就会报bean重复异常。
    //因此input和output需要不同的名字,为此还需要在配置文件中将两个名字绑定
    @Output(OUTPUT)
    MessageChannel output();
}
3,配置文件

在配置文件中将两个channel绑定到一个exchange(名为braodcast)上。这样,这个项目就可以同时作为生产者和消费者。rabbitmq会自动为我们创建需要的queue,名字格式为exchangeName.queueName,如

配置如下:

#绑定channel到broadcast
spring.cloud.stream.bindings.myTopic-consumer.destination=broadcast
spring.cloud.stream.bindings.myTopic-producer.destination=broadcast

配置到此为止,下面可以进行测试了。
生产者:

@RestController
@Slf4j
public class Controller {

    @Autowired
    private MyTopic producer;
    
    @PostMapping("send")
    public void sendMessage(@RequestParam String body){
        producer.output().send(MessageBuilder.withPayload(body).build());
    }
   }

消费者:

@Slf4j
@EnableBinding(value = {
        MyTopic.class
}
)
public class StreamConsumer {

    @StreamListener(MyTopic.INPUT)
    public void consumeMyMessage(Object payLoad) {
        log.info("My message consumed successfully, payLoad={}", payLoad);
    }
}

如此,可以在调用后,在mq和日志中发现消息消费的痕迹。这种exchange为topic类型,由于没有routing key,会将消息广播出去。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5671552.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存