- 下载与安装 Kafka
- 8.4.1 SpringCloud Sleuth 整合 Zipkin 实现分布式链路跟踪、收集
Maven 依赖
4.0.0 com.edcode.ecommerce edcode-study-scacommerce1.0-SNAPSHOT jar org.springframework.boot spring-boot-starter-parent2.3.1.RELEASE edcode-study-scacommerce Edcode SpringBoot org.springframework.boot spring-boot-starter-weborg.projectlombok lomboktrue org.springframework.boot spring-boot-starter-actuatororg.springframework.boot spring-boot-starter-testtest org.springframework.cloud spring-cloud-context2.2.6.RELEASE org.springframework.kafka spring-kafka2.5.0.RELEASE org.apache.commons commons-lang33.11 ${artifactId} org.springframework.boot spring-boot-maven-pluginrepackage
resources 配置文件 bootstrap.yml
spring: profiles: active: dev application: name: edcode-study-scacommerce # 暴露端点 management: endpoints: web: exposure: include: '*' endpoint: health: show-details: always
resources 配置文件 application-dev.yml
server: port: 8001 servlet: context-path: /edcode-study-scacommerce-dev spring: # SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers kafka: bootstrap-servers: 192.168.3.250:9092 # consumer: # 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成 # group-id: imooc-study-ecommerce # auto-offset-reset: latest # key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # producer: # key-serializer: org.apache.kafka.common.serialization.StringSerializer # value-serializer: org.apache.kafka.common.serialization.StringSerializer
通过代码自定义 Kafka 配置
package com.edcode.ecommerce.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactoryproducerFactory() { Map configs = new HashMap<>(16); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configs); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap<>(16); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); // 并发数就是一个消费者实例起几个线程 factory.setConcurrency(3); factory.setConsumerFactory(consumerFactory()); return factory; } }
13.2.1.1 kafka 生产者如果复杂配置使用代码更直观,如果简单配置使用 yaml 更方便
package com.edcode.ecommerce.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.TimeUnit; @Slf4j @Component @RequiredArgsConstructor public class KafkaProducer { private final KafkaTemplate13.3.1.1 kafka 消费者kafkaTemplate; public void sendMessage(String key, String value, String topic) { if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) { throw new IllegalArgumentException("value 或 topic为null或空"); } ListenableFuture > future = StringUtils.isBlank(key) ? kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value); // 异步回调的方式获取通知 future.addCallback( success -> { assert null != success && null != success.getRecordmetadata(); // 发送到 kafka 的 topic String _topic = success.getRecordmetadata().topic(); // 消息发送到的分区 int partition = success.getRecordmetadata().partition(); // 消息在分区内的 offset long offset = success.getRecordmetadata().offset(); log.info("发送 kafka 信息成功: [{}], [{}], [{}]", _topic, partition, offset); }, failure -> { log.error("发送 kafka 信息失败: [{}], [{}], [{}]", key, value, topic); } ); // 同步等待的方式获取通知 try { // SendResult sendResult = future.get(); SendResult sendResult = future.get(5, TimeUnit.SECONDS); // 发送到 kafka 的 topic String _topic = sendResult.getRecordmetadata().topic(); // 消息发送到的分区 int partition = sendResult.getRecordmetadata().partition(); // 消息在分区内的 offset long offset = sendResult.getRecordmetadata().offset(); log.info("发送 kafka 信息成功: [{}], [{}], [{}]", _topic, partition, offset); } catch (Exception ex) { log.error("发送 kafka 信息失败: [{}], [{}], [{}]", key, value, topic); } } }
通过 Kafka 传递的消息对象VO
@Data @NoArgsConstructor @AllArgsConstructor public class MessageVo { private Integer id; private String projectName; }
Kafka 消费者
package com.edcode.ecommerce.kafka; import com.edcode.ecommerce.vo.MessageVo; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.RequiredArgsConstructor; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; @Slf4j @Component @RequiredArgsConstructor public class KafkaConsumer { private final ObjectMapper mapper; @KafkaListener(topics = { "edcode-springboot" }, groupId = "edcode-springboot-kafka") public void listener01(ConsumerRecordconsumerRecord) throws JsonProcessingException { String key = consumerRecord.key(); String value = consumerRecord.value(); MessageVo kafkaMessage = mapper.readValue(value, MessageVo.class); log.info("监听器01 消费 kafka 信息: [{}], [{}]", key, mapper.writevalueAsString(kafkaMessage)); } @KafkaListener(topics = { "edcode-springboot" }, groupId = "edcode-springboot-kafka-1") public void listener02(ConsumerRecord, ?> consumerRecord) throws JsonProcessingException { String key = (String) consumerRecord.key(); Optional> kafkaMessage = Optional.ofNullable(consumerRecord.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); MessageVo messageVo = mapper.readValue(message.toString(), MessageVo.class); log.info("监听器02 消费 kafka 信息: [{}], [{}]", key, mapper.writevalueAsString(messageVo)); } } }
SpringBoot 集成 kafka 发送消息
package com.edcode.ecommerce.controller; import com.edcode.ecommerce.kafka.KafkaProducer; import com.edcode.ecommerce.vo.MessageVo; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController @RequestMapping("/kafka") @RequiredArgsConstructor public class KafkaController { private final ObjectMapper mapper; private final KafkaProducer kafkaProducer; @GetMapping("/send-message") public void sendMessage(@RequestParam(required = false) String key, @RequestParam String topic) throws Exception { MessageVo message = new MessageVo( 1, "EdCode-Study-ScaCommerce" ); kafkaProducer.sendMessage(key, mapper.writevalueAsString(message), topic); } }
kafka-controller.http
### kafka-send-message GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/kafka/send-message?key=edcode&topic=edcode-springboot Content-Type: application/json ### kafka-send-message GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/kafka/send-message?topic=edcode-springboot Content-Type: application/json ###
日志打印
2021-12-14 11:52:46.908 INFO 20556 --- [nio-8001-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0 2021-12-14 11:52:46.908 INFO 20556 --- [nio-8001-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84 2021-12-14 11:52:46.908 INFO 20556 --- [nio-8001-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1639453966907 2021-12-14 11:52:46.934 INFO 20556 --- [ad | producer-1] org.apache.kafka.clients.metadata : [Producer clientId=producer-1] Cluster ID: gliuLj3bTNSg2ht1Cn32Dg 2021-12-14 11:52:46.973 INFO 20556 --- [ad | producer-1] c.edcode.ecommerce.kafka.KafkaProducer : 发送 kafka 信息成功: [edcode-springboot], [0], [1] 2021-12-14 11:52:46.973 INFO 20556 --- [nio-8001-exec-2] c.edcode.ecommerce.kafka.KafkaProducer : 发送 kafka 信息成功: [edcode-springboot], [0], [1] 2021-12-14 11:52:47.005 INFO 20556 --- [ntainer#0-0-C-1] c.edcode.ecommerce.kafka.KafkaConsumer : 监听器02 消费 kafka 信息: [edcode], [{"id":1,"projectName":"EdCode-Study-ScaCommerce"}] 2021-12-14 11:52:47.005 INFO 20556 --- [ntainer#1-0-C-1] c.edcode.ecommerce.kafka.KafkaConsumer : 监听器01 消费 kafka 信息: [edcode], [{"id":1,"projectName":"EdCode-Study-ScaCommerce"}]13.4.1 SpringBoot集成RocketMQ构建消息驱动微服务
- 下载与安装RocketMQ
- 下载 RocketMQ:https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-source-release.zip
- RocketMQ 快速开始:https://rocketmq.apache.org/docs/quick-start/
- 下载以 bin-release 结尾的 zip 包解压即可完成安装
构建二进制
unzip rocketmq-all-4.9.2-source-release.zip cd rocketmq-all-4.9.2/ mvn -Prelease-all -DskipTests clean install -U # cd distribution/target/rocketmq-4.9.2/rocketmq-4.9.2 ln -s /opt/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2 /opt/rocketmq-4.9.2 cd rocketmq-4.9.2/
启动 NameServer
nohup sh bin/mqnamesrv >/dev/null 2>&1 &
runbroker.sh 和 runserver.sh 可以 jvm 参数,默认内存比较大,个人测试可以适当调正
启动 Broker
nohup sh bin/mqbroker -n 192.168.3.250:9876 >/dev/null 2>&1 &
测试发送
export NAMESRV_ADDR=192.168.3.250:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭
[root@zk1 rocketmq-4.9.2]# sh bin/mqshutdown broker The mqbroker(10613) is running... Send shutdown request to mqbroker(10613) OK [root@zk1 rocketmq-4.9.2]# sh bin/mqshutdown namesrv The mqnamesrv(10271) is running... Send shutdown request to mqnamesrv(10271) OK13.4.1.2 通过 RocketMQ 生产者
package com.edcode.ecommerce.rocketmq; import com.alibaba.fastjson.JSON; import com.edcode.ecommerce.vo.MessageVo; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Slf4j @Component @RequiredArgsConstructor public class RocketMQProducer { private static final String TOPIC = "edcode-study-rocketmq"; private final RocketMQTemplate rocketMQTemplate; public void sendMessageWithValue(String value) { // 随机选择一个 Topic 的 Message Queue 发送消息 SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value); log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult)); SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly(TOPIC, value, "Eddie"); log.info("sendMessageWithValue orderly result: [{}]", JSON.toJSONString(sendResultOrderly)); } public void sendMessageWithKey(String key, String value) { Message13.4.1.3 RocketMQ 消费者方式汇总message = MessageBuilder.withPayload(value).setHeader(RocketMQHeaders.KEYS, key).build(); // 异步发送消息, 并设定回调 rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("sendMessageWithKey success result: [{}]", JSON.toJSONString(sendResult)); } @Override public void onException(Throwable e) { log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e); } }); } public void sendMessageWithTag(String tag, String value) { MessageVo qinyiMessage = JSON.parseObject(value, MessageVo.class); SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", TOPIC, tag), qinyiMessage); log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult)); } public void sendMessageWithAll(String key, String tag, String value) { Message message = MessageBuilder.withPayload(value).setHeader(RocketMQHeaders.KEYS, key).build(); SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", TOPIC, tag), message); log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult)); } }
使用同步的方式发送消息, 不指定 key 和 tag
@Slf4j @Component @RocketMQMessageListener( topic = "edcode-study-rocketmq", consumerGroup = "edcode-springboot-rocketmq-string" ) public class RocketMQConsumerString implements RocketMQListener{ @Override public void onMessage(String message) { MessageVo rocketMessage = JSON.parseObject(message, MessageVo.class); log.info("consume message in RocketMQConsumerString: [{}]", JSON.toJSONString(rocketMessage)); } }
指定了消费带有 tag 的消息
@Slf4j @Component @RocketMQMessageListener( topic = "edcode-study-rocketmq", consumerGroup = "edcode-springboot-rocketmq-tag-string", selectorexpression = "edcode" // 根据 tag 过滤 ) public class RocketMQConsumerTagString implements RocketMQListener{ @Override public void onMessage(String message) { MessageVo rocketMessage = JSON.parseObject(message, MessageVo.class); log.info("consume message in RocketMQConsumerTagString: [{}]", JSON.toJSONString(rocketMessage)); } }
指定消费带有 tag 的消息, 且消费的是 Java Pojo
@Slf4j @Component @RocketMQMessageListener( topic = "edcode-study-rocketmq", consumerGroup = "edcode-springboot-rocketmq-tag-object", selectorexpression = "edcode" // 根据 tag 做过滤 ) public class RocketMQConsumerObject implements RocketMQListener{ @Override public void onMessage(MessageVo message) { log.info("consume message in RocketMQConsumerObject: [{}]", JSON.toJSONString(message)); // so something } }
扩展 MessageExt, 可以获取 Keys 之类字段
@Slf4j @Component @RocketMQMessageListener( topic = "edcode-study-rocketmq", consumerGroup = "edcode-springboot-rocketmq-message-ext" ) public class RocketMQConsumerMessageExt implements RocketMQListener13.6.1.1 RocketMQ Producer Api 发送消息{ @Override public void onMessage(MessageExt message) { String value = new String(message.getBody()); log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]", message.getKeys(), value); log.info("MessageExt: [{}]", JSON.toJSONString(message)); // 会慢一些 } }
@Slf4j @RestController @RequestMapping("/rocket-mq") @RequiredArgsConstructor public class RocketMQController { private static final MessageVo RocketMQMessage = new MessageVo(1, "Edcode-Study-RocketMQ-In-SpringBoot"); private final RocketMQProducer rocketMQProducer; @GetMapping("/message-with-value") public void sendMessageWithValue() { rocketMQProducer.sendMessageWithValue(JSON.toJSONString(RocketMQMessage)); } @GetMapping("/message-with-key") public void sendMessageWithKey() { rocketMQProducer.sendMessageWithKey("Edcode", JSON.toJSONString(RocketMQMessage)); } @GetMapping("/message-with-tag") public void sendMessageWithTag() { rocketMQProducer.sendMessageWithTag("edcode", JSON.toJSONString(RocketMQMessage)); } @GetMapping("/message-with-all") public void sendMessageWithAll() { rocketMQProducer.sendMessageWithAll("Edcode", "edcode", JSON.toJSONString(RocketMQMessage)); } }13.7.1.1 rocket-mq-controller.http
### message-with-value GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-value Content-Type: application/json ### message-with-key GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-key Content-Type: application/json ### message-with-tag GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-tag Content-Type: application/json ### message-with-all GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-all Content-Type: application/json ###13.8.1 SpringCloud Stream 消息驱动组件概览
- 为什么要有 SpringCloud Stream
- 如果没有 SpringCloud Stream ,那么我们怎么玩消息驱动?
- SpringCloud Stream 应用模型
- SpringCloud Stream 中的核心概念
- 负责与中间件交互的抽象绑定器:Binder
- 发送消息与接收消息的应用通信信道:Input、Output
- SpringCloud Stream 中的核心概念
- SpringCloud Stream 应用模型
- 经典的 SpringCloud Stream 发布-订阅模型
- Topic 可以认为就是 Kafka 中的 Topic 概念
- Producer 通过 Input 信道发布消息到 Topic 上
- Consumer 通过 Output 信道消费 Topic 上的消息
- 经典的 SpringCloud Stream 发布-订阅模型
Maven 依赖
sca-commerce com.edcode.commerce 1.0-SNAPSHOT 4.0.0 sca-commerce-stream-client1.0-SNAPSHOT jar sca-commerce-stream-client Stream Client com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discoverycom.edcode.commerce sca-commerce-mvc-config1.0-SNAPSHOT org.springframework.cloud spring-cloud-starter-zipkinorg.springframework.kafka spring-kafka2.5.0.RELEASE org.springframework.cloud spring-cloud-streamorg.springframework.cloud spring-cloud-stream-binder-kafka${artifactId} org.springframework.boot spring-boot-maven-pluginrepackage
bootstrap.yml
server: port: 8006 servlet: context-path: /scacommerce-stream-client spring: application: name: sca-commerce-stream-service cloud: nacos: # 服务注册发现 discovery: enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可 #server-addr: ${NACOS_ADDR:127.0.0.1}:8848 server-addr: ${NACOS_ADDR_1:127.0.0.1}:8848,${NACOS_ADDR_2:127.0.0.1}:8858,${NACOS_ADDR_3:127.0.0.1}:8868 # Nacos 服务器地址 namespace: ${NAMESPACE_ID:1adcfdd8-5763-4768-9a15-9c7157988950} metadata: management: context-path: ${server.servlet.context-path}/actuator stream: # SpringCloud Stream + RocketMQ # rocketmq: # binder: # name-server: ${ROCKETMQ_SERVER:127.0.0.1}:${ROCKETMQ_PORT:9876} kafka: binder: brokers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092} auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好, 生产环境建议 false bindings: # 默认发送方 output: # 这里用 Stream 给我们提供的默认 output 信道 destination: scacommerce-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要 # 默认接收方 (按道理是不同的项目工程 input 与 output) input: # 这里用 Stream 给我们提供的默认 input 信道 destination: scacommerce-stream-client-default # Edcode 发送方 edcodeOutput: destination: scacommerce-stream-client-edcode content-type: text/plain # Edcode 接收方 edcodeInput: destination: scacommerce-stream-client-edcode kafka: bootstrap-servers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092} producer: retries: 3 consumer: auto-offset-reset: latest zipkin: sender: type: ${ZIPKIN_KAFKA_SENDER:web} # 默认是 web base-url: http://${ZIPKIN_URL:localhost}:${ZIPKIN_PORT:9411}/ sleuth: sampler: # RateLimitingSampler 抽样策略,设置了限速采样,spring.sleuth.sampler.probability 属性值无效 rate: 100 # 每秒间隔接受的 trace 量 # Probability 抽样策略 probability: 1.0 # 采样比例,1.0 表示 100%, 默认:0.1 redis: database: 0 host: ${REDIS_HOST:localhost} port: ${REDIS_PORT:6379} timeout: 5000 # 暴露端点 management: endpoints: web: exposure: include: '*' endpoint: health: show-details: always
创建 SpringBoot 启动类
@EnableDiscoveryClient @SpringBootApplication public class StreamClientApplication { public static void main(String[] args) { SpringApplication.run(StreamClientApplication.class, args); } }
消息传递对象: SpringCloud Stream + Kafka/RocketMQ
@Data @NoArgsConstructor @AllArgsConstructor public class MessageVo { private Integer id; private String projectName; private String org; private String author; private String version; public static MessageVo defaultMessage() { return new MessageVo( 1, "sca-commerce-stream-client", "blog.eddilee.cn", "Eddie", "1.0" ); } }
使用默认的通信信道实现消息的发送
package com.edcode.commerce.stream; import com.alibaba.fastjson.JSON; import com.edcode.commerce.vo.MessageVo; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; @Slf4j @EnableBinding(Source.class) @RequiredArgsConstructor public class DefaultSendService { private final Source source; public void sendMessage(MessageVo message) { String jsonString = JSON.toJSONString(message); log.info("在 DefaultSendService 中发送消息: [{}]", jsonString); // Spring Messaging, 统一消息的编程模型, 是 Stream 组件的重要组成部分之一 source.output().send(MessageBuilder.withPayload(jsonString).build()); } }
使用默认的信道实现消息的接收
package com.edcode.commerce.stream; import com.alibaba.fastjson.JSON; import com.edcode.commerce.vo.MessageVo; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; @Slf4j @EnableBinding(Sink.class) public class DefaultReceiveService { @StreamListener(Sink.INPUT) public void receiveMessage(Object payload) { log.info("在 DefaultReceiveService 消费消息中启动 "); MessageVo edcodeMessage = JSON.parseObject(payload.toString(), MessageVo.class); // 消费消息 log.info("在 DefaultReceiveService 中使用消息成功: [{}]", JSON.toJSONString(edcodeMessage)); } }13.10.1 自定义Stream消息通信信道实现定制分发 13.10.1.1 构建消息驱动
@Slf4j @RestController @RequestMapping("/message") @RequiredArgsConstructor public class MessageController { private final DefaultSendService defaultSendService; private final EdcodeSendService edcodeSendService; @GetMapping("/default") public void defaultSend() { defaultSendService.sendMessage(MessageVo.defaultMessage()); } @GetMapping("/edcode") public void qinyiSend() { edcodeSendService.sendMessage(MessageVo.defaultMessage()); } }13.10.1.2 message.http
### 发送默认信道消息 GET http://127.0.0.1:8006/scacommerce-stream-client/message/default Content-Type: application/json ### 发送自定义信道消息 GET http://127.0.0.1:8006/scacommerce-stream-client/message/edcode Content-Type: application/json ###13.11.1 SpringCloud Stream消息分组和消费分区的配置与说明 13.11.1.1 SpringCloud Stream消息分组
- SpringCloud Stream消息分组模型
- 应用的不同实例放在一个消费者组中,每一条消息只会被一个实例消费
- 消费者组的思想是通过多实例扩展服务吞吐量,且不会造成消息的重覆消费
- SpringCloud Stream消息分区
- 消费分区的作用就是为了确保距又共同特性标识的数据由同一个消费者实例进行处理
本章完整配置文件 bootstrap.yml
server: port: 8006 servlet: context-path: /scacommerce-stream-client spring: application: name: sca-commerce-stream-service cloud: nacos: # 服务注册发现 discovery: enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可 #server-addr: ${NACOS_ADDR:127.0.0.1}:8848 server-addr: ${NACOS_ADDR_1:127.0.0.1}:8848,${NACOS_ADDR_2:127.0.0.1}:8858,${NACOS_ADDR_3:127.0.0.1}:8868 # Nacos 服务器地址 namespace: ${NAMESPACE_ID:1adcfdd8-5763-4768-9a15-9c7157988950} metadata: management: context-path: ${server.servlet.context-path}/actuator stream: # SpringCloud Stream + RocketMQ # rocketmq: # binder: # name-server: ${ROCKETMQ_SERVER:127.0.0.1}:${ROCKETMQ_PORT:9876} # 开启 stream 分区支持 instanceCount: 1 # 消费者的总数 instanceIndex: 0 # 当前消费者的索引 kafka: binder: brokers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092} auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好, 生产环境建议 false bindings: # 默认发送方 output: # 这里用 Stream 给我们提供的默认 output 信道 destination: scacommerce-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要 # 消息分区 producer: # partitionKeyexpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性 partitionCount: 1 # 分区大小 # 使用自定义的分区策略, 注释掉 partitionKeyexpression partitionKeyExtractorName: edcodePartitionKeyExtractorStrategy # com.edcode.commerce.partition.EdcodePartitionKeyExtractorStrategy partitionSelectorName: edcodePartitionSelectorStrategy # com.edcode.commerce.partition.EdcodePartitionSelectorStrategy # 默认接收方 (按道理是不同的项目工程 input 与 output) input: # 这里用 Stream 给我们提供的默认 input 信道 destination: scacommerce-stream-client-default # 消息分组 group: sca-commerce-edcode-default # 消费者开启分区支持 consumer: partitioned: true # Edcode 发送方 edcodeOutput: destination: scacommerce-stream-client-edcode content-type: text/plain # Edcode 接收方 edcodeInput: destination: scacommerce-stream-client-edcode # 消息分组 group: sca-commerce-edcode-edcode kafka: bootstrap-servers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092} producer: retries: 3 consumer: auto-offset-reset: latest zipkin: sender: type: ${ZIPKIN_KAFKA_SENDER:web} # 默认是 web base-url: http://${ZIPKIN_URL:localhost}:${ZIPKIN_PORT:9411}/ sleuth: sampler: # RateLimitingSampler 抽样策略,设置了限速采样,spring.sleuth.sampler.probability 属性值无效 rate: 100 # 每秒间隔接受的 trace 量 # Probability 抽样策略 probability: 1.0 # 采样比例,1.0 表示 100%, 默认:0.1 redis: database: 0 host: ${REDIS_HOST:localhost} port: ${REDIS_PORT:6379} timeout: 5000 # 暴露端点 management: endpoints: web: exposure: include: '*' endpoint: health: show-details: always
分区部分
自定义从 Message 中提取 partition key 的策略
@Slf4j @Component public class EdcodePartitionKeyExtractorStrategy implements PartitionKeyExtractorStrategy { @Override public Object extractKey(Message> message) { MessageVo messageVo = JSON.parseObject(message.getPayload().toString(), MessageVo.class); // 自定义提取 key String key = messageVo.getProjectName(); log.info("SpringCloud Stream EdCode Partition Key: [{}]", key); return key; } }
决定 message 发送到哪个分区的策略
@Slf4j @Component public class EdcodePartitionSelectorStrategy implements PartitionSelectorStrategy { @Override public int selectPartition(Object key, int partitionCount) { int partition = key.toString().hashCode() % partitionCount; log.info("SpringCloud Stream EdCode Selector info: [{}], [{}], [{}]", key.toString(), partitionCount, partition); return partition; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)