- 消息驱动
MQ使用场景:异步处理,流量削峰填谷,解耦微服务
-
安装
-
下载 http://rocketmq.apache.org/release_notes/release-notes-4.5.1/
-
解压
unzip rocketmq-all-4.5.1-bin-release.zip
-
切换目录到RocketMQ根目录
cd rocketmq-all-4.5.1-bin-release
-
启动Name Server
nohup sh bin/mqnamesrv &
-
启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
-
验证name server是否启动OK:
tail -f ~/logs/rocketmqlogs/namesrv.log
-
验证broker是否启动OK:
tail -f ~/logs/rocketmqlogs/broker.log
-
停止name server
sh bin/mqshutdown namesrv
-
停止broker
sh bin/mqshutdown broker
-
控制台的安装:
-
参考 http://www.imooc.com/article/290092
-
我项目中使用了懒人包,不过最好两个都看一下,试验一下。
编写生产者
- 依赖
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.0.3version>
dependency>
- 注解:无
- 配置:
rocketmq:
name-server: 127.0.0.1:9876
producer:
#必须指定group
group: test-group
- 测试:
package com.zstu.contentcenter.controller.content;
import com.zstu.contentcenter.domain.dto.content.ShareAuditDTO;
import com.zstu.contentcenter.domain.entity.content.Share;
import com.zstu.contentcenter.service.content.ShareService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* TODO
*
* @author darksideinxx
* @date 2022/4/22 12:44
*/
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@RequestMapping("/admin/shares")
public class ShareAdminController {
private final ShareService shareService;
@PutMapping("/audit/{id}")
public Share auditById(@PathVariable Integer id,
@RequestBody ShareAuditDTO shareAuditDTO) {
//TODO 认证授权
return this.shareService.auditById(id, shareAuditDTO);
}
}
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class ShareService {
private final ShareMapper shareMapper;
private final UserCenterFeignClient userCenterFeignClient;
private final RocketMQTemplate rocketMQTemplate;
public ShareDTO findById(Integer id) {
Share share = this.shareMapper.selectByPrimaryKey(id);
//获取发布人的id
Integer userId = share.getUserId();
UserDTO userDTO = this.userCenterFeignClient.findById(userId);
//拿到用户中心的所有实例信息
//怎么调用用户微服务的/users/{userId}?
//消息的装配
ShareDTO shareDTO = new ShareDTO();
BeanUtils.copyProperties(share, shareDTO);
shareDTO.setWxNickname(userDTO.getWxNickname());
return shareDTO;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ShareAuditDTO {
/**
* 审核状态
*/
private AuditStatusEnum auditStatusEnum;
/**
* 原因
*/
private String reason;
}
@Getter
@AllArgsConstructor
public enum AuditStatusEnum {
/**
* 待审核
*/
NOT_YET,
/**
* 审核通过
*/
PASS,
/**
* 审核不通过
*/
REJECT
}
然后我们编写user-center
package com.zstu.usercenter.rocketmq;
import com.zstu.usercenter.dao.bonus.BonusEventLogMapper;
import com.zstu.usercenter.dao.user.UserMapper;
import com.zstu.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.zstu.usercenter.domain.entity.bonus.BonusEventLog;
import com.zstu.usercenter.domain.entity.user.User;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* TODO
*
* @author DARKSIDEinxx
* @date 2022/4/23 17:01
*/
@Service
//注意:这里的topic一定要和content-center发送过来的消息一致 这里的consumerGroup和生产者的group一样的,可以随便填写,只是生产者是放在yml文件
//消费者是放在注释里面的
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "add-bonus")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
private final UserMapper userMapper;
private final BonusEventLogMapper bonusEventLogMapper;
/**
* @param
* @return
* @author DARKSIDEinxx
* @description 执行业务
* @date 2022/4/23 17:03
*/
@Override
public void onMessage(UserAddBonusMsgDTO userAddBonusMsgDTO) {
//当收到消息的时候,执行业务
//1.为用户加积分
Integer userId = userAddBonusMsgDTO.getUserId();
Integer bonus = userAddBonusMsgDTO.getBonus();
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + userAddBonusMsgDTO.getBonus());
this.userMapper.updateByPrimaryKey(user);
//2.记录日志到bonus_event_log表里面
this.bonusEventLogMapper.insert(
BonusEventLog.builder()
.userId(userId)
.value(bonus)
.event("contribute")
.createTime(new Date())
.description("contribute add bonus")
.build()
);
}
}
package com.zstu.usercenter.domain.dto.message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* TODO
*
* @author darksideinxx
* @date 2022/4/22 17:39
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class UserAddBonusMsgDTO {
/**
* 为谁加积分
*/
private Integer userId;
/**
* 加多少积分
*/
private Integer bonus;
}
然后我们测试运行一下:
我们发现积分已经变成了150
总结:
当然我们可能不是用的rocketmq,可能是别的,
如果使用的是rocketmq:RocketMQMessageListener
如果是ActiveMQ or Artemis :JmsListener;
如果是RabbitMQ:RabbitListener;
如果是Kafka:KafkaListener
事务
@Transactional(rollbackFor = Exception.class)
public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {
//1.查询share是否存在,不存在或者当前的audit-status!=noyet 那么抛异常
System.out.println("---------");
log.info("id={},share = {}",id,shareAuditDTO);
Share share = this.shareMapper.selectByPrimaryKey(id);
System.out.println("---------");
log.info("---------{}",share);
if (share == null) {
throw new IllegalArgumentException("参数非法,该分享不存在");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数非法,该分享已审核通过或者审核不通过");
}
//2,审核资源,将状态设为pass/reject
share.setAuditStatus(shareAuditDTO.getAuditStatusEnum().toString());
this.shareMapper.updateByPrimaryKey(share);
//3. 如果是pass,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
//目标 topic;消息体:
this.rocketMQTemplate.convertAndSend(
"add-bonus",
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build());
//他是一个附属 *** 作。可以异步执行
//4.把share写到缓存
return share;
}
如果这个事务发生异常,那么这个事务就会回滚
但是,这里我们需要理解一个问题,就是我的事务在前三个都是正常的运行的,运行到第四步,share写到缓存是时候失败了,那么事务就会回滚,但是了,我消息已经发出去了,积分已经加了,这样就造成了乌龙
rocketmq提供了事务消息,来解决这个消息
- 第一步:发送半消息:生产者发送半消息给mqserver,半消息是一种特殊的消息,这种消息存储到mqserver中,但是会被标记位不能投递状态,所以消费者不会接收到这种消息。
- 什么情况会导致第四部的确认消息不能成功发送?举例:在发送二次确认的瞬间断网了,或者应用重启了
- 简单描述这个步骤就是:生产者发送消息给mqserver,但是mqserver先标记了这个消息,不让人发送这条消息,然后生产者去执行本地事务,然后可以知道是应该投递这条消息还是回滚这条消息,如果本地消息执行成功的话,就让mqserver投递这条消息,否则就把这条消息删掉(5,6,7这三条消息回查,其实是防止在特殊情况下mqserver没有收到二次确认做的容错处理)
- content-center
package com.zstu.contentcenter.rocketmq;
import com.zstu.contentcenter.dao.messaging.RocketmqTransationLogMapper;
import com.zstu.contentcenter.domain.dto.content.ShareAuditDTO;
import com.zstu.contentcenter.domain.entity.messaging.RocketmqTransationLog;
import com.zstu.contentcenter.service.content.ShareService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.Arg;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import javax.xml.transform.Source;
/**
* TODO
*
* @author DARKSIDEinxx
* @date 2022/4/24 20:50
*/
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransationLogMapper rocketmqTransationLogMapper;
/**
* 执行本地事务
* 这里的message是shareServer的MessageBuilder,这里的object o是shareServer中的arg,
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
/**
*拿到headers
*/
MessageHeaders headers = message.getHeaders();
/**
*拿到shareserver中的transactionId
*/
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
/**
*获取share_id 的id
*/
Integer shareId = Integer.valueOf(headers.get("shareId").toString());
try {
this.shareService.auditByIdWithRocketMqLog(shareId, (ShareAuditDTO) o, transactionId);
/**
*这里我们需要注意一点的是:为什么需要回查?
* 假如代码走到了这里,还没有return的那个瞬间,就断网了,或者这个内容中心挂了,或者正在重启。
* 本地事务已经提交了,但是还没有来的及告诉rocketmq,要把消息提交,就挂了,
* 所以rocketmq会定时的请求内容中心,让其告诉本事事务状态是什么
* checkLocalTransaction()这个函数就是执行这个功能的
* 我根据什么东西来检查本地事务的状态?所以这里利用到刚才创建的表,rocketmq_transaction_log
* 可以在执行完本地日志之后,记录一条消息,然后回查的时候,到表里面查询一下,如果有消息,就证明本地事务执行成功了;
* 如果查询表中没有消息的话,就证明失败了
*/
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 本地事务的检查接口,如果mq没有收到本地检查确认信息,就会执行这个方法
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
/**
*拿到headers
*/
MessageHeaders headers = message.getHeaders();
/**
*拿到shareserver中的transactionId
*/
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
/**
*select * from xxx where transaction_id = xxx
*/
this.rocketmqTransationLogMapper.selectOne(
RocketmqTransationLog.builder()
.transactionId(transactionId)
.build()
);
/**
*如果transaction——id不是空,则commit
*/
if (transactionId != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
package com.zstu.contentcenter.service.content;
import com.zstu.contentcenter.dao.content.ShareMapper;
import com.zstu.contentcenter.dao.messaging.RocketmqTransationLogMapper;
import com.zstu.contentcenter.domain.dto.content.ShareAuditDTO;
import com.zstu.contentcenter.domain.dto.content.ShareDTO;
import com.zstu.contentcenter.domain.dto.message.UserAddBonusMsgDTO;
import com.zstu.contentcenter.domain.dto.user.UserDTO;
import com.zstu.contentcenter.domain.entity.content.Share;
import com.zstu.contentcenter.domain.entity.messaging.RocketmqTransationLog;
import com.zstu.contentcenter.domain.enums.AuditStatusEnum;
import com.zstu.contentcenter.feignclient.UserCenterFeignClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
/**
* TODO
*
* @author darksideinxx
* @date 2022/4/18 17:12
*/
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class ShareService {
private final ShareMapper shareMapper;
private final UserCenterFeignClient userCenterFeignClient;
private final RocketMQTemplate rocketMQTemplate;
private final RocketmqTransationLogMapper rocketmqTransationLogMapper;
public ShareDTO findById(Integer id) {
Share share = this.shareMapper.selectByPrimaryKey(id);
//获取发布人的id
Integer userId = share.getUserId();
UserDTO userDTO = this.userCenterFeignClient.findById(userId);
//拿到用户中心的所有实例信息
//怎么调用用户微服务的/users/{userId}?
//消息的装配
ShareDTO shareDTO = new ShareDTO();
BeanUtils.copyProperties(share, shareDTO);
shareDTO.setWxNickname(userDTO.getWxNickname());
return shareDTO;
}
public static void main(String[] args) {
RestTemplate restTemplate = new RestTemplate();
//用http的get方法请求,返回一个对象
ResponseEntity<String> forEntity = restTemplate.getForEntity(
"http://localhost:8080/users/{id}",
String.class,
2
);
System.out.println(forEntity);
}
public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {
//1.查询share是否存在,不存在或者当前的audit-status!=noyet 那么抛异常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数非法,该分享不存在");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数非法,该分享已审核通过或者审核不通过");
}
//3. 如果是pass,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
if (AuditStatusEnum.PASS.equals(shareAuditDTO.getAuditStatusEnum())) {
//发送半消息 txProducergroup:起一个名称就可以了 destination:一样的,用add-bonux,这个也是随便的,但是要和user-center一样
//MessageBuilder:消息体
String transactionId = UUID.randomUUID().toString();
this.rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("shareId", id)
.build(),
//arg有大用处
shareAuditDTO
);
} else {
//如果是reject,不需要发消息 只做数据库的审核
this.auditByIdInDB(id, shareAuditDTO);
}
//他是一个附属 *** 作。可以异步执行
//4.把share写到缓存
return share;
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO shareAuditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(shareAuditDTO.getAuditStatusEnum().toString())
.reason(shareAuditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
//4.把share写到缓存
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO shareAuditDTO, String transactionId) {
this.auditByIdInDB(id, shareAuditDTO);
log.info("++++++++{}",shareAuditDTO);
System.out.println("-----------id" + id);
System.out.println("-----------shareAuditDTO"+shareAuditDTO.getReason());
this.rocketmqTransationLogMapper.insertSelective(
RocketmqTransationLog.builder()
.transactionId(transactionId)
.log("check and share")
.build()
);
}
}
- user-center
package com.zstu.usercenter.rocketmq;
import com.zstu.usercenter.dao.bonus.BonusEventLogMapper;
import com.zstu.usercenter.dao.user.UserMapper;
import com.zstu.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.zstu.usercenter.domain.entity.bonus.BonusEventLog;
import com.zstu.usercenter.domain.entity.user.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* TODO
*
* @author DARKSIDEinxx
* @date 2022/4/23 17:01
*/
@Service
//注意:这里的topic一定要和content-center发送过来的消息一致 这里的consumerGroup和生产者的group一样的,可以随便填写,只是生产者是放在yml文件
//消费者是放在注释里面的
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "add-bonus")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
private final UserMapper userMapper;
private final BonusEventLogMapper bonusEventLogMapper;
/**
* @param
* @return
* @author DARKSIDEinxx
* @description 执行业务
* @date 2022/4/23 17:03
*/
@Override
public void onMessage(UserAddBonusMsgDTO userAddBonusMsgDTO) {
//当收到消息的时候,执行业务
//1.为用户加积分
Integer userId = userAddBonusMsgDTO.getUserId();
Integer bonus = userAddBonusMsgDTO.getBonus();
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + userAddBonusMsgDTO.getBonus());
this.userMapper.updateByPrimaryKey(user);
//2.记录日志到bonus_event_log表里面
this.bonusEventLogMapper.insert(
BonusEventLog.builder()
.userId(userId)
.value(bonus)
.event("contribute")
.createTime(new Date())
.description("contribute add bonus")
.build()
);
log.info("积分添加完毕");
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class UserAddBonusMsgDTO {
/**
* 为谁加积分
*/
private Integer userId;
/**
* 加多少积分
*/
private Integer bonus;
}
springcloud stream—通用的rocketmq的方法
用于构建消息驱动的微服务框架
简化mq
destination binder 目标绑定器;与消息中间件通信的组件
destination bindings 目标绑定:binding是连接应用程序与消息中间价的桥梁,用于消息的消费和生产,由binder创建
bindings由binder生成
三板斧
- 依赖
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-stream-rocketmqartifactId>
dependency>
- 注解:启动类上加
@EnableBinding(Source.class)
- 配置
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
#用来指定topic
destination: stream-test-topic
- 测试
@Autowired
private Source source;
@GetMapping("/test-stream")
public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
.build()
);
return "success";
}
这样运行后,会一直打印日志,所以添加设置
logging:
level:
com.zstu.contentcenter.feignclient.UserCenterFeignClient: debug
com.alibaba.nacos: error
三板斧
- 依赖
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-stream-rocketmqartifactId>
dependency>
- 注解:启动类上加
@EnableBinding(Sink.class)
- 配置
stream:
bindings:
input:
#要和内容中心的topic匹配
destination: stream-test-topic
#如果用的是rocketmq,一定要设置,
#如果用的是其他mq,可以留空
group: binder-group
代码
package com.zstu.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;
/**
* TODO
*
* @author DARKSIDEinxx
* @date 2022/4/25 11:40
*/
@Service
@Slf4j
public class TestStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive(String messageBody) {
log.info("通过stream收到了消息,message:{}", messageBody);
}
}
接收到了消息,成功。
keep moving~~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)