【Spring Cloud Alibaba 温故而知新】(十)基于 SpringCloud Stream 构建消息驱动微服务

【Spring Cloud Alibaba 温故而知新】(十)基于 SpringCloud Stream 构建消息驱动微服务,第1张

【Spring Cloud Alibaba 温故而知新】(十)基于 SpringCloud Stream 构建消息驱动微服务 目录 13.1.1 SpringBoot集成Kafka构建消息驱动微服务
  • 下载与安装 Kafka
    • 8.4.1 SpringCloud Sleuth 整合 Zipkin 实现分布式链路跟踪、收集
13.1.1.1 创建新工程 - edcode-study-scacommerce

Maven 依赖



    4.0.0

    com.edcode.ecommerce
    edcode-study-scacommerce
    1.0-SNAPSHOT
    jar


    
        org.springframework.boot
        spring-boot-starter-parent
        2.3.1.RELEASE
    

    edcode-study-scacommerce
    Edcode SpringBoot

    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.projectlombok
            lombok
            true
        
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
        
            org.springframework.cloud
            spring-cloud-context
            2.2.6.RELEASE
        
        
        
            org.springframework.kafka
            spring-kafka
            2.5.0.RELEASE
        
        
            org.apache.commons
            commons-lang3
            3.11
        
    

    
        ${artifactId}
        
            
                org.springframework.boot
                spring-boot-maven-plugin
                
                    
                        
                            repackage
                        
                    
                
            
        
    


resources 配置文件 bootstrap.yml

spring:
  profiles:
    active: dev
  application:
    name: edcode-study-scacommerce

# 暴露端点
management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

resources 配置文件 application-dev.yml

server:
  port: 8001
  servlet:
    context-path: /edcode-study-scacommerce-dev

spring:
  # SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
  kafka:
    bootstrap-servers: 192.168.3.250:9092
#    consumer:
      # 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
#      group-id: imooc-study-ecommerce
#      auto-offset-reset: latest
#      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#    producer:
#      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: org.apache.kafka.common.serialization.StringSerializer

通过代码自定义 Kafka 配置

package com.edcode.ecommerce.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class KafkaConfig {

	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;

	
	@Bean
	public ProducerFactory producerFactory() {
		Map configs = new HashMap<>(16);
		configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return new DefaultKafkaProducerFactory<>(configs);
	}

	
	@Bean
	public KafkaTemplate kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}

	
	@Bean
	public ConsumerFactory consumerFactory() {
		Map props = new HashMap<>(16);
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return new DefaultKafkaConsumerFactory<>(props);
	}

	
	@Bean
	public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
		// 并发数就是一个消费者实例起几个线程
		factory.setConcurrency(3);
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}
}

如果复杂配置使用代码更直观,如果简单配置使用 yaml 更方便

13.2.1.1 kafka 生产者
package com.edcode.ecommerce.kafka;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.TimeUnit;


@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {


    private final KafkaTemplate kafkaTemplate;

    
    public void sendMessage(String key, String value, String topic) {

        if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
            throw new IllegalArgumentException("value 或 topic为null或空");
        }

        ListenableFuture> future = StringUtils.isBlank(key) ?
                kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);

        // 异步回调的方式获取通知
        future.addCallback(
                success -> {
                    assert null != success && null != success.getRecordmetadata();
                    // 发送到 kafka 的 topic
                    String _topic = success.getRecordmetadata().topic();
                    // 消息发送到的分区
                    int partition = success.getRecordmetadata().partition();
                    // 消息在分区内的 offset
                    long offset = success.getRecordmetadata().offset();

                    log.info("发送 kafka 信息成功: [{}], [{}], [{}]",
                            _topic, partition, offset);
                }, failure -> {
                    log.error("发送 kafka 信息失败: [{}], [{}], [{}]",
                            key, value, topic);
                }
        );

        // 同步等待的方式获取通知
        try {
//            SendResult sendResult = future.get();
            SendResult sendResult = future.get(5, TimeUnit.SECONDS);

            // 发送到 kafka 的 topic
            String _topic = sendResult.getRecordmetadata().topic();
            // 消息发送到的分区
            int partition = sendResult.getRecordmetadata().partition();
            // 消息在分区内的 offset
            long offset = sendResult.getRecordmetadata().offset();

            log.info("发送 kafka 信息成功: [{}], [{}], [{}]",
                    _topic, partition, offset);
        } catch (Exception ex) {
            log.error("发送 kafka 信息失败: [{}], [{}], [{}]",
                    key, value, topic);
        }
    }

}
13.3.1.1 kafka 消费者

