样例的消息队列采用kafka。相关软件(kafka软件和zookeeper软件)的安装见初探一(安装篇)。
注意:新的版本,kafka不采用sink和source(注解被弃用),而是改用supplier和consumer方式。
演示架构
左侧分支是publisher启动即发布一个主题关联的消息到stream-kafka,然后subscriber(服务本身)从队列获取该主题的消息并打印出来;
右侧分支是访问http://localhost:7888/request,则发布另外一个主题相关的消息到stream-kafka,然后subscriber A(另一个服务)从队列获取该主题的消息并打印出来。
一、Publisher
pom.xml
引入spring-cloud-stream和spring-cloud-stream-binder-kafka
4.0.0 org.springframework.boot spring-boot-starter-parent2.6.1 com.abcd MessageStreamPublisher0.0.1-SNAPSHOT MessageStreamPublisher MessageStreamPublisher 1.8 2021.0.0 org.springframework.cloud spring-cloud-starter-netflix-eureka-clientorg.springframework.boot spring-boot-starter-weborg.springframework.cloud spring-cloud-streamorg.springframework.cloud spring-cloud-stream-binder-kafkaorg.springframework.boot spring-boot-starter-testtest org.springframework.cloud spring-cloud-streamtest test-binder test-jar org.springframework.cloud spring-cloud-dependencies${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin
application.yml
server: port: 7888 eureka: client: service-url: defaultZone: http://localhost:8761/eureka/ spring: application: name: msg_producer cloud: stream: function: definition: testSupplier;testConsumer;publishMsg bindings: testSupplier-out-0: destination: topic-hello content-type: text/plain testConsumer-in-0: destination: topic-hello publishMsg-out-0: destination: topic-user-request content-type: text/plain kafka: binder: brokers: localhost:9092 #zk-nodes: localhost:2181 auto-create-topics: true required-acks: 1
java
application实现supplier和consumer同体,即Publisher和Subscriber的URL和端口一样。
特别注意:方法返回采用Supplier和Consumer(java.util.function.Supplier/Consumer)
Application代码 @EnableEurekaClient @SpringBootApplication public class MessageStreamPublisherApplication { private static final Logger log = LoggerFactory.getLogger(MessageStreamPublisherApplication.class); public static void main(String[] args) { SpringApplication.run(MessageStreamPublisherApplication.class, args); } @Bean public SuppliertestSupplier(){ return () -> { String message = "www.test.com"; log.info("Sending value: " + message); return message; }; } @Bean public Consumer testConsumer(){ return message -> { log.info("Received message: " + message); }; } }
以下service和controller实现supplier和consumer不同体(不同微服务实现)
Publisher Controller代码 @RestController public class PublishController { @Autowired public PublishService publishService; @GetMapping("request") public void log(){ publishService.log("userRequest"); } }
Publisher service代码 @Service public class PublishService { private BlockingQueuemRequestList = new linkedBlockingDeque<>(); public void log(String userRequest){ mRequestList.offer(userRequest); } @Bean public Supplier publishMsg(){ return () -> { String request = mRequestList.poll(); System.out.println("publish msg: " + request); return request; }; } }
二、Subscriber A
pom.xml
引入spring-cloud-stream和spring-cloud-stream-binder-kafka
4.0.0 org.springframework.boot spring-boot-starter-parent2.6.1 com.abcd MessageStreamSubscriberA0.0.1-SNAPSHOT MessageStreamSubscriberA MessageStreamSubscriberA 1.8 2021.0.0 org.springframework.cloud spring-cloud-starter-netflix-eureka-clientorg.springframework.boot spring-boot-starter-weborg.springframework.cloud spring-cloud-streamorg.springframework.cloud spring-cloud-stream-binder-kafkaorg.springframework.boot spring-boot-starter-testtest org.springframework.cloud spring-cloud-streamtest test-binder test-jar org.springframework.cloud spring-cloud-dependencies${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin
application.yml
server: port: 7889 eureka: client: service-url: defaultZone: http://localhost:8761/eureka/ spring: application: name: msg_consumer_a cloud: stream: kafka: binder: brokers: localhost:9092 #zk-nodes: localhost:2181 auto-create-topics: true bindings: receiveMsg-in-0: destination: topic-user-request function: definition: receiveMsg
java
Application代码 @SpringBootApplication public class MessageStreamSubscriberAApplication { public static void main(String[] args) { SpringApplication.run(MessageStreamSubscriberAApplication.class, args); } }
Service代码 @Service public class SubscriberAService { @Bean public ConsumerreceiveMsg(){ return request -> { System.out.println("SubscriberA receive msg: " + request); }; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)