特立独行的ReentrantLock

特立独行的ReentrantLock,第1张

问:“ 如果有一间外表很普通,内心很迷的厕所,但是每次只允许进入一人,面对一群人的抢占资源的情况时,如果你是负责看守的管理员,你会怎么解决呢?”
答:“这还不简单?派一个监管人,每放行一人就在门上会加锁,防止其他人进入,当使用者出来时,释放锁,招呼后面排队等待的人不就好了”
问:“如果成功进入,打开发现里面嵌套了很多房间,需要层层打开才会找到,你又会怎么解决呢?”
答:“我会在每个子房间内都派一个监守员来保证一种独占模式,判断房间内是否有人,没人则放行,有人则继续等待”
(估计进去的人看到这个解决方法会疯掉。。。)

根据上述场景提出问题:
1. 面对多人抢占资源采取什么解决措施
2. 面对层层房间校验,有什么更好的解决方式

在多线程并发场景也会发生这样的问题,假设很多线程都访问变量(临界资源),如果所有线程都是“读” *** 作,其实并不会发生任何问题,如果其中有线程尝试对临界资源执行“写” *** 作,就会发生问题,就要思考要不要对临界资源加锁来保证线程之间安全问题,随着技术的迭代更新,Doug Lea老先生在jdk1.5引进了ReentrantLock(重入锁)来解决多线程之间的线程安全问题,下面让我们一起来看看ReentrantLock的内心世界吧!

首先,我们先构建一把独占模式的锁

ReentrantLock reentrantLock = new ReentrantLock();

点开它的构造方法

   public ReentrantLock() {
        sync = new NonfairSync();
    }

可以看出默认生成NonfairSync(是非公平锁)
下面我们就一起来看看非公平锁的lock()方法加锁逻辑吧

 final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

1、compareAndSetState(0,1) : 共享变量首先会存在主内存中,如果有线程想 *** 作共享变量时,会先从主内从拷贝一份副本数据存在寄存器中,之后比对寄存器中的值和主内存的值是否相同,如果不同,证明有其他线程尝试更改了共享变量的值,如果相同,证明没有其他线程尝试更新,并且抢占资源成功,并更改主内存的值,防止其他线程 *** 作,达到加锁的目的。
2、setExclusiveOwnerThread:设置独占线程,代表标记一下抢占成功的线程
3、acquire:对于抢占失败的线程会执行

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

1、可以看出acquire函数包含了很多行为,首先会再让抢占失败的线程执行一次tryAcquire,尝试获取锁,这样设计的原因是因为如果抢占成功的线程执行的业务逻辑非常简单,立刻释放了锁,这样后面抢占失败的线程再次尝试抢占锁,就有可能抢占成功。
2、acquireQueued方法包含了addWaiter方法,由方法名称可知,对于第二次尝试获取锁失败的线程,会执行入队 *** 作。

 protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

这个方法是AbstractQueneSynchronizier(AQS)提供,具体还要看ReentrantLock对此方法的实现。

   protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
   final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

现在我们来分析一下对于第二次尝试获取锁的逻辑
1、获取当前线程,并且获取state的值,如果state的值为0,证明当前没有线程持有锁,可以尝试CAS *** 作获取锁,获取锁成功后设置独占线程。
2、如果state的值不为0,证明当前是有持有锁的线程未释放,如果当前持有线程和之前独占线程是一样的,是不需要再次尝试获取锁的,在旧值state的基础上加1,此处就是ReentrantLock对可重入性的具体实现,当溢出时则抛出异常,也就是说最大可重入次数为 (1<<31)-1次。
3、如果state的值不为0,并且当前线程不是之前所绑定的独占线程,就要执行下面的入队 *** 作acquireQueued方法,在执行acquireQueued方法之前,首先执行addWaiter方法。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

参数为当前获取锁失败的线程,首先会生成一个Node节点,传入当前线程,此时tail尾结点是空,因为队列未存在,执行enq方法初始化队列。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

1、因为尾结点为null,队列不存在,所以会执行第一个if中的逻辑,采用CAS将新生成的Node节点设置为头节点,并且把尾结点tail执行head,采用CAS是为了避免多线程时出现的问题,如果没有采用CAS来设置头,会出现如下问题:
*(1)*线程1执行到if中,成功设置了新节点为头节点,但是并没有设置tail尾结点指向头节点head,现在的tail依然为null,碰巧发生了上下文切换,时间片分给了线程2,因为tail为null,所以依然会执行enq方法进入队列的初始化
*(2)*线程2也会新创建一个节点,并且设置也成功设置了头节点,原来head指向的线程1生成的Node节点,现在变成head指向线程2生成的Node节点,这导致了获取锁失败的线程没有成功进入统一的队列中。
2、成功设置头节点后,因为外层是无条件for循环,由于上一步队列的初始化导致head和tail都指向了新生成的空Node节点,所以tail不再为null,执行else的逻辑,node.pre = t 让即将加入队列的Node节点的pre引用,指向了t,而t就是tail,tail又指向了队列头节点,最终node的前驱pre指向了头节点,之后通过CAS *** 作设置新加入的节点为尾结点,并且使新加入节点的前驱节点(此时是头节点)的next引用指向自己,最终返回尾结点的前驱节点。
相同,设置尾结点如果未使用CAS来设置也会发生如设置头节点类似的问题。


