<-----------------------------------------廖雪峰学Java----------------------------------------->
- 1. 多线程基础知识
- 2. 创建多线程
- 3. 线程状态
- 4. 中断线程
- 5. 守护线程
- 6. 线程同步
- 7. 同步方法
- 8. 死锁
- 9. 使用wait和notify
- 10. 使用ReentrantLock
- 11. 使用Condition
- 12. 使用ReadWriteLock
- 13. 使用StampedLock
- 14. 使用Concurrent集合
- 15. 使用Atomic
- 16. 使用线程池
- 17. 使用Future
- 18. 使用CompletableFuture
- 19. 使用ForkJoin
- 20. 使用ThreadLocal
- 计算机把一个任务称为一个进程,浏览器就是一个进程,视频播放器是另一个进程,类似的,音乐播放器和Word都是进程
- 一个进程中的子任务称为线程
- 进程和线程的关系:一个进程可以包含一个或多个线程,但至少会有一个线程
- *** 作系统调度的最小任务单位是线程。常用的Windows、Linux等 *** 作系统都采用抢占式多任务,如何调度线程完全由 *** 作系统决定,程序自己不能决定什么时候执行,以及执行多长时间
- 同一个应用程序既可以有多个进程也可以有多个线程,实现多任务的方法,有以下几种:
1. 多进程模式(每个进程中只有一个线程) 2. 多线程模式(一个进程中有多个线程) 3. 多进程+多线程(多个进程且每个进程中可以有一个或多个线程)
- 相较与多线程,多进程的缺点是:
1. 创建进程比创建线程开销大,尤其是在Windows系统上; 2. 进程间通信比线程间通信要慢,因为线程间通信就是读写同一个变量,速度很快。
- 相较与多线程,多进程的优势是:
多进程稳定性比多线程高,因为在多进程的情况下,一个进程崩溃不会影响其他进程,而在多线程的情况下,任何一个线程崩溃会直接导致整个进程崩溃
- Java语言内置了多线程支持:一个Java程序实际上是一个JVM进程,JVM进程用一个主线程来执行main()方法,在main()方法内部,我们又可以启动多个线程。此外,JVM还有负责垃圾回收的其他工作线程等
- 创建一个新线程只需要实例化一个Thread,然后调用它的**start()**方法
- 为线程指派任务的方式:
public class Main { public static void main(String[] args) { Thread t = new MyThread(); t.start(); // 启动新线程 } } class MyThread extends Thread { @Override public void run() { System.out.println("start new thread!"); } }
方式二:创建Thread实例时,传入一个Runnable实例
public class Main { public static void main(String[] args) { Thread t = new Thread(new MyRunnable()); t.start(); // 启动新线程 } } class MyRunnable implements Runnable { @Override public void run() { System.out.println("start new thread!"); } } //或者用Java8引入的lambda语法进一步简写为 public class Main { public static void main(String[] args) { Thread t = new Thread(() -> { System.out.println("start new thread!"); }); t.start(); // 启动新线程 } }
- 调用Thread.sleep(),强迫当前线程暂停一段时间;sleep()传入的参数是毫秒
- 直接调用Thread实例的run()方法是无效的:相当于在main线程中执行了Thread对象的run方法,不会创建新的线程
- start()方法内部调用了一个private native void start0()方法,native修饰符表示这个方法是由JVM虚拟机内部的C代码实现的,不是由Java代码实现的
- 对线程设定优先级,设定优先级的方法是:Thread.setPriority(int n) // 1~10, 默认值5
- Java线程的状态有以下几种:
1. New:新创建的线程,尚未执行; 2. Runnable:运行中的线程,正在执行run()方法的Java代码; 3. Blocked:运行中的线程,因为某些 *** 作被阻塞而挂起; 4. Waiting:运行中的线程,因为某些 *** 作在等待中; 5. Timed Waiting:运行中的线程,因为执行sleep()方法正在计时等待; 6. Terminated:线程已终止,因为run()方法执行完毕。
- 线程终止的原因有:
1. 线程正常终止:run()方法执行到return语句返回; 2. 线程意外终止:run()方法因为未捕获的异常导致线程终止; 3. 对某个线程的Thread实例调用stop()方法强制终止(强烈不推荐使用)。
- 线程可以等待另一个线程(当前线程暂停执行)直到该线程运行结束:Thread.join()方法
public class Main { public static void main(String[] args) throws InterruptedException { Thread t = new Thread(() -> { System.out.println("hello"); }); System.out.println("start"); t.start(); t.join(); System.out.println("end"); } }
- 获取当前线程:通过Thread类静态方法获得
Thread t = Thread.currentThread();4. 中断线程
- 中断线程就是其他线程给该线程发一个信号,该线程收到信号后结束执行run()方法,使得自身线程能立刻结束运行
- 在其他线程中对目标线程调用interrupt()方法,目标线程需要反复检测自身状态是否是interrupted(在被通知的线程中调用isInterrupted() 方法)状态,如果是,就立刻结束运行:
public class Main { public static void main(String[] args) throws InterruptedException { Thread t = new MyThread(); t.start(); Thread.sleep(1); // 暂停1毫秒 t.interrupt(); // 中断t线程 t.join(); // 等待t线程结束 System.out.println("end"); } } class MyThread extends Thread { public void run() { int n = 0; while (! isInterrupted()) { n ++; System.out.println(n + " hello!"); } } }
注意:如果在子线程中不显示调用isInterrupted方法,那么即使别的线程发送了interrupt通知,该线程也不会停止
- 如果一个线程处于等待状态,例如,t.join()会让main线程进入等待状态,此时,如果对main线程调用interrupt(),join()方法会立刻抛出InterruptedException,因此,目标线程只要捕获到join()方法抛出的InterruptedException,就说明有其他线程对其调用了interrupt()方法,通常情况下该线程应该立刻结束运行
public class MulThread_03 { public static void main(String[] args) throws InterruptedException { Thread t = new MyThread(); t.start(); Thread.sleep(1000); t.interrupt(); // 中断t线程 t.join(); // 等待t线程结束 System.out.println("end from main Thread"); } } class MyThread extends Thread { public void run() { Thread hello = new HelloThread(); hello.start(); // 启动hello线程 try { hello.join(); // 等待hello线程结束 } catch (InterruptedException e) { System.out.println("interrupted! from MyThread"); } hello.interrupt(); } } class HelloThread extends Thread { public void run() { int n = 0; while (!isInterrupted()) { n++; System.out.println(n + " hello!"); try { Thread.sleep(100); } catch (InterruptedException e) { System.out.println("!!!!"); break; } } } }
注意:父进程在join与interrupt组合下会强制join方法报出InterruptedException,这个错误是由子进程负责感知,而子进程可以配置不同的响应方式
- 另一个常用的中断线程的方法是设置标志位。我们通常会用一个running标志位来标识线程是否应该继续运行,在外部线程中,通过把HelloThread.running置为false,就可以让线程结束:
public class Main { public static void main(String[] args) throws InterruptedException { HelloThread t = new HelloThread(); t.start(); Thread.sleep(1); t.running = false; // 标志位置为false } } class HelloThread extends Thread { public volatile boolean running = true; public void run() { int n = 0; while (running) { n ++; System.out.println(n + " hello!"); } System.out.println("end!"); } }
- 注意标志位boolean running是一个线程间共享的变量。线程间共享变量需要使用volatile关键字标记,确保每个线程都能读取到更新后的变量值
- 为什么要对线程间共享的变量用关键字volatile声明?
在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的
volatile关键字解决的是**可见性问题**:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值
- Java程序入口就是由JVM启动main线程,main线程又可以启动其他线程。当所有线程都运行结束时,JVM退出,进程结束
- 如果有一个线程没有退出,JVM进程就不会退出,所以需要保证所有的线程都能结束执行
- 有一种线程的目的就是无限循环,例如,一个定时触发任务的线程:
class TimerThread extends Thread { @Override public void run() { while (true) { System.out.println(LocalTime.now()); try { Thread.sleep(1000); } catch (InterruptedException e) { break; } } } }
- 谁负责结束这个线程?
答案是使用守护线程(Daemon Thread)。守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。 - 如何创建守护线程?
在调用start()方法前,调用setDaemon(true)把该线程标记为守护线程
在守护线程中,编写代码要注意:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失
- 多线程同时执行时,有一个单线程模型下不存在的问题就来了:如果多个线程同时读写共享变量,会出现数据不一致的问题
数据一致性问题!! - 以下面的伪代码为例:
main{ static int count = 0 addThread.start // 该线程负责对共享变量进行加 *** 作100次 subThread.start // 该线程复杂对贡献变量进行减 *** 作100次 addThread.join subThread.join sout(count) // ? ? ? }
如果不存在数据一致性问题,那么答案显而易见,最终输出的count为0,但事实上输出的结果总是发生变化,为什么?
首先补充基础知识,一条简单的加指令(add = add + 1),在我们看来为单一指令,但是对于底层而言,该指令将会被分解为多条指令(load:加载变量;add:执行加 *** 作;store:存储变量值);需要意识到的是,一个线程可能在执行完任何一条指令后(当不存在原子 *** 作或其他锁 *** 作时)被 *** 作系统替换,那么在这种情况下就可能出现下面图示的这种情况:
即一条加指令在执行执行的过程中被替换,此时其加 *** 作还没有完成;而减线程已经为替换执行,其读取共享变量值时由于加线程的上一次 *** 作还没有完成,变量值还没有更新,因此读取到的是上一次的值,这样就产生了数据不一致的问题。
-
要保证逻辑正确,对共享变量进行读写时,必须保证一组指令以原子方式执行:即某一个线程执行时,其他线程必须等待:
通过加锁和解锁的 *** 作,就能保证3条指令总是在一个线程执行期间,不会有其他线程会进入此指令区间。即使在执行期线程被 *** 作系统中断执行,其他线程也会因为无法获得锁导致无法进入此指令区间。只有执行线程将锁释放后,其他线程才有机会获得锁并执行。这种加锁和解锁之间的代码块我们称之为临界区(Critical Section),任何时候临界区最多只有一个线程能执行 -
Java程序使用synchronized关键字对一个对象进行加锁:synchronized(对象) {......};注意在进入synchronized代码块时,获取指定对象的锁,在退出synchronized代码块时就会释放对象的锁:
public class Main { public static void main(String[] args) throws Exception { var add = new AddThread(); var dec = new DecThread(); add.start(); dec.start(); add.join(); dec.join(); System.out.println(Counter.count); } } class Counter { public static final Object lock = new Object(); // 创建一个锁对象 public static int count = 0; } class AddThread extends Thread { public void run() { for (int i=0; i<10000; i++) { synchronized(Counter.lock) { Counter.count += 1; } } } } class DecThread extends Thread { public void run() { for (int i=0; i<10000; i++) { synchronized(Counter.lock) { Counter.count -= 1; } } } }
- 因为在经过synchronized语句块时只能串行运行,因此synchronized会降低程序的执行效率
- 不需要synchronized的 *** 作:
JVM规范定义了几种原子 *** 作: 基本类型(long和double除外)赋值,例如:int n = m; 引用类型赋值,例如:Listlist = anotherList。 //long和double是64位数据,JVM没有明确规定64位赋值 *** 作是不是一个原子 *** 作,不过在x64平台的JVM是把long和double的赋值作为原子 *** 作实现的
注意:单行赋值语句是不需要用synchronized来保证数据同步的,但是如果是多行赋值语句则仍然需要使用人为保证数据的一致性,但是可以通过一定的方法转换使得赋值语句具有原子性:
class Pair { // 改造前 int first; int last; public void set(int first, int last) { synchronized(this) { this.first = first; this.last = last; } } } class Pair { // 改造后 int[] pair; public void set(int first, int last) { int[] ps = new int[] { first, last }; this.pair = ps; } }7. 同步方法
- 上面的例子中,我们将获取锁的权限赋予给了线程(即由线程决定获取谁的锁),这样 *** 作复杂而且容易造成逻辑的混乱
- 更好的方法是把synchronized逻辑封装起来(将锁逻辑放在 *** 作对象的执行方法当中,线程不需要主动地去获取锁权限):
public class Counter { private int count = 0; public void add(int n) { synchronized(this) { count += n; } } public void dec(int n) { synchronized(this) { count -= n; } } public int get() { return count; } }
- 如果一个类被设计为允许多线程正确访问,我们就说这个类就是**“线程安全”的(thread-safe)**
- 一些不变类,例如String,Integer,LocalDate,它们的所有成员变量都是final,多线程同时访问时只能读不能写,这些不变类也是线程安全的
- 类似Math这些只提供静态方法,没有成员变量的类,也是线程安全的
- 没有特殊说明时,一个类默认是非线程安全的。
- 当我们锁住的是this实例时,实际上可以用synchronized修饰这个方法。下面两种写法是等价的:
// 写法一 public void add(int n) { synchronized(this) { // 锁住this count += n; } // 解锁 } // 写法二 public synchronized void add(int n) { // 锁住this count += n; } // 解锁
- 用synchronized修饰的方法就是同步方法,它表示整个方法都必须用this实例加锁
- 任何一个类都有一个由JVM自动创建的Class实例,因此,对static方法添加synchronized,锁住的是该类的Class实例,例如:
public synchronized static void test(int n) { ... } // 等价于=====================> public class Counter { public static void test(int n) { synchronized(Counter.class) { ... } } }8. 死锁
- Java的线程锁是可重入的锁:JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁
- 由于Java的线程锁是可重入锁,所以,获取锁的时候,不但要判断是否是第一次获取,还要记录这是第几次获取。每获取一次锁,记录+1,每退出synchronized块,记录-1,减到0的时候,才会真正释放锁
- 在获取多个锁的时候,不同线程获取多个不同对象的锁可能导致死锁,例如下面在这种情况:
public void add(int m) { synchronized(lockA) { // 获得lockA的锁 this.value += m; synchronized(lockB) { // 获得lockB的锁 this.another += m; } // 释放lockB的锁 } // 释放lockA的锁 } public void dec(int m) { synchronized(lockB) { // 获得lockB的锁 this.another -= m; synchronized(lockA) { // 获得lockA的锁 this.value -= m; } // 释放lockA的锁 } // 释放lockB的锁 }
- 死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程
- 多线程协调运行的原则就是:当条件不满足时,线程进入等待状态;当条件满足时,线程被唤醒,继续执行任务
- wait()方法:
1. wait()方法只能在synchronized(lock)语句块中使用 2. wait()方法只能由锁对象来调用,如上面的lock.wait(),又如this.wait()(如果锁定对象为this) 3. 如果能执行wait(),那么说明当前线程已经获取了指定的锁对象,而其他所有需求该锁对象的代码块都无法执行 4. wait()方法的效果是:当前线程放弃该锁对象,并进入等待状态 5. wait()方法没有返回值,只有当其他线程释放了该对象的锁,并“唤醒”该线程时,该线程才能够重新得到锁对象并继续执行 6. wait()方法的底层实现为native方法(c方法)
- notify()方法:
7. notify()方法只能在synchronized(lock)语句块中使用 8. notify()方法只能由锁对象来调用,如lock.notify(),又如this.notify() 9. notify()方法执行之后会唤醒因该锁对象而处于等待状态的线程,并归还锁对象给等待线程 10.如果没有因为该锁对象而处于等待状态的线程,那么没有执行效果
- 示例:
public class Main { public static void main(String[] args) throws InterruptedException { var q = new TaskQueue(); var ts = new ArrayList10. 使用ReentrantLock(); for (int i=0; i<5; i++) { var t = new Thread() { public void run() { // 执行task: while (true) { try { String s = q.getTask(); System.out.println("execute task: " + s); } catch (InterruptedException e) { return; } } } }; t.start(); ts.add(t); } var add = new Thread(() -> { for (int i=0; i<10; i++) { // 放入task: String s = "t-" + Math.random(); System.out.println("add task: " + s); q.addTask(s); try { Thread.sleep(100); } catch(InterruptedException e) {} } }); add.start(); add.join(); Thread.sleep(100); for (var t : ts) { t.interrupt(); } } } class TaskQueue { Queue queue = new linkedList<>(); public synchronized void addTask(String s) { this.queue.add(s); this.notifyAll(); } public synchronized String getTask() throws InterruptedException { while (queue.isEmpty()) { this.wait(); } return queue.remove(); } }
- Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写
- Java语言直接提供了synchronized关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制
- java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁
- 代码改造:(基于ReentrantLock实现synchronized相同功能):
public class Counter { private final Lock lock = new ReentrantLock(); private int count; public void add(int n) { lock.lock(); try { count += n; } finally { lock.unlock(); } } }
- synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁
- ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁
- ReentrantLock可以尝试获取锁:
if (lock.tryLock(1, TimeUnit.SECONDS)) { //参数一表示最多等待的时间单位数,参数二表示时间单位长度 try { ... } finally { lock.unlock(); } }
上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去
- 使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁
- 这里再附上一小段测试代码,感兴趣的可以通过修改等待时间来查看对应的结果:
public class MulThread_05 { public static ReentrantLock lock = new ReentrantLock(); // 创建锁对象 public static void main(String[] args) throws InterruptedException { lock.lock(); // 主线程获取锁对象 System.out.println("main thread: I get the lock !!"); new MyThread_1().start(); Thread.sleep(3000); lock.unlock(); System.out.println("main thread: ok let it go !!"); System.out.println("main thread end !!"); } } class MyThread_1 extends Thread { @Override public void run() { try { if(MulThread_05.lock.tryLock(4, TimeUnit.SECONDS)) { try { System.out.println("this thread: I get the lock !!!"); }finally { MulThread_05.lock.unlock(); } }else { System.out.println("this thread: whatever I give it up !!"); } } catch (InterruptedException e) { e.printStackTrace(); } } }11. 使用Condition
- Condition对象可以配合ReentrantLock实现wait以及notify功能
- 使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。也就是说一个Condition对象与Lock对象时绑定关系,一个Lock可以创建多个Condition对象,但是Condition对象必须依附于Lock对象存在
- Condition提供的**await()、signal()、signalAll()**原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的:
1. await()会释放当前锁,进入等待状态; 2. signal()会唤醒某个等待线程; 3. signalAll()会唤醒所有等待线程; 4. 唤醒线程从await()返回后需要重新获得锁。
- 和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:
if (condition.await(1, TimeUnit.SECOND)) { // 被其他线程唤醒 } else { // 指定时间内没有被其他线程唤醒 }12. 使用ReadWriteLock
- 一些情况下ReentrantLock的保护过于激进:(只允许一个线程进入临界区)对于不会修改对象内容的读方法其实可以不做保护,这时RenentrantLock就不能满足使用要求
- 想要实现的是:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待,使用ReadWriteLock可以解决这个问题
- ReadWriteLock保证:
1. 只允许一个线程写入(其他线程既不能写入也不能读取); 2. 没有写入时,多个线程允许同时读(提高性能)。
- ReadWriteLock功能功能实现十分容易。我们需要创建一个ReadWriteLock实例,然后分别获取读锁和写锁:
public class Counter { private final ReadWriteLock rwlock = new ReentrantReadWriteLock(); private final Lock rlock = rwlock.readLock(); private final Lock wlock = rwlock.writeLock(); private int[] counts = new int[10]; public void inc(int index) { wlock.lock(); // 加写锁 try { counts[index] += 1; } finally { wlock.unlock(); // 释放写锁 } } public int[] get() { rlock.lock(); // 加读锁 try { return Arrays.copyOf(counts, counts.length); } finally { rlock.unlock(); // 释放读锁 } } }
- 使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改
- ReadWriteLock有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁
- Java 8引入了新的读写锁:StampedLock
- StampedLock和ReadWriteLock相比,改进之处在于: 读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁
- 乐观锁: 乐观地估计读的过程中大概率不会有写入。悲观锁: 读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,并且重读
- 示例:
public class Point { private final StampedLock stampedLock = new StampedLock(); private double x; private double y; public void move(double deltaX, double deltaY) { long stamp = stampedLock.writeLock(); // 获取写锁 try { x += deltaX; y += deltaY; } finally { stampedLock.unlockWrite(stamp); // 释放写锁 } } public double distanceFromOrigin() { long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁 // 注意下面两行代码不是原子 *** 作 // 假设x,y = (100,200) double currentX = x; // 此处已读取到x=100,但x,y可能被写线程修改为(300,400) double currentY = y; // 此处已读取到y,如果没有写入,读取是正确的(100,200) // 如果有写入,读取是错误的(100,400) if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生 stamp = stampedLock.readLock(); // 获取一个悲观读锁 try { currentX = x; currentY = y; } finally { stampedLock.unlockRead(stamp); // 释放悲观读锁 } } return Math.sqrt(currentX * currentX + currentY * currentY); } }
- 和ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号(stamp)。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续 *** 作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据
- stampedLock的代价:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁
- StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写
- java.util.concurrent包提供了多种线程安全的集合,如ArrayBlockingQueue、List、Map、Set、Deque等
- java.util.concurrent包也提供了对应的并发集合类:
- 使用这些并发集合与使用非线程安全的集合类完全相同
- java.util.Collections工具类还提供了一个旧的线程安全集合转换器将线程不安全的集合转换为线程安全的集合,可以这么用:
Map unsafeMap = new HashMap(); Map threadSafeMap = Collections.synchronizedMap(unsafeMap);
它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用
15. 使用Atomic- Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子 *** 作的封装类,它们位于java.util.concurrent.atomic包
- 以AtomicInteger为例,它提供的主要 *** 作有:
1. 增加值并返回新值:int addAndGet(int delta) 2. 加1后返回新值:int incrementAndGet() 3. 获取当前值:int get() 4. 用CAS方式设置:int compareAndSet(int expect, int update)
- Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set(系统支持)
- 自己通过CAS编写incrementAndGet(),它大概长这样:
public int incrementAndGet(AtomicInteger var) { int prev, next; do { prev = var.get(); // 获取当前值 next = prev + 1; // 获取当前值+1 } while ( ! var.compareAndSet(prev, next)); // 如果var的值等于prev就将var的值设置为next,返回true; //否则放回false,重新在循环中获取var的当前值(保证实现对var加一 *** 作的正确性) return next; }
CAS是指,在这个 *** 作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS *** 作并配合do ... while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的
16. 使用线程池- Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要 *** 作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间
- 如果可以复用一组线程,那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池
- 线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理
- Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:
// 创建固定大小的线程池: ExecutorService executor = Executors.newFixedThreadPool(3); // 提交任务: executor.submit(task1); executor.submit(task2); executor.submit(task3); executor.submit(task4); executor.submit(task5);
- ExecutorService只是接口,Java标准库提供的几个常用实现类有:
1. FixedThreadPool:线程数固定的线程池; 2. CachedThreadPool:线程数根据任务动态调整的线程池; 3. SingleThreadExecutor:仅单线程执行的线程池。
创建这些线程池的方法都被封装到Executors这个类中
- 以FixedThreadPool为例,看看线程池的执行逻辑:
import java.util.concurrent.*; public class Main { public static void main(String[] args) { // 创建一个固定大小的线程池: ExecutorService es = Executors.newFixedThreadPool(4); for (int i = 0; i < 6; i++) { es.submit(new Task("" + i)); } // 关闭线程池: es.shutdown(); } } class Task implements Runnable { private final String name; public Task(String name) { this.name = name; } @Override public void run() { System.out.println("start task " + name); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("end task " + name); } }
执行结果:
start task 0 start task 1 start task 2 start task 3 end task 1 start task 4 end task 0 start task 5 end task 2 end task 3 end task 5 end task 4
- 线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭
- 创建一个线程数量维持在4~10的动态调节的线程池:
// Executors.newCachedThreadPool()方法的源码 public static ExecutorService newCachedThreadPool() { // 实际上返回的是一个ThreadPoolExecutor对象 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } // 因此我们可以这样创建: int min = 4; int max = 10; ExecutorService es = new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new SynchronousQueue ());
- 还有一种需要定期反复执行的任务,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行
- 创建一个ScheduledThreadPool仍然是通过Executors类:
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
- 提交一次性任务,它会在指定延迟后只执行一次:
// 1秒后执行一次性任务: ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
- 任务以固定的每3秒执行:
// 2秒后开始执行定时任务,每3秒执行: ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
- 任务以固定的3秒为间隔执行:
// 2秒后开始执行定时任务,以3秒为间隔执行: ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
-
FixedRate和FixedDelay的区别:FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间;FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务
-
Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer
注意:这里的多调度指的是一个ScheduledThreadPool可以创建多个执行队列,一个队列中的定时任务不是并发执行的(具体请看下一点的解析) -
回答两个问题:
问题一:在FixedRate模式下,假设每秒触发,如果某次任务执行时间超过1秒,后续任务会不会并发执行?
答:不会并发执行
问题二:如果任务抛出了异常,后续任务是否继续执行?
答:如果当前任务抛出了异常,后续任务停止执行
测试代码如下:
public class MulThread_08 { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService ses = Executors.newScheduledThreadPool(4); // 创建一个大小为4的线程池 // 为线程池指定固定周期的任务,即无论当前任务是否执行完毕,在指定的周期(period)之后开始执行下一次任务 ses.scheduleAtFixedRate(new MyTask(), 1, 1, TimeUnit.SECONDS); ses.scheduleAtFixedRate(new MyTask2(), 1, 1, TimeUnit.SECONDS); boolean b = ses.awaitTermination(20, TimeUnit.SECONDS); } } class MyTask implements Runnable { @Override public void run() { try { int id = idGetter.get_id(); System.out.println("Thread " + id + " start !!"); Thread.sleep(5000); System.out.println("Thread " + id + " end !!"); } catch (InterruptedException e) { e.printStackTrace(); } } } class MyTask2 implements Runnable { @Override public void run() { try { System.out.println("Task2" + " start !!"); Thread.sleep(1); System.out.println("Task2" + " end !!"); } catch (InterruptedException e) { e.printStackTrace(); } } } class idGetter { static AtomicLong id = new AtomicLong(0); public static int get_id() { return (int) id.incrementAndGet(); } }17. 使用Future
- Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值:
class Task implements Callable{ public String call() throws Exception { return longTimeCalculation(); } }
Callable接口是一个泛型接口,可以返回指定类型的结果
- 如何获得异步执行的结果?
ExecutorService.submit()方法返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象
ExecutorService executor = Executors.newFixedThreadPool(4); // 定义任务: Callabletask = new Task(); // 提交任务并获得Future: Future future = executor.submit(task); // 从Future获取异步执行返回的结果: String result = future.get(); // 可能阻塞
- 当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。 在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果
- 一个Future接口表示一个未来可能会返回的结果,它定义的方法有:
get():获取结果(可能会等待) get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间; cancel(boolean mayInterruptIfRunning):取消当前任务; isDone():判断任务是否已完成。18. 使用CompletableFuture
- 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待
- Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
- 以获取股票价格为例,看看如何使用CompletableFuture:
public class Main { public static void main(String[] args) throws Exception { // 创建异步执行任务: CompletableFuturecf = CompletableFuture.supplyAsync(Main::fetchPrice); // 如果执行成功: cf.thenAccept((result) -> { System.out.println("price: " + result); }); // 如果执行异常: cf.exceptionally((e) -> { e.printStackTrace(); return null; }); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: Thread.sleep(200); } static Double fetchPrice() { try { Thread.sleep(100); } catch (InterruptedException e) { } if (Math.random() < 0.3) { throw new RuntimeException("fetch price failed!"); } return 5 + Math.random() * 20; } }
- 创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:
public interface Supplier{ T get(); }
这里用lambda语法简化了一下,直接传入Main::fetchPrice,因为Main.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)
- CompletableFuture创建完成后已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例
- 完成时,CompletableFuture会调用Consumer对象:
public interface Consumer{ void accept(T t); }
- 异常时,CompletableFuture会调用Function对象:
public interface Function{ R apply(T t); }
这里都用lambda语法简化了代码
- CompletableFuture的优点是:
1. 异步任务结束时,会自动回调某个对象的方法; 2. 异步任务出错时,会自动回调某个对象的方法; 3. 主线程设置好回调后,不再关心异步任务的执行。
- CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格
public class Main { public static void main(String[] args) throws Exception { // 第一个任务: CompletableFuturecfQuery = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油"); }); // cfQuery成功后继续执行下一个任务: CompletableFuture cfFetch = cfQuery.thenApplyAsync((code) -> { return fetchPrice(code); }); // cfFetch成功后打印结果: cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: Thread.sleep(2000); } static String queryCode(String name) { try { Thread.sleep(100); } catch (InterruptedException e) { } return "601857"; } static Double fetchPrice(String code) { try { Thread.sleep(100); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
- 除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成 *** 作
public class Main { public static void main(String[] args) throws Exception { // 两个CompletableFuture执行异步查询: CompletableFuturecfQueryFromSina = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油", "https://finance.sina.com.cn/code/"); }); CompletableFuture cfQueryFrom163 = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油", "https://money.163.com/code/"); }); // 用anyOf合并为一个新的CompletableFuture: CompletableFuture
- anyOf() 可以实现“任意个CompletableFuture只要一个成功”,allOf() 可以实现“所有CompletableFuture都必须成功”,这些组合 *** 作可以实现非常复杂的异步流程控制
- 注意CompletableFuture的命名规则:
1. xxx():表示该方法将继续在已有的线程中执行; 2. xxxAsync():表示将异步在线程池中执行。19. 使用ForkJoin
- Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行
- Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务
- 示例:
public class Main { public static void main(String[] args) throws Exception { // 创建2000个随机数组成的数组: long[] array = new long[2000]; long expectedSum = 0; for (int i = 0; i < array.length; i++) { array[i] = random(); expectedSum += array[i]; } System.out.println("Expected sum: " + expectedSum); // fork/join: ForkJoinTasktask = new SumTask(array, 0, array.length); long startTime = System.currentTimeMillis(); Long result = ForkJoinPool.commonPool().invoke(task); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms."); } static Random random = new Random(0); static long random() { return random.nextInt(10000); } } class SumTask extends RecursiveTask { static final int THRESHOLD = 500; long[] array; int start; int end; SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { if (end - start <= THRESHOLD) { // 如果任务足够小,直接计算: long sum = 0; for (int i = start; i < end; i++) { sum += this.array[i]; // 故意放慢计算速度: try { Thread.sleep(1); } catch (InterruptedException e) { } } return sum; } // 任务太大,一分为二: int middle = (end + start) / 2; System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end)); SumTask subtask1 = new SumTask(this.array, start, middle); SumTask subtask2 = new SumTask(this.array, middle, end); invokeAll(subtask1, subtask2); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); Long result = subresult1 + subresult2; System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result); return result; } }
- Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度
- 背景:在真实的业务中,基于多线程的实现方法,我们会让一个线程执行一个主方法,可以想见,在业务逻辑复杂的情况下,该方法中可能会调用其他格式方法;一种方案是:在方法调用时直接将主方法参数赋予各个字方法,但是这种方式会造成一个严重的问题:同一份数据被多次“拷贝”,浪费严重
- 在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等
- 给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,context就无法进行传送了
- Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象
- ThreadLocal实例通常总是以静态字段初始化如下:
static ThreadLocalthreadLocalUser = new ThreadLocal<>();
- 典型用法如下:
void processUser(user) { try { threadLocalUser.set(user); step1(); step2(); } finally { threadLocalUser.remove(); } }
通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:
void step1() { User u = threadLocalUser.get(); log(); printUser(); } void log() { User u = threadLocalUser.get(); println(u.name); } void step2() { User u = threadLocalUser.get(); checkUser(u.id); }
注意到普通的方法调用一定是同一个线程执行的,所以,step1()、step2()以及log()方法内,threadLocalUser.get()获取的User对象是同一个实例
- 可以把ThreadLocal看成一个全局Map
:每个线程获取ThreadLocal变量时,总是使用Thread自身作为key:
Object threadLocalValue = threadLocalMap.get(Thread.currentThread());
ThreadLocal相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal关联的实例互不干扰(只需要一个ThreadLocal对象,同一个ThreadLocal对象可以为多个线程存储context,是一对多的关系)
-
特别注意ThreadLocal一定要在finally中清除
因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。 -
为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {…}结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal可以封装为一个UserContext对象:
public class UserContext implements AutoCloseable { static final ThreadLocalctx = new ThreadLocal<>(); public UserContext(String user) { ctx.set(user); } public static String currentUser() { return ctx.get(); } @Override public void close() { ctx.remove(); } }
使用的时候,我们借助try (resource) {...}结构,可以这么写:
try (var ctx = new UserContext("Bob")) { // 可任意调用UserContext.currentUser(): String currentUser = UserContext.currentUser(); } // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象
这样就在UserContext中完全封装了ThreadLocal,外部代码在try (resource) {...}内部可以随时调用UserContext.currentUser()获取当前线程绑定的用户名
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)