【并发编程】通俗易懂的来学ReentrantLock锁源码

【并发编程】通俗易懂的来学ReentrantLock锁源码,第1张

【并发编程】通俗易懂的来学ReentrantLock锁源码 ReentrantLock实例化方法

构造方法有俩个,默认不传值的时候使用的是非公平模式!性能在大量竞争的场景下,性能会更高一点。构造方法如果传一个Boolean值,true为公平锁,false为非公平锁

    
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

加锁:lock()方法跟踪

调用一个内部类的lock()方法,实际调用的是内部类FairSync或者内部类NonfairSync的lock方法

    public void lock() {
        // 调用一个内部类的lock()方法
        sync.lock();
    }
公平锁与非公平锁的lock方法
// 公平锁的lock方法
final void lock() {
    // 尝试入队
    acquire(1);
}

// 非公平锁的lock方法
final void lock() {
    // 通过CAS把状态设置为1
    if (compareAndSetState(0, 1))
        // 设置当前的执行现场是当前线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 尝试入队
        acquire(1);
}

可以从源码中看出:公平锁与非公平锁的区别是非公平锁多了一次cas *** 作,在cas失败才会添加到队列中。简单理解公平锁就是每次新的线程都会去排队!简单理解非公平锁就是每次新的线程过来都会尝试获取锁,获取不到锁采取排队! 入队方法详解:acquire(1)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

入队方法调用的第一个方法:tryAcquire(arg) 尝试去执行当前线程,他有俩种实现方式

