MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。
指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。
消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。
削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
消费者 消费者拉取消息消息由消费者定时从 broker 拉取,优点是实现简单,可以根据消费者自己的处理能力来消费。
缺点是实时性相对较差
定时拉取我们通过重载了 push 策略的 afterInit
方法实现定时拉取。
消费者拉取消息需要回执,告诉中间人消息拉取成功。实现并不复杂,该处不再贴出实现代码。
需要注意,同一个消息,可以被不同的 groupName 进行消费,所以回执是需要根据 groupName 进行分开的。
public class MqConsumerPull extends MqConsumerPush {
@Override
public void subscribe(String topicName, String tagRegex) {
MqTopicTagDto tagDto = buildMqTopidTagDto(topicName,tagRegex);
if(!subscribeList.contains(tagDto)){
subscribeList.add(tagDto);
}
}
@Override
public void unSubscribe(String topicName, String tagRegex) {
MqTopicTagDto tagDto = buildMqTopidTagDto(topicName,tagRegex);
subscribeList.remove(tagDto);
}
private MqTopicTagDto buildMqTopidTagDto(String topicName,String tagRegex){
MqTopicTagDto dto = new MqTopicTagDto();
dto.setTagRegex(tagRegex);
dto.setTopicName(topicName);
//主动拉取这里应该会有问题,会导致不同的groupName的消息,实际上已经被消费了
//所以实际上应该有一个消息+group的映射关系表,单个消息可以被多次重复消费
//groupName+messageId+status==>在数据库层面实现
dto.setGroupName(groupName);
return dto;
}
/**
* 初始化拉取消息
*/
@Override
protected void afterInit() {
//5s发一次心跳
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if(CollectionUtil.isEmpty(subscribeList)) {
log.warn("订阅列表为空,忽略处理。");
return;
}
for(MqTopicTagDto tagDto:subscribeList){
final String topicName = tagDto.getTopicName();
final String tagRegex = tagDto.getTagRegex();
MqConsumerPullResp resp = consumerBrokerService.pull(topicName,tagRegex,size);
if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())){
List mqMessageList = resp.getList();
if(CollectionUtil.isNotEmpty(mqMessageList)){
List statusDtoList = new ArrayList<>(mqMessageList.size());
for(MqMessage mqMessage : mqMessageList){
IMqConsumerListenerContext context = new MqConsumerListenerContext();
final String messageId = mqMessage.getTraceId();
ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage,context);
log.info("消息:{} 消费结果 {}",messageId,consumerStatus);
//状态同步更新
if(!ackBatchFlag){
MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId,consumerStatus);
log.info("消息:{} 状态回执结果 {}",messageId, JSON.toJSON(ackResp));
}else {
//批量
MqConsumerUpdateStatusDto statusDto = new MqConsumerUpdateStatusDto();
statusDto.setMessageId(messageId);
statusDto.setMessageStatus(consumerStatus.getCode());
statusDto.setConsumerGroupName(groupName);
statusDtoList.add(statusDto);
}
}
//批量执行
if(ackBatchFlag){
MqCommonResp ackResp = consumerBrokerService.consumerStatusAckBatch(statusDtoList);
log.info("消息:{} 状态批量回执结果 {}",statusDtoList,JSON.toJSON(ackResp));
statusDtoList = null;
}
}else {
log.error("拉取消息失败:{}",JSON.toJSON(resp));
}
}
}
}
},pullInitDelaySeconds,pullPeriodSeconds, TimeUnit.SECONDS);
}
}
消费者通过推送获取消息
消息由 broker 直接推送给消费者,实时性比较好。
缺点是如果消费者处理不过来,就会造成大量问题。
public class MqConsumerPush extends Thread implements IMqConsumer {
/**
* 参数校验
*/
private void paramCheck(){
ArgUtil.notEmpty(brokerAddress,"brokerAddress");
ArgUtil.notEmpty(groupName,"groupName");
}
@Override
public void run() {
// 启动服务端
log.info("MQ 消费者开始启动服务端 groupName :{}, brokerAddress:{}",groupName,brokerAddress);
//1.参数校验
this.paramCheck();
try {
//0.配置信息
ConsumerBrokerConfig config = ConsumerBrokerConfig.newInstance()
.groupName(groupName).brokerAddress(brokerAddress).check(check)
.respTimeoutMills(respTimeoutMills).invokeService(invokeService)
.statusManager(statusManager).mqListenerService(mqListenerService)
.loadBalance(loadBalance).subscribeMaxAttempt(subscribeMaxAttempt)
.unSubscribeMaxAttempt(unSubscribeMaxAttempt).consumerStatusMaxAttempt(consumerStatusMaxAttempt)
.appKey(appKey).appSecret(appSecret);
//1.初始化
this.consumerBrokerService.initChannelFutureList(config);
//2.连接到服务端
this.consumerBrokerService.registerToBroker();
//3.标识为可用
statusManager.status(true);
//4.添加钩子函数
final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook();
rpcShutdownHook.setStatusManager(statusManager);
rpcShutdownHook.setInvokeService(invokeService);
rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest);
rpcShutdownHook.setDestroyable(this.consumerBrokerService);
ShutdownHooks.rpcShutdownHook(rpcShutdownHook);
//5.启动完成以后的时间
afterInit();
log.info("MQ 消费者启动完成");
}catch (Exception e){
log.error("MQ 消费者启动异常",e);
statusManager.initFailed(true);
throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);
}
}
/**
* 初始化完成后
*/
protected void afterInit(){}
@Override
public void subscribe(String topicName, String tagRegex) {
final String consumerType = getConsumerType();
consumerBrokerService.subscribe(topicName,tagRegex,consumerType);
}
/**
* 获取消费策略类型
* @return
*/
protected String getConsumerType(){
return ConsumerTypeConst.PUSH;
}
@Override
public void unSubscribe(String topicName, String tagRegex) {
final String consumerType = getConsumerType();
consumerBrokerService.unSubscribe(topicName,tagRegex,consumerType);
}
@Override
public void registerListener(IMqConsumerListener listener) {
this.mqListenerService.register(listener);
}
}
消费者处理类(业务处理逻辑)
public class MqConsumerHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(MqConsumerHandler.class);
/**
* 调用管理类
*/
private IInvokeService invokeService;
/**
* 消息监听服务类
*/
private IMqListenerService mqListenerService;
public void setInvokeService(IInvokeService invokeService){
this.invokeService = invokeService;
}
public void setMqListenerService(IMqListenerService mqListenerService){
this.mqListenerService = mqListenerService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
RpcMessageDto rpcMessageDto = null;
try {
rpcMessageDto = JSON.parseObject(bytes,RpcMessageDto.class);
}catch (Exception e){
log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes));
return;
}
if(rpcMessageDto.isRequest()){
MqCommonResp commonResp = this.dispatch(rpcMessageDto,ctx);
if(commonResp == null){
log.debug("当前消息为null,忽略处理.");
return;
}
writeResponse(rpcMessageDto,commonResp,ctx);
}else {
final String traceId = rpcMessageDto.getTraceId();
//丢掉traceId为空的信息
if(StringUtil.isBlank(traceId)){
log.debug("[server Response] response traceId 为空,直接丢弃",JSON.toJSON(rpcMessageDto));
return;
}
//添加消息
invokeService.addResponse(traceId,rpcMessageDto);
}
}
/**
* 消息的分发
*
* @param rpcMessageDto 入参
* @param ctx 上下文
* @return
*/
private MqCommonResp dispatch(RpcMessageDto rpcMessageDto,ChannelHandlerContext ctx){
final String methodType = rpcMessageDto.getMethodType();
final String json = rpcMessageDto.getJson();
String channelId = ChannelUtil.getChannelId(ctx);
log.debug("channelId:{} 接收到 method:{} 内容:{}",channelId,methodType,json);
//消息发送
if(MethodType.B_MESSAGE_PUSH.equals(methodType)){
//日志输出
log.info("接收到服务端消息:{}",json);
return this.consumer(json);
}
throw new UnsupportedOperationException("暂不支持的方法类型");
}
/**
* 消息消费
* @param json 原始请求
* @return 结果
*/
private MqCommonResp consumer(final String json){
try {
//如果是broker,应该进行处理化等 *** 作
MqMessage mqMessage = JSON.parseObject(json,MqMessage.class);
IMqConsumerListenerContext context = new MqConsumerListenerContext();
ConsumerStatus consumerStatus = this.mqListenerService.consumer(mqMessage,context);
MqConsumerResultResp resp = new MqConsumerResultResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
resp.setConsumerStatus(consumerStatus.getCode());
return resp;
}catch (MqException mqException){
log.error("消息消费业务异常",mqException);
MqConsumerResultResp resp = new MqConsumerResultResp();
resp.setRespCode(mqException.getCode());
resp.setRespMessage(mqException.getMsg());
return resp;
}catch (Exception exception){
log.error("消息消费系统异常",exception);
MqConsumerResultResp resp = new MqConsumerResultResp();
resp.setRespCode(MqCommonRespCode.FAIL.getCode());
resp.setRespMessage(MqCommonRespCode.FAIL.getMsg());
return resp;
}
}
/**
* 结果写回
*
* @param req 请求
* @param resp 响应
* @param ctx 上下文
*/
private void writeResponse(RpcMessageDto req,
Object resp,
ChannelHandlerContext ctx){
final String id = ctx.channel().id().asLongText();
RpcMessageDto rpcMessageDto = new RpcMessageDto();
//响应类消息
rpcMessageDto.setRequest(false);
rpcMessageDto.setTraceId(req.getTraceId());
rpcMessageDto.setMethodType(req.getMethodType());
rpcMessageDto.setRequestTime(System.currentTimeMillis());
String json = JSON.toJSONString(resp);
rpcMessageDto.setJson(json);
//回写到client端
ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
ctx.writeAndFlush(byteBuf);
log.debug("[server] channel {} response {}",id,JSON.toJSON(rpcMessageDto));
}
}
消费者与broker
public class ConsumerBrokerService implements IConsumerBrokerService {
/**
* 初始化心跳
*/
private void initHeartbeat(){
//5s发一次心跳
scheduledExecutorService.scheduleAtFixedRate(()->{heartbeat();},5,5, TimeUnit.SECONDS);
}
private ChannelHandler initChannelHandler(){
final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);
final MqConsumerHandler mqConsumerHandler = new MqConsumerHandler();
mqConsumerHandler.setInvokeService(invokeService);
mqConsumerHandler.setMqListenerService(mqListenerService);
//handler实际上会被多次调用如果不是@Shareable,应该每次都重新创建
ChannelHandler handler = new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))
.addLast(mqConsumerHandler);
}
};
return handler;
}
@Override
public void registerToBroker() {
int successCount = 0;
for(RpcChannelFuture channelFuture : this.channelFutureList){
ServiceEntry serviceEntry = new ServiceEntry();
serviceEntry.setGroupName(groupName);
serviceEntry.setAddress(channelFuture.getAddress());
serviceEntry.setPort(channelFuture.getPort());
serviceEntry.setWeight(channelFuture.getWeight());
BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq();
brokerRegisterReq.setServiceEntry(serviceEntry);
brokerRegisterReq.setMethodType(MethodType.C_REGISTER);
brokerRegisterReq.setTraceId(IdHelper.uuid32());
brokerRegisterReq.setAppKey(appKey);
brokerRegisterReq.setAppSecret(appSecret);
log.info("[Register] 开始注册到 broker :{}", JSON.toJSON(brokerRegisterReq));
final Channel channel = channelFuture.getChannelFuture().channel();
MqCommonResp resp = callServer(channel,brokerRegisterReq,MqCommonResp.class);
log.info("[Register] 完成注册到 broker :{}",JSON.toJSON(resp));
if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())){
successCount++;
}
}
if(successCount <=0 &&check){
log.error("校验 broker 可用性, 可连接成功数为 0");
throw new MqException(MqCommonRespCode.C_REGISTER_TO_BROKER_FAILED);
}
}
@Override
public R callServer(Channel channel, T commonReq, Class respClass) {
final String traceId = commonReq.getTraceId();
final long requestTime = System.currentTimeMillis();
RpcMessageDto rpcMessageDto = new RpcMessageDto();
rpcMessageDto.setTraceId(traceId);
rpcMessageDto.setRequestTime(requestTime);
rpcMessageDto.setJson(JSON.toJSONString(commonReq));
rpcMessageDto.setMethodType(commonReq.getMethodType());
rpcMessageDto.setRequest(true);
//添加调用服务
invokeService.addRequest(traceId,respTimeoutMills);
//遍历channel
//关闭当前线程,以获取对应的信息
//使用序列化方式
ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
//负载均衡获取channel
channel.writeAndFlush(byteBuf);
String channelId = ChannelUtil.getChannelId(channel);
log.debug("[Client] channelId {} 发送消息 {}",channelId,JSON.toJSON(rpcMessageDto));
if(respClass == null){
log.debug("[Client] 当前消息为 one-way 消息,忽略响应");
return null;
}else {
//channelHandler 中获取对应的响应
RpcMessageDto messageDto = invokeService.getResponse(traceId);
if(MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())){
throw new MqException(MqCommonRespCode.TIMEOUT);
}
String respJson = messageDto.getJson();
return JSON.parseObject(respJson,respClass);
}
}
@Override
public Channel getChannel(String key) {
//等待启动完成
while (!statusManager.status()){
if(statusManager.initFailed()){
log.error("初始化失败");
throw new MqException(MqCommonRespCode.C_INIT_FAILED);
}
log.debug("等待初始化完成...");
DateUtil.sleep(100);
}
RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance,
channelFutureList,key);
return rpcChannelFuture.getChannelFuture().channel();
}
@Override
public void subscribe(String topicName, String tagRegex, String consumerType) {
final ConsumerSubscribeReq req = new ConsumerSubscribeReq();
String messageId = IdHelper.uuid32();
req.setTraceId(messageId);
req.setMethodType(MethodType.C_SUBSCRIBE);
req.setTopicName(topicName);
req.setTagRegex(tagRegex);
req.setGroupName(groupName);
req.setConsumerType(consumerType);
//重试订阅
Retryer.newInstance()
.maxAttempt(subscribeMaxAttempt)
.callable(new Callable() {
@Override
public String call() throws Exception {
Channel channel = getChannel(null);
MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
throw new MqException(ConsumerRespCode.SUBSCRIBE_FAILED);
}
return resp.getRespCode();
}
}).retryCall();
}
@Override
public void unSubscribe(String topicName, String tagRegex, String consumerType) {
final ConsumerUnSubscribeReq req = new ConsumerUnSubscribeReq();
String messageId = IdHelper.uuid32();
req.setTraceId(messageId);
req.setMethodType(MethodType.C_UN_SUBSCRIBE);
req.setTopicName(topicName);
req.setTagRegex(tagRegex);
req.setGroupName(groupName);
req.setConsumerType(consumerType);
// 重试取消订阅
Retryer.newInstance()
.maxAttempt(unSubscribeMaxAttempt)
.callable(new Callable() {
@Override
public String call() throws Exception {
Channel channel = getChannel(null);
MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
throw new MqException(ConsumerRespCode.UN_SUBSCRIBE_FAILED);
}
return resp.getRespCode();
}
}).retryCall();
}
@Override
public MqConsumerPullResp pull(String topicName, String tagRegex, int fetchSize) {
MqConsumerPullReq req = new MqConsumerPullReq();
req.setSize(fetchSize);
req.setGroupName(groupName);
req.setTagRegex(tagRegex);
req.setTopicName(topicName);
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_MESSAGE_PULL);
Channel channel = getChannel(null);
return this.callServer(channel,req,MqConsumerPullResp.class);
}
@Override
public void heartbeat() {
final MqHeartBeatReq req = new MqHeartBeatReq();
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_HEARTBEAT);
req.setAddress(NetUtil.getLocalHost());
req.setPort(0);
req.setTime(System.currentTimeMillis());
log.debug("[HEARTBEAT] 往服务端发送心跳包 {}",JSON.toJSON(req));
//通知全部
for (RpcChannelFuture channelFuture : channelFutureList){
try{
Channel channel = channelFuture.getChannelFuture().channel();
callServer(channel,req,null);
}catch (Exception exception){
log.error("[HEARTBEAT] 往服务端处理异常",exception);
}
}
}
@Override
public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) {
final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq();
req.setMessageId(messageId);
req.setMessageStatus(consumerStatus.getCode());
req.setConsumerGroupName(groupName);
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_CONSUMER_STATUS);
//重试
return Retryer.newInstance()
.maxAttempt(consumerStatusMaxAttempt)
.callable(new Callable() {
@Override
public MqCommonResp call() throws Exception {
Channel channel = getChannel(null);
MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED);
}
return resp;
}
}).retryCall();
}
@Override
public MqCommonResp consumerStatusAckBatch(List statusDtoList) {
final MqConsumerUpdateStatusBatchReq req = new MqConsumerUpdateStatusBatchReq();
req.setStatusList(statusDtoList);
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_CONSUMER_STATUS_BATCH);
//重试
return Retryer.newInstance()
.maxAttempt(consumerStatusMaxAttempt)
.callable(new Callable() {
@Override
public MqCommonResp call() throws Exception {
Channel channel = getChannel(null);
MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_BATCH_FAILED);
}
return resp;
}
}).retryCall();
}
@Override
public void destroyAll() {
for(RpcChannelFuture channelFuture : channelFutureList){
Channel channel = channelFuture.getChannelFuture().channel();
final String channelId = ChannelUtil.getChannelId(channel);
log.info("开始注销:{}",channelId);
ServiceEntry serviceEntry = InnerChannelUtils.buildServiceEntry(channelFuture);
BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq();
brokerRegisterReq.setServiceEntry(serviceEntry);
String messageId = IdHelper.uuid32();
brokerRegisterReq.setTraceId(messageId);
brokerRegisterReq.setMethodType(MethodType.C_UN_REGISTER);
this.callServer(channel,brokerRegisterReq,null);
log.info("完成注销:{}",channelId);
}
}
}
生产者
只贴出了核心代码
生产者启动核心类public class MqProducer extends Thread implements IMqProducer {
@Override
public void run() {
this.paramCheck();
//启动服务端
log.info("MQ 生产者开始启动客户端 GROUP:{} brokerAddress:{}",
groupName,brokerAddress);
try {
//0.配置信息
ProducerBrokerConfig config = ProducerBrokerConfig.newInstance()
.groupName(groupName)
.brokerAddress(brokerAddress)
.check(check)
.respTimeoutMills(respTimeoutMills)
.invokeService(invokeService)
.statusManager(statusManager)
.loadBalance(loadBalance)
.maxAttempt(maxAttempt)
.appKey(appKey)
.appSecret(appSecret);
//1.初始化
this.producerBrokerService.initChannelFutureList(config);
//2.连接到服务端
this.producerBrokerService.registerToBroker();
//3.标识为可用
statusManager.status(true);
//4.添加钩子函数
final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook();
rpcShutdownHook.setStatusManager(statusManager);
rpcShutdownHook.setInvokeService(invokeService);
rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest);
rpcShutdownHook.setDestroyable(this.producerBrokerService);
ShutdownHooks.rpcShutdownHook(rpcShutdownHook);
log.info("MQ生产者启动完成");
}catch (Exception e){
log.error("MQ生产者启动遇到异常",e);
//设置为初始化失败
statusManager.initFailed(true);
throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
}
}
}
public class MqProducerHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(MqProducerHandler.class);
/**
* 调用管理类
*/
private IInvokeService invokeService;
public void setInvokeService(IInvokeService invokeService) {
this.invokeService = invokeService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String text = new String(bytes);
log.debug("[Client] channelId {} 接收到消息 {}", ChannelUtil.getChannelId(ctx),text);
RpcMessageDto rpcMessageDto = null;
try {
rpcMessageDto = JSON.parseObject(bytes,RpcMessageDto.class);
}catch (Exception e){
log.error("RpcMessageDto json 格式转换异常{}",JSON.parse(bytes));
return;
}
if(rpcMessageDto.isRequest()){
//请求类
final String methodType = rpcMessageDto.getMethodType();
final String json = rpcMessageDto.getJson();
}else {
//丢掉traceId为空的消息
if(StringUtil.isBlank(rpcMessageDto.getTraceId())){
log.debug("[Client] response traceId 为空 直接丢弃",JSON.toJSON(rpcMessageDto));
return;
}
invokeService.addResponse(rpcMessageDto.getTraceId(),rpcMessageDto);
log.debug("[Client] response is :{}",JSON.toJSON(rpcMessageDto));
}
}
}
public class ProducerBrokerService implements IProducerBrokerService {
@Override
public void initChannelFutureList(ProducerBrokerConfig config) {
//1.配置初始化
this.invokeService = config.invokeService();
this.check = config.check();
this.respTimeoutMills = config.respTimeoutMills();
this.brokerAddress = config.brokerAddress();
this.groupName = config.groupName();
this.statusManager = config.statusManager();
this.loadBalance = config.loadBalance();
this.maxAttempt = config.maxAttempt();
this.appKey = config.appKey();
this.appSecret = config.appSecret();
//2.初始化
this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,
initChannelHandler(),check);
}
private ChannelHandler initChannelHandler(){
final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);
final MqProducerHandler mqProducerHandler = new MqProducerHandler();
mqProducerHandler.setInvokeService(invokeService);
//handler实际上会被多次调用,如果不是@Sharealbe,应该每次都重新创建
ChannelHandler handler = new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH,delimiterBuf))
.addLast(mqProducerHandler);
}
};
return handler;
}
@Override
public void registerToBroker() {
int successCount = 0;
for(RpcChannelFuture channelFuture : this.channelFutureList) {
ServiceEntry serviceEntry = new ServiceEntry();
serviceEntry.setGroupName(groupName);
serviceEntry.setAddress(channelFuture.getAddress());
serviceEntry.setPort(channelFuture.getPort());
serviceEntry.setWeight(channelFuture.getWeight());
BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq();
brokerRegisterReq.setServiceEntry(serviceEntry);
brokerRegisterReq.setMethodType(MethodType.P_REGISTER);
brokerRegisterReq.setTraceId(IdHelper.uuid32());
brokerRegisterReq.setAppKey(appKey);
brokerRegisterReq.setAppSecret(appSecret);
log.info("[Register] 开始注册到 broker:{}", JSON.toJSON(brokerRegisterReq));
final Channel channel = channelFuture.getChannelFuture().channel();
MqCommonResp resp = callServer(channel, brokerRegisterReq, MqCommonResp.class);
log.info("[Register] 完成注册到 broker:{}", JSON.toJSON(resp));
if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
successCount++;
}
}
if(successCount <= 0 && check) {
log.error("校验 broker 可用性,可连接成功数为 0");
throw new MqException(MqCommonRespCode.P_REGISTER_TO_BROKER_FAILED);
}
}
@Override
public R callServer(Channel channel, T commonReq, Class respClass) {
final String traceId = commonReq.getTraceId();
final long requestTime = System.currentTimeMillis();
RpcMessageDto rpcMessageDto = new RpcMessageDto();
rpcMessageDto.setTraceId(traceId);
rpcMessageDto.setRequestTime(requestTime);
rpcMessageDto.setJson(JSON.toJSONString(commonReq));
rpcMessageDto.setMethodType(commonReq.getMethodType());
rpcMessageDto.setRequest(true);
//添加调用服务
invokeService.addRequest(traceId,respTimeoutMills);
//遍历channel
//关闭当前线程以获取对应的信息
//使用序列化的方式
ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
//负载均衡获取channel
channel.writeAndFlush(byteBuf);
String channelId = ChannelUtil.getChannelId(channel);
log.debug("[Client] channelId {} 发送消息 {}",channelId,JSON.toJSON(rpcMessageDto));
if(respClass == null){
log.debug("[Client] 当前消息为 one-way消息,忽略响应");
return null;
}else {
//channelHandler 中获取对应的响应
RpcMessageDto messageDto = invokeService.getResponse(traceId);
if(MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())){
throw new MqException(MqCommonRespCode.TIMEOUT);
}
String respJson = messageDto.getJson();
return JSON.parseObject(respJson,respClass);
}
}
}
broker
通过引入 broker,消息的中间人,达到生产者与消费者解耦的效果
broker启动核心类public class MqBroker extends Thread implements IMqBroker{
@Override
public void run() {
//启动服务端
log.info("MQ 中间人开始启动服务端 port :{}",port);
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(workerGroup,bossGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH,delimiterBuf))
.addLast(initChannelHandler());
}
})
//这个参数影响的是还没有被accept取出的链接
.option(ChannelOption.SO_BACKLOG,128)
//这个参数只是过一段时间内客户端没有响应,服务端会发送一个ack包,以判断客户端是否还活着
.childOption(ChannelOption.SO_KEEPALIVE,true);
//绑定端口,开始接受进来的链接
ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
log.info("MQ 中间人启动完成,监听{"+port+"} 端口");
channelFuture.channel().closeFuture().syncUninterruptibly();
log.info("MQ 中间人关闭完成");
}catch (Exception e){
log.error("MQ 中间人启动异常",e);
throw new MqException(BrokerRespCode.RPC_INIT_FAILED);
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
public class MqBrokerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
RpcMessageDto rpcMessageDto = null;
try {
rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);
} catch (Exception e) {
log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes));
return;
}
if (rpcMessageDto.isRequest()) {
MqCommonResp commonResp = this.dispatch(rpcMessageDto,ctx);
if(commonResp == null){
log.debug("当前消息为null, 忽略处理。");
return;
}
writeResponse(rpcMessageDto,commonResp,ctx);
}else {
final String traceId = rpcMessageDto.getTraceId();
//丢弃掉traceId 为空的信息
if(StringUtil.isBlank(traceId)){
log.debug("[Server Response] response traceId 为空,直接丢弃",JSON.toJSON(rpcMessageDto));
return;
}
//添加消息
invokeService.addResponse(traceId,rpcMessageDto);
}
}
/**
* 消息的分发
*
* @param rpcMessageDto 入参
* @param ctx 上下文
* @return
*/
private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) {
try {
final String methodType = rpcMessageDto.getMethodType();
final String json = rpcMessageDto.getJson();
String channelId = ChannelUtil.getChannelId(ctx);
final Channel channel = ctx.channel();
log.debug("channelId :{} 接收到 method :{} 内容 :{}", channelId, methodType, json);
// 生产者注册
if (MethodType.P_REGISTER.equals(methodType)) {
BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
if (!brokerRegisterValidService.producerValid(registerReq)) {
log.error("{} 生产者注册验证失败", JSON.toJSON(registerReq));
throw new MqException(MqBrokerRespCode.P_REGISTER_VALID_FAILED);
}
return registerProducerService.register(registerReq.getServiceEntry(), channel);
}
// 生产者注销
if (MethodType.P_UN_REGISTER.equals(methodType)) {
registerProducerService.checkValid(channelId);
BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
return registerProducerService.unRegister(registerReq.getServiceEntry(), channel);
}
// 生产者消息发送
if (MethodType.P_SEND_MSG.equals(methodType)) {
registerProducerService.checkValid(channelId);
return handleProducerSendMsg(channelId, json);
}
// 生产者消息发送-ONE WAY
if (MethodType.P_SEND_MSG_ONE_WAY.equals(methodType)) {
registerProducerService.checkValid(channelId);
handleProducerSendMsg(channelId, json);
return null;
}
// 生产者消息发送-批量
if (MethodType.P_SEND_MSG_BATCH.equals(methodType)) {
registerProducerService.checkValid(channelId);
return handleProducerSendMsgBatch(channelId, json);
}
// 生产者消息发送-ONE WAY-批量
if (MethodType.P_SEND_MSG_ONE_WAY_BATCH.equals(methodType)) {
registerProducerService.checkValid(channelId);
handleProducerSendMsgBatch(channelId, json);
return null;
}
// 消费者注册
if (MethodType.C_REGISTER.equals(methodType)) {
BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
if (!brokerRegisterValidService.consumerValid(registerReq)) {
log.error("{} 消费者注册验证失败", JSON.toJSON(registerReq));
throw new MqException(MqBrokerRespCode.C_REGISTER_VALID_FAILED);
}
return registerConsumerService.register(registerReq.getServiceEntry(), channel);
}
// 消费者注销
if (MethodType.C_UN_REGISTER.equals(methodType)) {
registerConsumerService.checkValid(channelId);
BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
return registerConsumerService.unRegister(registerReq.getServiceEntry(), channel);
}
// 消费者监听注册
if (MethodType.C_SUBSCRIBE.equals(methodType)) {
registerConsumerService.checkValid(channelId);
ConsumerSubscribeReq req = JSON.parseObject(json, ConsumerSubscribeReq.class);
return registerConsumerService.subscribe(req, channel);
}
// 消费者监听注销
if (MethodType.C_UN_SUBSCRIBE.equals(methodType)) {
registerConsumerService.checkValid(channelId);
ConsumerUnSubscribeReq req = JSON.parseObject(json, ConsumerUnSubscribeReq.class);
return registerConsumerService.unSubscribe(req, channel);
}
// 消费者主动 pull
if (MethodType.C_MESSAGE_PULL.equals(methodType)) {
registerConsumerService.checkValid(channelId);
MqConsumerPullReq req = JSON.parseObject(json, MqConsumerPullReq.class);
return mqBrokerPersist.pull(req, channel);
}
// 消费者心跳
if (MethodType.C_HEARTBEAT.equals(methodType)) {
registerConsumerService.checkValid(channelId);
MqHeartBeatReq req = JSON.parseObject(json, MqHeartBeatReq.class);
registerConsumerService.heartbeat(req, channel);
return null;
}
// 消费者消费状态 ACK
if (MethodType.C_CONSUMER_STATUS.equals(methodType)) {
registerConsumerService.checkValid(channelId);
MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class);
final String messageId = req.getMessageId();
final String messageStatus = req.getMessageStatus();
final String consumerGroupName = req.getConsumerGroupName();
return mqBrokerPersist.updateStatus(messageId, consumerGroupName, messageStatus);
}
//消费者消费状态 ACK-批量
if (MethodType.C_CONSUMER_STATUS_BATCH.equals(methodType)) {
registerConsumerService.checkValid(channelId);
MqConsumerUpdateStatusBatchReq req = JSON.parseObject(json, MqConsumerUpdateStatusBatchReq.class);
final List statusDtoList = req.getStatusList();
return mqBrokerPersist.updateStatusBatch(statusDtoList);
}
log.error("暂时不支持的方法类型 {}", methodType);
throw new MqException(MqBrokerRespCode.B_NOT_SUPPORT_METHOD);
} catch (MqException mqException) {
log.error("业务执行异常", mqException);
MqCommonResp resp = new MqCommonResp();
resp.setRespCode(mqException.getCode());
resp.setRespMessage(mqException.getMsg());
return resp;
} catch (Exception exception) {
log.error("执行异常", exception);
MqCommonResp resp = new MqCommonResp();
resp.setRespCode(MqCommonRespCode.FAIL.getCode());
resp.setRespMessage(MqCommonRespCode.FAIL.getMsg());
return resp;
}
}
/**
* 处理生产者发送的消息
*
* @param channelId 通道标识
* @param json 消息体
*/
private MqCommonResp handleProducerSendMsg(String channelId,String json){
MqMessage mqMessage = JSON.parseObject(json,MqMessage.class);
MqMessagePersistPut persistPut = new MqMessagePersistPut();
persistPut.setMqMessage(mqMessage);
persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER);
//构建rpc信息
final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId);
persistPut.setRpcAddress(serviceEntry);
MqCommonResp commonResp = mqBrokerPersist.put(persistPut);
return commonResp;
}
/**
* 异步处理消息
* @param put 信息
*/
private void asyncHandleMessage(MqMessagePersistPut put){
final MqMessage mqMessage = put.getMqMessage();
List channelList = registerConsumerService.getPushSubscribeList(mqMessage);
if(CollectionUtil.isEmpty(channelList)){
log.info("监听列表为空,忽略处理");
return;
}
BrokerPushContext brokerPushContext = BrokerPushContext.newInstance()
.channelList(channelList)
.mqMessagePersistPut(put)
.mqBrokerPersist(mqBrokerPersist)
.invokeService(invokeService)
.respTimeoutMills(respTimeoutMills)
.pushMaxAttempt(pushMaxAttempt);
brokerPushService.asyncPush(brokerPushContext);
}
/**
* 处理生产者批量发送消息
* @param channelId 通道标识
* @param json 消息体
*/
private MqCommonResp handleProducerSendMsgBatch(String channelId,String json){
MqMessageBatchReq batchReq = JSON.parseObject(json,MqMessageBatchReq.class);
final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId);
List putList = buildPersistPutList(batchReq,serviceEntry);
MqCommonResp commonResp = mqBrokerPersist.putBatch(putList);
//遍历异步推送
for(MqMessagePersistPut persistPut : putList){
this.asyncHandleMessage(persistPut);
}
return commonResp;
}
/**
* 构建列表
* @param batchReq 入参
* @param serviceEntry 实例
* @return
*/
private List buildPersistPutList(MqMessageBatchReq batchReq,
final ServiceEntry serviceEntry){
List resultList = new ArrayList<>();
//构建列表
List messageList = batchReq.getMqMessageList();
for(MqMessage mqMessage : messageList){
MqMessagePersistPut put = new MqMessagePersistPut();
put.setRpcAddress(serviceEntry);
put.setMessageStatus(MessageStatusConst.WAIT_CONSUMER);
put.setMqMessage(mqMessage);
resultList.add(put);
}
return resultList;
}
/**
* 结果写回
* @param req 请求
* @param resp 响应
* @param ctx 上下文
*/
private void writeResponse(RpcMessageDto req,
Object resp,
ChannelHandlerContext ctx){
final String id = ctx.channel().id().asLongText();
RpcMessageDto rpcMessageDto = new RpcMessageDto();
//响应类消息
rpcMessageDto.setRequest(false);
rpcMessageDto.setTraceId(req.getTraceId());
rpcMessageDto.setMethodType(req.getMethodType());
rpcMessageDto.setRequestTime(System.currentTimeMillis());
String json = JSON.toJSONString(resp);
rpcMessageDto.setJson(json);
//回写到client端
ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
ctx.writeAndFlush(byteBuf);
log.debug("[Server] channel {} response {}",id,JSON.toJSON(rpcMessageDto));
}
}
public class BrokerPushService implements IBrokerPushService {
private static final Log log = LogFactory.getLog(BrokerPushService.class);
private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
@Override
public void asyncPush(final BrokerPushContext context) {
EXECUTOR_SERVICE.submit(new Runnable() {
@Override
public void run() {
log.info("开始异步处理 {}", JSON.toJSON(context));
final MqMessagePersistPut persistPut = context.mqMessagePersistPut();
final MqMessage mqMessage = persistPut.getMqMessage();
final List channelList = context.channelList();
final IMqBrokerPersist mqBrokerPersist = context.mqBrokerPersist();
final IInvokeService invokeService = context.invokeService();
final long responseTime = context.respTimeoutMills();
final int pushMaxAttempt = context.pushMaxAttempt();
//更新状态为处理中
final String messageId = mqMessage.getTraceId();
log.info("开始更新消息为处理中 :{}",messageId);
for(final ChannelGroupNameDto channelGroupNameDto : channelList){
final Channel channel = channelGroupNameDto.getChannel();
final String consumerGroupName = channelGroupNameDto.getConsumerGroupName();
try {
mqBrokerPersist.updateStatus(messageId,consumerGroupName, MessageStatusConst.TO_CONSUMER_PROCESS);
String channelId = ChannelUtil.getChannelId(channel);
log.info("开始处理 channelId :{}",channelId);
//1.调用
mqMessage.setMethodType(MethodType.B_MESSAGE_PUSH);
//重试推送
MqConsumerResultResp resultResp = Retryer.newInstance()
.maxAttempt(pushMaxAttempt)
.callable(new Callable() {
@Override
public MqConsumerResultResp call() throws Exception {
MqConsumerResultResp resp = callServer(channel,mqMessage,
MqConsumerResultResp.class,invokeService,responseTime);
//失败校验
if(resp == null
|| !ConsumerStatus.SUCCESS.getCode().equals(resp.getConsumerStatus())){
throw new MqException(BrokerRespCode.MSG_PUSH_FAILED);
}
return resp;
}
}).retryCall();
//2.更新状态
//2.1 处理成功,取push消费状态
if(MqCommonRespCode.SUCCESS.getCode().equals(resultResp.getRespCode())){
mqBrokerPersist.updateStatus(messageId,consumerGroupName,resultResp.getConsumerStatus());
}else {
//2.2处理失败
log.error("消费失败 :{}",JSON.toJSON(resultResp));
mqBrokerPersist.updateStatus(messageId,consumerGroupName,MessageStatusConst.TO_CONSUMER_FAILED);
}
log.info("完成处理 channelId :{}",channelId);
}catch (Exception e){
log.error("处理异常");
mqBrokerPersist.updateStatus(messageId,consumerGroupName,MessageStatusConst.TO_CONSUMER_FAILED);
}
}
log.info("完成异步处理");
}
});
}
/**
* 调用服务端
* @param channel 调用通道
* @param commonRep 通用请求
* @param respClass 类
* @param invokeService 调用管理类
* @param respTimeoutMills 响应超时时间
* @param 泛型
* @param 结果
* @return 结果
*/
private R callServer(Channel channel,
T commonRep,
Class respClass,
IInvokeService invokeService,
long respTimeoutMills){
final String traceId = commonRep.getTraceId();
final long requestTime = System.currentTimeMillis();
RpcMessageDto rpcMessageDto = new RpcMessageDto();
rpcMessageDto.setTraceId(traceId);
rpcMessageDto.setRequestTime(requestTime);
rpcMessageDto.setJson(JSON.toJSONString(commonRep));
rpcMessageDto.setMethodType(commonRep.getMethodType());
rpcMessageDto.setRequest(true);
//添加调用服务
invokeService.addRequest(traceId,respTimeoutMills);
//遍历channel
//关闭当前线程,以获取对应的信息
//使用序列化方式
ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
//负载均衡获取channel
channel.writeAndFlush(byteBuf);
String channelId = ChannelUtil.getChannelId(channel);
log.debug("[Client] channelId {} 发送消息 {}",channelId,JSON.toJSON(rpcMessageDto));
if(respClass == null){
log.debug("[Client] 当前消息为 one-way消息,忽略响应");
return null;
}else {
//channelHandler 中获取对应的响应
RpcMessageDto messageDto = invokeService.getResponse(traceId);
if(MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())){
throw new MqException(MqCommonRespCode.TIMEOUT);
}
String respJson = messageDto.getJson();
return JSON.parseObject(respJson,respClass);
}
}
}
消费者处理默认实现
getSubscribeList
的逻辑可能稍微复杂点,其实就是消息过来,找到匹配的订阅消费者而已。
因为同一个 groupName 的消费者消息只消费一次,所以需要一次分组。
public class LocalBrokerConsumerService implements IBrokerConsumerService {
public LocalBrokerConsumerService(){
//120s扫描一次
final long limitMills = 2*60*1000;
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for(Map.Entry entry : heartbeatMap.entrySet()){
String key = entry.getKey();
long lastAccessTime = entry.getValue().getLastAccessTime();
long currentTime = System.currentTimeMillis();
if(currentTime - lastAccessTime > limitMills){
removeByChannelId(key);
}
}
}
},2*60,2*60, TimeUnit.SECONDS);
}
/**
* 根据channelId 移除信息
* @param channelId
*/
private void removeByChannelId(final String channelId){
BrokerServiceEntryChannel channelRegister = registerMap.remove(channelId);
log.info("移除注册信息 id :{},channel :{}",channelId, JSON.toJSON(channelId));
BrokerServiceEntryChannel channelHeartBeat = heartbeatMap.remove(channelId);
log.info("移除心跳信息 id :{},channel :{}",channelId,JSON.toJSON(channelHeartBeat));
}
@Override
public void loadBalance(ILoadBalance loadBalance) {
this.loadBalance = loadBalance;
}
@Override
public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) {
final String channelId = ChannelUtil.getChannelId(channel);
BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);
registerMap.put(channelId, entryChannel);
entryChannel.setLastAccessTime(System.currentTimeMillis());
heartbeatMap.put(channelId, entryChannel);
MqCommonResp resp = new MqCommonResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return resp;
}
@Override
public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) {
final String channelId = ChannelUtil.getChannelId(channel);
removeByChannelId(channelId);
MqCommonResp resp = new MqCommonResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return resp;
}
@Override
public MqCommonResp subscribe(ConsumerSubscribeReq serviceEntry, Channel clientChannel) {
final String channelId = ChannelUtil.getChannelId(clientChannel);
final String topicName = serviceEntry.getTopicName();
final String consumerType = serviceEntry.getConsumerType();
Map> subscribeMap = getSubscribeMapByConsumerType(consumerType);
ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo();
subscribeBo.setChannelId(channelId);
subscribeBo.setGroupName(serviceEntry.getGroupName());
subscribeBo.setTopicName(topicName);
subscribeBo.setTagRegex(serviceEntry.getTagRegex());
//放入集合
MapUtil.putToSetMap(subscribeMap,topicName,subscribeBo);
MqCommonResp resp = new MqCommonResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return resp;
}
private Map> getSubscribeMapByConsumerType(String consumerType){
return pushSubscribeMap;
}
@Override
public MqCommonResp unSubscribe(ConsumerUnSubscribeReq serviceEntry, Channel clientChannel) {
final String channelId = ChannelUtil.getChannelId(clientChannel);
final String topicName = serviceEntry.getTopicName();
final String consumerType = serviceEntry.getConsumerType();
Map> subscribeMap = getSubscribeMapByConsumerType(consumerType);
ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo();
subscribeBo.setChannelId(channelId);
subscribeBo.setGroupName(serviceEntry.getGroupName());
subscribeBo.setTopicName(topicName);
subscribeBo.setTagRegex(serviceEntry.getTagRegex());
//集合
Set set = subscribeMap.get(topicName);
if(CollectionUtil.isNotEmpty(set)){
set.remove(subscribeBo);
}
MqCommonResp resp = new MqCommonResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return resp;
}
@Override
public List getPushSubscribeList(MqMessage mqMessage) {
final String topicName = mqMessage.getTopic();
Set set = pushSubscribeMap.get(topicName);
if(CollectionUtil.isEmpty(set)){
return Collections.emptyList();
}
//2.获取匹配的tag列表
final List tagNameList = mqMessage.getTags();
Map> groupMap = new HashMap<>();
for(ConsumerSubscribeBo bo:set){
String tagRegex = bo.getTagRegex();
if(RegexUtil.hasMatch(tagNameList,tagRegex)){
String groupName = bo.getGroupName();
MapUtil.putToListMap(groupMap,groupName,bo);
}
}
//3.按照groupName分组之后,每一组只随机返回一个,最好应该调整为以shardingkey 选择
final String shardingKey = mqMessage.getShardingKey();
List channelGroupNameList = new ArrayList<>();
for(Map.Entry> entry : groupMap.entrySet()){
List list = entry.getValue();
ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance,list,shardingKey);
final String channelId = bo.getChannelId();
BrokerServiceEntryChannel entryChannel = registerMap.get(channelId);
if(entryChannel == null){
log.warn("channelId :{} 对应的通道信息为空",channelId);
continue;
}
final String groupName = entry.getKey();
ChannelGroupNameDto channelGroupNameDto = ChannelGroupNameDto.of(groupName, entryChannel.getChannel());
channelGroupNameList.add(channelGroupNameDto);
}
return channelGroupNameList;
}
@Override
public void heartbeat(MqHeartBeatReq mqHeartBeatReq, Channel channel) {
final String channelId = ChannelUtil.getChannelId(channel);
log.info("[HEARTBEAT] 接受消费者心跳 {}, channelId :{}",
JSON.toJSON(mqHeartBeatReq),channelId);
}
@Override
public void checkValid(String channelId) {
if(!registerMap.containsKey(channelId)){
log.error("channelId :{} 未注册",channelId);
throw new MqException(MqBrokerRespCode.C_REGISTER_CHANNEL_NOT_VALID);
}
}
}
生产者处理默认实现
/**
* 生产者注册服务类
*/
public class LocalBrokerProducerService implements IBrokerProducerService {
private static final Log log = LogFactory.getLog(LocalBrokerProducerService.class);
private final Map registerMap = new ConcurrentHashMap<>();
@Override
public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) {
final String channelId = ChannelUtil.getChannelId(channel);
BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry,channel);
registerMap.put(channelId,entryChannel);
MqCommonResp resp = new MqCommonResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return resp;
}
@Override
public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) {
final String channelId = ChannelUtil.getChannelId(channel);
registerMap.remove(channelId);
MqCommonResp resp = new MqCommonResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return resp;
}
@Override
public ServiceEntry getServiceEntry(String channelId) {
return registerMap.get(channelId);
}
@Override
public void checkValid(String channelId) {
if(!registerMap.containsKey(channelId)){
log.error("channelId :{} 未注册",channelId);
throw new MqException(MqBrokerRespCode.P_REGISTER_CHANNEL_NOT_VALID);
}
}
}
调用管理类
/**
* 调用服务接口
*/
public class InvokeService implements IInvokeService {
private static final Log logger = LogFactory.getLog(IInvokeService.class);
/**
* 请求序列号map
* (1)这里后期如果要添加超时检测,可以添加对应的超时时间
* 可以把这里调整为map
*
* key:seqId 唯一标识一个请求
* value: 存入该请求最厂的有效时间,用于定时删除和超时判断
*/
private final ConcurrentHashMap requestMap;
/**
* 响应结果
*/
private final ConcurrentHashMap responseMap;
public InvokeService() {
this.requestMap = new ConcurrentHashMap<>();
this.responseMap = new ConcurrentHashMap<>();
final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap);
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(timeoutThread, 60, 60, TimeUnit.SECONDS);
}
@Override
public IInvokeService addRequest(String seqId, long timeoutMills) {
logger.debug("[Invoke] start add request for seqId :{}, timeoutMills:{}", seqId, timeoutMills);
final long expireTime = System.currentTimeMillis() + timeoutMills;
requestMap.putIfAbsent(seqId, expireTime);
return this;
}
@Override
public IInvokeService addResponse(String seqId, RpcMessageDto rpcResponse) {
//1.判断是否有效
Long expireTime = this.requestMap.get(seqId);
//如果为空,可能已经超时,被定时job移除之后,响应结果才过来,直接忽略
if (ObjectUtil.isNull(expireTime))
return this;
//2.判断是否超时
if (System.currentTimeMillis() > expireTime) {
logger.debug("[Invoke] seqId :{} 信息已超时,直接返回超时结果");
rpcResponse = RpcMessageDto.timeout();
}
//这里放入之前可以添加判断
//如果seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃
//通知所有等待方
responseMap.putIfAbsent(seqId, rpcResponse);
logger.debug("[Invoke] 获取结果信息,seqId:{},rpcResponse:{}", seqId, rpcResponse);
logger.debug("[Invoke] seqId:{} 信息已放入,通知所有等待方", seqId);
//移除对应的requestMap
requestMap.remove(seqId);
logger.debug("[Invoke] seqId:{} remove from request map,", seqId);
//同步锁
synchronized (this) {
this.notifyAll();
logger.debug("[Invoke] {} notifyAll()", seqId);
}
return this;
}
@Override
public RpcMessageDto getResponse(String seqId) {
try {
RpcMessageDto rpcResponse = this.responseMap.get(seqId);
if (ObjectUtil.isNotNull(rpcResponse)) {
logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
return rpcResponse;
}
//进入等待
while (rpcResponse == null) {
logger.debug("[Invoke] seq {} 对应结果为空,进入等待", seqId);
//同步等待锁
synchronized (this) {
this.wait();
}
logger.debug("[Invoke] {} wait has notified!", seqId);
rpcResponse = this.responseMap.get(seqId);
logger.debug("[Invoke] seq {} 对应结果已经获取 : {}", seqId, rpcResponse);
}
return rpcResponse;
} catch (InterruptedException ex) {
logger.error("获取响应异常", ex);
throw new MqException(MqCommonRespCode.RPC_GET_RESP_FAILED);
}
}
@Override
public boolean remainRequest () {
return this.requestMap.size() > 0;
}
}
这里 getResponse 获取不到会进入等待,直到 addResponse 唤醒。
但是问题是如果请求的响应丢失了怎么办?可以通过添加超时检测线程解决。
超时检测线程/**
* 超时检测线程
*/
public class TimeoutCheckThread implements Runnable {
/**
* 请求信息
*/
private final ConcurrentHashMap requestMap;
/**
* 请求信息
*/
private final ConcurrentHashMap responseMap;
/**
* 新建
* @param requestMap 请求Map
* @param responseMap 结果map
*/
public TimeoutCheckThread(ConcurrentHashMap requestMap,
ConcurrentHashMap responseMap){
ArgUtil.notNull(requestMap,"requestMap");
this.requestMap = requestMap;
this.responseMap = responseMap;
}
@Override
public void run() {
for(Map.Entry entry:requestMap.entrySet()){
long expireTime = entry.getValue();
long currentTime = System.currentTimeMillis();
if(currentTime>expireTime){
final String key = entry.getKey();
//结果设置为超时,从请求map中一处
responseMap.putIfAbsent(key,RpcMessageDto.timeout());
requestMap.remove(key);
}
}
}
}
解决netty粘包问题
通过指定分隔符解决
public class DelimiterUtil {
private DelimiterUtil(){}
/**
* 分隔符
*/
public static final String DELIMITER = "~!@#$%^&*";
/**
* 长度
*
* 这个长度是必须的,避免把缓冲区打爆
*/
public static final int LENGTH = 65535;
/**
* 分隔符buffer
*/
public static final ByteBuf DELIMITER_BUF = Unpooled.copiedBuffer(DELIMITER.getBytes());
/**
* 获取对应的字节缓存
* @param text 文本
* @return
*/
public static ByteBuf getByteBuf(String text){
return Unpooled.copiedBuffer(text.getBytes());
}
/**
* 获取消息
* @param rpcMessageDto 消息体
* @return
*/
public static ByteBuf getMessageDelimiterBuffer(RpcMessageDto rpcMessageDto){
String json = JSON.toJSONString(rpcMessageDto);
String jsonDelimiter = json+DELIMITER;
return Unpooled.copiedBuffer(jsonDelimiter.getBytes());
}
}
消息持久化
这里只是通过map持久化进行简化实现,还可以通过springboot + mysql实现。
/**
* 本地持久化策略
*/
public class LocalMqBrokerPersist implements IMqBrokerPersist {
private static final Log log = LogFactory.getLog(LocalMqBrokerPersist.class);
/**
* 队列
* ps:这里只是简化实现,暂时不考虑并发等问题
*/
private final Map> map = new ConcurrentHashMap<>();
//1.接收
//2.持久化
//3.通知消费
@Override
public synchronized MqCommonResp put(MqMessagePersistPut mqMessage) {
this.doPut(mqMessage);
MqCommonResp commonResp = new MqCommonResp();
commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return commonResp;
}
private void doPut(MqMessagePersistPut put){
log.info("put elem :{}", JSON.toJSON(put));
MqMessage mqMessage = put.getMqMessage();
final String topic = mqMessage.getTopic();
//放入元素
MapUtil.putToListMap(map,topic,put);
}
@Override
public MqCommonResp putBatch(List putList) {
//构建列表
for(MqMessagePersistPut put:putList){
this.doPut(put);
}
MqCommonResp commonResp = new MqCommonResp();
commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
return commonResp;
}
@Override
public MqCommonResp updateStatus(String messageId, String consumerGroupName, String status) {
//这里性能较差,所以不可以用于生产,仅作为测试验证
this.doUpdateStatus(messageId,consumerGroupName,status);
MqCommonResp commonResp = new MqCommonResp();
commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return commonResp;
}
private void doUpdateStatus(String messageId, String consumerGroupName,String status){
//这里性能比较差,所以不可以用于生产。仅作为测试验证
for(List list : map.values()){
for(MqMessagePersistPut put : list){
MqMessage mqMessage = put.getMqMessage();
if(mqMessage.getTraceId().equals(messageId)){
put.setMessageStatus(status);
break;
}
}
}
}
@Override
public MqCommonResp updateStatusBatch(List statusDtoList) {
for(MqConsumerUpdateStatusDto statusDto : statusDtoList){
this.doUpdateStatus(statusDto.getMessageId(),statusDto.getConsumerGroupName(),
statusDto.getMessageStatus());
}
MqCommonResp commonResp = new MqCommonResp();
commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return commonResp;
}
@Override
public MqConsumerPullResp pull(MqConsumerPullReq pull, Channel channel) {
//1.拉取匹配的信息
//2.状态更新为代理中
//3.如何更新对应的消费状态呢?
//获取状态为W的订单
final int fetchSize = pull.getSize();
final String topic = pull.getTopicName();
final String tagRegex = pull.getTagRegex();
List resultList = new ArrayList<>(fetchSize);
List putList = map.get(topic);
//性能较差
if(CollectionUtil.isNotEmpty(putList)){
for(MqMessagePersistPut put : putList){
if(!isEnableStatus(put)){
continue;
}
final MqMessage mqMessage = put.getMqMessage();
List tagList = mqMessage.getTags();
if(RegexUtil.hasMatch(tagList,tagRegex)){
//设置为处理中
//TODO:消息的最终状态什么时候更新呢?
//可以给broker一个ACK
put.setMessageStatus(MessageStatusConst.TO_CONSUMER_PROCESS);
resultList.add(mqMessage);
}
if (resultList.size() >= fetchSize){
break;
}
}
}
MqConsumerPullResp resp = new MqConsumerPullResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
resp.setList(resultList);
return resp;
}
private boolean isEnableStatus(final MqMessagePersistPut persistPut){
final String status = persistPut.getMessageStatus();
//数据库可以设计一个字段,比如待消费时间,进行排序
//这里只是简化时间,仅用于测试
List statusList = Arrays.asList(MessageStatusConst.WAIT_CONSUMER,MessageStatusConst.CONSUMER_LATER);
return statusList.contains(status);
}
}
实现优雅关闭
public class DefaultShutdownHook extends AbstractShutdownHook {
/**
* (1)设置status 状态为关闭
* (2)查看是否{@link IInvokeService#remainRequest()}包含请求
* (3)超时检测-可以不添加,如果难以关闭成功,直接强制关闭即可
* (4)关闭所有线程池资源信息
* (5)设置状态为成功关闭
*/
@Override
protected void doHook() {
//设置状态为等待关闭
statusManager.status(false);
logger.info("[Shutdown] set status to wait for shutdown.");
//循环等待当前执行的请求执行完成
long startMills = System.currentTimeMillis();
while (invokeService.remainRequest()){
long currentMills = System.currentTimeMillis();
long costMills = currentMills - startMills;
if(costMills >= waitMillsForRemainRequest){
logger.warn("[Shutdown] still remains request, but timeout, break.");
break;
}
logger.debug("[Shutdown] still remains request,wait for a while");
DateUtil.sleep(100);
}
//销毁
destroyable.destroyAll();
//设置状态为关闭成功
statusManager.status(false);
logger.info("[Shutdown] set status to shutdown success");
}
}
心跳检测
心跳实现
心跳可以是一个很简单的消息体。
@Override
public void heartbeat() {
final MqHeartBeatReq req = new MqHeartBeatReq();
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_HEARTBEAT);
req.setAddress(NetUtil.getLocalHost());
req.setPort(0);
req.setTime(System.currentTimeMillis());
log.debug("[HEARTBEAT] 往服务端发送心跳包 {}", JSON.toJSON(req));
// 通知全部
for(RpcChannelFuture channelFuture : channelFutureList) {
try {
Channel channel = channelFuture.getChannelFuture().channel();
callServer(channel, req, null);
} catch (Exception exception) {
log.error("[HEARTBEAT] 往服务端处理异常", exception);
}
}
}
消费者把心跳通知所有的 broker.
心跳的定时执行我们启动一个定时任务,5S 钟执行一次。
/**
* 初始化心跳
* @since 0.0.6
*/
private void initHeartbeat() {
//5S 发一次心跳
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
heartbeat();
}
}, 5, 5, TimeUnit.SECONDS);
}
心跳是在连接到 broker 之后就开始启动:
@Override
public void initChannelFutureList(ConsumerBrokerConfig config) {
//1. 配置初始化
//...
//2. 初始化
this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,
initChannelHandler(), check);
//3. 初始化心跳
this.initHeartbeat();
}
Broker 实现
消费者定时发送消息,生产者肯定是需要接受的。
接收心跳为了简单,我们让心跳是 ONE-WAY 的。
// 消费者心跳
if(MethodType.C_HEARTBEAT.equals(methodType)) {
MqHeartBeatReq req = JSON.parseObject(json, MqHeartBeatReq.class);
registerConsumerService.heartbeat(req, channel);
return null;
}
hearbeat 处理
每次收到消息,我们把请求的 channelId 记录下来,并保存最新的访问时间
@Override
public void heartbeat(MqHeartBeatReq mqHeartBeatReq, Channel channel) {
final String channelId = ChannelUtil.getChannelId(channel);
log.info("[HEARTBEAT] 接收消费者心跳 {}, channelId: {}",
JSON.toJSON(mqHeartBeatReq), channelId);
ServiceEntry serviceEntry = new ServiceEntry();
serviceEntry.setAddress(mqHeartBeatReq.getAddress());
serviceEntry.setPort(mqHeartBeatReq.getPort());
BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);
entryChannel.setLastAccessTime(mqHeartBeatReq.getTime());
heartbeatMap.put(channelId, entryChannel);
}
移除消费者
如果一些消费者长时间没有心跳,我们就认为服务已经挂了。
在 LocalBrokerConsumerService
服务启动的时候,同时启用一个定时清理任务。
public LocalBrokerConsumerService() {
//120S 扫描一次
final long limitMills = 2 * 60 * 1000;
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for(Map.Entry entry : heartbeatMap.entrySet()) {
String key = entry.getKey();
long lastAccessTime = entry.getValue().getLastAccessTime();
long currentTime = System.currentTimeMillis();
if(currentTime - lastAccessTime > limitMills) {
removeByChannelId(key);
}
}
}
}, 2 * 60, 2 * 60, TimeUnit.SECONDS);
}
这个任务 2min 执行一次,如果 2min 都没有心跳,这移除对应的消费者。
负载均衡负载均衡
之前发的博客中有写过,这里直接调接口就行。此处不再详细介绍。
MQ 中用到负载均衡的地方 生产者发送生产者发送消息时,可以发送给任一 broker。
broker 推送给消费者broker 接收到消息以后,在推送给消费者时,也可以任一选择一个。
消费者的消费 ACK消费者消费完,状态回执给 broker,可以选择任一一个。
消息黏连有些消息比较特殊,比如需要保证消费的有序性,可以通过 shardingKey 的方式,在负载的时候固定到指定的片区。
注册鉴权我们前面实现了 mq 的基本功能,不过还是存在一个问题。
那就是 mq 没有进行鉴权。
这就会导致如果部署在公网,任何一个机器都可以连接我们的服务,这显然是不合理的。
作者使用的是通过设置用户密码的方式,在生产者注册到中间人的时候,添加这两个属性。消费者也是同理,在发送消息,推送消息,拉取消息时,进行账号密码的验证。十分简单。
同时也可以通过token的方式实现鉴权,更加的安全,不过实现可能复杂一些。
总结一些项目特性如下-
check broker 启动检测
-
关闭时通知 register center
-
优雅关闭添加超时设置
-
heartbeat 心跳检测机制
-
完善 load-balance 实现 + shardingkey 粘性消费、请求
-
失败重试的拓展
-
消费者 pull 策略实现
-
pull 消息消费的 ACK 处理
-
broker springboot 实现
-
消息的 ack 处理,要基于 groupName 进行处理
-
消息的回溯消费 offset
-
消息的批量发送,批量 ACK
-
添加注册鉴权,保证安全性
-
顺序消息
-
事务消息
-
定时消息
-
流量控制 back-press 反压
-
消息可靠性
-
offline message 离线消息
-
dead message 死信队列
-
断线重连
项目地址:Zzzzs1/MQ-demo: java实现mq (github.com)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)