多线程(七)原子 *** 作、阻塞队列

多线程(七)原子 *** 作、阻塞队列,第1张

文章目录
    • 一、原子类
      • 1.1 基本类型原子类
        • 1.1.1 AtomicInteger常用API
        • 1.1.2 AtomicBoolean常用API
        • 1.1.3 原子类实现源码
      • 1.2 数组类型原子类
      • 1.3 引用类型原子类
      • 1.4 字段类型原子类
      • 1.5 原子 *** 作的实现原理
        • 1.5.1 处理器如何实现原子 *** 作
        • 1.5.2 Java如何实现原子 *** 作
        • 1.5.3 CAS实现原子 *** 作的三大问题
      • 1.6 原子类相关问题
        • 1.6.1 Atomic的原理
        • 1.6.2 volatile变量和Atomic变量的区别
    • 二、BlockingQueue
      • 2.1 阻塞队列的基本 *** 作
        • 2.1.1 从Queue继承的基本接口
        • 2.1.2 BlockingQueue的特有接口
      • 2.2 BlockingQueue实现类
      • 2.3 ArrayBlockingQueue实现原理
        • 2.3.1 成员变量
        • 2.3.2 元素入队
        • 2.3.3 元素出队
      • 2.4 LinkedBlockingQueue实现原理
        • 2.4.1 成员变量
        • 2.4.2 元素入队
        • 2.4.3 元素出队

本系列文章:
  多线程(一)线程与进程、Thread
  多线程(二)Java内存模型、同步关键字
  多线程(三)线程池
  多线程(四)显式锁、队列同步器
  多线程(五)可重入锁、读写锁
  多线程(六)线程间通信机制
  多线程(七)原子 *** 作、阻塞队列
  多线程(八)并发容器
  多线程(九)并发工具类
  多线程(十)多线程编程示例

一、原子类

  原子 *** 作指不可被中断的一个或一系列 *** 作
  Java从JDK 1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),Atomic包里面提供了一组原子类,它们可以自动的保证对于他们的 *** 作是原子的并且不需要使用同步。
  Atomic包大致可以属于4种类型的原子更新方式,分别是:原子更新基本类型、原子更新数组、原子更新引用、原子更新属性。
  Atomic包提高原子更新基本类型的工具类,主要有:AtomicInteger(以原子更新的方式更新Integer)、AtomicBoolean(以原子更新的方式更新boolean)、AtomicLong(以原子更新的方式更新Long)。3个类提供的方法几乎一模一样。

1.1 基本类型原子类 1.1.1 AtomicInteger常用API
  • 1、AtomicInteger的创建和获取值
      AtomicInteger的创建分为两种:1、无参的,默认值0;有参的,指定默认值。
      get():用于获取当前值,该方法不需要锁。
      示例:
	AtomicInteger i = new AtomicInteger();
	System.out.println(i.get()); //0
	AtomicInteger j = new AtomicInteger(10);
	System.out.println(j.get()); //10
  • 2、设置值
      示例:
	AtomicInteger i = new AtomicInteger();
	i.set(12);
	System.out.println(i.get()); //12
  • 3、先取值,再设置值
      示例:
	AtomicInteger i = new AtomicInteger();
	int result = i.getAndSet(10);
	System.out.println(result); //0
	System.out.println(i.get()); //10
  • 4、先取值并且后加上指定的值
      示例:
	AtomicInteger i = new AtomicInteger(10);
	int result = i.getAndAdd(10);
	System.out.println(result); //10
	System.out.println(i.get()); //20
  • 5、先加上指定的值再取值
      示例:
	AtomicInteger i = new AtomicInteger(10);
	int result = i.addAndGet(10);
	System.out.println(result);//输出20
	System.out.println(i.get());//输出20
  • 6、先取值然后再+1
      示例:
	AtomicInteger i = new AtomicInteger();
	int result = i.getAndIncrement();
	System.out.println(result); //0
	System.out.println(i.get()); //1
  • 7、先+1再取值
      示例:
	AtomicInteger i = new AtomicInteger();
	int result = i.incrementAndGet();
	System.out.println(result); //1
	System.out.println(i.get()); //1
  • 8、先取值再-1
      示例:
	AtomicInteger i = new AtomicInteger(10);
	int result = i.getAndDecrement();
	System.out.println(result); //10
	System.out.println(i.get()); //9
  • 9、先-1再取值
      示例:
	AtomicInteger i = new AtomicInteger();
	int result = i.decrementAndGet();
	System.out.println(result); //9
	System.out.println(i.get()); //9
  • 10、快速失败(CAS)策略
      是用于判断期望值是否与变量实际值相等,如果相等则将update赋值给变量,否则失败。示例:
	//成功案例
	AtomicInteger atomicInteger = new AtomicInteger(10);
	boolean result = atomicInteger.compareAndSet(10, 12);
	System.out.println(result); //true
	System.out.println(atomicInteger.get()); //12
		
	//失败案例
	AtomicInteger atomicInteger1 = new AtomicInteger(10);
	boolean result1 = atomicInteger1.compareAndSet(11, 12);
	System.out.println(result1); //false
	System.out.println(atomicInteger1.get()); //10