protected final boolean tryAcquire(int acquires) {
    // 得到当前线程
    final Thread current = Thread.currentThread();
    // 获取当前线程的状态:线程有5种,AQS种有详细介绍
    int c = getState();
    // 初始化状态,表示当前节点在sync队列中,等待着获取锁。
    if (c == 0) {
        // 判断没有排队的对象,并且可以将当前的线程状态CAS到1。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 设置运行线程为当前的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 重入的逻辑判断当前线程和现在持有锁的线程是同一个线程
    else if (current == getExclusiveOwnerThread()) {
        // 重入了几次
        int nextc = c + acquires;
        // 重入的太多了,超过int的最大值,再次进行增加会变为负数,抛出异常
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 由于当前锁获取线程,直接可以设置状态
        setState(nextc);
        return true;
    }
    return false;
}


final boolean nonfairTryAcquire(int acquires) {
    // 得到当前线程
    final Thread current = Thread.currentThread();
    // 获取当前线程的状态:线程有5种,AQS种有详细介绍
    int c = getState();
    // 初始化状态,表示当前节点在sync队列中,等待着获取锁。
    if (c == 0) {
        // 并且可以将当前的线程状态CAS到1。
        if (compareAndSetState(0, acquires)) {
            // 设置运行线程为当前的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 重入的逻辑判断当前线程和现在持有锁的线程是同一个线程
    else if (current == getExclusiveOwnerThread()) {
        // 重入了几次
        int nextc = c + acquires;
        // 重入的太多了,超过int的最大值,再次进行增加会变为负数,抛出异常
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 由于当前锁获取线程,直接可以设置状态
        setState(nextc);
        return true;
    }
    return false;
}

公平锁比非公平锁多了一个校验:!hasQueuedPredecessors()

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // 未初始化的时候,队列头部尾部的值为null,相等----不满足
        // 只有一个的时候,头尾相等----不满足
        // 头部的下一个为null,说明只有一个----满足
        // 下一个的线程是当前线程,重入了----满足
        // 简单说:链表中只有一个或者链表是空的,返回false。链表中有多个,不满足重入的机制,返回false。只有链表中有多个数据并且持有线程是当前线程的时候才会返回true!
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
入队方法调用的第二个内部参数方法:addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
    // 创建一个当前线程的Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 得到之前的尾节点
    Node pred = tail;
    // 尾节点不为空,说明队列存在
    if (pred != null) {
        // 设置当前线程节点的上一个节点是之前的尾节点
        node.prev = pred;
        // cas尝试将尾节点设置为当前线程的节点
        if (compareAndSetTail(pred, node)) {
            // 设置之前尾节点,现在的倒数第二个节点的下一个节点是当前线程节点
            pred.next = node;
            // 返回当前节点
            return node;
        }
    }
    // CAS失败或者节点没有创建,会执行这入队的 *** 作。详细请看下面的代码
    enq(node);
    // 入队成功后,返回当前节点
    return node;
}


private Node enq(final Node node) {
    for (;;) {
        // 得到之前的尾节点
        Node t = tail;
        // 之前的尾节点为空,需要进行初始化队列
        if (t == null) { // Must initialize
            // 通过CAS的方式将头节点设置为当前节点
            if (compareAndSetHead(new Node()))
                // 头结点设置成功后,复制给尾节点。只有一个节点的状态
                tail = head;
        } else {
            // 设置当前线程节点的上一个节点是之前的尾节点
            node.prev = t;
            // cas尝试将尾节点设置为当前线程的节点
            if (compareAndSetTail(t, node)) {
                // 设置之前尾节点,现在的倒数第二个节点的下一个节点是当前线程节点
                t.next = node;
                // 返回当前节点!注意:这里是这个方法唯一返回的地方!也就是说初始化后还会继续循环一次来设置上一个下一个节点,然后进行返回。
                return t;
            }
        }
    }
}

入队方法调用的第二个方法:acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
final boolean acquireQueued(final Node node, int arg) {
    // 设置失败的标志位为true
    boolean failed = true;
    try {
        // 设置中断的标记位为false
        boolean interrupted = false;
        for (;;) {
            // 得到当前节点的上一个(前置)节点,前置节点为null,会抛出空指针异常
            final Node p = node.predecessor();
            // 如果前置节点是头结点,并且尝试执行当前线程成功
            if (p == head && tryAcquire(arg)) {
                // 可以执行了,就把当前节点设置为头结点
                setHead(node);
                // 前置节点去掉引用,方便GC去回收
                p.next = null; // help GC
                // 设置失败的标志位为false
                failed = false;
                // 返回中断的标记位:false
                return interrupted;
            }
            // 代码执行到这里,说明尝试获取锁,但是获取锁失败了。
            // 阻塞前的准备工作 *** 作成功(状态是-1的时候成功)
            // 将线程阻塞,等待他去唤醒。唤醒后返回线程的中断状态!
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 设置中断的标记位:false
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前驱当前节点的等待状态
    int ws = pred.waitStatus;
    // 状态为-1的时候:当前节点的后继节点包含的线程需要运行,也就是unpark;
    if (ws == Node.SIGNAL)
        
        // 前驱当前节点状态已经设置为SIGNAL,可以进行安全的阻塞
        return true;
    // 大于0(CANCELLED状态):表示当前的线程被取消;
    if (ws > 0) {
        
        // 前驱节点已经因为超时或响应了中断,需要跳过这些状态大于0的节点,直到找到一个状态不是大于0的。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 跳过中断的线程后,设置前驱节点的下一个节点为当前节点。
        pred.next = node;
    } else {
        
        // 针对于ReentrantLock,到这里的状态只能为0或者PROPAGATE(-3)
        // 通过CAS将前置节点的状态设置为SIGNAL(-1)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private final boolean parkAndCheckInterrupt() {
    // 阻塞
    LockSupport.park(this);
    // unpark之后,返回当前的中断状态,并清除中断标志位
    return Thread.interrupted();
}


private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    // 忽略节点不存在的时候
    if (node == null)
        return;

    // 设置当前节点的线程为null
    node.thread = null;

    // Skip cancelled predecessors:有前驱节点被取消,跳过所有被取消的
    // 得到前驱节点
    Node pred = node.prev;
    // 前驱节点的状态大于0,被取消
    while (pred.waitStatus > 0)
        // 将前驱结点的前驱结点设置为当前节点的前驱结点。简单理解就是将当前节点的前驱节点设置为第一个找到的正常状态(<=0)的前驱节点
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 获取当前节点的下一个节点
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 将当前节点状态设置为1(取消状态)。这里不用CAS的原因是这个执行完其他线程会跳过取消状态,这个执行前无其他线程在执行!
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // 如果当前节点是尾节点,将尾节点设置为上一个节点。简单理解就是移除当前节点。
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // 进入else说明node不是队尾(或者是队尾但是cas队尾失败(其实结果也不是队尾,因为被别的线程抢先了))
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        // 定义一个状态标识
        int ws;
        // 筛选后的前驱节点不是头结点
        // 并且当前节点状态为-1(等待唤醒)或者(当前节点不在运行或者不被取消(<= 0)并且可以将当前节点CAS到-1状态(等待唤醒))
        // 并且前驱节点有线程持有!
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 得到当前节点的下一个节点
            Node next = node.next;
            // 下一个节点不为空,并且下一个节点没有被取消
            if (next != null && next.waitStatus <= 0)
                // CAS将前一个节点与下一个节点连接。简单理解就是跳过(取消)当前节点!
                compareAndSetNext(pred, predNext, next);
        } else {
            // 唤醒下一个不被取消的节点!
            unparkSuccessor(node);
        }
        // 当前节点的下一个节点取消指向
        node.next = node; // help GC
    }
}
上面所有方法返回true后,调用selfInterrupt();
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

解锁:unlock()方法跟踪
public void unlock() {
    sync.release(1);
}


public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        // 释放锁成功,获取头结点
        Node h = head;
        // 头结点不为null并且当前节点的状态不在初始化状态
        if (h != null && h.waitStatus != 0)
            // unpark去唤醒队列中的下一个线程
            unparkSuccessor(h);
        // 返回解锁成功
        return true;
    }
    // 返回解锁失败
    return false;
}


