Spring Cloud Alibaba 学习笔记

Spring Cloud Alibaba 学习笔记,第1张

RocketMQ
  • 消息驱动

消息驱动


MQ使用场景:异步处理,流量削峰填谷,解耦微服务

  • 安装

  • 下载 http://rocketmq.apache.org/release_notes/release-notes-4.5.1/

  • 解压 unzip rocketmq-all-4.5.1-bin-release.zip

  • 切换目录到RocketMQ根目录 cd rocketmq-all-4.5.1-bin-release

  • 启动Name Server nohup sh bin/mqnamesrv &

  • 启动 Broker nohup sh bin/mqbroker -n localhost:9876 &

  • 验证name server是否启动OK: tail -f ~/logs/rocketmqlogs/namesrv.log

  • 验证broker是否启动OK: tail -f ~/logs/rocketmqlogs/broker.log

  • 停止name server sh bin/mqshutdown namesrv

  • 停止broker sh bin/mqshutdown broker

  • 控制台的安装:

  • 参考 http://www.imooc.com/article/290092

  • 我项目中使用了懒人包,不过最好两个都看一下,试验一下。


编写生产者

  • 依赖
        <dependency>
            <groupId>org.apache.rocketmqgroupId>
            <artifactId>rocketmq-spring-boot-starterartifactId>
            <version>2.0.3version>
        dependency>
  • 注解:无
  • 配置:
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    #必须指定group
    group: test-group
  • 测试:
package com.zstu.contentcenter.controller.content;

import com.zstu.contentcenter.domain.dto.content.ShareAuditDTO;
import com.zstu.contentcenter.domain.entity.content.Share;
import com.zstu.contentcenter.service.content.ShareService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


/**
 * TODO
 *
 * @author darksideinxx
 * @date 2022/4/22 12:44
 */
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@RequestMapping("/admin/shares")
public class ShareAdminController {
    private final ShareService shareService;
    @PutMapping("/audit/{id}")
    public Share auditById(@PathVariable Integer id,
                           @RequestBody ShareAuditDTO shareAuditDTO) {
        //TODO 认证授权
        return this.shareService.auditById(id, shareAuditDTO);
    }
}


@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class ShareService {
    private final ShareMapper shareMapper;
    private final UserCenterFeignClient userCenterFeignClient;
    private final RocketMQTemplate rocketMQTemplate;
    public ShareDTO findById(Integer id) {
        Share share = this.shareMapper.selectByPrimaryKey(id);
        //获取发布人的id
        Integer userId = share.getUserId();

        UserDTO userDTO = this.userCenterFeignClient.findById(userId);


        //拿到用户中心的所有实例信息

        //怎么调用用户微服务的/users/{userId}?

        //消息的装配
        ShareDTO shareDTO = new ShareDTO();
        BeanUtils.copyProperties(share, shareDTO);
        shareDTO.setWxNickname(userDTO.getWxNickname());

        return shareDTO;
    }
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ShareAuditDTO {
    /**
     * 审核状态
     */
    private AuditStatusEnum auditStatusEnum;
    /**
     * 原因
     */
    private String reason;
}


@Getter
@AllArgsConstructor
public enum AuditStatusEnum {
    /**
     * 待审核
     */
    NOT_YET,
    /**
     * 审核通过
     */
    PASS,
    /**
     * 审核不通过
     */
    REJECT
}





然后我们编写user-center

package com.zstu.usercenter.rocketmq;

import com.zstu.usercenter.dao.bonus.BonusEventLogMapper;
import com.zstu.usercenter.dao.user.UserMapper;
import com.zstu.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.zstu.usercenter.domain.entity.bonus.BonusEventLog;
import com.zstu.usercenter.domain.entity.user.User;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * TODO
 *
 * @author DARKSIDEinxx
 * @date 2022/4/23 17:01
 */
@Service
//注意:这里的topic一定要和content-center发送过来的消息一致  这里的consumerGroup和生产者的group一样的,可以随便填写,只是生产者是放在yml文件
//消费者是放在注释里面的
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "add-bonus")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {

    private final UserMapper userMapper;

    private final BonusEventLogMapper bonusEventLogMapper;
    /**
     * @param
     * @return
     * @author DARKSIDEinxx
     * @description 执行业务
     * @date 2022/4/23 17:03
     */
    @Override
    public void onMessage(UserAddBonusMsgDTO userAddBonusMsgDTO) {
        //当收到消息的时候,执行业务
        //1.为用户加积分
        Integer userId = userAddBonusMsgDTO.getUserId();
        Integer bonus = userAddBonusMsgDTO.getBonus();
        User user = this.userMapper.selectByPrimaryKey(userId);
        user.setBonus(user.getBonus() + userAddBonusMsgDTO.getBonus());
        this.userMapper.updateByPrimaryKey(user);
        //2.记录日志到bonus_event_log表里面
        this.bonusEventLogMapper.insert(
                BonusEventLog.builder()
                        .userId(userId)
                        .value(bonus)
                        .event("contribute")
                        .createTime(new Date())
                        .description("contribute add bonus")
                        .build()
        );

    }
}