通过 Kafka 传递的消息对象VO

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageVo {

    private Integer id;

    private String projectName;

}

Kafka 消费者

package com.edcode.ecommerce.kafka;

import com.edcode.ecommerce.vo.MessageVo;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.RequiredArgsConstructor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;


@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {

	private final ObjectMapper mapper;

	
	@KafkaListener(topics = { "edcode-springboot" }, groupId = "edcode-springboot-kafka")
	public void listener01(ConsumerRecord consumerRecord) throws JsonProcessingException {

		String key = consumerRecord.key();
		String value = consumerRecord.value();

		MessageVo kafkaMessage = mapper.readValue(value, MessageVo.class);
		log.info("监听器01 消费 kafka 信息: [{}], [{}]", key, mapper.writevalueAsString(kafkaMessage));
	}

	
	@KafkaListener(topics = { "edcode-springboot" }, groupId = "edcode-springboot-kafka-1")
	public void listener02(ConsumerRecord consumerRecord) throws JsonProcessingException {

        String key = (String) consumerRecord.key();
		Optional kafkaMessage = Optional.ofNullable(consumerRecord.value());
		if (kafkaMessage.isPresent()) {
			Object message = kafkaMessage.get();
			MessageVo messageVo = mapper.readValue(message.toString(), MessageVo.class);
			log.info("监听器02 消费 kafka 信息: [{}], [{}]", key, mapper.writevalueAsString(messageVo));
		}
	}
}

SpringBoot 集成 kafka 发送消息

package com.edcode.ecommerce.controller;

import com.edcode.ecommerce.kafka.KafkaProducer;
import com.edcode.ecommerce.vo.MessageVo;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@Slf4j
@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaController {

    private final ObjectMapper mapper;

    private final KafkaProducer kafkaProducer;

    @GetMapping("/send-message")
    public void sendMessage(@RequestParam(required = false) String key,
                            @RequestParam String topic) throws Exception {

        MessageVo message = new MessageVo(
                1,
                "EdCode-Study-ScaCommerce"
        );
        kafkaProducer.sendMessage(key, mapper.writevalueAsString(message), topic);
    }

}

kafka-controller.http

### kafka-send-message
GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/kafka/send-message?key=edcode&topic=edcode-springboot
Content-Type: application/json

### kafka-send-message
GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/kafka/send-message?topic=edcode-springboot
Content-Type: application/json

###

日志打印

