管理线程,当线程执行完当前任务,不会死掉而是 会从队列里面取
1降低系统资源消耗。通过复用已存在的线程,降低线程创建和销毁造成的消耗;
2提高响应速度。当有任务到达时,无需等待新线程的创建便能立即执行;
3提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗大量系统资源,还会降低系统的稳定性,使用线程池可以进行对线程进行统一的分配、调优和监控。
本文主要是围绕 ThreadPoolExecutor(线程池框架的核心类)的构造方法参数 展开:
1corePoolSize
线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。
2maximumPoolSize
额外最大线程数。上面说到任务数足够多,且使用的是有界队列,如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,首先从队列里面取,如果队列里面的消息执行完毕,等下一定时间,额外线程自动销毁。
3keepAliveTime
线程空闲时的存活时间。默认情况下,可以理解成额外最大线程数没活干了,额外线程线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
4unit
keepAliveTime参数的时间单位。
5workQueue
任务缓存队列,用来存放等待执行的任务。如果当前线程数为corePoolSize,继续提交的任务就会被保存到任务缓存队列中,等待被执行。
一般来说,这里的BlockingQueue有以下三种选择:
SynchronousQueue:一个不存储元素的阻塞队列,每个插入 *** 作必须等到另一个线程调用移除 *** 作,否则插入 *** 作一直处于阻塞状态。因此,如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无限制的线程增长。
LinkedBlockingQueue:基于链表结构的阻塞队列,如果不设置初始化容量,其容量为IntegerMAX_VALUE,即为无界队列。因此,如果线程池中线程数达到了corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无限制的增长。
ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。
6threadFactory
线程工厂,创建新线程时使用的线程工厂。
7handler
任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略:
AbortPolicy :丢弃任务并抛出RejectedExecutionException异常,默认策略;
CallerRunsPolicy :由调用execute方法的线程执行该任务;
DiscardPolicy :丢弃任务,但是不抛出异常;
DiscardOldestPolicy :丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。
当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
总结下上诉参数:假设corePoolSize为10 ,maximumPoolSize为10,线程空闲时的存活时间为60s,队列采用的是有界队列ArrayBlockingQueue 设置阈值200,使 用拒绝策略 , 当前2000个任务提交过来 流程如下图
参数案例描述:
当前2000笔 任务进来,10个核心线程去处理,剩下的1990任务队列里面放200个。剩下的1790个任务。队列塞满会去创建10个额外线程和核心线程一起去 去执行剩下的1780个任务。 当还有剩下任务处理不了就会触发任务拒绝策略 。
当前220笔 任务进来,10个核心线程去处理,剩下的210任务队列里面放200个。剩下的10个任务。队列塞满会去创建10个额外线程 去执行队列放不下的任务。 当额外线程和核心线程处理完队列里面的队列。没有任务可执行时,额外线程会等待我们设置的keepAliveTime,还是没有任务的情况下,就会被回收了 。
以上是绝对理想的状况下。
由参数可知 核心线程 和额外线程值是相同的,额外线程被回收时间是0,采用的是无界队列。默认采用的拒绝策略为 AbortPolicy。分析得 核心线程和额外线程处理不过来得情况,会一直往队列里面放任务。
可能存在的问题:队列过大 导致内存溢出 OOM
当任务量足够大,超过队列。交由额外线程处理。就会创建过多线程 。
可能存在问题:特殊场景下,线程过多可能会导致系统奔溃。cpu负载过高。
1具体解决方案 根据业务系统而定:
华瑞批量查证举例:定时任务CZJZRW001每隔2min 轮询一次 会从业务表verifycationTask 中 查询出待处理和处理中的状态的任务 根据表中的查证类型 分流到具体的 反欺诈异步查证 ,还款查证, 充值查证,贷款查证 。 具体查证根据处理结果更新verifycationTask表查证状态。处理成功 或者失败的定时任务无法再次轮询。这样就不需要考虑以上场景。使用线程池的情况下核心线程,额外线程处理不过来且队列已满使用DiscardPolicy拒绝不抛异常策略 ,即可满足该业务场景。类结构如下图
2思路
可以实现 RejectedExecutionHandler接口 自定义拒绝策略 将被拒绝的任务信息缓存到磁盘,等待线程池负载较低 从磁盘读取重新提交到任务里面去执行
ThreadPoolExecutor提供了四个构造方法:
我们以最后一个构造方法(参数最多的那个),对其参数进行解释:
如果对这些参数作用有疑惑的请看 ThreadPoolExecutor概述 。
知道了各个参数的作用后,我们开始构造符合我们期待的线程池。首先看JDK给我们预定义的几种线程池:
适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。
适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。
咋一瞅,不就是newFixedThreadPool(1)吗?定眼一看,这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢,写个dome来解释一下:
对比可以看出,FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。 因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。
newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。
对于ScheduledThreadPool本文不做描述,其特性请关注后续篇章。
以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:
输出结果如下:
其中线程线程1-4先占满了核心线程和最大线程数量,然后4、5线程进入等待队列,7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。
多线程系列目录(不断更新中):
线程启动原理
线程中断机制
多线程实现方式
FutureTask实现原理
线程池之ThreadPoolExecutor概述
线程池之ThreadPoolExecutor使用
线程池之ThreadPoolExecutor状态控制
线程池之ThreadPoolExecutor执行原理
线程池之ScheduledThreadPoolExecutor概述
线程池的优雅关闭实践
主要内容:
进程是资源分配的最小单位,每个进程都有独立的代码和数据空间,一个进程包含 1 到 n 个线程。线程是 CPU 调度的最小单位,每个线程有独立的运行栈和程序计数器,线程切换开销小。
Java 程序总是从主类的 main 方法开始执行,main 方法就是 Java 程序默认的主线程,而在 main 方法中再创建的线程就是其他线程。在 Java 中,每次程序启动至少启动 2 个线程。一个是 main 线程,一个是垃圾收集线程。每次使用 Java 命令启动一个 Java 程序,就相当于启动一个 JVM 实例,而每个 JVM 实例就是在 *** 作系统中启动的一个进程。
多线程可以通过继承或实现接口的方式创建。
Thread 类是 JDK 中定义的用于控制线程对象的类,该类中封装了线程执行体 run() 方法。需要强调的一点是,线程执行先后与创建顺序无关。
通过 Runnable 方式创建线程相比通过继承 Thread 类创建线程的优势是避免了单继承的局限性。若一个 boy 类继承了 person 类,boy 类就无法通过继承 Thread 类的方式来实现多线程。
使用 Runnable 接口创建线程的过程:先是创建对象实例 MyRunnable,然后将对象 My Runnable 作为 Thread 构造方法的入参,来构造出线程。对于 new Thread(Runnable target) 创建的使用同一入参目标对象的线程,可以共享该入参目标对象 MyRunnable 的成员变量和方法,但 run() 方法中的局部变量相互独立,互不干扰。
上面代码是 new 了三个不同的 My Runnable 对象,如果只想使用同一个对象,可以只 new 一个 MyRunnable 对象给三个 new Thread 使用。
实现 Runnable 接口比继承 Thread 类所具有的优势:
线程有新建、可运行、阻塞、等待、定时等待、死亡 6 种状态。一个具有生命的线程,总是处于这 6 种状态之一。 每个线程可以独立于其他线程运行,也可和其他线程协同运行。线程被创建后,调用 start() 方法启动线程,该线程便从新建态进入就绪状态。
NEW 状态(新建状态) 实例化一个线程之后,并且这个线程没有开始执行,这个时候的状态就是 NEW 状态:
RUNNABLE 状态(就绪状态):
阻塞状态有 3 种:
如果一个线程调用了一个对象的 wait 方法, 那么这个线程就会处于等待状态(waiting 状态)直到另外一个线程调用这个对象的 notify 或者 notifyAll 方法后才会解除这个状态。
run() 里的代码执行完毕后,线程进入终结状态(TERMINATED 状态)。
线程状态有 6 种:新建、可运行、阻塞、等待、定时等待、死亡。
我们看下 join 方法的使用:
运行结果:
我们来看下 yield 方法的使用:
运行结果:
线程与线程之间是无法直接通信的,A 线程无法直接通知 B 线程,Java 中线程之间交换信息是通过共享的内存来实现的,控制共享资源的读写的访问,使得多个线程轮流执行对共享数据的 *** 作,线程之间通信是通过对共享资源上锁或释放锁来实现的。线程排队轮流执行共享资源,这称为线程的同步。
Java 提供了很多同步 *** 作(也就是线程间的通信方式),同步可使用 synchronized 关键字、Object 类的 wait/notifyAll 方法、ReentrantLock 锁、无锁同步 CAS 等方式来实现。
ReentrantLock 是 JDK 内置的一个锁对象,用于线程同步(线程通信),需要用户手动释放锁。
运行结果:
这表明同一时间段只能有 1 个线程执行 work 方法,因为 work 方法里的代码需要获取到锁才能执行,这就实现了多个线程间的通信,线程 0 获取锁,先执行,线程 1 等待,线程 0 释放锁,线程 1 继续执行。
synchronized 是一种语法级别的同步方式,称为内置锁。该锁会在代码执行完毕后由 JVM 释放。
输出结果跟 ReentrantLock 一样。
Java 中的 Object 类默认是所有类的父类,该类拥有 wait、 notify、notifyAll 方法,其他对象会自动继承 Object 类,可调用 Object 类的这些方法实现线程间的通信。
除了可以通过锁的方式来实现通信,还可通过无锁的方式来实现,无锁同 CAS(Compare-and-Swap,比较和交换)的实现,需要有 3 个 *** 作数:内存地址 V,旧的预期值 A,即将要更新的目标值 B,当且仅当内存地址 V 的值与预期值 A 相等时,将内存地址 V 的值修改为目标值 B,否则就什么都不做。
我们通过计算器的案例来演示无锁同步 CAS 的实现方式,非线程安全的计数方式如下:
线程安全的计数方式如下:
运行结果:
线程安全累加的结果才是正确的,非线程安全会出现少计算值的情况。JDK 15 开始,并发包里提供了原子 *** 作的类,AtomicBoolean 用原子方式更新的 boolean 值,AtomicInteger 用原子方式更新 int 值,AtomicLong 用原子方式更新 long 值。 AtomicInteger 和 AtomicLong 还提供了用原子方式将当前值自增 1 或自减 1 的方法,在多线程程序中,诸如 ++i 或 i++ 等运算不具有原子性,是不安全的线程 *** 作之一。 通常我们使用 synchronized 将该 *** 作变成一个原子 *** 作,但 JVM 为此种 *** 作提供了原子 *** 作的同步类 Atomic,使用 AtomicInteger 做自增运算的性能是 ReentantLock 的好几倍。
上面我们都是使用底层的方式实现线程间的通信的,但在实际的开发中,我们应该尽量远离底层结构,使用封装好的 API,例如 JUC 包(javautilconcurrent,又称并发包)下的工具类 CountDownLath、CyclicBarrier、Semaphore,来实现线程通信,协调线程执行。
CountDownLatch 能够实现线程之间的等待,CountDownLatch 用于某一个线程等待若干个其他线程执行完任务之后,它才开始执行。
CountDownLatch 类只提供了一个构造器:
CountDownLatch 类中常用的 3 个方法:
运行结果:
CyclicBarrier 字面意思循环栅栏,通过它可以让一组线程等待至某个状态之后再全部同时执行。当所有等待线程都被释放以后,CyclicBarrier 可以被重复使用,所以有循环之意。
相比 CountDownLatch,CyclicBarrier 可以被循环使用,而且如果遇到线程中断等情况时,可以利用 reset() 方法,重置计数器,CyclicBarrier 会比 CountDownLatch 更加灵活。
CyclicBarrier 提供 2 个构造器:
上面的方法中,参数 parties 指让多少个线程或者任务等待至 barrier 状态;参数 barrierAction 为当这些线程都达到 barrier 状态时会执行的内容。
CyclicBarrier 中最重要的方法 await 方法,它有 2 个重载版本。下面方法用来挂起当前线程,直至所有线程都到达 barrier 状态再同时执行后续任务。
而下面的方法则是让这些线程等待至一定的时间,如果还有线程没有到达 barrier 状态就直接让到达 barrier 的线程执行任务。
运行结果:
CyclicBarrier 用于一组线程互相等待至某个状态,然后这一组线程再同时执行,CountDownLatch 是不能重用的,而 CyclicBarrier 可以重用。
Semaphore 类是一个计数信号量,它可以设定一个阈值,多个线程竞争获取许可信号,执行完任务后归还,超过阈值后,线程申请许可信号时将会被阻塞。Semaphore 可以用来 构建对象池,资源池,比如数据库连接池。
假如在服务器上运行着若干个客户端请求的线程。这些线程需要连接到同一数据库,但任一时刻只能获得一定数目的数据库连接。要怎样才能够有效地将这些固定数目的数据库连接分配给大量的线程呢?
给方法加同步锁,保证同一时刻只能有一个线程去调用此方法,其他所有线程排队等待,但若有 10 个数据库连接,也只有一个能被使用,效率太低。另外一种方法,使用信号量,让信号量许可与数据库可用连接数为相同数量,10 个数据库连接都能被使用,大大提高性能。
上面三个工具类是 JUC 包的核心类,JUC 包的全景图就比较复杂了:
JUC 包(javautilconcurrent)中的高层类(Lock、同步器、阻塞队列、Executor、并发容器)依赖基础类(AQS、非阻塞数据结构、原子变量类),而基础类是通过 CAS 和 volatile 来实现的。我们尽量使用顶层的类,避免使用基础类 CAS 和 volatile 来协调线程的执行。JUC 包其他的内容,在其他的篇章会有相应的讲解。
Future 是一种异步执行的设计模式,类似 ajax 异步请求,不需要同步等待返回结果,可继续执行代码。使 Runnable(无返回值不支持上报异常)或 Callable(有返回值支持上报异常)均可开启线程执行任务。但是如果需要异步获取线程的返回结果,就需要通过 Future 来实现了。
Future 是位于 javautilconcurrent 包下的一个接口,Future 接口封装了取消任务,获取任务结果的方法。
在 Java 中,一般是通过继承 Thread 类或者实现 Runnable 接口来创建多线程, Runnable 接口不能返回结果,JDK 15 之后,Java 提供了 Callable 接口来封装子任务,Callable 接口可以获取返回结果。我们使用线程池提交 Callable 接口任务,将返回 Future 接口添加进 ArrayList 数组,最后遍历 FutureList,实现异步获取返回值。
运行结果:
上面就是异步线程执行的调用过程,实际开发中用得更多的是使用现成的异步框架来实现异步编程,如 RxJava,有兴趣的可以继续去了解,通常异步框架都是结合远程 >
最近写小玩具的时候用到了 CountDownLatch 计数器,然后顺便想了想判断线程池全部结束有多少种方法。
在网上搜了下,可能有些没找到,但是我找到的有(所有方法都是在 ThreadPoolExecutor 线程池方法下测试的):
好嘞,现在开始一个一个介绍优缺点和简要原理;
先创建一个 static 线程池,后面好几个例子就不一一创建了,全部用这个就行了:
然后再准备一个通用的睡眠方法:
这个方法就是为了测试的时候区分线程执行完毕的下顺序而已。
好嘞,准备完毕,现在开始。
首先贴上测试代码:
这一种方式就是在主线程中进行循环判断,全部任务是否已经完成。
这里有两个主要方法:
通俗点讲,就是在执行全部任务后,对线程池进行 shutdown() 有序关闭,然后循环判断 isTerminated() ,线程池是否全部完成。
类似方法扩展:
还是一样,贴上代码:
还是一样在主线程循环判断,主要就两个方法:
这个好理解,总任务数等于已完成任务数,就表示全部执行完毕。
其他 :
最后扯两句,因为我用 main 方法运行的,跑完后 main 没有结束,是因为非守护线程如果不终止,程序是不会结束的。而线程池 Worker 线程里写了一个死循环,而且被设置成了非守护线程。
这种方法是我比较常用的方法,先看代码:
这种方法,呃,应该是看起来比较高级的,我也不知道别的大佬怎么写的,反正我就用这个。
这个方法需要介绍下这个工具类 CountDownLatch 。先把这种方式的优缺点写了,后面再详细介绍这个类。
CountDownLatch 是 JDK 提供的一个同步工具,它可以让一个或多个线程等待,一直等到其他线程中执行完成一组 *** 作。
常用的方法有 countDown 方法和 await 方法, CountDownLatch 在初始化时,需要指定用给定一个整数作为计数器。
当调用 countDown 方法时,计数器会被减1;当调用 await 方法时,如果计数器大于0时,线程会被阻塞,一直到计数器被 countDown 方法减到0时,线程才会继续执行。
计数器是无法重置的,当计数器被减到0时,调用 await 方法都会直接返回。
这种方式其实和 CountDownLatch 原理类似。
先维护一个静态变量
然后在线程任务结束时,进行静态变量 *** 作:
其实就是加锁计数,循环判断。
Future 是用来装载线程结果的,不过,用这个来进行判断写代码总感觉怪怪的。
因为 Future 只能装载一条线程的返回结果,多条线程总不能用 List 在接收 Future 。
这里就开一个线程做个演示:
这种方式就不写优缺点了,因为 Future 的主要使用场景并不是用于判断任务执行状态。
一个一个回答你
1、其中pool-1-thread-18代表的是当前线程的名字吗?
答:是线程名,这个名字是它自己命的名,不用管
2、18代表的是什么?
答:就是排在18号位置上得线程
3、为什么我线程池最大数量为30,却可以得到pool-1-thread-168这样的输出结果?
答:比如说你最开始有1到30号的线程,你1用了之后关闭了,1就木有了,那下一个线程就叫31了,虽然线程池里面还是只有30个,但是你却有了31号。也就是说,数字只是代表了它产生的顺序,却不表示当前的最大
以上就是关于线程池工作原理全部的内容,包括:线程池工作原理、线程池之ThreadPoolExecutor使用、并发编程解惑之线程等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)