protected final boolean tryRelease(int releases) {
    // 获取当前状态的state值与传入的releases。对应减去前面的重入次数
    int c = getState() - releases;
    // 当前线程不是现在获取锁的线程,抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 定义可以释放锁的标记位
    boolean free = false;
    // 回到了初始状态
    if (c == 0) {
        // 释放锁的标记位变为0
        free = true;
        // 设置持有锁的线程为null
        setExclusiveOwnerThread(null);
    }
    // 设置AQS的状态
    setState(c);
    // 返回锁的标记位
    return free;
}


private void unparkSuccessor(Node node) {
    
    // 获取当前节点的线程状态
    int ws = node.waitStatus;
    // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
    // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
    // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
    // 对于ReentrantLock,只能出现-1的状态
    if (ws < 0)
        // 通过CAS的方式,设置线程状态为0
        compareAndSetWaitStatus(node, ws, 0);

    
    // 获取下一个节点
    Node s = node.next;
    // 下一个节点为空或者下一个节点的状态大于0(CANCELLED,值为1,表示当前的线程被取消;)
    if (s == null || s.waitStatus > 0) {
        // 下一个节点直接置为null
        s = null;
        // 从尾部开始向前遍历,找到最前的一个处于正常阻塞状态的结点,直到节点重合
        // 从尾部遍历的原因是为了防止在高并发场景下漏掉线程
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 过滤后的下一个节点不为null,唤醒他
    if (s != null)
        // 唤醒下一个节点
        LockSupport.unpark(s.thread);
}
结束语

获取更多有价值的文章,让我们一起成为架构师!关注公众号,可以让你对MySQL有非常深入的了解关注公众号,每天持续高效的了解并发编程!关注公众号,后续持续高效的了解spring源码!这个公众号,无广告!!!每日更新!!!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716234.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存