java redis redisson 分布式锁 阻塞队列(BlockingQueue) 线程池 的使用

java redis redisson 分布式锁 阻塞队列(BlockingQueue) 线程池 的使用,第1张

java redis redisson 分布式锁 阻塞队列(BlockingQueue) 线程池 的使用

@[TOC](java redis redisson 分布式锁 阻塞队列(BlockingQueue) 线程池 的使用)

一、场景
		用户付款成功->上传到第三方订单系统

	    订单付款成功 -> 添加到阻塞队列 -> 触发上传接口 -> 获取到锁(未获取到锁返回)-> 循环取出队列里的数据 ->
	    线程池 -> 多个线程处理 -> 处理完队列里面所有数据释放锁 -> 上传完成
二、代码实现
redisson工具类
		 //redisson配置此处省略
        
            org.redisson
            redisson-spring-boot-starter
            3.16.7
        
}
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;


@Component
public class RedissonUtils {

    private RedissonClient redisson;

    @Autowired
    public RedissonUtils(RedissonClient redisson) {
        this.redisson = redisson;
    }

    
    public Boolean setBlockingQueue(E e, String queueName) throws Throwable {
        Boolean isBoolean = false;
        RBlockingQueue queue = redisson.getBlockingQueue(queueName);
        //offer 表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);      
        isBoolean = queue.offer(e);
        return isBoolean;
    }

    
    public E getBlockingQueue(String queueName) throws Throwable {
        RBlockingQueue queue = redisson.getBlockingQueue(queueName);
        //取走BlockingQueue里排在首位的对象,取不到时返回null;
        E value = queue.poll();
        return value;
    }

    
    public RLock getTryLock(String lockName, int time) throws Throwable {
        RLock lock = redisson.getLock(lockName);    // 拿锁失败时会不停的重试
        // 尝试拿锁time s后停止重试,返回false 具有Watch Dog 自动延期机制 默认续30s
        boolean isBoolean = lock.tryLock(time, TimeUnit.SECONDS);
        if (!isBoolean){
            return  null;
        }
        return lock;
    }


}

}

线程池配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;


@Configuration
@EnableAsync//开启异步
public class ThreadPoolConfig {
    //在@SpringBootApplication启动类 添加注解@EnableAsync
    //异步方法使用注解@Async("@Bean的名称") ,返回值为void或者Future
    //切记一点 ,异步方法和调用方法一定要写在不同的类中,如果写在一个类中,是没有效果的

    //    在@Async标注的方法,同时也使用@Transactional进行标注;在其调用数据库 *** 作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的 *** 作。
    //    那该如何给这些 *** 作添加事务管理呢?
    //    可以将需要事务管理 *** 作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional
    //    示例:
    //    方法A, 使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。
    //    方法B, 使用了@Async来标注,B中调用了C、D,C/D分别使用@Transactional做了标注,则可实现事务控制的目的

    
    @Bean("pushGoodsOrderExecutor")
    public ThreadPoolTaskExecutor pushGoodsOrderExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(2);
        // 设置最大线程数
        executor.setMaxPoolSize(3);
        // 设置队列容量
        executor.setQueueCapacity(10);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置默认线程名称
        executor.setThreadNamePrefix("pushGoodsOrderExecutor-");
        // 设置拒绝策略 ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

基类

import java.util.List;


public abstract class AbstractJstService {

    
    public abstract void pushOrder();


}

派生类

import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.concurrent.RejectedExecutionException;


@Service
public class JstOrderServiceImpl extends AbstractJstService {

    private final Logger log = LoggerFactory.getLogger(JstOrderServiceImpl.class);

    private RedissonUtils redissonUtils;

    private AsyncJstOrderService asyncJstOrderService;

    @Autowired
    public JstOrderServiceImpl(RedissonUtils redissonUtils, AsyncJstOrderService asyncJstOrderService) {
        this.redissonUtils = redissonUtils;
        this.asyncJstOrderService = asyncJstOrderService;
    }

    @Override
    public void pushOrder() {
        RLock rLock = null;
        Boolean isWhile = true;
        try {
            rLock = redissonUtils.getTryLock(BlockingQueueEnum.商品订单上传队列锁.getName(), 10);
            if (rLock != null) {
                log.info("获取锁成功");
                while(isWhile){
                    String id = (String) redissonUtils.getBlockingQueue(BlockingQueueEnum.商品订单上传队列.getName());
                    System.out.println("id:"+id);
                    if (!StringUtils.isEmpty(id)) {
//                        Thread.sleep(500);//当前线程暂停0.5秒钟,再进行处理
                        //队列里面获取到数据了,开始执行业务逻辑
                        try {
                            asyncJstOrderService.pushOrder(Long.valueOf(id));
                        }catch (RejectedExecutionException rejectedExecutionException){
                            //线程池已满,触发RejectedExecutionException 返回
                            redissonUtils.setBlockingQueue(id,BlockingQueueEnum.商品订单上传队列.getName());//把没有处理id重新放回队列
                            log.info("线程池队列已满");
                            Thread.sleep(1000);//当前线程暂停1秒钟,再进行处理
                        }

                    }else {
                        //队列里面获取不到数据了,取消while循环
                        isWhile = false;
                    }
                }
            }else {
                log.info("获取锁失败");
            }
        } catch (Throwable t) {
            t.printStackTrace();
            if (rLock != null && rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                rLock.unlock();
            }
        }finally {
            //   lock.isLocked():判断要解锁的key是否已被锁定。
            //   lock.isHeldByCurrentThread():判断要解锁的key是否被当前线程持有。
            if (rLock != null && rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                rLock.unlock();
            }
        }
    }



}


异步方法类

package com.gemo.bear.service.order.jst.util;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;


@Component
public class AsyncJstOrderService {

    @Async("pushGoodsOrderExecutor")
    public void pushOrder(Long id){
        System.out.println("商品订单异步线程ID:" + Thread.currentThread().getId() + "线程名字:" +Thread.currentThread().getName()+"执行异步任务:"+id);
    }
}

.枚举类

package com.gemo.bear.bean.order.enums;

public enum BlockingQueueEnum {
    商品订单上传队列("goods_order_push"),
    商品订单上传队列锁("goods_order_push_luck"),
    商品订单上传Error队列("goods_order_push_error");

    private String name;

    BlockingQueueEnum(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}


Controller类

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

@Api(tags = "订单相关")
@RestController
@RequestMapping("/order")
public class OrderController {
    @Resource(type = JstOrderServiceImpl.class)
    private AbstractJstService abstractJstService;
    
    @ApiOperation("测试添加队列数据")
    @PostMapping("test-setBlockingQueue")
    public ResponseBuild setBlockingQueue(@RequestBody Order order) throws Throwable {
        try {
            for (int i = 0; i < 15; i++) {
                boolean b = redissonUtils.setBlockingQueue(String.valueOf(i), BlockingQueueEnum.商品订单上传队列.getName());
                if (b) {
                    System.out.println("加入队列成功");
                }
            }
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        } finally {

        }
        return ResponseBuild.success();
    }

    @ApiOperation("测试上传")
    @PostMapping("test-pullOrder")
    public ResponseBuild testPullOrder() throws Throwable {
        try {
            abstractJstService.pullOrder();
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        } finally {

        }
        return ResponseBuild.success();
    }

}

线程运行情况

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5686011.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存