常用并发工具类(并发集合类)

常用并发工具类(并发集合类),第1张

常用并发工具类(并发集合类)

文章目录
  • 概述
  • BlockingQueue
  • ArrayBlockingQueue
    • 数据存储相关属性
    • 阻塞特性相关属性
    • 主要方法
  • linkedBlockingQueue
    • linkedBlockingQueue 主要属性
    • linkedBlockingQueue 设计思想
  • ConcurrentlinkedQueue
  • PriorityBlockingQueue
    • PriorityBlockingQueue 主要属性
    • PriorityBlockingQueue 设计思想
  • SynchronousQueue
  • linkedBlockingDeque
  • DelayQueue
  • DelayQueue 主要属性
    • DelayQueue 主要设计思想
  • CopyOnWrite
  • CopyOnWriteArrayList
    • CopyonWriteArrayList 的主要属性
    • CopyonWriteArrayList 主要设计思想
    • CopyOnWriteArraySet

概述

常用的并发集合类,主要有 ArrayBlockingQueue,ConcurrentlinkedQueue,linkedBlockingQueue,PriorityBlockingQueue,DelayQueue,SynchronousQueue,linkedBlockingDeque,以及 COW 系列的集合,如 CopyOnWriteArrayList,CopyOnWriteArraySet 等

BlockingQueue

BlockingQueue 描述了一个阻塞队列应该有的行为,例如:

  • 阻塞读方法:take()
  • 阻塞写方法:put()

阻塞的读写方法的函数签名上都会抛出 InterruptedException,具体的阻塞逻辑由实现类自行实现。

ArrayBlockingQueue

是一个基于数组实现的阻塞队列,创建对象时必须指定容量

数据存储相关属性

内置了一个循环队列用于存储数据,可以更高效地利用内存空间

    
    final Object[] items;

    
    int takeIndex;

    
    int putIndex;

    
    int count;
阻塞特性相关属性

内置一个 ReentrantLock 锁对象,一个队列是否为空的 Condition 对象,以及一个队列是否满的 Condition 对象

    
    final ReentrantLock lock;

    
    private final Condition notEmpty;

    
    private final Condition notFull;

其中,notEmpty 条件和 notFull 条件都是由 lock 对象创建出来的

主要方法
  • 构造方法中必须传入容量,可以通过 public ArrayBlockingQueue(int capacity, boolean fair) 中的 fair 参数来指定公平/非公平模式
    • fair 参数最终会用于构造 ReentrantLock 锁对象
  • put 方法会在队列满的时候((队尾下标+1)%队列长度=队头下标))调用 notEmpty 条件进行等待,等队列再次有空位的时候(有元素出队时将会被唤醒)将元素放至队尾
  • take 会在队列为空的时候(队尾下标=队头下标)调用 notFull 条件进行等待,当队列不为空时,将队头的元素返回,并删除队头元素
linkedBlockingQueue

linkedBlockingQueue 是一个基于单向链表实现的阻塞队列,其容量为:

  • 如果在构造函数中指定了容量,那么容量就是队列长度的最大边界限制
  • 如果没有在构造函数中指定容量,容量将会默认设置成 Integer.MAX_VALUE,近似无界

所以,linkedBlockingQueue 在不传入容量的情况下,最大容量会被设置为 Integer.MAX_VALUE

linkedBlockingQueue 主要属性
    
    private final int capacity;

    
    private final AtomicInteger count = new AtomicInteger();

    
    transient Node head;

    
    private transient Node last;

    
    private final ReentrantLock takeLock = new ReentrantLock();

    
    private final Condition notEmpty = takeLock.newCondition();

    
    private final ReentrantLock putLock = new ReentrantLock();

    
    private final Condition notFull = putLock.newCondition();
  • linkedBlockingQueue 底层的数据结构是单向链表
  • 内置一个读取锁,ReentrantLock takeLock ,以及由这把锁创建出来的非空条件 Condition notEmpty
  • 内置一个写入锁,ReentrantLock putLock,以及由这把锁创建出来的非空条件 Condition notFull