2021-12-14 11:52:46.908  INFO 20556 --- [nio-8001-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2021-12-14 11:52:46.908  INFO 20556 --- [nio-8001-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2021-12-14 11:52:46.908  INFO 20556 --- [nio-8001-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1639453966907
2021-12-14 11:52:46.934  INFO 20556 --- [ad | producer-1] org.apache.kafka.clients.metadata        : [Producer clientId=producer-1] Cluster ID: gliuLj3bTNSg2ht1Cn32Dg
2021-12-14 11:52:46.973  INFO 20556 --- [ad | producer-1] c.edcode.ecommerce.kafka.KafkaProducer   : 发送 kafka 信息成功: [edcode-springboot], [0], [1]
2021-12-14 11:52:46.973  INFO 20556 --- [nio-8001-exec-2] c.edcode.ecommerce.kafka.KafkaProducer   : 发送 kafka 信息成功: [edcode-springboot], [0], [1]
2021-12-14 11:52:47.005  INFO 20556 --- [ntainer#0-0-C-1] c.edcode.ecommerce.kafka.KafkaConsumer   : 监听器02 消费 kafka 信息: [edcode], [{"id":1,"projectName":"EdCode-Study-ScaCommerce"}]
2021-12-14 11:52:47.005  INFO 20556 --- [ntainer#1-0-C-1] c.edcode.ecommerce.kafka.KafkaConsumer   : 监听器01 消费 kafka 信息: [edcode], [{"id":1,"projectName":"EdCode-Study-ScaCommerce"}]
13.4.1 SpringBoot集成RocketMQ构建消息驱动微服务
  • 下载与安装RocketMQ
    • 下载 RocketMQ:https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-source-release.zip
    • RocketMQ 快速开始:https://rocketmq.apache.org/docs/quick-start/
    • 下载以 bin-release 结尾的 zip 包解压即可完成安装
13.4.1.1 RocketMQ 二进制部署与启动MQ

构建二进制

unzip rocketmq-all-4.9.2-source-release.zip
cd rocketmq-all-4.9.2/
mvn -Prelease-all -DskipTests clean install -U
# cd distribution/target/rocketmq-4.9.2/rocketmq-4.9.2
ln -s /opt/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2 /opt/rocketmq-4.9.2
cd rocketmq-4.9.2/

启动 NameServer

nohup sh bin/mqnamesrv >/dev/null 2>&1 &  

runbroker.sh 和 runserver.sh 可以 jvm 参数,默认内存比较大,个人测试可以适当调正

启动 Broker

nohup sh bin/mqbroker -n 192.168.3.250:9876 >/dev/null 2>&1 & 

测试发送

export NAMESRV_ADDR=192.168.3.250:9876 
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer    

关闭

[root@zk1 rocketmq-4.9.2]# sh bin/mqshutdown broker
The mqbroker(10613) is running...
Send shutdown request to mqbroker(10613) OK

[root@zk1 rocketmq-4.9.2]# sh bin/mqshutdown namesrv
The mqnamesrv(10271) is running...
Send shutdown request to mqnamesrv(10271) OK
13.4.1.2 通过 RocketMQ 生产者
package com.edcode.ecommerce.rocketmq;

import com.alibaba.fastjson.JSON;
import com.edcode.ecommerce.vo.MessageVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RequiredArgsConstructor
public class RocketMQProducer {

	
	private static final String TOPIC = "edcode-study-rocketmq";

	
	private final RocketMQTemplate rocketMQTemplate;

	
	public void sendMessageWithValue(String value) {
		// 随机选择一个 Topic 的 Message Queue 发送消息
		SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value);
		log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult));

		SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly(TOPIC, value, "Eddie");
		log.info("sendMessageWithValue orderly result: [{}]", JSON.toJSONString(sendResultOrderly));
	}

	
	public void sendMessageWithKey(String key, String value) {
		Message message = MessageBuilder.withPayload(value).setHeader(RocketMQHeaders.KEYS, key).build();
		// 异步发送消息, 并设定回调
		rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() {
			@Override
			public void onSuccess(SendResult sendResult) {
				log.info("sendMessageWithKey success result: [{}]", JSON.toJSONString(sendResult));
			}

			@Override
			public void onException(Throwable e) {
				log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e);
			}
		});
	}

    
	public void sendMessageWithTag(String tag, String value) {
		MessageVo qinyiMessage = JSON.parseObject(value, MessageVo.class);
		SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", TOPIC, tag), qinyiMessage);
		log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult));
	}

    
	public void sendMessageWithAll(String key, String tag, String value) {
		Message message = MessageBuilder.withPayload(value).setHeader(RocketMQHeaders.KEYS, key).build();
		SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", TOPIC, tag), message);
		log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult));
	}

}
13.4.1.3 RocketMQ 消费者方式汇总

使用同步的方式发送消息, 不指定 key 和 tag

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "edcode-study-rocketmq",
        consumerGroup = "edcode-springboot-rocketmq-string"
)
public class RocketMQConsumerString implements RocketMQListener {