1.1.2 AtomicBoolean常用API
  • 1、AtomicBoolean的创建和获取值
      AtomicBoolean的创建分为两种:1、无参的,默认值false;有参的,指定默认值。
      示例:
	AtomicBoolean bool = new AtomicBoolean();
	System.out.println(bool.get()); //false
	AtomicBoolean bool2 = new AtomicBoolean(true);
	System.out.println(bool2.get()); //true
  • 2、设置值
      示例:
	AtomicBoolean bool = new AtomicBoolean();
	bool.set(true);
	System.out.println(bool.get()); //true
  • 3、先取值,再设置值
      示例:
	AtomicBoolean bool = new AtomicBoolean(true);
	boolean result = bool.getAndSet(false);
	System.out.println(result);  //true
	System.out.println(bool.get());  //false
  • 4、快速失败(CAS)策略
      是用于判断期望值是否与变量实际值相等,如果相等则将update赋值给变量,否则失败。示例:
	//成功案例
	AtomicBoolean bool = new AtomicBoolean(true);
	boolean result = bool.compareAndSet(true, false);
	System.out.println(result); //true
	System.out.println(bool.get()); //false
		
	//失败案例
	AtomicBoolean bool1 = new AtomicBoolean(true);
	boolean result1 = bool1.compareAndSet(false, true);
	System.out.println(result1); //false
	System.out.println(bool1.get()); //true

  AtomicBoolean可以当做多线程中的开关flag,从而来代替synchronized这样比较重的锁

1.1.3 原子类实现源码

  原子类的用法基本一致,以AtomicInteger中的getAndIncrement方法为例,其源码:

	public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

  unsafe实例是通过UnSafe类的静态方法getUnsafe获取:

	private static final Unsafe unsafe = Unsafe.getUnsafe();

  valueOffset是由AtomicInteger类中的变量转化而来,源码:

    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

  Unsafe类提供了一些底层 *** 作,Atomic包下的原子 *** 作类的也主要是通过 Unsafe类提供的compareAndSwapInt、compareAndSwapLong等一系列提供CAS *** 作的方法来进行实现。CAS *** 作能够保证数据更新的时候是线程安全的,并且由于CAS是采用乐观锁策略,因此,这种数据更新的方法也具有高效性。
  示例:

	private static AtomicInteger atomicInteger = new AtomicInteger(1);
    public static void main(String[] args) {
        System.out.println(atomicInteger.getAndIncrement()); //1
        System.out.println(atomicInteger.get());  //2
    }

  AtomicLong的实现原理和AtomicInteger 一致,只不过一个针对的是long变量,一个针对的是int变量。AtomicBoolean稍有不同,看下AtomicBoolean的compareAndSet方法:

	public final boolean compareAndSet(boolean expect, boolean update) {
	    int e = expect ? 1 : 0;
	    int u = update ? 1 : 0;
	    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
	}

  上面的方法的实际上也是先转换成0、1的整型变量,然后是通过针对int型变量的原子更新方法compareAndSwapInt来实现的。

1.2 数组类型原子类

  Atomic包下提供能原子更新数组中元素的类有:

AtomicIntegerArray:原子更新整型数组中的元素;
AtomicLongArray:原子更新长整型数组中的元素;
AtomicReferenceArray:原子更新引用类型数组中的元素。

  他们的用法基本一致,以AtomicIntegerArray来介绍下常用的方法:

	//以原子更新的方式将数组中索引为i的元素与输入值相加
	public final int addAndGet(int i, int delta)
	//以原子更新的方式将数组中索引为i的元素自增加1
	public final int getAndIncrement(int i)
	//将数组中索引为i的位置的元素进行更新
	public final boolean compareAndSet(int i, int expect, int update)

  AtomicIntegerArray与AtomicInteger的方法基本一致,只不过在 AtomicIntegerArray的方法中会多一个指定数组索引位 i。示例:

    private static int[] value = new int[]{1, 2, 3};
    private static AtomicIntegerArray integerArray = new AtomicIntegerArray(value);
    
	public static void main(String[] args) {
	    //对数组中索引为1的位置的元素加5
	    int result = integerArray.getAndAdd(1, 5);
	    System.out.println(integerArray.get(1));  //7
	    System.out.println(result);  //2
	}
