- java多线程执行任务(工具再升级版)
- DelayPlan
- DelayPlanQueue
- Scheder
- TaskInfo
- Plan
- MyPlan
- Test
之前写的工具
java多线程执行任务(工具升级版)
少考虑了两种情况:
1:计划任务列表中如果有睡眠计划怎么办,线程就会等待了这样效率不高
2:计划之间没有上下文对象
针对这两点,在原来基础上我们再升级一下吧!
解决方法是:
1、针对第一种情况,新增一个延迟队列,将要休眠的计划对应的任务id放入延迟队列中,每次取计划时跳过延迟队列中存在的任务中的计划
2、在计划中添加上下文对象,方便后面计划获取前面计划放入的数据
实现原理如图:
这里面线程池取数据时较为复杂,没有画在图中,下面详细介绍存放数据的细节:
在介绍之前我们先看一下延迟队列中包含的属性
延迟队列的结构如下:
1、放数据
放数据比较简单,就在每次获取任务时判断原来的任务是否在延迟队列中,在的话就跳过获取下个数据,不在就将该任务的数据放入对应的执行计划队列中
2、取数据
取数据的逻辑就比较复杂了,这其中不同的模式取数据的方式还不太一样,但是实现逻辑是一样的,取数据时线程池获取对应执行列表中的计划,如果计划不存在,可以从延迟队列中对应任务中还未执行的计划中获取计划执行。如果计划存在,还需要判断该计划对应的任务是否在延迟队列中,如果在需要将该计划放入在延迟队列中对应任务的未执行计划列表中,等待后面执行;如果该计划对应的任务不在延迟队列中,那么可以直接执行计划
后面接着上代码
DelayPlanimport lombok.Data;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Author: dinghao
* @Date: 2022/3/25 15:44
*/
@Data
public class DelayPlan implements Delayed {
private int taskInfoId;
/* 触发时间*/
private long time;
public DelayPlan(int taskInfoId, long time, TimeUnit unit) {
this.taskInfoId = taskInfoId;
this.time = System.currentTimeMillis() + (time > 0? unit.toMillis(time): 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
DelayPlan DelayPlan = (DelayPlan) o;
long diff = this.time - DelayPlan.time;
if (diff <= 0) {// 改成>=会造成问题
return -1;
}else {
return 1;
}
}
}
DelayPlanQueue
import cn.hutool.core.collection.ConcurrentHashSet;
import lombok.Data;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
/**
* @Author: dinghao
* @Date: 2022/3/25 15:50
*/
@Data
public class DelayPlanQueue {
/**
* 延迟队列
*/
public static DelayQueue<DelayPlan> delayPlanQueue = new DelayQueue<>();
/**
* 任务id 集合
*/
public static Set<Integer> taskInfoIdSet = new ConcurrentHashSet<>();
/**
* key: 任务id
* value: 对应任务中因为前面计划有睡眠后面还未执行的计划列表
*/
public static Map<Integer, LinkedList<Plan>> delayPlanMap = new ConcurrentHashMap<>();
public static Thread thread;
public static void add(int taskInfoId, long time, TimeUnit unit) {
if (!contains(taskInfoId)) {
DelayPlan delayPlan = new DelayPlan(taskInfoId, time, unit);
delayPlanQueue.put(delayPlan);
taskInfoIdSet.add(taskInfoId);
}
}
public static void clearDelayPlanStart() {
thread = new Thread(() -> {
for (; ; ) {
try {
DelayPlan take = delayPlanQueue.take();
int taskInfoId = take.getTaskInfoId();
taskInfoIdSet.removeIf(item -> item == taskInfoId);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
thread.start();
}
public static void clearDelayPlanEnd() {
if (thread != null) {
thread.interrupt();
}
}
public static boolean contains(int taskInfoId) {
return taskInfoIdSet.contains(taskInfoId);
}
public static void main(String[] args) throws InterruptedException {
add(0, 5, TimeUnit.SECONDS);
add(1, 6, TimeUnit.SECONDS);
add(2, 7, TimeUnit.SECONDS);
add(3, 8, TimeUnit.SECONDS);
add(4, 10, TimeUnit.SECONDS);
add(5, 1, TimeUnit.MINUTES);
add(4, 30, TimeUnit.SECONDS);
clearDelayPlanStart();
for (int i = 1; i < 70; i++) {
Thread.sleep(1000);
for (int j = 0; j < 6; j++) {
boolean contains = contains(j);
System.out.println("第" + i + "秒," + j + "是否存在" + contains);
}
System.out.println("-----------------------------------------------");
}
}
}
Scheder
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
/**
* @Author: dinghao
* @Date: 2022/3/7 15:29
*/
@Slf4j
public class Scheder {
/**
* 任务信息数组
*/
private TaskInfo[] taskInfos;
/**
* 执行计划队列数组
*/
private LinkedBlockingQueue<Plan>[] planQueueArray;
/**
* 队列的数量
*/
int queueNum;
/**
* 队列的容量
*/
int queueSize;
/**
* 允许并行执行的线程池
*/
private java.util.concurrent.ExecutorService loopExecutor;
/**
* 允许并行执行的线程数
*/
private int nThrends;
/**
* 执行模式:
* 1:所有任务信息都执行
* 2:先执行部分任务,执行完后再执行其他任务
* 3:顺序执行任务中的计划
* 4:顺序先执行执行任务中的计划,执行完后再顺序执行其他任务
*/
private int model;
/**
* 每批执行的任务数量
*/
private int modelSize;
/**
* model = 2,4 时有效
*/
private ArrayList<Integer> indexList = new ArrayList<>();
/**
* 协助判断,是否线程池的任务全部结束
*/
private AtomicInteger count;
/**
* 调度器的执行状态
*/
private volatile boolean status = false;
/**
* 构造器
*
* @param taskInfos 任务数组
* @param nThrends 同时执行任务中的计划线程数
* @param queueSize 计划执行队列
* @param model 执行模式 1:所有任务信息都执行 2:先执行部分任务,执行完后再执行其他任务
* @param modelSize 每批执行任务的数量
*/
public Scheder(TaskInfo[] taskInfos, int nThrends, int queueNum, int queueSize, int model, Integer modelSize) {
this.taskInfos = taskInfos;
this.nThrends = nThrends;
this.queueNum = queueNum;
this.queueSize = queueSize;
this.loopExecutor = Executors.newFixedThreadPool(this.nThrends);
this.model = model;
if (this.model < 3) {
this.planQueueArray = new LinkedBlockingQueue[1];
this.planQueueArray[0] = new LinkedBlockingQueue<>(this.queueSize);
} else {
// 初始化队列数组
this.planQueueArray = new LinkedBlockingQueue[this.queueNum];
IntStream.range(0, this.queueNum).forEach(i -> this.planQueueArray[i] = new LinkedBlockingQueue<>(this.queueSize));
}
// modelSize只有在等于2,4有效
if (this.model == 2 || this.model == 4) {
this.modelSize = modelSize > taskInfos.length ? taskInfos.length : modelSize;
}
count = countPlan();
}
/**
* 计算一共有多少执行计划
*
* @return /
*/
private AtomicInteger countPlan() {
int sum = 0;
for (int i = 0; i < this.taskInfos.length; i++) {
sum += this.taskInfos[i].getPlanQueue().size();
}
return new AtomicInteger(sum);
}
public Scheder(TaskInfo[] taskInfos, int nThrends, int model, Integer modelSize) {
this(taskInfos, nThrends, nThrends, 100, model, modelSize);
}
public Scheder(TaskInfo[] taskInfos, int nThrends) {
this(taskInfos, nThrends, nThrends, 100, 1, null);
}
public Scheder(TaskInfo[] taskInfos) {
this(taskInfos, 10, 10, 100, 1, null);
}
public void setModel(int model) {
this.model = model;
}
public int getModel() {
return model;
}
public void setModelSize(int modelSize) {
this.modelSize = modelSize;
}
public int getModelSize() {
return modelSize;
}
public ArrayList<Integer> getIndexList() {
return indexList;
}
public AtomicInteger getCount() {
return count;
}
public boolean isStatus() {
return status;
}
/**
* 执行方法
*/
public void run() {
if (this.status) {
log.warn("任务处于启动状态");
return;
}
this.status = true;
// 开启向队列中添加执行计划线程
init();
// 循环执行执行计划
while (this.status) {
// 所有执行计划执行完后,退出
if (this.taskInfos.length <= 0) {
if (this.model < 3) {
if (this.planQueueArray[0].size() == 0 && CollUtil.isEmpty(DelayPlanQueue.delayPlanMap)) {
this.status = false;
break;
}
} else {
ArrayList<Integer> notEmptyIndex = getNotEmptyIndex(this.planQueueArray);
if (CollUtil.isEmpty(notEmptyIndex) && CollUtil.isEmpty(DelayPlanQueue.delayPlanMap)) {
this.status = false;
break;
}
}
}
// 执行计划
execute();
}
int size;
// 所有线程执行完毕出循环
for (; ; ) {
size = this.count.get();
if (size == 0) {
break;
}
}
//停止线程池
this.loopExecutor.shutdownNow();
for (; ; ) {
//只有当线程池中所有线程完成任务时才会返回true,并且需要先调用线程池的shutdown方法或者shutdownNow方法。
if (this.loopExecutor.isTerminated()) {
System.out.println("执行结束!");
break;
}
}
DelayPlanQueue.clearDelayPlanEnd();
}
private void execute() {
if (this.model < 3) {
try {
// 获取一个执行计划
Plan peek = this.planQueueArray[0].peek();
if(peek == null){
if (CollUtil.isEmpty(DelayPlanQueue.delayPlanMap)) {
return;
}
// 计划队列为空的时候,如果休眠计划队列有数据可以执行休眠计划队列中的数据,这里的any一定会取到值
Optional<Map.Entry<Integer, LinkedList<Plan>>> any = DelayPlanQueue.delayPlanMap.entrySet().stream().findAny();
Integer taskInfoId = any.get().getKey();
LinkedList<Plan> planLinkedList = any.get().getValue();
Plan planOfDelayPlan = planLinkedList.removeFirst();
if(CollUtil.isEmpty(planLinkedList)){
DelayPlanQueue.delayPlanMap.entrySet().removeIf(item->item.getKey().equals(taskInfoId));
}
this.loopExecutor.execute(() -> planOfDelayPlan.run0(this.count, Plan.context.get(planOfDelayPlan.taskInfoId)));
return;
}
// 判断这个任务是否在休眠
if (DelayPlanQueue.contains(peek.taskInfoId)) {
// 处于休眠,取出该任务计划
Plan take = this.planQueueArray[0].take();
// 将该任务放入休眠计划队列中
LinkedList<Plan> planLinkedList = DelayPlanQueue.delayPlanMap.computeIfAbsent(take.taskInfoId, k -> new LinkedList<>());
planLinkedList.push(take);
return;
}else{// 这个任务不在休眠
// 获取这个任务的休眠计划队列
LinkedList<Plan> planLinkedList = DelayPlanQueue.delayPlanMap.get(peek.taskInfoId);
// 判断是否休眠计划队列中还存在该任务的计划
if(!CollUtil.isEmpty(planLinkedList)){
// 存在,优先执行休眠计划队列的计划
Plan planOfDelayPlan = planLinkedList.removeFirst();
if(CollUtil.isEmpty(planLinkedList)){
DelayPlanQueue.delayPlanMap.entrySet().removeIf(item->item.getKey() == peek.taskInfoId);
}
this.loopExecutor.execute(() -> planOfDelayPlan.run0(this.count, Plan.context.get(planOfDelayPlan.taskInfoId)));
}else{
// 不存在
Plan take = this.planQueueArray[0].take();
// 执行计划
this.loopExecutor.execute(() -> take.run0(this.count, Plan.context.get(take.taskInfoId)));
}
}
} catch (InterruptedException e) {
log.error("任务执行中发生异常", e);
}
} else {
this.loopExecutor.execute(() -> {
try {
// 获取一个执行计划
Plan plan = null;
// 获取线程id
String name = Thread.currentThread().getName();
int lastIndexOf = name.lastIndexOf("-");
int id = Integer.parseInt(name.substring(lastIndexOf + 1));
ArrayList<Integer> notEmptyIndex2 = getNotEmptyIndex(this.planQueueArray);
Integer index = notEmptyIndex2.stream().filter(item -> item % this.nThrends == (id - 1)).findAny().orElse(null);
if (index == null) {
return;
}
LinkedBlockingQueue<Plan> plans = this.planQueueArray[index];
if (plans.size() > 0) {
Plan peek = plans.peek();
if(peek == null){
if (CollUtil.isEmpty(DelayPlanQueue.delayPlanMap)) {
return;
}
// 计划队列为空的时候,如果休眠计划队列有数据可以执行休眠计划队列中的数据
Optional<Map.Entry<Integer, LinkedList<Plan>>> any = DelayPlanQueue.delayPlanMap.entrySet().stream().findAny();
Integer taskInfoId = any.get().getKey();
LinkedList<Plan> planLinkedList = any.get().getValue();
Plan planOfDelayPlan = planLinkedList.removeFirst();
if(CollUtil.isEmpty(planLinkedList)){
DelayPlanQueue.delayPlanMap.entrySet().removeIf(item->item.getKey().equals(taskInfoId));
}
this.loopExecutor.execute(() -> planOfDelayPlan.run0(this.count, Plan.context.get(planOfDelayPlan.taskInfoId)));
return;
}
// 如果这个任务在睡眠
if (DelayPlanQueue.contains(peek.taskInfoId)) {
Plan take = plans.take();
LinkedList<Plan> planLinkedList = DelayPlanQueue.delayPlanMap.computeIfAbsent(take.taskInfoId, k -> new LinkedList<>());
planLinkedList.push(take);
return;
}else{
LinkedList<Plan> planLinkedList = DelayPlanQueue.delayPlanMap.get(peek.taskInfoId);
if(!CollUtil.isEmpty(planLinkedList)){
Plan planOfDelayPlan = planLinkedList.removeFirst();
if(CollUtil.isEmpty(planLinkedList)){
DelayPlanQueue.delayPlanMap.entrySet().removeIf(item->item.getKey() == peek.taskInfoId);
}
planOfDelayPlan.run0(this.count, Plan.context.get(planOfDelayPlan.taskInfoId));
}else{
plan = plans.take();
plan.run0(this.count, Plan.context.get(plan.taskInfoId));
}
}
}
} catch (InterruptedException e) {
log.error("任务执行中发生异常", e);
}
});
}
}
private ArrayList<Integer> getNotEmptyIndex(LinkedBlockingQueue<Plan>[] planQueueArray) {
ArrayList<Integer> indexArray = new ArrayList<>();
for (int i = 0; i < planQueueArray.length; i++) {
if (!CollUtil.isEmpty(planQueueArray[i])) {
indexArray.add(i);
}
}
return indexArray;
}
/**
* 开启一个线程,持续向执行计划队列添加执行计划,直到所有的计划任务添加完
*/
private void init() {
DelayPlanQueue.clearDelayPlanStart();
new Thread(() -> {
while (this.status) {
// 任务信息数组数量
int length = this.taskInfos.length;
// 执行完结束线程
if (length <= 0) {
break;
}
// 获取添加执行计划的的任务索引值
int index = getIndexOfModel(this.model, length);
TaskInfo taskInfo = null;
try {
taskInfo = this.taskInfos[index];
} catch (Exception e) {
e.printStackTrace();
}
// 延迟队列中存在该任务id直接获取下一个任务
if (!DelayPlanQueue.contains(taskInfo.getId())) {
LinkedList<Plan> plans = taskInfo.getPlanQueue();
if (plans.size() > 0) {
try {
if (this.model >= 3) {
int index2 = taskInfo.getId() % this.planQueueArray.length;
this.planQueueArray[index2].put(plans.removeFirst());
} else {
this.planQueueArray[0].put(plans.removeFirst());
}
} catch (InterruptedException e) {
log.error("向执行计划队列放入计划异常", e);
}
} else {
this.taskInfos = reBuildTaskInfos(this.taskInfos, index);
}
}
}
}).start();
}
/**
* 根据执行模式获取添加执行计划的的任务信息索引值
*
* @param model 执行模式
* @param length 任务信息数组数量
* @return 任务信息索引值
*/
private int getIndexOfModel(int model, int length) {
if (model == 1 || model == 3) {
return RandomUtil.randomInt(0, length) % length;
} else {
this.indexList.removeIf(item -> item >= length);
if (this.indexList.size() < this.modelSize) {
int index = RandomUtil.randomInt(0, length) % length;
this.indexList.add(index);
return index;
} else {
return this.indexList.get(RandomUtil.randomInt(0, length) % this.indexList.size());
}
}
}
/**
* 重新构建任务信息数组
*
* @param taskInfos 原来任务信息数组
* @param index 需要移除的任务信息
* @return 新的任务信息数组
*/
private TaskInfo[] reBuildTaskInfos(TaskInfo[] taskInfos, int index) {
TaskInfo[] newTaskINfo = new TaskInfo[taskInfos.length - 1];
for (int j = 0, i = 0; i < taskInfos.length; i++) {
if (i != index) {
newTaskINfo[j] = taskInfos[i];
j++;
}
}
return newTaskINfo;
}
}
TaskInfo
import lombok.Data;
import java.util.LinkedList;
/**
* @Author: dinghao
* @Date: 2022/3/7 15:31
*/
@Data
public class TaskInfo {
/**
* 唯一标识
*/
private int id;
/**
* 任务名称
*/
private String name;
/**
* 执行计划队列
*/
private LinkedList<Plan> planQueue;
public TaskInfo(int id, String name, LinkedList<Plan> planQueue) {
this.id = id;
this.name = name;
this.planQueue = planQueue;
}
}
Plan
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: dinghao
* @Date: 2022/3/7 15:37
*/
public abstract class Plan {
/**
* 所有执行计划的上下文
*/
static Map<Integer, Map<String, Object>> context = new ConcurrentHashMap<>();
/**
* taskInfo id
*/
int taskInfoId;
/**
* 线程池执行前
*/
void before() {
}
abstract void run(Map<String, Object> context);
/**
* 线程池执行后
*/
void after() {
}
/**
* @param atomicInteger 计数器
* @param context 执行计划的上下文
*/
void run0(AtomicInteger atomicInteger, Map<String, Object> context) {
try {
before();
run(context);
} finally {
after();
atomicInteger.decrementAndGet();
}
}
}
这里加入context上下对象
实现自己的计划
MyPlanimport lombok.Data;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @Author: dinghao
* @Date: 2022/3/9 10:33
*/
@Data
public class MyPlan extends Plan {
private String name;
public void setTaskInfoId(int taskInfoId) {
this.taskInfoId = taskInfoId;
}
@Override
public void run(Map<String, Object> context) {
if (this.taskInfoId == 33 && name.indexOf("333")>0) {
DelayPlanQueue.add(this.taskInfoId, 30, TimeUnit.SECONDS);
}
System.out.println(Thread.currentThread().getName() + ":" + "用户:" + this.taskInfoId + name);
}
}
这里第33个任务的333个计划睡眠30s
Testimport java.util.LinkedList;
import java.util.stream.IntStream;
/**
* @Author: dinghao
* @Date: 2022/3/9 14:52
*/
public class Test {
public static void main(String[] args) {
int userSize = 100;
int jobSize = 1000;
TaskInfo[] taskInfos = new TaskInfo[userSize];
IntStream.range(0, userSize).parallel().forEach(i -> {
LinkedList<Plan> plans = new LinkedList<>();
for (int j = 0; j < jobSize; j++) {
MyPlan myPlan = new MyPlan();
myPlan.setTaskInfoId(i);
myPlan.setName("执行计划" + j);
plans.add(myPlan);
}
taskInfos[i] = new TaskInfo(i, "用户" + i, plans);
});
Scheder scheder = new Scheder(taskInfos, 10,10,100,3,2);
scheder.run();
}
}
测试选择的是模式3,即所有任务都在执行,并且每个任务中的计划顺序实现,由于我们在计划中第33个任务的第333个计划睡眠过所以第33个任务会最后顺序执行完
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)