stream主要负责与消息相关的功能,如与消息中间件rabbitMQ的集成等。这里先示例一个广播功能。生产者 发布消息后,所有的消费者都可以消费到消息。
1,引入依赖2,创建Topicorg.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-actuatororg.springframework.cloud spring-cloud-starter-stream-rabbit
public interface MyTopic { String INPUT= "myTopic-consumer"; String OUTPUT ="myTopic-producer"; //input是消费者那一方(接收消息的) //output是生产者那一方(推送消息的) //SubscribaleChannel意味可被订阅的channel @Input(INPUT) SubscribableChannel input(); //两个topic被注入到channel的时候,还会被作为一个bean注入进来,如果input和output名字重复了,就会报bean重复异常。 //因此input和output需要不同的名字,为此还需要在配置文件中将两个名字绑定 @Output(OUTPUT) MessageChannel output(); }3,配置文件
在配置文件中将两个channel绑定到一个exchange(名为braodcast)上。这样,这个项目就可以同时作为生产者和消费者。rabbitmq会自动为我们创建需要的queue,名字格式为exchangeName.queueName,如
配置如下:
#绑定channel到broadcast spring.cloud.stream.bindings.myTopic-consumer.destination=broadcast spring.cloud.stream.bindings.myTopic-producer.destination=broadcast
配置到此为止,下面可以进行测试了。
生产者:
@RestController @Slf4j public class Controller { @Autowired private MyTopic producer; @PostMapping("send") public void sendMessage(@RequestParam String body){ producer.output().send(MessageBuilder.withPayload(body).build()); } }
消费者:
@Slf4j @EnableBinding(value = { MyTopic.class } ) public class StreamConsumer { @StreamListener(MyTopic.INPUT) public void consumeMyMessage(Object payLoad) { log.info("My message consumed successfully, payLoad={}", payLoad); } }
如此,可以在调用后,在mq和日志中发现消息消费的痕迹。这种exchange为topic类型,由于没有routing key,会将消息广播出去。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)