关于AQS的一点体会 一独占模式

关于AQS的一点体会 一独占模式,第1张

      我们都知道在Java并发场景下JUC提供很多工具类,但是AbstrctQueuedSynchronizer这个抽象同步队列绝对是核心和基石。在理解这个框架运行机制中,很多抽象的逻辑让我们理解起来很困难,所以想分享一些自己在理解过程的心得,跟大家交流探讨,有不对的地方望各位大神指正,谢谢!

      实现独占模式的工具类有ReentrantLock,无非就是重写了tryAcquire() 和tryRelease()方法,以及公平与不公平获得锁机制的分别实现,ReentrantLock默认实现非公平获得锁机制。关于管理队列的任务还是由AQS内部来完成。我们一步一步来分析。

       首先获取尝试同步状态state,AQS同步队列 通过CAS方式来维护state这个字段,来保证原子性。默认初始化为0,当state=1时表示该node要被删除,state=-1表示该node对应的线程release时,要唤醒后继节点,state=-2时,说明该node在条件队列中,state=-3,表示向后继节点传播,在共享锁中才会使用到。


//acquire方式就是获取锁的方法调入口,所有获取独占锁的实现机制都是从该方法进入
public final void acquire(int arg) {
//tryAcquire()方法会被其它类根据业务需要被重写,它仅仅只是尝试获取同步状态state,不涉及任何关于队
//列的 *** 作,
        if (!tryAcquire(arg) &&
/**
*当无法直接获取同步状态后,会通过acquireQueued()方法直接被安排进入同步队列中
*/
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}



/**该方法为当前线程获取同步状态失败后,进入同步队列的准备工作,生成线程对应的node,并插入到同步队*列的尾部,
*/
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {

/**该步骤是维护整个队列的核心,在执行CAS方法设置尾节点时提前设置好node的pred前置节点,以此来保证*同步队列的顺序性和有效性,因为cas设置好尾节点后因该立刻执行的pred.next = node无法确定执行时间,*所以提前设置好node的pred至关重要。
*这一步也是 unparkSuccessor(h)的关键。即使 pred.next = node延后执行也不影响通过循环遍历
*获取最先进入同步队列的node节点,来唤醒对应的线程
*/
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                 pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
}

/**
*通过CAS自旋的方式将node节点以插入尾部的方式加入到同步队列
*同步队列设置了一个空的node当作哨兵,来代表已经获得同步状态的线程。可以通过对哨兵节点的字段*waitStatus进行设置,来告知该线程release时同步队列中是否有需要被唤醒的节点。如果有,则唤醒。
*/
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}






/** 该方法为进入同步队列的入口,通过for循环来实现cas自旋 获取同步状态方式,方法参数中node已经加入
*等待队列后进行 *** 作,当node的前节点pred 是head头节点时,首先尝试一下获取同步状态,这里考虑的点
*主要是1、提高响应效率,直接将当前线程挂起,然后等待唤醒浪费资源
*2、极有可能出现获取同步状态的线程在此时已经release,而当前线程进入挂起状态时没有pred节点可以
*唤醒当前线程,然后进入无限阻塞中
*/
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
/**无法获取同步状态后开始准备进入等待状态,提前完成两个任务:1、需要设置好通知信号,这个任务由*pred节点完成,当pred节点的waitStatus状态设置成-1(node.singal)时表示它对应的线程release释放
*同步状态后会调用unparkSuccessor(h)唤醒后继节点
*2、维护这个双向同步队列,删除无效节点(waitStatus=1),根据gc内存回收机制,无效节点的前后pred 
*和next节点脱钩后,没有直接或者间接被对象引用时,会被直接清理,所以利用这个机制只需要将
*无效节点脱钩就行,即可完成同步队列的维护工作。这里不用担心线程安全问题,因为进入同步队列的node节*点会严格按照进入队列的时机(也就是执行compareAndSetTail(pred, node)方法的顺序)排好顺序,node的*先后顺序始终会维持不变,直到被删除或者被唤醒执行,也是公平机制的实现。
*/
                if (shouldParkAfterFailedAcquire(p, node) &&
/**当设置好通知信号和同步队列维护后,调用JNI本地方法unsafe.park(this)将当前线程在此处挂起,
*此时可以安心挂起,因为就算前节点提前release(),调用unsafe.unpark()方法来唤醒next对应的当前线程
*设置了启动标志,当当前执行到unsafe.park()时,也会结束挂起状态,直接被唤醒。
*到这一步也就完成了入队 *** 作,等待perd前节点唤醒。
*/
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
/**
*在finally方式是处理node进入CAS自旋获得同步状态失败后的处理,看整个CAS自旋过程可能出现异常的步骤
*1、tryAcquire(arg),这个肯定不会,因为要是出现异常,早在自旋前acquire()方法获取同步状态时就已
*经出现异常,所以这个排除。
*2、 parkAndCheckInterrupt(),这个方法也不可能,因为本地方法unsafe.park(),虽然可以被中断唤醒,*但是不响应中断,不会抛出异常。
*3、shouldParkAfterFailedAcquire(p, node),这个方法也不可能,它仅仅是在进入阻塞前确保perd前置*节点已经设置好了通知标志(signal)没有抛出异常的可能
*当跳出acquire()方法再去看这个方法是,就有了抛出空指针异常的可能,该方法并非private修饰,所以可
*以在子类中被调用,当参数node并非已加入同步队列的节点时 final Node p = node.predecessor()就可
*能报出空指针异常,所以个人认为设计者的设计思路在于此。
*/
            if (failed)
                cancelAcquire(node);
        }
}


