【并发编程】基于数组结构实现的一个有界阻塞队列ArrayBlockingQueue

【并发编程】基于数组结构实现的一个有界阻塞队列ArrayBlockingQueue,第1张

【并发编程】基于数组结构实现的一个有界阻塞队列ArrayBlockingQueue ArrayBlockingQueue是什么

ArrayBlockingQueue是最典型的有界阻塞队列。内部使用数组存储元素!初始化时需要指定容量大小。利用 ReentrantLock 实现线程安全! ArrayBlockingQueue的适用场景

在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,可以使用ArrayBlockingQueue。当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。 ArrayBlockingQueue的实现原理

使用独占锁ReentrantLock实现线程安全,入队和出队 *** 作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队 *** 作;意味着生产者和消费者无法并行 *** 作,在高并发场景下会成为性能瓶颈。 ArrayBlockingQueue的特点

有界队列!先进先出!存取互相排斥!使用的数据结构是静态数组:容量固定,没有扩容机制;没有元素的位置也占用空间,被 null 占位;使用ReentrantLock锁:存取是同一把锁, *** 作的是同一个数组对象,存取互相排斥。 ArrayBlockingQueue的入队出队 *** 作

两个指针都是从队首向队尾移动,保证队列的先进先出原则!入队阻塞对象notFull:队列count=length,放不进去元素时,阻塞在该对象上。出队阻塞对象notEmpty:队列count=0,无元素可取时,阻塞在该对象上。入队 *** 作:从队首开始添加元素,记录putIndex(到队尾时设置为0),唤醒notEmpty。出队 *** 作:从队首开始取出元素,记录takeIndex(到队尾时设置为0),唤醒notFull。 ArrayBlockingQueue的使用方式

// 定义同步队列
BlockingQueue blockingQueue = new ArrayBlockingQueue(1000);
// 放入元素
System.out.println(blockingQueue.add(9));
blockingQueue.put(10);
// 取出元素
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
ArrayBlockingQueue的数据结构源码分析
// 数据元素数组
final Object[] items;


// 下一个待取出元素索引
int takeIndex;


// 下一个待添加元素索引
int putIndex;


// 元素个数
int count;


// 内部使用的锁
final ReentrantLock lock;


// 消费者条件队列
private final Condition notEmpty;


// 生产者条件队列
private final Condition notFull;
ArrayBlockingQueue的构造方法源码分析
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}


public ArrayBlockingQueue(int capacity, boolean fair) {
    // 传入数组的长度小于0,直接抛出异常
    if (capacity <= 0)
        throw new IllegalArgumentException();
    // 初始化数组
    this.items = new Object[capacity];
    // 初始化锁
    lock = new ReentrantLock(fair);
    // 初始化消费者的条件队列
    notEmpty = lock.newCondition();
    // 初始化生产者的条件队列
    notFull =  lock.newCondition();
}


public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection c) {
    // 调用俩个参数的构造方法
    this(capacity, fair);

    // 得到当前队列的lock锁
    final ReentrantLock lock = this.lock;
    // 加锁 *** 作:这里加锁是防止由于指令重排序导致的可见性问题。
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        // 定义一个数组元素的临时角标
        int i = 0;
        try {
            // 循环每个元素,放入到ArrayBlockingQueue的数组中
            for (E e : c) {
                // 元素为NULL抛出空指针异常
                checkNotNull(e);
                // 将元素放入ArrayBlockingQueue的数组中
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            // 传入数组长度大于给定的capacity的长度,会抛出异常
            throw new IllegalArgumentException();
        }
        // 赋值元素的数量到count上
        count = i;
        // 数组中元素满了,插入的计数器从0开始
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

ArrayBlockingQueue的入队方法:put(E e) 源码分析
public void put(E e) throws InterruptedException {
    // 如果元素是NULL,抛出异常
    checkNotNull(e);
    // 得到当前队列的lock锁
    final ReentrantLock lock = this.lock;
    // 尝试去获取锁
    lock.lockInterruptibly();
    try {
        // 数量满了的时候,生产者队列等待
        while (count == items.length)
            notFull.await();
        // 入队
        enqueue(e);
    } finally {
        // 唤醒消费者线程
        lock.unlock();
    }
}


private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    // 获取到当前的元素数组
    final Object[] items = this.items;
    // 在该添加的位置放入当前的元素
    items[putIndex] = x;
    // 数组中元素满了,插入的计数器从0开始。
    // 这里进行了一次加一 *** 作,将putIndex指向下个要插入的位置
    if (++putIndex == items.length)
        putIndex = 0;
    // 元素的数量加一
    count++;
    // 准备唤醒消费者条件队列
    notEmpty.signal();
}


public void lockInterruptibly() throws InterruptedException {
    // 直接调用AQS的acquireInterruptibly方法
    sync.acquireInterruptibly(1);
}


public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    // 如果线程被中断了,抛异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取锁。tryAcquire在AQS中,与ReentrantLock的实现方式一致
    if (!tryAcquire(arg))
        // 循环的获取锁,优先考虑中断
        doAcquireInterruptibly(arg);
}