1.3 引用类型原子类

  Atomic包下相关的原子引用类:

AtomicReference:原子更新引用类型;
AtomicReferenceFieldUpdater:原子更新引用类型里的字段;
AtomicMarkableReference:原子更新带有标记位的引用类型。

  这几个类的使用方法也是基本一样的,以AtomicReference为例,示例:

    private static AtomicReference reference = new AtomicReference();
    public static void main(String[] args) {
        User user1 = new User("a", 1);
        reference.set(user1);
        User user2 = new User("b",2);
        User user = (User) reference.getAndSet(user2);
        System.out.println(user); //User{userName='a', age=1}
        System.out.println(reference.get()); //User{userName='b', age=2}
    }
    
    static class User {
        private String userName;
        private int age;
        
        public User(String userName, int age) {
        	this.userName = userName;
        	this.age = age;
        }

        @Override
        public String toString() {
        	return "User{" + "userName='" + userName + '\'' + ", age=" + age + '}';
        }
    }
1.4 字段类型原子类

  Atomic包下相关的原子字段类:

AtomicIntegeFieldUpdater:原子更新整型字段类;
AtomicLongFieldUpdater:原子更新长整型字段类;
AtomicStampedReference:原子更新引用类型,这种更新方式会带有版本号,是为了解决 CAS 的 ABA 问题。

  使用原子更新字段需要两步 *** 作:

  1. 原子更新字段类都是抽象类,只能通过静态方法newUpdater来创建一个更新器,并且需要设置想要更新的类和属性;
  2. 更新类的属性必须使用public volatile进行修饰。

  这几个类提供的方法基本一致,以AtomicIntegerFieldUpdater为例,看下其使用:

	private static AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
	public static void main(String[] args) {
		User user = new User("a", 1);
		int oldValue = updater.getAndAdd(user, 5);
		System.out.println(oldValue); //1
		System.out.println(updater.get(user)); //6
	}

	static class User {
		private String userName;
		public volatile int age;

		public User(String userName, int age) {
		    this.userName = userName;
		    this.age = age;
		}

		@Override
		public String toString() {
		    return "User{" + "userName='" + userName + '\'' + ", age=" + age + '}';
		}
	}
1.5 原子 *** 作的实现原理

  原子 *** 作相关的术语:

1.5.1 处理器如何实现原子 *** 作

  处理器提供总线锁定和缓存锁定两个机制来保证复杂内存 *** 作的原子性。

  • 1、使用总线锁保证原子性
      第一个机制是通过总线锁保证原子性。如果多个处理器同时对共享变量进行读改写 *** 作(i++就是经典的读改写 *** 作),那么共享变量就会被多个处理器同时进行 *** 作,这样读改写 *** 作就不是原子的, *** 作完之后共享变量的值会和期望的不一致。举个例子,如果i=1,我们进行两次i++ *** 作,我们期望的结果是3,但是有可能结果是2。示例:

      原因可能是多个处理器同时从各自的缓存中读取变量i,分别进行加1 *** 作,然后分别写入系统内存中。那么,想要保证读改写共享变量的 *** 作是原子的,就必须保证CPU1读改写共享变量的时候,CPU2不能 *** 作缓存了该共享变量内存地址的缓存。
      处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。
  • 2、使用缓存锁保证原子性
      第二个机制是通过缓存锁定来保证原子性。在同一时刻,我们只需保证对某个内存地址的 *** 作是原子性即可,但总线锁定把CPU和内存之间的通信锁住了,这使得锁定期间,其他处理器不能 *** 作其他内存地址的数据,所以总线锁定的开销比较大,目前处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。
      频繁使用的内存会缓存在处理器的L1、L2和L3高速缓存里,那么原子 *** 作就可以直接在处理器内部缓存中进行,并不需要声明总线锁,在Pentium 6和目前的处理器中可以使用“缓存锁定”的方式来实现复杂的原子性。所谓“缓存锁定”是指内存区域如果被缓存在处理器的缓存行中,并且在Lock *** 作期间被锁定,那么当它执行锁 *** 作回写到内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证 *** 作的原子性,因为缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行的数据时,会使缓存行无效,在如上图所示的例子中,当CPU1修改缓存行中的i时使用了缓存锁定,那么CPU2就不能同时缓存i的缓存行。

  有两种情况下处理器不会使用缓存锁定:

  1. 当 *** 作的数据不能被缓存在处理器内部,或 *** 作的数据跨多个缓存行(cache line)时,则处理器会调用总线锁定。
  2. 有些处理器不支持缓存锁定。对于Intel 486和Pentium处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。
