线程池:控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动排在队列中的任务, 如果线程数量超过了最大数量,超出数量的线程进入队列,等待其他线程执行完毕,再从队列中取出任务执行。 优点: (1)线程复用:避免了反复创建销毁线程的消耗,提高了线程的响应速度。 (2)控制最大并发数:避免并发数过高,服务崩溃。 (3)集中管理线程:使用线程池对线程进行统一管理和处理。
线程池里的7大参数:
/**
* 线程池的七大参数:
* 1.corePoolSize:核心线程数
* 2.maximumPoolSize:最大线程数
* 3.keepaliveTime:时间大小
* 4.unit:时间单位
* 5.workQueue:任务队列
* 6.threadFactory:线程工厂
* 7.handler:拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
newFixedThreadPool(int)线程池:
package org.lee.study.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lee
* @date 2022/4/20 15:19
*/
public class ThreadPoolTest {
public static void main(String[] args) {
/**
* 1.newFixedThreadPool();线程池中有固定的线程数:
* 通过源码得知,核心线程数和最大线程数都是固定的,适合执行长期固定任务。
* 缺点:通过源码得知,等待队列大小为int的最大值,会造成任务堆积,不建议使用。
*/
int threadPoolNumber = 10;
ExecutorService executorService = Executors.newFixedThreadPool(5);
try {
for (int i = 1; i <= threadPoolNumber; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
//源码:传入的int值为核心线程数和最大线程数
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
//任务队列的大小为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
newSingleThreadExecutor()
package org.lee.study.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lee
* @date 2022/4/20 15:19
*/
public class ThreadPoolTest {
public static void main(String[] args) {
/**
* 2.newSingleThreadExecutor();线程池中只有一个线程。
* 通过源码得知,核心线程数和最大线程数都是1。
* 缺点:通过源码得知,等待队列大小为int的最大值,会造成任务堆积,不建议使用。
*/
int threadPoolNumber = 10;
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
for (int i = 1; i <= threadPoolNumber; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
newCachedThreadPool
package org.lee.study.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lee
* @date 2022/4/20 15:19
*/
public class ThreadPoolTest {
public static void main(String[] args) {
/**
* 3.newCachedThreadPool();线程池中的数量可以动态变化。
* 通过源码得知,核心线程数是0,最大线程数是Integer.MAX_VALUE。
* 缺点:通过源码得知,最大线程池数大小为int的最大值,会造成任务堆积,造成OOM,不建议使用。
*/
int threadPoolNumber = 10;
ExecutorService executorService = Executors.newCachedThreadPool();
try {
for (int i = 1; i <= threadPoolNumber; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
此处略有不同的地方是,任务队列的数据类型是SynchronousQueue队列,源码默认是false,使用的是TransferStack;
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
/**
* Creates a {@code SynchronousQueue} with nonfair access policy.
*创建具有非公平访问策略的 SynchronousQueue。
*/
public SynchronousQueue() {
this(false);
}
/**
* Creates a {@code SynchronousQueue} with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}
以上3种创建线程池的方式仅供了解,实际开发中不建议使用,都有造成OOM的风险。实际开发中使用的自定义创建线程池的方式。
package org.lee.study.concurrent;
import java.util.concurrent.*;
/**
* @author lee
* @date 2022/4/20 15:19
*/
public class ThreadPoolTest {
public static void main(String[] args) {
int threadPoolNumber = 10;
/**
* 线程池的七大参数:
* 1.corePoolSize:核心线程数
* 2.maximumPoolSize:最大线程数
* 3.keepaliveTime:时间大小
* 4.unit:时间单位
* 5.workQueue:任务队列
* 6.threadFactory:线程工厂
* 7.handler:拒绝策略
*/
ExecutorService executorService = new ThreadPoolExecutor(2,
5,
2L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 1; i <= threadPoolNumber; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
通过源码了解线程池的工作模式
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
通过源码里面的注释可以得知:
1.核心线程数没满,新建线程执行任务。如果核心线程数满了,新任务将进入队列。
2.任务队列满了,将使用最大线程数。
3.最大线程数满了,将使用拒绝策略。
4.当一个线程无事可做并且当前线程数大于核心线程数,将根据最大的存活时间进行回收,所有线程池的任务完成后,线程池的大小会回缩到核心线程数的大小。
线程池的拒绝策略
/**
*通过源码可知,拒绝策略的其中一种实现了RejectedExecutionHandler类,这是其中一种直接抛出异常。
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
通过这个类得知,一共有以下4中拒绝策略。
1.AbortPolicy拒绝策略:丢弃任务并直接抛出异常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2.CallerRunsPolicy拒绝策略:如果任务被拒绝了,将由调用线程直接执行此任务。
举例:用main方法写了一个线程池,任务队列满了的情况下,将由main主线程来执行这个任务。
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
3.DiscardOldestPolicy拒绝策略:如果线程池没有关闭,将队列第一个元素出队(抛弃),然后重新提交这个任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
4.DiscardPolicy拒绝策略:Does nothing, which has the effect of discarding task r.
丢弃任务什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)