private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    // 得到当前的节点
    final Node node = addWaiter(Node.EXCLUSIVE);
    // 定义失败标志位true
    boolean failed = true;
    try {
        for (;;) {
            // 得到当前节点的上一个(前置)节点,前置节点为null,会抛出空指针异常
            final Node p = node.predecessor();
            // 如果前置节点是头结点,并且尝试获取锁成功
            if (p == head && tryAcquire(arg)) {
                // 把当前的节点设置为头结点
                setHead(node);
                // 去掉前驱节点的指向,方便GC去回收线程
                p.next = null; // help GC
                // 变更失败标志位false
                failed = false;
                // 跳出循环
                return;
            }
            // 代码执行到这里,说明尝试获取锁,但是获取锁失败了。
            // 阻塞前的准备工作 *** 作成功(状态是-1的时候成功)
            // 将线程阻塞,等待他去唤醒。唤醒后返回线程的中断状态!
            // 这里的代码在AQS中,与ReentrantLock的实现方式一致
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 上面代码抛出异常的时候,会执行这里的逻辑
        if (failed)
            // 取消获取锁的逻辑。cancelAcquire在AQS中,与ReentrantLock的实现方式一致
            cancelAcquire(node);
    }
}


public final void signal() {
    // 不是当前线程,直接抛出异常!
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取条件队列的头结点
    Node first = firstWaiter;
    // 条件队列的头结点不是null,尝试去唤醒头结点
    if (first != null)
        doSignal(first);
}


private void doSignal(Node first) {
    do {
        // 如果当前节点的下一个节点是null。说明唤醒这个就没有其他节点了。
        if ( (firstWaiter = first.nextWaiter) == null)
            // 无节点的时候设置尾结点为null
            lastWaiter = null;
        // 将当前节点的指向情况,方便GC去回收
        first.nextWaiter = null;
    // 这里的循环条件为条件队列转同步队列,transferForSignal在AQS中,与CyclicBarrier的实现方式一致
    // 转同步队列失败并且节点存在会一直循环
    // 转同步队列成功或者条件队列中没有节点,跳出循环!
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
ArrayBlockingQueue的出队方法:take() 源码分析
public E take() throws InterruptedException {
    // 得到当前队列的lock锁
    final ReentrantLock lock = this.lock;
    // 尝试去获取锁
    lock.lockInterruptibly();
    try {
        // 队列中无数据的时候,消费者队列等待
        while (count == 0)
            // 消费者队列等待
            notEmpty.await();
        // 返回出队的结果
        return dequeue();
    } finally {
        // 唤醒生产者线程
        lock.unlock();
    }
}


private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 获取到当前要出队的元素
    E x = (E) items[takeIndex];
    // 将要出队位置的元素变为null方便GC去回收
    items[takeIndex] = null;
    // 出队到最后一个,下一个出队的计数器从0开始。
    // 这里进行了一次加一 *** 作,将takeIndex指向下个要出队的位置
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 元素总数量减一
    count--;
    // 迭代器不为空的时候
    if (itrs != null)
        // 这里的逻辑主要是头结点为空的时候,清空所有迭代器!
        // 迭代器需要去重写:iterator()定义
        itrs.elementDequeued();
    notFull.signal();
    // 返回当前要出队的元素
    return x;
}

结束语

获取更多本文的前置知识文章,以及新的有价值的文章,让我们一起成为架构师!关注公众号,可以让你对MySQL有非常深入的了解关注公众号,每天持续高效的了解并发编程!关注公众号,后续持续高效的了解spring源码!这个公众号,无广告!!!每日更新!!!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5721968.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存