1.5.2 Java如何实现原子 *** 作

  在Java中可以通过锁和循环CAS的方式来实现原子 *** 作。

  • 1、使用循环CAS实现原子 *** 作
      JVM中的CAS *** 作正是利用了处理器提供的CMPXCHG指令实现的。自旋CAS实现的基本思路就是循环进行CAS *** 作直到成功为止。
      从JDK1.5开始,JUC里提供了一些类来支持原子 *** 作,如AtomicBoolean、AtomicInteger和AtomicLong。
  • 2、使用锁机制实现原子 *** 作
      锁机制保证了只有获得锁的线程才能够 *** 作锁定的内存区域。JVM内部实现了很多种锁机制,有偏向锁、轻量级锁和互斥锁。有意思的是除了偏向锁,JVM实现锁的方式都用了循环CAS,即当一个线程想进入同步块的时候使用循环CAS的方式来获取锁,当它退出同步块的时候使用循环CAS释放锁。
1.5.3 CAS实现原子 *** 作的三大问题
  • 1、ABA问题
      因为CAS需要在 *** 作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A。
      从JDK1.5开始,JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
  • 2、循环时间长开销大
      自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令,那么效率会有一定的提升。pause指令有两个作用:第一,它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;第二,它可以避免在退出循环的时候因内存顺序冲突(Memory Order Violation)而引起CPU流水线被清空(CPU Pipeline Flush),从而提高CPU的执行效率。
  • 3、只能保证一个共享变量的原子 *** 作
      当对一个共享变量执行 *** 作时,我们可以使用循环CAS的方式来保证原子 *** 作,但是对多个共享变量 *** 作时,循环CAS就无法保证 *** 作的原子性,这个时候就可以用锁。还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来 *** 作。比如,有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来 *** 作ij。从JDK1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS *** 作。
1.6 原子类相关问题 1.6.1 Atomic的原理

  Atomic包中的类基本的特性就是在多线程环境下,当有多个线程同时对单个(包括基本类型及引用类型)变量进行 *** 作时,具有排他性,即当多个线程同时对该变量的值进行更新时,仅有一个线程能成功,而未成功的线程可以向自旋锁一样,继续尝试,一直等到执行成功。
  AtomicInteger类主要利用CAS + volatile和native方法来保证原子 *** 作,从而避免synchronized的高开销,提升执行效率。
  CAS的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe 类的 objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址,返回值是 valueOffset。value 是一个volatile变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。

1.6.2 volatile变量和Atomic变量的区别

  volatile常见的功能是保证其修饰的变量在不同线程之间的可见性和禁止重排序, 但它并不能保证原子性。例如用volatile修饰count变量,那么count++ *** 作就不是原子性的。
  Atomic变量提供的方法可以让类似count++的 *** 作具有原子性。如AtomicInteger类中的getAndIncrement()方法会原子性的进行增量 *** 作把当前值加1,其它数据类型和引用变量也可以进行相似 *** 作。

二、BlockingQueue

  在经典的生产者消费者问题中,阻塞队列常常被用到。因为BlockingQueue 提供了可阻塞的插入和移除的方法。即:当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

  阻塞队列(BlockingQueue)是一个支持两个附加 *** 作的队列。这两个附加的 *** 作是:

  • 在队列空时,获取元素的线程会阻塞;
  • 当队列满时,存储元素的线程会阻塞。

  阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

2.1 阻塞队列的基本 *** 作

  BlockingQueue基本 *** 作:

抛异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e,time,unit)
删除remove()poll()take()poll(time,unit)
获取element()peek()
2.1.1 从Queue继承的基本接口

  BlockingQueue继承于Queue接口,对数据元素的基本接口有:

  • 1、插入元素
	//往队列插入数据
	boolean add(E e);
	//当往队列插入数据时,插入成功返回true,否则则返回false
	boolean offer(E e);
  • 2、删除元素
	//从队列中删除数据,成功则返回true,否则为false
	boolean remove(Object o);
	//删除数据,当队列为空时,返回null
	E poll(long timeout, TimeUnit unit) throws InterruptedException;
  • 3、查找元素
	//获取队首元素
	E element();
	//获取队首元素
	E peek();
