昨天写的java多线程执行任务(工具)但是不能符合顺序执行计划的场景,下面升级一下原工具
java多线程执行任务(工具)
之前只支持两种模式,新工具支持四种模式
执行模式:
- 1:所有任务信息都执行
- 2:先执行部分任务,执行完后再执行其他任务
- 3:所有任务信息都执行,但是顺序执行每个任务中的计划
- 4:顺序先执行执行任务中的计划,执行完后再顺序执行其他任务
模式3,4在模式1,2上顺序执行每个任务中的计划
实现原理如图:
接着上代码
Scheder
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
/**
* @Author: dinghao
* @Date: 2022/3/7 15:29
*/
@Slf4j
public class Scheder {
/**
* 任务信息数组
*/
private TaskInfo[] taskInfos;
/**
* 执行计划队列数组
*/
private LinkedBlockingQueue<Plan>[] planQueueArray;
/**
* 队列的数量
*/
int queueNum;
/**
* 队列的容量
*/
int queueSize;
/**
* 允许并行执行的线程池
*/
private ExecutorService loopExecutor;
/**
* 允许并行执行的线程数
*/
private int nThrends;
/**
* 执行模式:
* 1:所有任务信息都执行
* 2:先执行部分任务,执行完后再执行其他任务
* 3:顺序执行任务中的计划
* 4:顺序先执行执行任务中的计划,执行完后再顺序执行其他任务
*/
private int model;
/**
* 每批执行的任务数量
*/
private int modelSize;
/**
* model = 2,4 时有效
*/
private ArrayList<Integer> indexList = new ArrayList<>();
/**
* 协助判断,是否线程池的任务全部结束
*/
private AtomicInteger count;
/**
* 调度器的执行状态
*/
private volatile boolean status = false;
/**
* 构造器
*
* @param taskInfos 任务数组
* @param nThrends 同时执行任务中的计划线程数
* @param queueSize 计划执行队列
* @param model 执行模式 1:所有任务信息都执行 2:先执行部分任务,执行完后再执行其他任务
* @param modelSize 每批执行任务的数量
*/
public Scheder(TaskInfo[] taskInfos, int nThrends, int queueNum, int queueSize, int model, Integer modelSize) {
this.taskInfos = taskInfos;
this.nThrends = nThrends;
this.queueNum = queueNum;
this.queueSize = queueSize;
this.loopExecutor = Executors.newFixedThreadPool(this.nThrends);
this.model = model;
if (this.model < 3) {
this.planQueueArray = new LinkedBlockingQueue[1];
this.planQueueArray[0] = new LinkedBlockingQueue<>(this.queueSize);
} else {
// 初始化队列数组
this.planQueueArray = new LinkedBlockingQueue[this.queueNum];
IntStream.range(0, this.queueNum).forEach(i -> this.planQueueArray[i] = new LinkedBlockingQueue<>(this.queueSize));
}
// modelSize只有在等于2,4有效
if (this.model == 2 || this.model == 4) {
this.modelSize = modelSize > taskInfos.length ? taskInfos.length : modelSize;
}
count = countPlan();
}
/**
* 计算一共有多少执行计划
*
* @return /
*/
private AtomicInteger countPlan() {
int sum = 0;
for (int i = 0; i < this.taskInfos.length; i++) {
sum += this.taskInfos[i].getPlanQueue().size();
}
return new AtomicInteger(sum);
}
public Scheder(TaskInfo[] taskInfos, int nThrends, int model, Integer modelSize) {
this(taskInfos, nThrends, nThrends, 100, model, modelSize);
}
public Scheder(TaskInfo[] taskInfos, int nThrends) {
this(taskInfos, nThrends, nThrends, 100, 1, null);
}
public Scheder(TaskInfo[] taskInfos) {
this(taskInfos, 10, 10, 100, 1, null);
}
public void setModel(int model) {
this.model = model;
}
public int getModel() {
return model;
}
public void setModelSize(int modelSize) {
this.modelSize = modelSize;
}
public int getModelSize() {
return modelSize;
}
public ArrayList<Integer> getIndexList() {
return indexList;
}
public AtomicInteger getCount() {
return count;
}
public boolean isStatus() {
return status;
}
/**
* 执行方法
*/
public void run() {
if (this.status) {
log.warn("任务处于启动状态");
return;
}
this.status = true;
// 开启向队列中添加执行计划线程
init();
// 循环执行执行计划
while (this.status) {
// 所有执行计划执行完后,退出
if (this.taskInfos.length <= 0) {
if (this.model < 3) {
if (this.planQueueArray[0].size() == 0) {
this.status = false;
break;
}
} else {
ArrayList<Integer> notEmptyIndex = getNotEmptyIndex(this.planQueueArray);
if (CollUtil.isEmpty(notEmptyIndex)) {
this.status = false;
break;
}
}
}
// 执行计划
execute();
}
int size;
// 所有线程执行完毕出循环
for (; ; ) {
size = this.count.get();
if (size == 0) {
break;
}
}
//停止线程池
this.loopExecutor.shutdownNow();
for (; ; ) {
//只有当线程池中所有线程完成任务时才会返回true,并且需要先调用线程池的shutdown方法或者shutdownNow方法。
if (this.loopExecutor.isTerminated()) {
System.out.println("执行结束!");
break;
}
}
}
private void execute() {
if (this.model < 3) {
try {
// 获取一个执行计划
Plan plan = this.planQueueArray[0].take();
// 执行计划
this.loopExecutor.execute(() -> plan.run0(this.count));
} catch (InterruptedException e) {
log.error("任务执行中发生异常", e);
}
} else {
this.loopExecutor.execute(() -> {
try {
// 获取一个执行计划
Plan plan = null;
// 获取线程id
String name = Thread.currentThread().getName();
int lastIndexOf = name.lastIndexOf("-");
int id = Integer.parseInt(name.substring(lastIndexOf + 1));
ArrayList<Integer> notEmptyIndex2 = getNotEmptyIndex(this.planQueueArray);
Integer index = notEmptyIndex2.stream().filter(item -> item % this.nThrends == (id - 1)).findAny().orElse(null);
if (index == null) {
return;
}
LinkedBlockingQueue<Plan> plans = this.planQueueArray[index];
if (plans.size() > 0) {
plan = plans.take();
plan.run0(this.count);
}
} catch (InterruptedException e) {
log.error("任务执行中发生异常", e);
}
});
}
}
private ArrayList<Integer> getNotEmptyIndex(LinkedBlockingQueue<Plan>[] planQueueArray) {
ArrayList<Integer> indexArray = new ArrayList<>();
for (int i = 0; i < planQueueArray.length; i++) {
if (!CollUtil.isEmpty(planQueueArray[i])) {
indexArray.add(i);
}
}
return indexArray;
}
/**
* 开启一个线程,持续向执行计划队列添加执行计划,直到所有的计划任务添加完
*/
private void init() {
new Thread(() -> {
while (this.status) {
// 任务信息数组数量
int length = this.taskInfos.length;
// 执行完结束线程
if (length <= 0) {
break;
}
// 获取添加执行计划的的任务索引值
int index = getIndexOfModel(this.model, length);
TaskInfo taskInfo = null;
try {
taskInfo = this.taskInfos[index];
} catch (Exception e) {
e.printStackTrace();
}
LinkedList<Plan> plans = taskInfo.getPlanQueue();
if (plans.size() > 0) {
try {
if (this.model >= 3) {
int index2 = taskInfo.getId() % this.planQueueArray.length;
this.planQueueArray[index2].put(plans.removeFirst());
} else {
this.planQueueArray[0].put(plans.removeFirst());
}
} catch (InterruptedException e) {
log.error("向执行计划队列放入计划异常", e);
}
} else {
this.taskInfos = reBuildTaskInfos(this.taskInfos, index);
}
}
}).start();
}
/**
* 根据执行模式获取添加执行计划的的任务信息索引值
*
* @param model 执行模式
* @param length 任务信息数组数量
* @return 任务信息索引值
*/
private int getIndexOfModel(int model, int length) {
if (model == 1 || model == 3) {
return RandomUtil.randomInt(0, length) % length;
} else {
this.indexList.removeIf(item -> item >= length);
if (this.indexList.size() < this.modelSize) {
int index = RandomUtil.randomInt(0, length) % length;
this.indexList.add(index);
return index;
} else {
return this.indexList.get(RandomUtil.randomInt(0, length) % this.indexList.size());
}
}
}
/**
* 重新构建任务信息数组
*
* @param taskInfos 原来任务信息数组
* @param index 需要移除的任务信息
* @return 新的任务信息数组
*/
private TaskInfo[] reBuildTaskInfos(TaskInfo[] taskInfos, int index) {
TaskInfo[] newTaskINfo = new TaskInfo[taskInfos.length - 1];
for (int j = 0, i = 0; i < taskInfos.length; i++) {
if (i != index) {
newTaskINfo[j] = taskInfos[i];
j++;
}
}
return newTaskINfo;
}
}
TaskInfo
import lombok.Data;
import java.util.LinkedList;
/**
* @Author: dinghao
* @Date: 2022/3/7 15:31
*/
@Data
public class TaskInfo {
/**
* 唯一标识
*/
private int id;
/**
* 任务名称
*/
private String name;
/**
* 执行计划队列
*/
private LinkedList<Plan> planQueue;
public TaskInfo(int id, String name, LinkedList<Plan> planQueue) {
this.id = id;
this.name = name;
this.planQueue = planQueue;
}
}
这里加了id属性
Plan
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: dinghao
* @Date: 2022/3/7 15:37
*/
public interface Plan {
/**
* 线程池执行前
*/
default void before(){
}
void run();
/**
* 线程池执行后
*/
default void after(){
}
default void run0(AtomicInteger atomicInteger) {
try{
before();
run();
}finally {
after();
atomicInteger.decrementAndGet();
}
}
}
修改完成
实现自己的计划
MyPlan
import lombok.Data;
/**
* @Author: dinghao
* @Date: 2022/3/9 10:33
*/
@Data
public class MyPlan implements Plan {
private String name;
@Override
public void run() {
// if( name.startsWith("用户99")){
System.out.println(Thread.currentThread().getName() + ":" + name);
// }
}
}
Test
import java.util.LinkedList;
import java.util.stream.IntStream;
/**
* @Author: dinghao
* @Date: 2022/3/9 14:52
*/
public class Test {
public static void main(String[] args) {
int userSize = 100;
int jobSize = 1000;
TaskInfo[] taskInfos = new TaskInfo[userSize];
IntStream.range(0, userSize).parallel().forEach(i -> {
LinkedList<Plan> plans = new LinkedList<>();
for (int j = 0; j < jobSize; j++) {
MyPlan myPlan = new MyPlan();
myPlan.setName("用户" + i + ",执行计划" + j);
plans.add(myPlan);
}
taskInfos[i] = new TaskInfo(i, "用户" + i, plans);
});
Scheder scheder = new Scheder(taskInfos, 3, 10, 100, 3, 2);
scheder.run();
}
}
测试结果:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)