package com.zstu.usercenter.domain.dto.message;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * TODO
 *
 * @author darksideinxx
 * @date 2022/4/22 17:39
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class UserAddBonusMsgDTO {
    /**
     * 为谁加积分
     */
    private Integer userId;
    /**
     * 加多少积分
     */
    private Integer bonus;
}



然后我们测试运行一下:

我们发现积分已经变成了150

总结:

当然我们可能不是用的rocketmq,可能是别的,
如果使用的是rocketmq:RocketMQMessageListener
如果是ActiveMQ or Artemis :JmsListener;
如果是RabbitMQ:RabbitListener;
如果是Kafka:KafkaListener

事务

@Transactional(rollbackFor = Exception.class)
    public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {
        //1.查询share是否存在,不存在或者当前的audit-status!=noyet 那么抛异常
        System.out.println("---------");
        log.info("id={},share = {}",id,shareAuditDTO);
        Share share = this.shareMapper.selectByPrimaryKey(id);
        System.out.println("---------");
        log.info("---------{}",share);
        if (share == null) {
            throw new IllegalArgumentException("参数非法,该分享不存在");
        }
        if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
            throw new IllegalArgumentException("参数非法,该分享已审核通过或者审核不通过");
        }
        //2,审核资源,将状态设为pass/reject
        share.setAuditStatus(shareAuditDTO.getAuditStatusEnum().toString());
        this.shareMapper.updateByPrimaryKey(share);
        //3. 如果是pass,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
        //目标 topic;消息体:
        this.rocketMQTemplate.convertAndSend(
                "add-bonus",
                UserAddBonusMsgDTO.builder()
                        .userId(share.getUserId())
                        .bonus(50)
                        .build());
        //他是一个附属 *** 作。可以异步执行
		//4.把share写到缓存

        return share;
    }

如果这个事务发生异常,那么这个事务就会回滚
但是,这里我们需要理解一个问题,就是我的事务在前三个都是正常的运行的,运行到第四步,share写到缓存是时候失败了,那么事务就会回滚,但是了,我消息已经发出去了,积分已经加了,这样就造成了乌龙
rocketmq提供了事务消息,来解决这个消息

  • 第一步:发送半消息:生产者发送半消息给mqserver,半消息是一种特殊的消息,这种消息存储到mqserver中,但是会被标记位不能投递状态,所以消费者不会接收到这种消息。
  • 什么情况会导致第四部的确认消息不能成功发送?举例:在发送二次确认的瞬间断网了,或者应用重启了
  • 简单描述这个步骤就是:生产者发送消息给mqserver,但是mqserver先标记了这个消息,不让人发送这条消息,然后生产者去执行本地事务,然后可以知道是应该投递这条消息还是回滚这条消息,如果本地消息执行成功的话,就让mqserver投递这条消息,否则就把这条消息删掉(5,6,7这三条消息回查,其实是防止在特殊情况下mqserver没有收到二次确认做的容错处理)

  • content-center
package com.zstu.contentcenter.rocketmq;

import com.zstu.contentcenter.dao.messaging.RocketmqTransationLogMapper;
import com.zstu.contentcenter.domain.dto.content.ShareAuditDTO;
import com.zstu.contentcenter.domain.entity.messaging.RocketmqTransationLog;
import com.zstu.contentcenter.service.content.ShareService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.Arg;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

import javax.xml.transform.Source;