/**
*该静态方法仅仅只是为了给当前线程一个中断标志,好像有点多余,直接方法中使用*Thread.currentThread().interrupt()也可以实现同样的功能
*个人理解这是AQS框架预留的接口,没有使用final修饰该方法,可以在子类重写,实现更多对中断的响应需求
*而在doAcquireInterruptibl()可中断方法中就是抛出异常,所以本方法也预留了扩展的余地
*/
static void selfInterrupt() {
        Thread.currentThread().interrupt();
}


/** 
*该方法是线程释放同步状态的入口,tryRelease(arg)方法由实现子类重写,当顺利释放同步状态后
*如果同步队列中有等待node且head节点中waitStatus=1时(也就是说明有后继节点在等待唤醒中),那么执行*unparkSuccessor(h)方法来唤醒相对应的线程
*/
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

/**
*该方法时唤醒等待队列中的node
*/
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
/**s==null时需要从tail节点开始向前遍历,找到最新进入队列的等待node,说明node.next 节点入同步队
*列时 pred.next = node还没来得及设置所导致的。
*/
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
}
以上就是线程获取同步状态,以及获取失败后进入同步队列等待被唤醒的整个过程。

关于独占模式下还有另外一个实现方式就是条件队列。这个队列由ConditionObject对象来维护。

/**
*该方法时当前线程在获取同步状态的前提下,释放同步状态后进入条件队列的过程,并阻塞等待被唤醒的过程
*/ 

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
//生成当前线程对应的ConditionWaiter,并进入条件队列,
//因为condition作用在独占模式下,只有获得同步状态的前提下才能执行该方法,所以是线程安全的
//故在插入条件队列时不需要使用CAS方法 *** 作
            Node node = addConditionWaiter();
//完全释放同步状态:因为有可能重入,所以需要一次全释放干净,释放后会唤醒同步队列中的等待节点
            int savedState = fullyRelease(node);
            int interruptMode = 0;
/**
*在这个死循环体中当首次执行时因为对应的node还在条件队列中,所以直接被挂起。
*当线程被唤醒时首先要判断当前线程是是否被thread.interrupt()方法唤醒,因为unsafe.park()方法是可*以被thread.interrupt方法中断的,如果是说明取消等待,则将当前node转移到同步队列中,直接跳出死循*环。
*如果不是,则要判断当前线程对应的node是否被转移到同步队列中,如果没有,继续挂起,等待唤醒。
*/
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
/**
*唤醒后开始请求同步状态,上文已经讲到acquireQueued()方法(上文已经讲到),直至请求成功
*然后获取中断标志
*/
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
//这一步是维护整个条件等待队列,清理被删除的node节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
  }


//该方法是通知还在条件队列中的node,将其转移到同步队列中并唤醒对应的线程
public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
}
以上就是条件队列的执行机制。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/734007.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-27
下一篇 2022-04-27

发表评论

登录后才能评论

评论列表(0条)

保存