linkedBlockingQueue 设计思想
  • 执行 take() 方法的时候,会从队列的头部取出元素,所以必须要获取读取锁 takLock,从而确保同时只有一个线程可以 *** 作链表的头部
    • 当执行 take() 方法的时候如果队列为空,那么当前线程将会调用 notEmpty.await() 方法进行阻塞,一直到有元素放入事件发生(如调用 put() 方法)
  • 执行 put() 方法的时候,会从队列的尾部添加元素,所以必须要获取到写入锁 putLock,从而确保同时只有一个线程可以 *** 作链表的尾部
    • 当执行 put() 方法的时候如果队列满了,那么当前线程将会调用 notFull.await() 方法进行阻塞,一直到有元素取出事件发生(如调用 take() 方法)
  • 当执行 remove()、contains() 等方法时,将会同时获取读、写两把锁,在方法结束后同时释放两把锁
    • 这样做的目的是因为 remove() 等方法不能确定在链表的某个位置 *** 作(读取或者新增、删除)元素,所以需要同时对队头和队尾都加锁

总的来说,linkedBlockingQueue 就是通过控制链表同一方向的 *** 作,来实现线程安全和读写条件阻塞;当不能确定 *** 作元素的位置时,将进行双重加锁

ConcurrentlinkedQueue

ConcurrentlinkedQueue 是一个基于单向链表实现的无界线程安全队列,基于 CAS + 自旋来保证线程安全

PriorityBlockingQueue

PriorityBlockingQueue 是一个基于优先级的无界阻塞队列,底层的数据结构是堆。

PriorityBlockingQueue 主要属性
    
    private transient Object[] queue;

    
    private transient int size;

    
    private transient Comparator comparator;

    
    private final ReentrantLock lock;

    
    private final Condition notEmpty;

    
    private transient volatile int allocationSpinLock;
  • PriorityBlockingQueue 底层有一个可扩容的对象数组 Object[] queue,结构是堆
  • 内置一个 ReentrantLock 独占锁,以及由这个锁构造出来的 Condition notEmpty 条件
  • 一个 volatile 关键字修饰的整型变量 volatile int allocationSpinLock
PriorityBlockingQueue 设计思想
  • 所有的存取方法都需要获取锁后才能执行,包括 size() 方法
  • 当数组的长度大于等于容量时,需要进行扩容 *** 作
    • 扩容之前先要释放锁
    • 首先要尝试使用 CAS 尝试把 allocationSpinLock 变量的值修改为 1
    • 当前容量小于 64 时,增长为原来的 2 倍 + 2;否则增长为原来的 1.5 倍
    • 还原 allocationSpinLock 的值
    • 扩容完成之后,再次获取锁
  • 插入元素时,会按照指定的 Comparator 的比较方法进行插入;否则将会使用元素自身的 Comparable.compareTo() 方法进行比较
  • 调用 take() 方法时如果队列为空时,当前线程将会调用 notEmpty.await() 进行阻塞,直到有新的元素入队后被唤醒
  • 出队的元素始终是堆顶元素
SynchronousQueue

一个不存储元素的阻塞队列,每一个入队 *** 作必须对应一个出队 *** 作,每一个出队 *** 作必须对应一个入队 *** 作。

linkedBlockingDeque

linkedBlockingQueue 是一个双端阻塞队列,底层数据结构是双向链表。

  • 内置一个 ReentrantLock 独占锁,以及由这个锁构造出来的 Condition notEmpty(队列为空,等待放入) 和 Condition notFull(队列已满,等待取出)条件
  • 当队列为空时调用阻塞取出相关方法(take())时,将会调用 notEmpty.await() 方法进行阻塞,直到有元素被放入到队列中
  • 当队列已满时调用阻塞存入相关方法(put())时,将会调用 notFull.await() 方法进行阻塞,直到有元素被取出
  • 可以从队列的两端(头部/尾部)进行元素 *** 作(存入/取出)元素,意味着可以使用 linkeBlockingQueue 自由实现栈或者队列