    @Override
    public void onMessage(String message) {
        MessageVo rocketMessage = JSON.parseObject(message, MessageVo.class);
        log.info("consume message in RocketMQConsumerString: [{}]",
                JSON.toJSONString(rocketMessage));
    }

}

指定了消费带有 tag 的消息

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "edcode-study-rocketmq",
        consumerGroup = "edcode-springboot-rocketmq-tag-string",
        selectorexpression = "edcode"        // 根据 tag 过滤
)
public class RocketMQConsumerTagString implements RocketMQListener {

    @Override
    public void onMessage(String message) {
        MessageVo rocketMessage = JSON.parseObject(message, MessageVo.class);
        log.info("consume message in RocketMQConsumerTagString: [{}]",
                JSON.toJSONString(rocketMessage));
    }

}

指定消费带有 tag 的消息, 且消费的是 Java Pojo

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "edcode-study-rocketmq",
        consumerGroup = "edcode-springboot-rocketmq-tag-object",
        selectorexpression = "edcode"    // 根据 tag 做过滤
)
public class RocketMQConsumerObject implements RocketMQListener {

    @Override
    public void onMessage(MessageVo message) {
        log.info("consume message in RocketMQConsumerObject: [{}]",
                JSON.toJSONString(message));
        // so something
    }

}

扩展 MessageExt, 可以获取 Keys 之类字段

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "edcode-study-rocketmq",
        consumerGroup = "edcode-springboot-rocketmq-message-ext"
)
public class RocketMQConsumerMessageExt implements RocketMQListener {

    @Override
    public void onMessage(MessageExt message) {

        String value = new String(message.getBody());
        log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]",
                message.getKeys(), value);
        log.info("MessageExt: [{}]", JSON.toJSONString(message));   // 会慢一些
    }
}
13.6.1.1 RocketMQ Producer Api 发送消息
@Slf4j
@RestController
@RequestMapping("/rocket-mq")
@RequiredArgsConstructor
public class RocketMQController {

	private static final MessageVo RocketMQMessage = new MessageVo(1, "Edcode-Study-RocketMQ-In-SpringBoot");

	private final RocketMQProducer rocketMQProducer;

	@GetMapping("/message-with-value")
	public void sendMessageWithValue() {
		rocketMQProducer.sendMessageWithValue(JSON.toJSONString(RocketMQMessage));
	}

	@GetMapping("/message-with-key")
	public void sendMessageWithKey() {
		rocketMQProducer.sendMessageWithKey("Edcode", JSON.toJSONString(RocketMQMessage));
	}

	@GetMapping("/message-with-tag")
	public void sendMessageWithTag() {
		rocketMQProducer.sendMessageWithTag("edcode", JSON.toJSONString(RocketMQMessage));
	}

	@GetMapping("/message-with-all")
	public void sendMessageWithAll() {
		rocketMQProducer.sendMessageWithAll("Edcode", "edcode", JSON.toJSONString(RocketMQMessage));
	}
}
13.7.1.1 rocket-mq-controller.http
### message-with-value
GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-value
Content-Type: application/json

### message-with-key
GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-key
Content-Type: application/json

### message-with-tag
GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-tag
Content-Type: application/json

### message-with-all
GET http://127.0.0.1:8001/edcode-study-scacommerce-dev/rocket-mq/message-with-all
Content-Type: application/json

###
13.8.1 SpringCloud Stream 消息驱动组件概览
  • 为什么要有 SpringCloud Stream
    • 如果没有 SpringCloud Stream ,那么我们怎么玩消息驱动?




  • SpringCloud Stream 应用模型
    • SpringCloud Stream 中的核心概念
      • 负责与中间件交互的抽象绑定器:Binder
      • 发送消息与接收消息的应用通信信道:Input、Output




  • SpringCloud Stream 应用模型
    • 经典的 SpringCloud Stream 发布-订阅模型
      • Topic 可以认为就是 Kafka 中的 Topic 概念
      • Producer 通过 Input 信道发布消息到 Topic 上
      • Consumer 通过 Output 信道消费 Topic 上的消息