总结:

此函数只会调用一次,初始化一次队列,之后将节点加入到队列尾部,设置节点前驱引用,以及前驱节点的后继引用。虽然函数返回了尾结点的前驱节点,但是调用方并没有用到此返回值。函数使用两次CAS *** 作以及for(;;) CPU空转最终可以实现,加入的节点一定会成功加入到队列中,以及head和tail都能够成功设置,不会存在多线程问题导致head和tail设置不准确的问题。


    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

继续回到addWaiter方法来看,如果第二个抢占锁失败的节点执行加入队列的 *** 作,会执行if方法里面的逻辑,因为第一个节点的加入已经让队列初始化完毕,所以尾结点不再是null。
1、新加入的节点的前驱引用pre指向尾结点,通过CAS *** 作设置新节点为尾结点,并且前驱节点的next引用指向新节点,最终将新节点返回。


总结

新节点成功入队,并且成功设置尾结点,将尾结点返回
注意:
CAS设置尾结点后,前驱节点的next指向当前节点,会不会发生问题,当设置成功尾结点后,还没有设置next时发生了上下文切换,会导致从头结点开始通过next引用往下面找时,发现next为null,认为头结点后面没有其他节点了,但事实上,队列已经加入了很多节点。



 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

节点成功加入到队列中,就要调用acquireQueued方法
1、首先通过predecessor方法获取节点的前驱节点
2、判断前驱节点是否是头节点,如果是头节点,证明此时的节点处于排队等待的第一位,会进行一次尝试获取锁的方法,如果不是头节点,就要调用shouldparkAfterFailedAcquire方法,我们先来看一下这个方法做了什么。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

在每一个节点中维护了一个waitStatus的字段(等待状态)
(1)waitStatus = 0 初始化状态(默认),一种中间状态
(2)waitStatus = -1 (SIGNAL),代表它后面需要有被唤醒的节点等待它去唤醒。
(3)waitStatus = 1 (CANCELLED), 代表此节点处于取消状态,是不参与队列排队的,线程在等待中发现不想等了,而“取消” 的一种状态表现。
(4)waitStatus = -2 (CONDITION) 条件等待队列(暂时没用到)
(5)waitStatus = -3 (PROPAGATE)传播(暂时没用到)
首先要搞清楚,传进来的参数是前驱节点以及新节点
1、首先判断前驱节点的waitStatus是否为SIGNAL,如果是的话直接返回
2、如果waitStatus大于0,也就是处于“取消”状态,意味着此节点为无效节点,需要我们跳过这些无效节点,寻找有效的节点,修改pre和next引用,将这些无效节点剔除队列。
3、pre = pre.pre代表pre指向前驱节点的pre引用,意味向前移动一个(新前驱),并且把新节点的pre引用指向它,当新前驱的waitStatus小于等于0时才可以跳出循环,如果不是,证明新前驱也是“取消”状态的节点,继续重复此 *** 作向前寻找有效节点,如果找到,则跳出循环,把新前驱的next引用指向新节点,这样整个队列就会跳过无效节点。
4、如果以上条件都不符合,则会进入else的逻辑,只有一个CAS *** 作,把前驱节点waitStatus设置为SIGNAL状态


总结:

可以看出此函数的目的就是想把前驱节点的waitStatus设置为SIGNAL状态,作者当时设置的意图时,如果节点加入到队列中,又不是头节点时,就该执行阻塞 *** 作,而阻塞前必须保证有人可以来唤醒自己,而这个人就是前驱节点,前驱节点为SIGNAL状态意味着要唤醒后继节点。


  if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
	}

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

shouldParkAfterFailedAcquire方法其实就是调用parkAndCheckInterrupt方法的提前准备工作,前驱节点已经成功被设置为SIGNAL状态,此节点就可以放心的被阻塞了,我们来看看parkAndCheckInterrupt方法做了什么吧。

   private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

其实就是调用了park方法来阻塞当前线程。

  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

我们再回到这里,刚刚讲述的是前驱节点不是头节点的逻辑,如果前驱节点是头结点,就会再次尝试tryAcquire来获取锁,如果获取锁成功
1、将当前节点设置为头节点,因为tryAcquire保证了只有一个线程获取成功,所以这里的setHead不需要CAS,不会有线程安全问题。
2、清空旧head节点的next引用,脱离队伍,帮助gc。
最后来看下finally中的逻辑

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