2.1.2 BlockingQueue的特有接口
  • 1、插入数据
	//当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,
	//直至阻塞队列已经有空余的容量可供使用
	void put(E e) throws InterruptedException;
	//若阻塞队列已经满时,同样会阻塞插入数据的线程,直至阻塞队列
	//已经有空余的地方,与put方法不同的是,该方法会有一个超时时
	//间,若超过当前给定的超时时间,插入数据的线程会退出
	boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
  • 2、删除数据
	//获取队首数据,当阻塞队列为空时,获取队头数据的线程会被阻塞
	E take() throws InterruptedException;
	//当阻塞队列为空时,获取数据的线程会被阻塞;如果被阻塞的线程
	//超过了指定时间,该线程会退出
	E poll(long timeout, TimeUnit unit) throws InterruptedException;
2.2 BlockingQueue实现类

  JDK1.7提供了7个阻塞队列:

ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

  • 1、ArrayBlockingQueue
      ArrayBlockingQueue是由数组实现的有界(一旦创建,容量不能改变)阻塞队列。该队列中的元素FIFO(先进先出)。因此,队头元素时队列中存在时间最长的数据元素,而队尾数据则是当前队列最新的数据元素。ArrayBlockingQueue可作为“有界数据缓冲区”,生产者插入数据到队列容器中,并由消费者提取。
      当队列容量满时,尝试将元素放入队列将导致 *** 作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
      ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到ArrayBlockingQueue。而非公平性则是指访问ArrayBlockingQueue的顺序不是遵守严格的时间顺序,有可能存在,一旦ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue。
      如果保证公平性,通常会降低吞吐量。获得公平性的ArrayBlockingQueue示例:
	BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
  • 2、LinkedBlockingQueue
      LinkedBlockingQueue是用链表实现的有界阻塞队列,同样满足FIFO的特性,与ArrayBlockingQueue相比起来具有更高的吞吐量,为了防止LinkedBlockingQueue容量迅速增,损耗大量内存
      通常在创建LinkedBlockingQueue对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE。
  • 3、PriorityBlockingQueue
      PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化时通过构造器参数Comparator来指定排序规则。需要注意的是不能保证同优先级元素的顺序。
  • 4、SynchronousQueue
      SynchronousQueue每个插入 *** 作必须等待另一个线程进行相应的删除 *** 作,因此,SynchronousQueue实际上没有存储任何数据元素,因为只有线程在删除数据时,其他线程才能插入数据,同样的,如果当前有线程在插入数据时,线程才能删除数据。SynchronousQueue也可以通过构造器参数来为其指定公平性。
2.3 ArrayBlockingQueue实现原理

  阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素

  当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。

2.3.1 成员变量

  ArrayBlockingQueue的主要属性:

	//数据数组
	final Object[] items;

	//头节点下标
	int takeIndex;

	//尾节点下标
	int putIndex;

	//元素个数
	int count;

	//独占锁,入队和出队公用一个lock,说明不能同时出队和入队
	final ReentrantLock lock;

	//出队等待条件队列
	private final Condition notEmpty;

	//入队等待条件队列
	private final Condition notFull;

  可以看出ArrayBlockingQueue内部是采用数组进行数据存储的,为了保证线程安全,采用的是ReentrantLock。为了保证可阻塞式的插入删除数据利用的是Condition,当获取数据的消费者线程被阻塞时会将该线程放置到notEmpty等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中。
  notEmpty和notFull等重要属性在构造方法中进行创建:

	public ArrayBlockingQueue(int capacity) {
    	//false默认lock为非公平锁
   	 	this(capacity, false);
	}

	public ArrayBlockingQueue(int capacity, boolean fair) {
	    if (capacity <= 0)
	        throw new IllegalArgumentException();
	    this.items = new Object[capacity];
	    lock = new ReentrantLock(fair);
	    notEmpty = lock.newCondition();
	    notFull =  lock.newCondition();
	}
2.3.2 元素入队

  入队方法有3种:put入队,满则等待;offer入队,满则返回;add入队,满则抛异常。

  • offer(E e)
	public boolean offer(E e) {
   	 	checkNotNull(e);
    	//获取独占锁
    	final ReentrantLock lock = this.lock;
    	lock.lock();
    	try {
        	//如果队列满了,返回false
        	if (count == items.length)
            	return false;
        	else {
            	//入队
            	enqueue(e);
            	return true;
        	}
    	} finally {
        	lock.unlock();
    	}
	}
  • put(E e)
	public void put(E e) throws InterruptedException {
	    checkNotNull(e);
	    final ReentrantLock lock = this.lock;
	    lock.lockInterruptibly();
	    try {
			//如果当前队列已满,将线程移入到notFull等待队列中
	        while (count == items.length)
	            notFull.await();
			//满足插入数据的要求,直接进行入队 *** 作
	        enqueue(e);
	    } finally {
	        lock.unlock();
	    }
	}

  put方法的逻辑很简单,当队列已满时(count == items.length)将线程移入到notFull等待队列中,如果当前满足插入数据的条件,就可以直接调用enqueue(e)插入数据元素。

  • enqueue(E x)
