【学习笔记】抽象队列同步器AQS应用之BlockingQueue详解

【学习笔记】抽象队列同步器AQS应用之BlockingQueue详解,第1张

【学习笔记】抽象队列同步器AQS应用之BlockingQueue详解

文章目录
  • 什么是AQS框架
    • Aqs核心源码
    • 基于aqs实现的锁
  • BlockingQueue
    • ArrayBlockingQueue
    • linkedBlockingQueue
    • DelayQueue
    • BlockingQueue API
    • 多线程生产者-消费者示例

什么是AQS框架

1、AQS是一个JAVA线程同步的框架。是JDK中很多锁工具的核心实现框架。
2、在AQS中,维护了一个信号量 state和一个线程组成的双向链表队列。其中,这个线程队列,是用来给线程排队的,而state就像是一个红绿灯,用来控制线程排队或放行的。在不同的场景下,有不用的意义。
在可重入锁这个场景下, states就用来表示加的次数。0标识无锁,每加一次锁, states就加1.释放锁 state就减1

简单说一下AQS,AQS全称为AbstractQueuedSychronizer,翻译过来应该是抽象队列同步器。如果说java.util.concurrent的基础是CAS的话,那么AQS就是整个Java并发包的核心了,ReentrantLock、CountDownLatch、Semaphore等等都用到了它。
AQS实际上以双向队列的形式连接所有的Entry,比方说ReentrantLock,所有等待的线程都被放在一个Entry中并连成双向队列,前面一个线程使用ReentrantLock好了,则双向队列实际上的第一个Entry开始运行。
AQS定义了对双向队列所有的 *** 作,而只开放了tryLock和tryRelease方法给开发者使用,开发者可以根据自己的实现重写tryLock和tryRelease方法,以实现自己的并发功能。

AQS内部用一个volatile修饰的int类型的成员变量state来控制同步状态。

  • state = 0:表示没有线程正在独占共享资源的锁。
  • state = 1:表示有线程正在共享资源的锁。

AQS虽说是一个抽象类,但是其内部没有一个方法是抽象方法,因为AQS只是基础的组件,作者并不希望使用者对其直接进行 *** 作,更倾向于其作为基础组件,为其实现类提供基础的帮助。

AQS采用的是模板方法模式,其内部除了提供并发的 *** 作核心方法以及同步队列的 *** 作之外,还提供了一些模板方法让子类自己实现,如加锁解锁。

AQS作为基础的组件,封装的都是核心的并发 *** 作,实际上还分为两种模式,共享模式和独占模式,如Reentrantlock,ReentrantReadWriteLock(写锁部分)都是独占锁,ReentrantReadWriteLock(读锁部分)就是共享锁。
这两种模式的解锁和加锁逻辑都不一样,但是AQS只关注内部的公共方法的实现,不关心外部的具体实现,所以提供了模板方法给子类。

要实现独占模式,则需要实现tryAcquire(加锁)和tryRelease(解锁),而实现共享模式则需要实现tryAcquireShared(加锁)和tryReleaseShared(解锁),无论是共享模式还是独占模式,其底层实现都是同一个AQS,只是加锁和解锁逻辑不一样,所以,根据自己的需求自定义锁也就变得简单。
aqs类:

看看AQS提供的5个模板方法:

AQS在内部定义了一个volatile int state变量,表示同步状态:当线程调用lock方法时,如果state=0,说明没有任何线程占有共享资源的锁,可以获得锁并将state=1,如果state=1说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待

aqs通过node内部类构成一个双向链表结构的同步队列来完成线程获取锁的排队动作,当有线程获取锁失败以后,就被添加到队列末尾。AQS通过内部类conditionobject构建等待队列,当condition调用wait()方法后,线程会加入等待队列中,而当condition调用signal()方法后,线程将从等待队列转移到同步队列中进行锁竞争。

【conditionobject类】
ConditionObject是AQS中的内部类,提供了条件锁的同步实现,实现了Condition接口,并且实现了其中的await(),signal(),signalALL()等方法。