13.9.1 基于SpringCloud Stream消息驱动的简单应用 13.9.1.1 新增 sca-commerce-stream-client 子工程

Maven 依赖



    
        sca-commerce
        com.edcode.commerce
        1.0-SNAPSHOT
    
    4.0.0

    sca-commerce-stream-client
    1.0-SNAPSHOT
    jar

    
    sca-commerce-stream-client
    Stream Client

    
        
        
            com.alibaba.cloud
            spring-cloud-starter-alibaba-nacos-discovery
        
        
        
            com.edcode.commerce
            sca-commerce-mvc-config
            1.0-SNAPSHOT
        
        
        
            org.springframework.cloud
            spring-cloud-starter-zipkin
        
        
            org.springframework.kafka
            spring-kafka
            2.5.0.RELEASE
        
        
        
            org.springframework.cloud
            spring-cloud-stream
        
        
        
            org.springframework.cloud
            spring-cloud-stream-binder-kafka
        
        
        
        
        
        
    

    
    
        ${artifactId}
        
            
                org.springframework.boot
                spring-boot-maven-plugin
                
                    
                        
                            repackage
                        
                    
                
            
        
    


bootstrap.yml

server:
  port: 8006
  servlet:
    context-path: /scacommerce-stream-client

spring:
  application:
    name: sca-commerce-stream-service
  cloud:
    nacos:
      # 服务注册发现
      discovery:
        enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
        #server-addr: ${NACOS_ADDR:127.0.0.1}:8848
        server-addr: ${NACOS_ADDR_1:127.0.0.1}:8848,${NACOS_ADDR_2:127.0.0.1}:8858,${NACOS_ADDR_3:127.0.0.1}:8868 # Nacos 服务器地址
        namespace: ${NAMESPACE_ID:1adcfdd8-5763-4768-9a15-9c7157988950}
        metadata:
          management:
            context-path: ${server.servlet.context-path}/actuator
    stream:
      # SpringCloud Stream + RocketMQ
#      rocketmq:
#        binder:
#          name-server: ${ROCKETMQ_SERVER:127.0.0.1}:${ROCKETMQ_PORT:9876}
      kafka:
        binder:
          brokers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092}
          auto-create-topics: true  # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好, 生产环境建议 false
        bindings:
          # 默认发送方
          output:      # 这里用 Stream 给我们提供的默认 output 信道
            destination: scacommerce-stream-client-default    # 消息发往的目的地, Kafka 中就是 Topic
            content-type: text/plain    # 消息发送的格式, 接收端不用指定格式, 但是发送端要
          # 默认接收方 (按道理是不同的项目工程 input 与 output)
          input:      # 这里用 Stream 给我们提供的默认 input 信道
            destination: scacommerce-stream-client-default
          # Edcode 发送方
          edcodeOutput:
            destination: scacommerce-stream-client-edcode
            content-type: text/plain
          # Edcode 接收方
          edcodeInput:
            destination: scacommerce-stream-client-edcode
  kafka:
    bootstrap-servers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092}
    producer:
      retries: 3
    consumer:
      auto-offset-reset: latest
  zipkin:
    sender:
      type: ${ZIPKIN_KAFKA_SENDER:web} # 默认是 web
    base-url: http://${ZIPKIN_URL:localhost}:${ZIPKIN_PORT:9411}/
  sleuth:
    sampler:
      # RateLimitingSampler 抽样策略,设置了限速采样,spring.sleuth.sampler.probability 属性值无效
      rate: 100 # 每秒间隔接受的 trace 量
      # Probability 抽样策略
      probability: 1.0 # 采样比例,1.0 表示 100%, 默认:0.1
  redis:
    database: 0
    host: ${REDIS_HOST:localhost}
    port: ${REDIS_PORT:6379}
    timeout: 5000