private void enqueue(E x) {
    final Object[] items = this.items;
	//插入数据
    items[putIndex] = x;
    //如果putIndex超出数组范围了,就置为0
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
	//通知消费者线程,当前队列中有数据可供消费
    notEmpty.signal();
}

  enqueue方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(items[putIndex] = x),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(notEmpty.signal())。

  • offer(E e, long timeout, TimeUnit unit)
	public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

    	checkNotNull(e);
    	long nanos = unit.toNanos(timeout);
    	final ReentrantLock lock = this.lock;
    	lock.lockInterruptibly();
    	try {
       	 	while (count == items.length) {
            	if (nanos <= 0)
                	return false;
            	nanos = notFull.awaitNanos(nanos);
        	}
        	enqueue(e);
        	return true;
    	} finally {
        	lock.unlock();
    	}
	}

  有的入队方法中中lock.lockInterruptibly()。ReentrantLock的中断和非中断加锁模式的区别在于:线程尝试获取锁 *** 作失败后,在等待过程中,如果该线程被其他线程中断了,它是如何响应中断请求的。lock方法会忽略中断请求,继续获取锁直到成功;而lockInterruptibly则直接抛出中断异常来立即响应中断,由上层调用者处理中断。

2.3.3 元素出队
  • take()
	public E take() throws InterruptedException {
	    final ReentrantLock lock = this.lock;
	    lock.lockInterruptibly();
	    try {
			//如果队列为空,没有数据,将消费者线程移入等待队列中
	        while (count == 0)
	            notEmpty.await();
			//获取数据
	        return dequeue();
	    } finally {
	        lock.unlock();
	    }
	}

  take方法也主要做了两步:1. 如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中;2. 若队列不为空则获取数据,即完成出队 *** 作dequeue。

  • dequeue()
	private E dequeue() {
	    final Object[] items = this.items;
	    @SuppressWarnings("unchecked")
		//获取数据
	    E x = (E) items[takeIndex];
	    items[takeIndex] = null;
	    //如果takeIndex等于items.length,将takeIndex = 0
	    if (++takeIndex == items.length)
	        takeIndex = 0;
	    count--;
	    if (itrs != null)
	        itrs.elementDequeued();
	    //通知被阻塞的生产者线程
		notFull.signal();
	    return x;
	}

  dequeue方法也主要做了两件事情:

  1. 获取队列中的数据,即获取数组中的数据元素((E) items[takeIndex]);
  2. 通知notFull等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得lock,并执行完成功退出。

  可以看出put和take方法主要是通过condition的通知机制来完成可阻塞式的插入数据和获取数据。

2.4 LinkedBlockingQueue实现原理

  LinkedBlockingQueue是用链表实现的有界阻塞队列,当构造对象时未指定队列大小时,队列默认大小为Integer.MAX_VALUE。从它的构造方法可以看出:

	public LinkedBlockingQueue() {
	    this(Integer.MAX_VALUE);
	}

  具有两把锁takeLock、putLock。takeLock作为消费线程获取的锁,同时有个对应的notEmpty条件变量用于消费线程的阻塞和唤醒,putLock作为生产线程获取的锁,同时有个对应的notFull条件变量用于生产线程的阻塞和唤醒。

2.4.1 成员变量

  LinkedBlockingQueue的主要属性:

	//阻塞队列的容量,默认为Integer.MAX_VALUE,最大为Integer.MAX_VALUE
	private final int capacity;

	//阻塞队列的元素个数,原子变量
	private final AtomicInteger count = new AtomicInteger();

	//阻塞队列的头结点,并不是真正的头结点
	transient Node<E> head;

	//阻塞队列的尾结点
	private transient Node<E> last;

	//消费线程使用的锁
	private final ReentrantLock takeLock = new ReentrantLock();

	//notEmpty条件对象,当队列为空时用于挂起消费线程
	private final Condition notEmpty = takeLock.newCondition();

	//生产线程使用的锁
	private final ReentrantLock putLock = new ReentrantLock();

	//notFull条件对象,当队列已满时用于挂起生产线程
	private final Condition notFull = putLock.newCondition();

	//链表的结点内部类,用于存储数据
	static class Node<E> {
    	//数据域
    	E item;
    	//后继引用
    	Node<E> next;

    	//构造器
    	Node(E x) {
        	item = x;
    	}
	}

  可以看出与ArrayBlockingQueue主要的区别是,LinkedBlockingQueue在插入数据和删除数据时分别是由两个不同的lock(takeLock和putLock)来控制线程安全的,因此,也由这两个lock生成了两个对应的condition(notEmpty和notFull)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,链表中元素就是上面的Node内部类。