/**
 * TODO
 *
 * @author DARKSIDEinxx
 * @date 2022/4/24 20:50
 */
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
    private final ShareService shareService;
    private final RocketmqTransationLogMapper rocketmqTransationLogMapper;

    /**
     * 执行本地事务
     * 这里的message是shareServer的MessageBuilder,这里的object o是shareServer中的arg,
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        /**
         *拿到headers
         */
        MessageHeaders headers = message.getHeaders();
        /**
         *拿到shareserver中的transactionId
         */
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        /**
         *获取share_id 的id
         */
        Integer shareId = Integer.valueOf(headers.get("shareId").toString());

        try {
            this.shareService.auditByIdWithRocketMqLog(shareId, (ShareAuditDTO) o, transactionId);
            /**
             *这里我们需要注意一点的是:为什么需要回查?
             * 假如代码走到了这里,还没有return的那个瞬间,就断网了,或者这个内容中心挂了,或者正在重启。
             * 本地事务已经提交了,但是还没有来的及告诉rocketmq,要把消息提交,就挂了,
             * 所以rocketmq会定时的请求内容中心,让其告诉本事事务状态是什么
             * checkLocalTransaction()这个函数就是执行这个功能的
             * 我根据什么东西来检查本地事务的状态?所以这里利用到刚才创建的表,rocketmq_transaction_log
             * 可以在执行完本地日志之后,记录一条消息,然后回查的时候,到表里面查询一下,如果有消息,就证明本地事务执行成功了;
             * 如果查询表中没有消息的话,就证明失败了
             */
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 本地事务的检查接口,如果mq没有收到本地检查确认信息,就会执行这个方法
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        /**
         *拿到headers
         */
        MessageHeaders headers = message.getHeaders();
        /**
         *拿到shareserver中的transactionId
         */
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        /**
         *select * from xxx where transaction_id = xxx
         */
        this.rocketmqTransationLogMapper.selectOne(
                RocketmqTransationLog.builder()
                        .transactionId(transactionId)
                        .build()
        );
        /**
         *如果transaction——id不是空,则commit
         */
        if (transactionId != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}


package com.zstu.contentcenter.service.content;

import com.zstu.contentcenter.dao.content.ShareMapper;
import com.zstu.contentcenter.dao.messaging.RocketmqTransationLogMapper;
import com.zstu.contentcenter.domain.dto.content.ShareAuditDTO;
import com.zstu.contentcenter.domain.dto.content.ShareDTO;
import com.zstu.contentcenter.domain.dto.message.UserAddBonusMsgDTO;
import com.zstu.contentcenter.domain.dto.user.UserDTO;
import com.zstu.contentcenter.domain.entity.content.Share;
import com.zstu.contentcenter.domain.entity.messaging.RocketmqTransationLog;
import com.zstu.contentcenter.domain.enums.AuditStatusEnum;
import com.zstu.contentcenter.feignclient.UserCenterFeignClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;

import java.util.List;
import java.util.Objects;
import java.util.UUID;

/**
 * TODO
 *
 * @author darksideinxx
 * @date 2022/4/18 17:12
 */
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class ShareService {
    private final ShareMapper shareMapper;
    private final UserCenterFeignClient userCenterFeignClient;
    private final RocketMQTemplate rocketMQTemplate;
    private final RocketmqTransationLogMapper rocketmqTransationLogMapper;

    public ShareDTO findById(Integer id) {
        Share share = this.shareMapper.selectByPrimaryKey(id);
        //获取发布人的id
        Integer userId = share.getUserId();

        UserDTO userDTO = this.userCenterFeignClient.findById(userId);


        //拿到用户中心的所有实例信息

        //怎么调用用户微服务的/users/{userId}?

        //消息的装配
        ShareDTO shareDTO = new ShareDTO();
        BeanUtils.copyProperties(share, shareDTO);
        shareDTO.setWxNickname(userDTO.getWxNickname());

        return shareDTO;
    }

    public static void main(String[] args) {
        RestTemplate restTemplate = new RestTemplate();
        //用http的get方法请求,返回一个对象
        ResponseEntity<String> forEntity = restTemplate.getForEntity(
                "http://localhost:8080/users/{id}",
                String.class,
                2
        );
        System.out.println(forEntity);
    }

    public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {
        //1.查询share是否存在,不存在或者当前的audit-status!=noyet 那么抛异常
        Share share = this.shareMapper.selectByPrimaryKey(id);
        if (share == null) {
            throw new IllegalArgumentException("参数非法,该分享不存在");
        }
        if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
            throw new IllegalArgumentException("参数非法,该分享已审核通过或者审核不通过");
        }
        //3. 如果是pass,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
        if (AuditStatusEnum.PASS.equals(shareAuditDTO.getAuditStatusEnum())) {
            //发送半消息  txProducergroup:起一个名称就可以了 destination:一样的,用add-bonux,这个也是随便的,但是要和user-center一样
            //MessageBuilder:消息体
            String transactionId = UUID.randomUUID().toString();
            this.rocketMQTemplate.sendMessageInTransaction(
                    "tx-add-bonus-group",
                    "add-bonus",
                    MessageBuilder
                            .withPayload(
                                    UserAddBonusMsgDTO.builder()
                                            .userId(share.getUserId())
                                            .bonus(50)
                                            .build()
                            )
                            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                            .setHeader("shareId", id)
                            .build(),
                    //arg有大用处
                    shareAuditDTO
            );
        } else {
            //如果是reject,不需要发消息 只做数据库的审核
            this.auditByIdInDB(id, shareAuditDTO);
        }
        //他是一个附属 *** 作。可以异步执行
        //4.把share写到缓存
        return share;
    }

    @Transactional(rollbackFor = Exception.class)
    public void auditByIdInDB(Integer id, ShareAuditDTO shareAuditDTO) {
        Share share = Share.builder()
                .id(id)
                .auditStatus(shareAuditDTO.getAuditStatusEnum().toString())
                .reason(shareAuditDTO.getReason())
                .build();
        this.shareMapper.updateByPrimaryKeySelective(share);
        //4.把share写到缓存

    }

    @Transactional(rollbackFor = Exception.class)
    public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO shareAuditDTO, String transactionId) {
        this.auditByIdInDB(id, shareAuditDTO);
        log.info("++++++++{}",shareAuditDTO);
        System.out.println("-----------id" + id);
        System.out.println("-----------shareAuditDTO"+shareAuditDTO.getReason());
        this.rocketmqTransationLogMapper.insertSelective(
                RocketmqTransationLog.builder()
                        .transactionId(transactionId)
                        .log("check and share")
                        .build()
        );
    }
}


  • user-center