# 暴露端点
management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

创建 SpringBoot 启动类

@EnableDiscoveryClient
@SpringBootApplication
public class StreamClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamClientApplication.class, args);
    }

}

消息传递对象: SpringCloud Stream + Kafka/RocketMQ

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageVo {

	private Integer id;

	private String projectName;

	private String org;

	private String author;

	private String version;

    
    public static MessageVo defaultMessage() {

        return new MessageVo(
                1,
                "sca-commerce-stream-client",
                "blog.eddilee.cn",
                "Eddie",
                "1.0"
        );
    }
}

使用默认的通信信道实现消息的发送

package com.edcode.commerce.stream;

import com.alibaba.fastjson.JSON;
import com.edcode.commerce.vo.MessageVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;


@Slf4j
@EnableBinding(Source.class)
@RequiredArgsConstructor
public class DefaultSendService {

    private final Source source;

    
    public void sendMessage(MessageVo message) {

        String jsonString = JSON.toJSONString(message);
        log.info("在 DefaultSendService 中发送消息: [{}]", jsonString);

        // Spring Messaging, 统一消息的编程模型, 是 Stream 组件的重要组成部分之一
        source.output().send(MessageBuilder.withPayload(jsonString).build());
    }

}

使用默认的信道实现消息的接收

package com.edcode.commerce.stream;

import com.alibaba.fastjson.JSON;
import com.edcode.commerce.vo.MessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;


@Slf4j
@EnableBinding(Sink.class)
public class DefaultReceiveService {

	
	@StreamListener(Sink.INPUT)
	public void receiveMessage(Object payload) {
		log.info("在 DefaultReceiveService 消费消息中启动  ");
		MessageVo edcodeMessage = JSON.parseObject(payload.toString(), MessageVo.class);
		// 消费消息
		log.info("在 DefaultReceiveService 中使用消息成功: [{}]", JSON.toJSONString(edcodeMessage));
	}

}
13.10.1 自定义Stream消息通信信道实现定制分发 13.10.1.1 构建消息驱动
@Slf4j
@RestController
@RequestMapping("/message")
@RequiredArgsConstructor
public class MessageController {

	private final DefaultSendService defaultSendService;

	private final EdcodeSendService edcodeSendService;

	
	@GetMapping("/default")
	public void defaultSend() {
		defaultSendService.sendMessage(MessageVo.defaultMessage());
	}

	
	@GetMapping("/edcode")
	public void qinyiSend() {
		edcodeSendService.sendMessage(MessageVo.defaultMessage());
	}

}
13.10.1.2 message.http
### 发送默认信道消息
GET http://127.0.0.1:8006/scacommerce-stream-client/message/default
Content-Type: application/json

### 发送自定义信道消息
GET http://127.0.0.1:8006/scacommerce-stream-client/message/edcode
Content-Type: application/json

###
13.11.1 SpringCloud Stream消息分组和消费分区的配置与说明 13.11.1.1 SpringCloud Stream消息分组
  • SpringCloud Stream消息分组模型
    • 应用的不同实例放在一个消费者组中,每一条消息只会被一个实例消费
    • 消费者组的思想是通过多实例扩展服务吞吐量,且不会造成消息的重覆消费

13.11.1.2 SpringCloud Stream消息分区
  • SpringCloud Stream消息分区
    • 消费分区的作用就是为了确保距又共同特性标识的数据由同一个消费者实例进行处理


本章完整配置文件 bootstrap.yml

server:
  port: 8006
  servlet:
    context-path: /scacommerce-stream-client

spring:
  application:
    name: sca-commerce-stream-service
  cloud:
    nacos:
      # 服务注册发现
      discovery:
        enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
        #server-addr: ${NACOS_ADDR:127.0.0.1}:8848
        server-addr: ${NACOS_ADDR_1:127.0.0.1}:8848,${NACOS_ADDR_2:127.0.0.1}:8858,${NACOS_ADDR_3:127.0.0.1}:8868 # Nacos 服务器地址
        namespace: ${NAMESPACE_ID:1adcfdd8-5763-4768-9a15-9c7157988950}
        metadata:
          management:
            context-path: ${server.servlet.context-path}/actuator
    stream:
      # SpringCloud Stream + RocketMQ
