视频地址:https://www.bilibili.com/video/BV1mr4y1J77n
之前面试的时候都会被问到为什么使用MQ,使用MQ的好处是什么,我都会照本宣科的说:异步、解耦、削峰,这几个词也好理解,都是字面意思,今天我们就来进一步加深理解异步和结解耦。
一、引入问题
先思考这样一个问题,在多个系统之间我们想要异步的调用怎么做呢?当然MQ就是一个很好的解决办法
- 如何去用呢?在A系统引入MQ,作为生产者,在B系统也引入MQ做消费者,当然可以实现功能,但会不会很麻烦?每个系统都要引入一套重复的东西。
- 大多数我们业务场景的并发量其实很小,如果我们对每个业务场景都弄一个自己的
queue
是不是很浪费?管理起来也很麻烦。 - 如果有一个场景当我们做了某个 *** 作之后,我们要通知A、B、C…系统来做相对应的处理,又该如何去做呢?(系统只会越来越多)
对于上面的问题,我们可以给出一个解决方案,那就是我们可以定义一个平台MQ,做成一个starter
,谁要用我们就引入这个pom
,每个项目都有自己的spring.application
我们以这个为队列名称,注册到我们MQ里面,做一个广播消息,每一个服务既可以做生产者,也可以做消费者。
其实这里会有一个问题,比如我A服务发送一个消息了,B、C、D…服务都接受到了这个消息,但实际上只有B服务是需要消费这个消息的。很多人可能和我最开始的思路一样我在消息体里面加一个type
,根据这个type来去判断谁消费。
// 接受到了消息,拿到了type
if(type == 1) {
// ...
}else if(type == 2) {
// ...
}
....
上面的代码当然可以解决我们的问题,但是想想每次新增一个事件都得去修改原本的接受逻辑,太low了。Spring框架里面已经做了这个 *** 作ApplicationEventPublisher
通过这个类,我们就可以做到请求分发,根据class类型来。(具体后面讲解)
二、流程图
基于上面的理解,我画出了基础的流程图
2-1、系统交互流程图
每个服务都引入基础的mq-starter
底包
每一个服务都可以作生产者,但每一个服务都是消费者。
2-2、具体服务内部流转图
三、代码实现
3-1、代码 (基于RabbitMQ实现) AutoConfigurationPlatformMq
这就是自动注入的核心代码了
import com.xdx97.mq.consumer.PlatformConsumer;
import com.xdx97.mq.provider.PlatformMqProvider;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* 平台队列自动注入
*
* @author xdx
* @date 22/04/15
*/
@Configuration
public class AutoConfigurationPlatformMq {
/**
* 平台队列交换机名称
*/
public static final String PLATFORM_EXCHANGE = "platform_exchange";
/**
* 当前项目队列名称
*/
private final String projectQueue;
/**
* 当前项目路由key
*/
private final String projectRouteKey;
public AutoConfigurationPlatformMq(Environment environment) {
this.projectQueue = environment.getRequiredProperty("spring.application.name") + ".platform";
this.projectRouteKey = "spring.application.name." + environment.getRequiredProperty("spring.application.name");
}
/**
* 注入队列
* @return
*/
@Bean
public Queue platformQueue() {
return QueueBuilder
.durable(projectQueue)
.build();
}
/**
* 注入交换机
* @return
*/
@Bean
public Exchange platformExchange() {
return ExchangeBuilder
.fanoutExchange(PLATFORM_EXCHANGE)
.durable(true)
.build();
}
/**
* 队列绑定交换机
* @return
*/
@Bean
public Binding platformBinding() {
return BindingBuilder
.bind(platformQueue())
.to(platformExchange())
.with("*")
.and(null);
}
/**
* 注入生产者
* @return
*/
@Bean
public PlatformMqProvider platformMqProvider(){
return new PlatformMqProvider();
}
/**
* 注入消费者
* @return
*/
@Bean
public PlatformConsumer platformConsumer(){
return new PlatformConsumer();
}
}
PlatformMqProvider
import com.xdx97.mq.AutoConfigurationPlatformMq;
import com.xdx97.mq.event.PlatformEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.Resource;
import java.util.function.Consumer;
/**
* 消息发送类
*
* @author xdx
* @date 22/04/15
*/
@Slf4j
public class PlatformMqProvider {
@Value("${spring.application.name}")
private String source;
@Resource
private AmqpTemplate rabbitMqTemplate;
/**
* 发送平台消息
*
* @param platformEvent 平台事件消息
*/
public void sendPlatformMessage(PlatformEvent platformEvent) {
platformEvent.setSource(source);
rabbitMqTemplate.convertAndSend(AutoConfigurationPlatformMq.PLATFORM_EXCHANGE,"*", platformEvent);
}
/**
* 发送其它消息
*/
public void sendOtherMessage(Consumer<AmqpTemplate> consumer) {
consumer.accept(this.rabbitMqTemplate);
}
}
PlatformConsumer
import com.xdx97.mq.event.PlatformEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.messaging.handler.annotation.Payload;
/**
* 平台事件消费者
*
* @author xdx
* @date 22/04/15
*/
@Slf4j
public class PlatformConsumer implements ApplicationEventPublisherAware {
@Autowired
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
@RabbitHandler
@RabbitListener(queues = "${spring.application.name}.platform")
public void handler(@Payload PlatformEvent message) {
log.info("接受到平台事件消息:{}",message.toString());
publisher.publishEvent(message);
}
}
PlatformEvent
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 平台事件消息父类
*
* @author xdx
* @date 22/04/15
*/
@Data
public class PlatformEvent implements Serializable {
private static final long serialVersionUID=1L;
/**
* 系统来源
*/
private String source;
/**
* 唯一标示id
*/
private String transactionNo = UUID.randomUUID().toString();
/**
* 发送消息时间
*/
private LocalDateTime eventTimeStamp = LocalDateTime.now();
}
pom
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<groupId>com.xdx97.mqgroupId>
<artifactId>xdx-mq-starterartifactId>
<version>1.0.0-SNAPSHOTversion>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-dependenciesartifactId>
<version>2.2.1.RELEASEversion>
<type>pomtype>
<scope>importscope>
dependency>
dependencies>
dependencyManagement>
<properties>
<maven.build.timestamp.format>yyyyMMddHHmmssmaven.build.timestamp.format>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
<scope>runtimescope>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
dependencies>
project>
3-2、使用 导入pom依赖
<groupId>com.xdx97.mqgroupId>
<artifactId>xdx-mq-starterartifactId>
<version>1.0.0-SNAPSHOTversion>
定义一个事件
其实就是一个继承PlatformEvent
的实体类 (注:这个是要放在底包里面的)
@Data
public class TestEvent extends PlatformEvent{
private String name;
}
发送消息
任意服务
@Resource
private PlatformMqProvider platformMqProvider;
public void fun() {
TestEvent testEvent = new TestEvent();
testEvent.setName("小道仙");
platformMqProvider.sendPlatformMessage(testEvent);
}
消费消息
任意服务
@EventListener(TestEvent.class)
public void testListener(TestEvent testEvent){
// 业务处理
}
四、衍生问题 4-1、是否需要配置化注入
Spring里面提供一类以 @ConditionalOn
开头的注解,可以理解成在一定条件下进行注入
-
@ConditionalOnBean
当容器中存在某个bean才进行注入 -
@ConditionalOnProperty
当配置文件满足什么条件才进行注入 -
…
所以我在设计之初考虑如果我可以用这种方式去控制何时注入queue、何时注入exchange、何时注入生产者…
这看似提高了灵活度,但是仔细思考一下,别人如果引入你的包,不去把队列绑定到平台事件上,那就相当于无法发送消息和消费消息,那引入这个包的意义何在?
4-2、Spring组件扫描
最开始我的消费者和生产者都是如下这样去定义的,每个类上面都加了两个注解@Slf4j
@Component
@Slf4j
@Component
public class PlatformConsumer implements ApplicationEventPublisherAware
@Slf4j
是日志注解自不必说,@Component
是注入bean的,但有一个前提是你的项目可以扫描到这个包
我们项目包名都是以公司的域名来命名的,而且扫描的范围一般都很大,基于这两个前提下我这个mq-starter是没有问题的,可以发送可以接受。
但如果使用者的项目包名不是以你这个命名的,那就完蛋。
这里理解一下自动注入,也就是引入了你的pom文件后,底包里面的bean应该是要自动注入的,上面这种做法不是自动注入,而是使用者的项目去扫描到而注入的。
改造后直接把bean注入放在AutoConfigurationPlatformMq
里面就好了
4-3、如何定义事件类型公共属性
所谓事件类型公共属性就是PlatformEvent
类了,这个其实和功能实现无关,在我最开始实现了接受消息和发送消息后,我就以为我这个mq-starter已经完成了,当时这个类里面只有一个transactionNo
唯一标示id
这也是demo和生产最大的区别,如果我自己搭建demo,至此已经完美结束了
但在生产不行,平台事件是每个服务都可以发送的,只一个id无法知道具体的来源,后面在组长的帮助下加了系统来源和时间
这一点也很重要,一个完整的工程,不仅仅是代码功能的实现,还有业务的考量,还有代码的优美,比如你都命名a、b、c 这合理吗?
4-4、事件类型如何处理
所谓的事件类型就是一个个消息class,只要是继承了PlatformEvent
的类都算
原本我是想在生产者端定义一个A.class 去继承PlatformEvent,在消费者端也同样来一个A.class 去继承PlatformEvent
想一想,这样的两个A.class是一样的吗?
最终解决办法是把这A.class放在底包里面就好了,这样消费者和生产者引入的都是一个A.class了,每次新增只需要重新 mvn deploy
一下即可。需要用到的服务去更新一个maven之前旧的服务不更新也不会出错
五、源码获取
关注微信公众号回复关键字获取
- 公众号:小道仙97
- 关键字:xdx-mq-starter
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)