2.4.2 元素入队

  以put(e)为例,此方法将指定的元素插入此队列的尾部,如果该队列已满,则线程等待。
  如果因为获取不到锁而在同步队列中等待的时候被中断则抛出InterruptedException,即响应中断。如果因为队列满了在条件队列中等待的时候在其他线程调用signal、signalAll方法唤醒该线程之前就因为中断而被唤醒了,也会抛出InterruptedException。另外,如果指定元素为null则抛出NullPointerException异常。
  在ArrayBlockingQueue中,生产(放入数据)线程阻塞的时候,需要消费(移除数据)线程才能唤醒,并且因为它们获取的同一个锁,消费和生产不能并发进行(假设一个线程仅仅从事生产或者消费工作的一种)。在LinkedBlockingQueue中,如果有线程因为获取不到锁或者队列已满而导致生产(放入数据)线程阻塞,那么他可能被后面的消费线程唤醒也可能被后面的生产线程唤醒。因为它内部有两个锁,生产和消费获取不同的锁,可以并行执行生产和消费任务,不仅在消费数据的时候会唤醒阻塞的生产线程,在生产数据的时候如果队列容量还没满,也会唤醒此前阻塞的生产线程继续生产。
  put方法的大概步骤:

  1. 指定元素e的null校验;
  2. 新建结点node,lockInterruptibly可中断的等待获取生产者锁putLock,即响应中断;没有获取到锁就在同步队列中阻塞等待,被中断了则直接中断等待并抛出异常;
  3. 获取到锁之后,while循环判断此时结点数量是否等于容量,即队列是否满了,如果满了,那么该线程在notFull条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断;
  4. 队列没有满,node结点添加到链表尾部成为新的尾结点;
  5. 获取此时计数器的值赋给c,并且计数器值自增1;
  6. 如果c+1小于capacity,说明此时队列未满,还可以入队,那么唤醒一个在notFull条件队列中等待的生产线程;
  7. 释放生产者锁putLock;
  8. 如果前面没有发生异常,那么执行最后的if语句:如果c为0,那么此时队列中还可能有存在1条数据,刚放进去的那么由于刚才队列没有数据,可能此时有消费者线程在等待,这里需要唤醒一个消费者线程。如果此前队列中就有数据没有消费完毕,那么也不必唤醒唤醒消费者。注意这里唤醒消费者线程的时候,必须先获取Condition关联的消费者锁。

  put方法源码:

/**
 * 将指定的元素插入此队列的尾部,如果该队列已满,则线程等待。
 *
 * @param e 指定元素
 * @throws InterruptedException 如果因为获取不到锁而在同步队列中等待的时候被中断则抛出InterruptedException,即响应中断
 *                              如果因为队列满了在条件队列中等待的时候在其他线程调用signal、signalAll方法唤醒该线程之前就因为中断而被唤醒了,也会抛出InterruptedException。
 * @throws NullPointerException 如果指定元素为 null
 */
public void put(E e) throws InterruptedException {
    //e的null校验
    if (e == null) throw new NullPointerException();
    int c = -1;
    //新建结点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //可中断的等待获取生产者锁,即响应中断
    putLock.lockInterruptibly();
    try {
        //while循环判断此时结点数量是否等于容量,即队列是否满了
        while (count.get() == capacity) {
            //如果满了,那么该线程在notFull条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断
            notFull.await();
        }
        // 队列没有满,结点添加到链表尾部
        enqueue(node);
        //获取此时计数器的值赋给c,并且计数器值自增1
        c = count.getAndIncrement();
        //如果c+1小于capacity,说明还可以入队
        if (c + 1 < capacity)
            //唤醒一个在notFull条件队列中等待的生产线程
            notFull.signal();
    } finally {
        //释放生产者锁
        putLock.unlock();
    }
    //如果前面没有抛出异常,那么在finally之后会执行下面的代码
    //如果c为0,那么此时队列中还可能有存在1条数据,刚放进去的
    //那么由于刚才队列没有数据,可能此时有消费者线程在等待,这里需要唤醒一个消费者线程
    //如果此前队列中就有数据没有消费完毕,那么也不必唤醒唤醒消费者
    if (c == 0)
        //获取消费者锁并且尝试唤醒一个消费者线程
        signalNotEmpty();
}

/**
 * 指定结点链接到队列尾部成为新的尾结点,在获取锁之后才会调用该方法
 * @param node 指定结点
 */