DelayQueue

DelayQueue 是一个无界延时阻塞队列。底层的数据结构是优先级队列 PriorityQueue

DelayQueue 主要属性
    private final transient ReentrantLock lock = new ReentrantLock();

    private final PriorityQueue q = new PriorityQueue();

    private Thread leader = null;

     
    private final Condition available = lock.newCondition();
  • 内置一个 ReentrantLock 独占锁,以及一个由锁对象创建出来的条件对象 Condition available
  • 底层数据结构是 PriorityQueue,是由堆来实现的
  • 有一个 Thread leader 属性,代表的是当前第一个尝试等待获取队头元素的线程
DelayQueue 主要设计思想
  • 所有放入到 DelayQueue 中的元素,都要实现 Delayed 接口
  • 当队列为空时调用出队方法,将会调用 available.await() 方法进行阻塞,直到有新的元素被添加到队列中
  • 当队列不为空时调用出队方法,首先判断队头元素的剩余过期时间是否小于等于 0
    • 如果队头元素的剩余过期时间小于等于 0,那么说明元素已经到期,直接把这个元素移除并返回
    • 如果队头元素的剩余过期时间(long delay)大于 0,将会判断当前 leader 是否为空
      • 如果为空,则把自己设置为 leader,并且调用 available.awaitNanos(long delay) 方法,将自己阻塞 long delay 时间
      • 如果不为空,说明已经有其他线程在等待获取元素了,那么将调用 available.await() 方法进行无限期等待
    • 在阻塞了 long delay 时间后,重新将 leader 置为 null,并且执行上面的逻辑
  • 所有存取方法调用前都需要获取锁
CopyonWrite

CopyOnWrite,顾名思义,就是写时复制的意思。
主要的设计思想就是在修改数据的时候,并不直接在原数据上进行修改,而是先拷贝一份原数据的副本,后续对于数据的修改 *** 作在拷贝出来的副本上进行,最后将副本替换掉原数据。
这样做的好处就是,在读取数据的时候,是不用采取任何并发控制手段的,直接访问源数据就行,所以写时复制适用于读多写少的场景。

CopyonWriteArrayList

CopyOnWriteArrayList 是一个并发安全的动态数组,底层数据结构是数组。

CopyonWriteArrayList 的主要属性
    
    final transient ReentrantLock lock = new ReentrantLock();

    
    private transient volatile Object[] array;
  • 内置一个 ReentrantLock 独占锁
  • 一个用 volatile 关键字修饰的对象数组 array
CopyonWriteArrayList 主要设计思想
  • 在所有的修改方法执行之前,首先要获取锁
  • 在使用元素添加方法时,每次会拷贝出一个长度 +1 的新数组,同时把要添加的元素放入到新数组的最后一个位置上,最后把旧数组替换成新数组
  • 在使用元素移除方法时,每次会拷贝出一个长度 -1 的新数组,最后根据要删除的元素位置,进行数组拷贝,最后把旧数组替换成新数组
  • 所有读取方法都没有采取任何同步措施,包括获取迭代器
    • 获取迭代器的方法,并没有像很多文章中说的对当前数组进行了一个拷贝,而是直接把当前数组的引用传递给了迭代器
    • 之所以不用对当前数组进行拷贝,是因为每次的修改方法都会创建出一个新数组,所以 CopyOnWriteArrayList 迭代器根本不会,也没有必要去拷贝当前数组
CopyonWriteArraySet

CopyOnWriteArraySet 是一个并发安全的无重复集合,底层是基于 CopyOnWriteArrayList 来实现的

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5582614.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存