ConditionObject分析
 使用方式如下:

Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();//创建和该锁关联的条件锁
    
    public void conditionWait() throws InterruptedException{
        lock.lock();
        try {
            condition.await();
        }finally {
            lock.unlock();
        }
    }
    public void ConditionSignal() throws InterruptedException{
        lock.lock();
        try {
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

AQS和conditon各自维护了不同的队列,在使用lock和condition的时候,其实就是两个队列的互相移动。
aqs中的类:

⨠⨠⨠AbstractOwnableSynchronizer:抽象类,定义了存储独占当前线程的属性和设置,获取当前线程的方法。
⨠⨠⨠AbstractQueuenSynchronize:抽象类,AQS框架核心类,内部以虚拟队列的方式管理线程的锁获取与锁释放,其中获取锁(tryAcquire方法)和释放锁(tryRelease方法)并没有提供默认的实现,需要子类重写方法的具体逻辑,目的是为了使开发人可以自定义获取锁和释放锁的方式。
⨠⨠⨠Node:AbstractQueuenSynchronize的内部类,用于构建虚拟队列(双向链表),为每个进入同步队列的线程封装成Node对象加入队列,管理需要获取锁的线程。
⨠⨠⨠Sync:抽象类,是ReentrantLock的内部类,继承了AbstractQueuenSynchronize,实现了tryRelease方法,并提供抽象方法lock,供子类实现
⨠⨠⨠NonfairSync:Reentrantlock的内部类,继承Sync,非公平锁的实现类
⨠⨠⨠FairSync:Reentrantlock的内部类,继承Sync,公平锁的实现类
⨠⨠⨠Reentrantlock:实现了Lock接口,创建时默认为非公平锁。

Aqs核心源码

获取锁:

lock:

compareAndSetState 赋值成功则获取锁

ReentrantLock 实现了非公平锁和公平锁,所以在调用 lock.lock(); 时,会有不同的实现类:

  1. 非公平锁,会直接使用 CAS 进行抢占,修改变量 state 值。如果成功则直接把自己的线程设置到 exclusiveOwnerThread ,也就是获得锁成功。 不成功后续分析
  2. 公平锁,则不会进行抢占,而是规规矩矩的进行排队。 老实人

(AQS)acquire

public final void acquire(int arg)

 { 

if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
    selfInterrupt();

}

整个这块代码里面包含了四个方法的调用,如下:

  1. tryAcquire ,分别由继承 AQS 的公平锁( FairSync )、非公平锁 NonfairSync )实现。
  2. addWaiter ,该方法是 AQS 的私有方法,主要用途是方法 tryAcquire 返回 false以后,也就是获取锁失败以后,把当前请求锁的线程添加到队列中,并返回 Node节点。
  3. acquireQueued ,负责把 addWaiter 返回的 Node 节点添加到队列结尾,并会执行获取锁 *** 作以及判断是否把当前线程挂起。
  4. selfInterrupt ,是 AQS 中的 Thread.currentThread().interrupt() 方
    法调用,它的主要作用是在执行完 acquire 之前自己执行中断 *** 作。

addwaiter方法

private Node addWaiter(Node mode) { 
    Node node = new Node(Thread.currentThread(), mode); 
    Node pred = tail; // 如果队列不为空, 使用 CAS 方式将当前节点设为尾节点
    if (pred != null) { 
        node.prev = pred; 
        if (compareAndSetTail(pred, node)) 
    { 
        pred.next = node; 
        return node; 
    }
        } // 队列为空、CAS失败,将节点插入队列 
    enq(node); 
    return node; 
}

当执行方法 addWaiter ,那么就是 !tryAcquire = true ,也就是
tryAcq uire 获取锁失败了。
 接下来就是把当前线程封装到 Node 节点中,加入到 FIFO 队列中。 因为先进先
出,所以后来的队列加入到队尾
 compareAndSetTail 不一定一定成功,因为在并发场景下,可能会出现 *** 作
失败。那么失败后,则需要调用 enq 方法,该方法会自旋 *** 作,把节点入队列。

1、AQS是一个JAVA线程同步的框架。是JDK中很多锁工具的核心实现框架。
2、在AQS中,维护了一个信号量 state和一个线程组成的双向链表队列。其中,这个线程队列,是用来给线程排队的,而state就像是一个红绿灯,用来控制线程排队或放行的。在不同的场景下,有不用的意义。
在可重入锁这个场景下, states就用来表示加的次数。0标识无锁,每加一次锁, states就加1.释放锁 state就减1

基于aqs实现的锁


Semaphore ,信号量锁。主要用于控制流量,它的作用是限制某段代码块的并发数。Semaphore有一个构造函数,可以传入一个int型整数n,表示某段代码最多只有n个线程可以访问,如果超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。由此可以看出如果Semaphore构造函数中传入的int型整数n=1,相当于变成了一个synchronized了。比如:数据库连接池给你分配 10个链接,那么让你来一个连一个,连到 10 个还没有人释放,那你就等等。
CountDownLatch ,闭锁。 Latch 门闩的意思,比如:说四个人一个漂流艇,坐满了就推下水。

BlockingQueue

概要:BlockingQueue,是java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put *** 作,并且BlockingQueue提供了超时return null的机制,在许多生产场景里都可以看到这个工具的身影。
队列类型
无限队列 (unbounded queue ) - 几乎可以无限增长
有限队列 ( bounded queue ) - 定义了最大容量
队列数据结构
队列实质就是一种存储数据的结构
通常用链表或者数组实现
一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
主要 *** 作:入队(EnQueue)与出队(Dequeue)

ArrayBlockingQueue

队列基于数组实现,容量大小在创建ArrayBlockingQueue对象时已定义好
数据结构如下图

BlockingQueue blockingQueue = new ArrayBlockingQueue<>();

应用场景
在线程池中有比较多的应用,生产者消费者场景
工作原理
基于ReentrantLock保证线程安全,根据Condition实现队列满时的阻塞

linkedBlockingQueue

是一个基于链表的无界队列(理论上有界)

BlockingQueue blockingQueue = new linkedBlockingQueue<>();

上面这段代码中,blockingQueue 的容量将设置为 Integer.MAX_VALUE 。
向无限队列添加元素的所有 *** 作都将永远不会阻塞,[注意这里不是说不会加锁保证线程安全],因此它可以增长到非常大的容量。
使用无限 BlockingQueue 设计生产者 - 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息 。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。

DelayQueue

由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现。
队列创建:

BlockingQueue blockingQueue = new DelayQueue();

要求
入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口
应用场景
电影票
工作原理:
队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

BlockingQueue API

BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

在构建生产者 - 消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。

多线程生产者-消费者示例

接下来我们创建一个由两部分组成的程序 - 生产者 ( Producer ) 和消费者 ( Consumer ) 。
生产者将生成一个 0 到 100 的随机数(十全大补丸的编号),并将该数字放在 BlockingQueue 中。我们将创建 16 个线程(潘金莲)用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。
需要记住的重要一点是,我们需要阻止我们的消费者线程无限期地等待元素出现在队列中。
从生产者(潘金莲)向消费者(武大郎)发出信号的好方法是,不需要处理消息,而是发送称为毒 ( poison ) 丸 ( pill ) 的特殊消息。 我们需要发送尽可能多的毒 ( poison ) 丸 ( pill ) ,因为我们有消费者(武大郎)。然后当消费者从队列中获取特殊的毒 ( poison ) 丸 ( pill )消息时,它将优雅地完成执行。
以下生产者的代码:

@Slf4j
public class NumbersProducer implements Runnable {
    private BlockingQueue numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
            log.info("潘金莲-{}号,给武大郎的泡药!",Thread.currentThread().getId());
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
            log.info("潘金莲-{}号,往武大郎的药里放入第{}颗毒丸!",Thread.currentThread().getId(),j+1);
        }
    }
}