private void enqueue(Node<E> node) {
    //很简单,原尾结点的next引用指向node结点,然后last指向最新node结点
    last = last.next = node;
}

//唤醒一个在notEmpty条件队列中等待的消费线程,需要先获取消费者锁
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    //阻塞式的获取消费者锁,即不响应中断
    takeLock.lock();
    try {
        //唤醒一个在notEmpty条件队列中等待的消费线程
        //要想调用Condition对象的方法,必须先要获取该Condition对象对应的lock锁
        notEmpty.signal();
    } finally {
        //释放消费者锁
        takeLock.unlock();
    }
}
2.4.3 元素出队

  以take方法为例,该方法的作用:获取并移除此队列的头部,在元素变得可用(队列非空)之前一直等待。
  如果因为获取不到锁而在同步队列中等待的时候被中断则抛出InterruptedException,即响应中断。如果因为队列满了在条件队列中等待的时候在其他线程调用signal、signalAll方法唤醒该线程之前就因为中断而被唤醒了,也会抛出InterruptedException。
  在ArrayBlockingQueue中,消费(移除数据)线程阻塞的时候,需要生产(放入数据)线程才能唤醒,并且因为它们获取的同一个锁,消费和生产不能并发进行(假设一个线程仅仅从事生产或者消费工作的一种)。在LinkedBlockingQueue中,如果有线程因为获取不到消费者锁或者队列已空而导致消费(移除数据)线程阻塞,那么他可能被后面的生产线程唤醒也可能被后面的消费线程唤醒。因为它内部有两个锁,生产和消费获取不同的锁,可以并行执行生产和消费任务,不仅在生产数据的时候会唤醒阻塞的消费线程,在消费数据的时候如果队列容量还没空,也会唤醒此前阻塞的消费线程继续消费。

  take方法大概步骤:

  1. 指定元素e的null校验;
  2. lockInterruptibly可中断的等待获取消费者锁takeLock,即响应中断;没有获取到锁就在同步队列中阻塞等待,被中断了则直接中断等待并抛出异常;
  3. 获取到锁之后,while循环判断此时结点数量是否等于0,即队列是否空了,如果空了,那么该线程在notEmpty条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断;
  4. 队列没有空,调用dequeue方法获取并移除此队列的头部;
  5. 获取此时计数器的值赋给c,并且计数器值自减1;
  6. 如果c大于1,说明此时队列未空,说明还可以出队列,那么唤醒一个在notEmpty条件队列中等待的消费线程;
  7. 释放消费者锁putLock;
  8. 如果前面没有发生异常,那么执行最后的if语句:如果c为capacity,那么此前队列中可能具有满的数据,可能此时有生产者线程在等待,这里需要唤醒一个生产者线程。如果此前队列中的数据没有满,那么也不必唤醒生产者。注意这里唤醒生产者线程的时候,必须先获取Condition关联的生产者锁。

  take方法源码:

/**
 * 获取并移除此队列的头部
 *
 * @return 被移除的队列头部元素
 * @throws InterruptedException 因为获取不到锁而等待的时候被中断
 */
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //可中断的等待获取消费者锁,即响应中断
    takeLock.lockInterruptibly();
    try {
        //while循环判断此时结点数量是否等于0,即队列是否空了
        while (count.get() == 0) {
            //如果空了,那么该线程在notEmpty条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断
            notEmpty.await();
        }
        // 队列没有空,获取并移除此队列的头部
        x = dequeue();
        //获取此时计数器的值赋给c,并且计数器值自减1
        c = count.getAndDecrement();
        //如果c大于1,说明还可以出队列
        if (c > 1)
            //唤醒一个在notEmpty条件队列中等待的消费线程
            notEmpty.signal();
    } finally {
        //释放消费者锁
        takeLock.unlock();
    }
    //如果前面没有抛出异常,那么在finally之后会执行下面的代码
    //如果c为capacity,那么此前队列中可能具有满的数据,可能此时有生产者线程在等待,
    //这里需要唤醒一个生产者线程
    //如果此前队列中的数据没有满,那么也不必唤醒唤醒生产者
    if (c == capacity)
        //获取生产者锁并且尝试唤醒一个生产者线程
        signalNotFull();
    //返回被移除的队列头部元素
    return x;
}

//唤醒一个生产者线程,只会在take/poll方法中被调用
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    //阻塞式的获取生产者锁,即不响应中断
    putLock.lock();
    try {
        //唤醒一个在notFull条件队列中等待的生产线程
        notFull.signal();
    } finally {
        //释放生产者锁
        putLock.unlock();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/924052.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存