@[TOC](java redis redisson 分布式锁 阻塞队列(BlockingQueue) 线程池 的使用)
一、场景用户付款成功->上传到第三方订单系统 订单付款成功 -> 添加到阻塞队列 -> 触发上传接口 -> 获取到锁(未获取到锁返回)-> 循环取出队列里的数据 -> 线程池 -> 多个线程处理 -> 处理完队列里面所有数据释放锁 -> 上传完成二、代码实现
redisson工具类
//redisson配置此处省略} org.redisson redisson-spring-boot-starter3.16.7
import org.redisson.api.RBlockingQueue; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Component public class RedissonUtils{ private RedissonClient redisson; @Autowired public RedissonUtils(RedissonClient redisson) { this.redisson = redisson; } public Boolean setBlockingQueue(E e, String queueName) throws Throwable { Boolean isBoolean = false; RBlockingQueue queue = redisson.getBlockingQueue(queueName); //offer 表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程); isBoolean = queue.offer(e); return isBoolean; } public E getBlockingQueue(String queueName) throws Throwable { RBlockingQueue queue = redisson.getBlockingQueue(queueName); //取走BlockingQueue里排在首位的对象,取不到时返回null; E value = queue.poll(); return value; } public RLock getTryLock(String lockName, int time) throws Throwable { RLock lock = redisson.getLock(lockName); // 拿锁失败时会不停的重试 // 尝试拿锁time s后停止重试,返回false 具有Watch Dog 自动延期机制 默认续30s boolean isBoolean = lock.tryLock(time, TimeUnit.SECONDS); if (!isBoolean){ return null; } return lock; } } }
线程池配置类
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync//开启异步 public class ThreadPoolConfig { //在@SpringBootApplication启动类 添加注解@EnableAsync //异步方法使用注解@Async("@Bean的名称") ,返回值为void或者Future //切记一点 ,异步方法和调用方法一定要写在不同的类中,如果写在一个类中,是没有效果的 // 在@Async标注的方法,同时也使用@Transactional进行标注;在其调用数据库 *** 作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的 *** 作。 // 那该如何给这些 *** 作添加事务管理呢? // 可以将需要事务管理 *** 作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional // 示例: // 方法A, 使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。 // 方法B, 使用了@Async来标注,B中调用了C、D,C/D分别使用@Transactional做了标注,则可实现事务控制的目的 @Bean("pushGoodsOrderExecutor") public ThreadPoolTaskExecutor pushGoodsOrderExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(2); // 设置最大线程数 executor.setMaxPoolSize(3); // 设置队列容量 executor.setQueueCapacity(10); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(60); // 设置默认线程名称 executor.setThreadNamePrefix("pushGoodsOrderExecutor-"); // 设置拒绝策略 ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); return executor; } }
基类
import java.util.List; public abstract class AbstractJstService{ public abstract void pushOrder(); }
派生类
import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.util.List; import java.util.concurrent.RejectedExecutionException; @Service public class JstOrderServiceImpl extends AbstractJstService { private final Logger log = LoggerFactory.getLogger(JstOrderServiceImpl.class); private RedissonUtils redissonUtils; private AsyncJstOrderService asyncJstOrderService; @Autowired public JstOrderServiceImpl(RedissonUtils redissonUtils, AsyncJstOrderService asyncJstOrderService) { this.redissonUtils = redissonUtils; this.asyncJstOrderService = asyncJstOrderService; } @Override public void pushOrder() { RLock rLock = null; Boolean isWhile = true; try { rLock = redissonUtils.getTryLock(BlockingQueueEnum.商品订单上传队列锁.getName(), 10); if (rLock != null) { log.info("获取锁成功"); while(isWhile){ String id = (String) redissonUtils.getBlockingQueue(BlockingQueueEnum.商品订单上传队列.getName()); System.out.println("id:"+id); if (!StringUtils.isEmpty(id)) { // Thread.sleep(500);//当前线程暂停0.5秒钟,再进行处理 //队列里面获取到数据了,开始执行业务逻辑 try { asyncJstOrderService.pushOrder(Long.valueOf(id)); }catch (RejectedExecutionException rejectedExecutionException){ //线程池已满,触发RejectedExecutionException 返回 redissonUtils.setBlockingQueue(id,BlockingQueueEnum.商品订单上传队列.getName());//把没有处理id重新放回队列 log.info("线程池队列已满"); Thread.sleep(1000);//当前线程暂停1秒钟,再进行处理 } }else { //队列里面获取不到数据了,取消while循环 isWhile = false; } } }else { log.info("获取锁失败"); } } catch (Throwable t) { t.printStackTrace(); if (rLock != null && rLock.isLocked() && rLock.isHeldByCurrentThread()) { rLock.unlock(); } }finally { // lock.isLocked():判断要解锁的key是否已被锁定。 // lock.isHeldByCurrentThread():判断要解锁的key是否被当前线程持有。 if (rLock != null && rLock.isLocked() && rLock.isHeldByCurrentThread()) { rLock.unlock(); } } } }
异步方法类
package com.gemo.bear.service.order.jst.util; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @Component public class AsyncJstOrderService { @Async("pushGoodsOrderExecutor") public void pushOrder(Long id){ System.out.println("商品订单异步线程ID:" + Thread.currentThread().getId() + "线程名字:" +Thread.currentThread().getName()+"执行异步任务:"+id); } }
.枚举类
package com.gemo.bear.bean.order.enums; public enum BlockingQueueEnum { 商品订单上传队列("goods_order_push"), 商品订单上传队列锁("goods_order_push_luck"), 商品订单上传Error队列("goods_order_push_error"); private String name; BlockingQueueEnum(String name) { this.name = name; } public String getName() { return name; } }
Controller类
import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; @Api(tags = "订单相关") @RestController @RequestMapping("/order") public class OrderController { @Resource(type = JstOrderServiceImpl.class) private AbstractJstService abstractJstService; @ApiOperation("测试添加队列数据") @PostMapping("test-setBlockingQueue") public ResponseBuild setBlockingQueue(@RequestBody Order order) throws Throwable { try { for (int i = 0; i < 15; i++) { boolean b = redissonUtils.setBlockingQueue(String.valueOf(i), BlockingQueueEnum.商品订单上传队列.getName()); if (b) { System.out.println("加入队列成功"); } } } catch (Exception ex) { System.out.println(ex.getMessage()); } finally { } return ResponseBuild.success(); } @ApiOperation("测试上传") @PostMapping("test-pullOrder") public ResponseBuild testPullOrder() throws Throwable { try { abstractJstService.pullOrder(); } catch (Exception ex) { System.out.println(ex.getMessage()); } finally { } return ResponseBuild.success(); } }
线程运行情况
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)