我们的生成器构造函数将 BlockingQueue 作为参数,用于协调生产者和使用者之间的处理。我们看到方法 generateNumbers() 将 100 个元素(生产100副药给武大郎吃)放入队列中。它还需要有毒 ( poison ) 丸 ( pill ) (潘金莲给武大郎下毒)消息,以便知道在执行完成时放入队列的消息类型。该消息需要将 poisonPillPerProducer 次放入队列中。
每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素。从队列中取出一个 Integer 后,它会检查该消息是否是毒 ( poison ) 丸 ( pill )(武大郎看潘金莲有没有下毒) ,如果是,则完成一个线程的执行。否则,它将在标准输出上打印出结果以及当前线程的名称。

消费者:

@Slf4j
public class NumbersConsumer implements Runnable {
    private BlockingQueue queue;
    private final int poisonPill;

    public NumbersConsumer(BlockingQueue queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                log.info("武大郎-{}号,喝药-编号:{}",Thread.currentThread().getId(),number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。
既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素。
我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:

public class Main {

    public static void main(String[] args) {
        int BOUND = 10;
        int N_PRODUCERS = 16;
        int N_ConSUMERS = Runtime.getRuntime().availableProcessors();
        int poisonPill = Integer.MAX_VALUE;
        int poisonPillPerProducer = N_ConSUMERS / N_PRODUCERS;
        int mod = N_ConSUMERS % N_PRODUCERS;

        BlockingQueue queue = new linkedBlockingQueue<>(BOUND);
        //潘金莲给武大郎熬药
        for (int i = 1; i < N_PRODUCERS; i++) {
            new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
        }
        //武大郎开始喝药
        for (int j = 0; j < N_CONSUMERS; j++) {
            new Thread(new NumbersConsumer(queue, poisonPill)).start();
        }
        //潘金莲开始投毒,武大郎喝完毒药GG
        new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
    }

}

BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者(武大郎)。我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。这里要注意的最重要的事情是 BlockingQueue 用于协调它们之间的工作。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5686820.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存