package com.zstu.usercenter.rocketmq;

import com.zstu.usercenter.dao.bonus.BonusEventLogMapper;
import com.zstu.usercenter.dao.user.UserMapper;
import com.zstu.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.zstu.usercenter.domain.entity.bonus.BonusEventLog;
import com.zstu.usercenter.domain.entity.user.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * TODO
 *
 * @author DARKSIDEinxx
 * @date 2022/4/23 17:01
 */
@Service
//注意:这里的topic一定要和content-center发送过来的消息一致  这里的consumerGroup和生产者的group一样的,可以随便填写,只是生产者是放在yml文件
//消费者是放在注释里面的
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "add-bonus")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {

    private final UserMapper userMapper;

    private final BonusEventLogMapper bonusEventLogMapper;
    /**
     * @param
     * @return
     * @author DARKSIDEinxx
     * @description 执行业务
     * @date 2022/4/23 17:03
     */
    @Override
    public void onMessage(UserAddBonusMsgDTO userAddBonusMsgDTO) {
        //当收到消息的时候,执行业务
        //1.为用户加积分
        Integer userId = userAddBonusMsgDTO.getUserId();
        Integer bonus = userAddBonusMsgDTO.getBonus();
        User user = this.userMapper.selectByPrimaryKey(userId);
        user.setBonus(user.getBonus() + userAddBonusMsgDTO.getBonus());
        this.userMapper.updateByPrimaryKey(user);
        //2.记录日志到bonus_event_log表里面
        this.bonusEventLogMapper.insert(
                BonusEventLog.builder()
                        .userId(userId)
                        .value(bonus)
                        .event("contribute")
                        .createTime(new Date())
                        .description("contribute add bonus")
                        .build()
        );
        log.info("积分添加完毕");
    }
}


@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class UserAddBonusMsgDTO {
    /**
     * 为谁加积分
     */
    private Integer userId;
    /**
     * 加多少积分
     */
    private Integer bonus;
}




springcloud stream—通用的rocketmq的方法
用于构建消息驱动的微服务框架
简化mq
destination binder 目标绑定器;与消息中间件通信的组件
destination bindings 目标绑定:binding是连接应用程序与消息中间价的桥梁,用于消息的消费和生产,由binder创建
bindings由binder生成

三板斧

  • 依赖
        <dependency>
            <groupId>org.springframework.cloudgroupId>
            <artifactId>spring-cloud-starter-stream-rocketmqartifactId>
        dependency>


  • 注解:启动类上加
@EnableBinding(Source.class)
  • 配置
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        output:
          #用来指定topic
          destination: stream-test-topic
  • 测试
    @Autowired
    private Source source;

    @GetMapping("/test-stream")
    public String testStream() {
        this.source.output()
                .send(
                        MessageBuilder
                                .withPayload("消息体")
                                .build()
                );
        return "success";

    }


这样运行后,会一直打印日志,所以添加设置

logging:
  level:
    com.zstu.contentcenter.feignclient.UserCenterFeignClient: debug
    com.alibaba.nacos: error
    


三板斧

  • 依赖
        <dependency>
            <groupId>org.springframework.cloudgroupId>
            <artifactId>spring-cloud-starter-stream-rocketmqartifactId>
        dependency>


  • 注解:启动类上加
@EnableBinding(Sink.class)
  • 配置
    stream:
      bindings:
        input:
          #要和内容中心的topic匹配
          destination: stream-test-topic
          #如果用的是rocketmq,一定要设置,
          #如果用的是其他mq,可以留空
          group: binder-group

代码

package com.zstu.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

/**
 * TODO
 *
 * @author DARKSIDEinxx
 * @date 2022/4/25 11:40
 */
@Service
@Slf4j
public class TestStreamConsumer {
    @StreamListener(Sink.INPUT)
    public void receive(String messageBody) {

        log.info("通过stream收到了消息,message:{}", messageBody);
    }
}



接收到了消息,成功。

keep moving~~

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/737139.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存