#      rocketmq:
#        binder:
#          name-server: ${ROCKETMQ_SERVER:127.0.0.1}:${ROCKETMQ_PORT:9876}
      # 开启 stream 分区支持
      instanceCount: 1  # 消费者的总数
      instanceIndex: 0  # 当前消费者的索引
      kafka:
        binder:
          brokers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092}
          auto-create-topics: true  # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好, 生产环境建议 false
        bindings:
          # 默认发送方
          output:      # 这里用 Stream 给我们提供的默认 output 信道
            destination: scacommerce-stream-client-default    # 消息发往的目的地, Kafka 中就是 Topic
            content-type: text/plain    # 消息发送的格式, 接收端不用指定格式, 但是发送端要
           # 消息分区
            producer:
              # partitionKeyexpression: payload.author  # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
              partitionCount: 1   # 分区大小
              # 使用自定义的分区策略, 注释掉 partitionKeyexpression
              partitionKeyExtractorName: edcodePartitionKeyExtractorStrategy  # com.edcode.commerce.partition.EdcodePartitionKeyExtractorStrategy
              partitionSelectorName: edcodePartitionSelectorStrategy # com.edcode.commerce.partition.EdcodePartitionSelectorStrategy

          # 默认接收方 (按道理是不同的项目工程 input 与 output)
          input:      # 这里用 Stream 给我们提供的默认 input 信道
            destination: scacommerce-stream-client-default
            # 消息分组
            group: sca-commerce-edcode-default
            # 消费者开启分区支持
            consumer:
              partitioned: true

          # Edcode 发送方
          edcodeOutput:
            destination: scacommerce-stream-client-edcode
            content-type: text/plain

          # Edcode 接收方
          edcodeInput:
            destination: scacommerce-stream-client-edcode
            # 消息分组
            group: sca-commerce-edcode-edcode
  kafka:
    bootstrap-servers: ${KAFKA_SERVER:127.0.0.1}:${KAFKA_PORT:9092}
    producer:
      retries: 3
    consumer:
      auto-offset-reset: latest
  zipkin:
    sender:
      type: ${ZIPKIN_KAFKA_SENDER:web} # 默认是 web
    base-url: http://${ZIPKIN_URL:localhost}:${ZIPKIN_PORT:9411}/
  sleuth:
    sampler:
      # RateLimitingSampler 抽样策略,设置了限速采样,spring.sleuth.sampler.probability 属性值无效
      rate: 100 # 每秒间隔接受的 trace 量
      # Probability 抽样策略
      probability: 1.0 # 采样比例,1.0 表示 100%, 默认:0.1
  redis:
    database: 0
    host: ${REDIS_HOST:localhost}
    port: ${REDIS_PORT:6379}
    timeout: 5000


# 暴露端点
management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

分区部分

自定义从 Message 中提取 partition key 的策略

@Slf4j
@Component
public class EdcodePartitionKeyExtractorStrategy implements PartitionKeyExtractorStrategy {

	@Override
	public Object extractKey(Message message) {
		MessageVo messageVo = JSON.parseObject(message.getPayload().toString(), MessageVo.class);
		// 自定义提取 key
		String key = messageVo.getProjectName();
		log.info("SpringCloud Stream EdCode Partition Key: [{}]", key);
		return key;
	}
}

决定 message 发送到哪个分区的策略

@Slf4j
@Component
public class EdcodePartitionSelectorStrategy implements PartitionSelectorStrategy {

	
	@Override
	public int selectPartition(Object key, int partitionCount) {
		int partition = key.toString().hashCode() % partitionCount;
		log.info("SpringCloud Stream EdCode Selector info: [{}], [{}], [{}]", key.toString(), partitionCount, partition);
		return partition;
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678834.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存