一种可扩展的消息路由实现方式,kafka基本原理及leader

一种可扩展的消息路由实现方式,kafka基本原理及leader,第1张

一种可扩展的消息路由实现方式,kafka基本原理及leader

@Builder

@Getter

@ToString

public class Message {

private String event;

private String msgType;

private String content;

private String fromUser;

private String toUser;

private Instant createTime;

//… as you wish

}

待消费消息对象包含一些Trait字段,包括event,msgType,content。这些字段既可以作为路由规则匹配的依据,也能作为具体消息处理的条件。可根据不同的业务类型去扩展。

消息处理结果

@Builder

@ToString

public class OutMessage {

private String msgType;

private String fromUser;

private String toUser;

private Instant createTime;

}

消息处理结果,根据具体业务自行定义吧。

3.2 消息处理器

抽象类型

public interface MessageHandler {

OutMessage handle(Message message, Map context);

}

这是所有消息处理器的抽象类型,自定义的处理器都必须实现它。简单实现一个消息日志记录处理器。

@Component

public class LogMessageHandler implements MessageHandler {

@Override

public OutMessage handle(Message message, Map context) {

System.out.println(message.toString());

// define your return value

return null;

}

}

3.2 路由相关

消息拦截器

public interface MessageInterceptor {

OutMessage handle(Message message, Map context);

}

拦截器可增强对消息处理。自行实现此接口。

消息匹配器

public interface MessageRouterMatcher {

boolean match(Message message);

}

匹配器可实现对消息的过滤,以实现对消息的规则匹配。

路由器

public class MessageRouter {

@Getter

private final List rules = new ArrayList<>();

public MessageRouterRule rule(){

return new MessageRouterRule(this);

}

private OutMessage route(Message message,Map context){

final List matchRules = new ArrayList<>();

final Iterator iterator = this.rules.iterator();

while (iterator.hasNext()){

final MessageRouterRule rule = iterator.next();

if (rule.test(message)){

matchRules.add(rule);

}

}

if(matchRules.size() == 0){

return null;

}else{

final Iterator matchIterator = matchRules.iterator();

while (matchIterator.hasNext()){

final MessageRouterRule rule = matchIterator.next();

//think think multi OutMessage

return rule.service(message, context);

}

}

return null;

}

public OutMessage route(Message message){

return this.route(message,new HashMap<>(2));

}

}

消息路由规则

public class MessageRouterRule {

//是否异步处理消息

private boolean async;

private String event;

private String msgType;

private String content;

private String fromUser;

private String toUser;

private MessageRouter router;

private MessageRouterMatcher matcher;

private List handlers = new ArrayList<>();

private List interceptors = new ArrayList

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

<>();

public MessageRouterRule async(boolean async){

this.async = async;

return this;

}

public MessageRouterRule msgType(String msgType){

this.msgType = msgType;

return this;

}

public MessageRouterRule event(String event){

this.event = event;

return this;

}

public MessageRouterRule content(String content){

this.content= content;

return this;

}

public MessageRouterRule fromUser(String fromUser){

this.fromUser= fromUser;

return this;

}

public MessageRouterRule toUser(String toUser){

this.toUser= toUser;

return this;

}

public MessageRouterRule handler(MessageHandler handler,MessageHandler… otherHandlers){

this.handlers.add(handler);

if(otherHandlers != null && otherHandlers.length>0){

Collections.addAll(this.handlers,otherHandlers);

}

return this;

}

public MessageRouterRule handle(MessageHandler handler){

return this.handler(handler,(MessageHandler[]) null);

}

public MessageRouter end(){

this.router.getRules().add(this);

return this.router;

}

protected boolean test(Message message){

//here can use matcher

return (this.fromUser == null || this.fromUser.equals(message.getFromUser())) && (this.msgType == null || this.msgType.toLowerCase().equals(message.getMsgType() == null ? null : message.getMsgType().toLowerCase())) && (this.event == null || this.event.toLowerCase().equals(message.getEvent() == null ? null : message.getEvent().toLowerCase())) && (this.content == null || this.content.equals(message.getContent() == null ? null : message.getContent().trim())) && (this.matcher == null || this.matcher.match(message));

}

public MessageRouterRule(MessageRouter router){

this.router = router;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653808.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存