如果一个节点将要被cancel,那么它就应该移除出队列,通过更改pre和next的值来 *** 作。
1、执行while循环来剔除掉node前面状态是cancel(无效)的节点。
2、此时的pred应该指向的是一个有效的节点(waitStatus<=0)。
3、记录pred节点的后继节点,并且设置当前节点的waitStatus为cancel。
4、判断当前节点是否是尾结点,如果是就把pred设置为尾结点,并且设置pre的next引用为null,这样当前节点就成功被剔除队列。
5、如果当前节点不是尾结点
(1)pred不是头节点
(2)并且pre的waitStatus是SIGNAL状态,或者是一个有效节点但是非SIGNAL状态的值,并且可以成功CAS成SIGNAL状态(换句话说只要pred是个有效节点就OK)
(3)pred.thread不为空,一般不会为空,thread存储的是线程变量
满足以上三点并且cancle节点的后继不为空,通过调整pred.next = cancle的后继来将cancle节点剔除。
思考一下,为什么要满足这些条件,cancle节点如果后面还有节点的话,并且是抢占锁失败进行阻塞的节点时,cancle节点需要唤醒它,所以cancle节点的waitStatus一定是SIGNAL状态才可以,此时cancle节点即将退出队列,帮它完成这项任务的就是pred,所以必须保证pred的waitStatus是SIGNAL才可以继续往后面唤醒。
(4)如果pred是头结点
回顾一下进入队列后的逻辑,首先会判断节点的前驱是否是头节点,如果是的话需要进行尝试获取锁 *** 作,所以在这里如果pred是头节点的话,那么后续节点是要被唤醒的,由于cancle的退出,必须要调用unparkSuccessor来唤醒cancle的后继节点。

至此获取锁的逻辑已经完成(非公平锁)
我们来看看如果抢占到资源的线程释放锁,会有什么 *** 作吧。

 public void unlock() {
        sync.release(1);
    }
 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release做的事情分为两步
1、释放锁资源
2、唤醒后继节点
先来看一下tryRelease是如何释放锁资源

 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

1、当前state的值减1
2、判断当前线程和独占线程是否一致,不一致抛出异常
3、如果state的值减到0,说明锁释放成功,如果一个线程多次重入,就要执行多次释放锁逻辑,state不为0,其他线程依然抢不到锁。


总结:

和加锁逻辑一样,加锁成功时state会加1,同时设置独占线程属性,释放锁时也要还原这些值。


   public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

继续来看唤醒逻辑

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

参数:头节点
获取头节点的waitStatus的值,通过CAS强制设置为中间状态0,这都好理解,释放锁之后,自然状态处于中间位。
首先看第一个if语句中的逻辑
1、先看for循环,从尾结点开始,不断向前开始遍历,到头节点结束,如果节点的waitStatus<=0(有效节点),s记录此时的节点。
作用:找到一个离头节点最近且有效的节点
2、找到后,调用uppark方法来阻塞该线程。

问题: (1)既然为了找到离头节点最近的有效节点,从头节点遍历岂不是更快可以找到,为什么从尾结点开始遍历?
(2)遍历的条件是 头节点的下一个节点为null(s==null)或者下一个节点的waitStatus>0(s.waitStatus>0),下一个节点的状态大于0,这个号理解,因为waitStatus大于0,说明处于“取消”状态,非有效节点,所以要往后遍历找到一个有效的节点开始唤醒,可是下一个节点为null,为什么也满足条件?作者是要解决什么问题?

   private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

是否还记得分析这个入队 *** 作时可能会发生的问题以及队列图?不记得的朋友可以往上翻看一下,下面分析一下上述的问题。

答: 这两个问题其实归为一个问题来回答,在addWaiter入队 *** 作时,CAS设置尾结点以及前驱结点的next引用,这两个 *** 作并不是原子性,只可保证安全的设置尾结点,很有可能在多线程情况下发生pre.next还没有来得及设置就发生上下文切换,导致pre.next
= null,如果按照这样的情况来分析,就会导致从头节点开始遍历,next都是null,导致无法找到后续节点。

下面我们来看看获取公平锁的实现逻辑

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

可以看出它和非公平锁的不同在于,多了hasQueuedPredecessors判断是否有前驱节点,非公平锁不需要判断此逻辑,直接可以进行一次获取锁逻辑,失败才会入队。

  public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

1、复制一份头节点和尾结点的副本
2、如果头结点不等于尾结点,说明队列中除了头节点,还有其他节点存在
3、h.next获取头节点后的节点,如果是null,或者当前时间片分到的线程不是head.next节点,也就是说当前线程不是排在队伍的第二个,说明前面是有前驱节点的,这样此线程就不能尝试执行获取锁逻辑。
4、相反,没有前驱节点,则可以尝试获取锁。
至此ReentranckLock所有逻辑都已分析完。


总结

Reentrantlock关于入队和出队 *** 作都是AQS提供的,而获取锁释放锁的逻辑是它特有的方法,如果想实现自己的锁,只需要继承AQS并且重写tryAcquire方法即可。
在重入锁方法的实现中,作者将其放到了tryAcquire方法里面,而且是else中,可能作者认为它的优先级是比较低的,而我觉得如果将其逻辑放到lock方法中的CAS前会不会更好一些,这样重入锁可以省去一次CAS的失败。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/736212.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存