RentrantLock与AQS详解

RentrantLock与AQS详解,第1张

RentrantLock与AQS详解 基本概念

对应源码里面的类,可以理解这些基本概念,如下:

队列同步器(同步器):AbstractQueuedSynchronizer(AQS)

自定义同步器:java.util.concurrent.Semaphore.Sync、java.util.concurrent.locks.ReentrantLock.Sync等

同步组件:ReentrantLock、Semaphore、CountDownLatch等

如何实现ReentrantLock?

ReentrantLock实现了Lock接口。它提供了这些获取的方法。

我们继续看它的构造方法:

public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

可以看出,通过构造方法,调用不同的自定义同步器,默认是获取的公平锁。

继续看NonfairSync和FairSync是如何实现的。

从上面的类定义可以看出,NonfairSync和FairSync 的实现是继承了Sync这个自定义同步器。这里面就是核心。Sync这个自定义同步器是继承自AQS,AQS里面提供了一些可以被重写的方法:

以上这些方法的被推迟到子类去,也就是Sync中实现。而他又提供了一些模板方法,这些模板方法如下:

我们回到ReentrantLock里面看,到底是如何实现,我们可以看到再Sync里面定义了一个抽象方法Lock,而Lock接口提供的lock方法调用了自定义同步器Sync的lock方法。

public void lock() {
    sync.lock();
}

继续往Sync里面看,发现他里面定义了一个抽象方法。那也就是再Sync的子类中有具体实现。

/**
 * Performs {@link Lock#lock}. The main reason for subclassing
 * is to allow fast path for nonfair version.
 */
abstract void lock();

我们看下他的具体实现,可以看出它调用了acquire()方法。acquire方法正是我们提到的AQS里面提供的模板方法。

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

我们继续看AQS里面accuire是如何实现的,可以看出它调用了tryAcquire方法。而这个tryAcquire在AQS里面没有具体实现,他的实现在子类Sync里面。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

我们看看Sync里面的tryAcquire的具体实现吧,可以看出它调用了Sync子类的的一个非公平锁获取的方法

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

这个非公平锁获取的定义如下:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

以上基本把ReentrantLock的实现给串起来了。其核心就是锁是给用户的,屏蔽了具体实现,也就是接口那些方法。而同步器则是锁的具体实现者。这个实现者指的就是自定义同步器Sync提供的方法。这些方法只关注状态设置。具体AQS有关队列的细节都不需要关注。

AQS源码解析
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

compareAndSetState(0, 1)是CAS *** 作,也就是检查内存里面的值是不是期望值0,如果是0的话,则将状态修改为更新值1.成功的话返回true。之后 setExclusiveOwnerThread(Thread.currentThread());也就是当前线程拿到锁,进入同步状态。compareAndSetState方法如下,可以看见是用了CAS *** 作。

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

stateOffset则是内存中的地址。该方法参数分别解释如下,var2,var4,var5对应CAS *** 作的三个值:

  • val1:对象本身,这里是AbstractQueuedSynchronizer(AQS)

  • var2:该对象值得引用地址

    stateOffset = unsafe.objectFieldOffset
        (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
    
  • var4:期望值

  • var5:要更新的值

如果上面没有返回ture,说明当前线程无法同步,则会进入acquire方法。acquire方法是定义在AQS中的一个模板方法。如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

该方法首先调用tryAcquire方法,看一下它是如何实现的。它的实现在Sync这个自定义同步器中。

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //检查状态,如果是0的话,再次尝试获取锁
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果没有获取锁,获取当前的线程,看是否等于获取锁的线程
    else if (current == getExclusiveOwnerThread()) {
        // 更新nextc
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //重新设置状态
        setState(nextc);
        return true;
    }
    return false;
}

有个疑问这里为什么会判断两次。总之是还是判断当前线程有没有获取到锁。如果没有返回true,则进入

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

传的参数是static final Node EXCLUSIVE = null;说明当前线程没有获取锁。继续看addWaiter方法

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //将Node的尾节点设置为前驱节点
    Node pred = tail;
    if (pred != null) {
        //将前驱节点设置为当前节点的前驱节点
        node.prev = pred;
        //csa设置尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter中可以看出新建了一个Node节点,将当前线程加入Node中,compareAndSetTail会将新建节点加入链表中,返回当前节点。如果第一次没有加入成功,会进入enq方法中,循环尝试加入尾节点,知道成功为止。

private Node enq(final Node node) {
    for (;;) {
        //将节点t指向尾节点
        Node t = tail;
        if (t == null) { // Must initialize
            //t为空的话,就设置头节点
            if (compareAndSetHead(new Node()))
                //将尾节点指向头结点
                tail = head;
        } else {
            //t如果不为空的话,设置t为前驱节点
            node.prev = t;
            //设置尾节点,如果成功的话
            if (compareAndSetTail(t, node)) {
                //将头节点和next节点连起来
                t.next = node;
                //返回节点
                return t;
            }
        }
    }
}

之后节点会进入acquireQueued方法,等待获取同步状态,我们进这个方法看一下,可以看出同步队列一直在自旋获取同步。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取前驱节点
            final Node p = node.predecessor();
            //如果前驱是头节点,并能同步的话
            if (p == head && tryAcquire(arg)) {
                //设置当前节点为头结点
                setHead(node);
                //将p和下一个节点断开,也就是p进入了同步状态
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

总结:在获取同步状态时,同步器维护一个人同步队列,获取状态失败的线程会加入到队列中并在队列中自旋,指导前驱节点为头结点并获取同步状态。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/800171.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存