构造方法有俩个,默认不传值的时候使用的是非公平模式!性能在大量竞争的场景下,性能会更高一点。构造方法如果传一个Boolean值,true为公平锁,false为非公平锁
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }加锁:lock()方法跟踪
调用一个内部类的lock()方法,实际调用的是内部类FairSync或者内部类NonfairSync的lock方法
public void lock() { // 调用一个内部类的lock()方法 sync.lock(); }公平锁与非公平锁的lock方法
// 公平锁的lock方法 final void lock() { // 尝试入队 acquire(1); } // 非公平锁的lock方法 final void lock() { // 通过CAS把状态设置为1 if (compareAndSetState(0, 1)) // 设置当前的执行现场是当前线程 setExclusiveOwnerThread(Thread.currentThread()); else // 尝试入队 acquire(1); }
可以从源码中看出:公平锁与非公平锁的区别是非公平锁多了一次cas *** 作,在cas失败才会添加到队列中。简单理解公平锁就是每次新的线程都会去排队!简单理解非公平锁就是每次新的线程过来都会尝试获取锁,获取不到锁采取排队! 入队方法详解:acquire(1)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }入队方法调用的第一个方法:tryAcquire(arg) 尝试去执行当前线程,他有俩种实现方式
protected final boolean tryAcquire(int acquires) { // 得到当前线程 final Thread current = Thread.currentThread(); // 获取当前线程的状态:线程有5种,AQS种有详细介绍 int c = getState(); // 初始化状态,表示当前节点在sync队列中,等待着获取锁。 if (c == 0) { // 判断没有排队的对象,并且可以将当前的线程状态CAS到1。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 设置运行线程为当前的线程 setExclusiveOwnerThread(current); return true; } } // 重入的逻辑判断当前线程和现在持有锁的线程是同一个线程 else if (current == getExclusiveOwnerThread()) { // 重入了几次 int nextc = c + acquires; // 重入的太多了,超过int的最大值,再次进行增加会变为负数,抛出异常 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 由于当前锁获取线程,直接可以设置状态 setState(nextc); return true; } return false; } final boolean nonfairTryAcquire(int acquires) { // 得到当前线程 final Thread current = Thread.currentThread(); // 获取当前线程的状态:线程有5种,AQS种有详细介绍 int c = getState(); // 初始化状态,表示当前节点在sync队列中,等待着获取锁。 if (c == 0) { // 并且可以将当前的线程状态CAS到1。 if (compareAndSetState(0, acquires)) { // 设置运行线程为当前的线程 setExclusiveOwnerThread(current); return true; } } // 重入的逻辑判断当前线程和现在持有锁的线程是同一个线程 else if (current == getExclusiveOwnerThread()) { // 重入了几次 int nextc = c + acquires; // 重入的太多了,超过int的最大值,再次进行增加会变为负数,抛出异常 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 由于当前锁获取线程,直接可以设置状态 setState(nextc); return true; } return false; }
公平锁比非公平锁多了一个校验:!hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // 未初始化的时候,队列头部尾部的值为null,相等----不满足 // 只有一个的时候,头尾相等----不满足 // 头部的下一个为null,说明只有一个----满足 // 下一个的线程是当前线程,重入了----满足 // 简单说:链表中只有一个或者链表是空的,返回false。链表中有多个,不满足重入的机制,返回false。只有链表中有多个数据并且持有线程是当前线程的时候才会返回true! return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }入队方法调用的第二个内部参数方法:addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) { // 创建一个当前线程的Node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 得到之前的尾节点 Node pred = tail; // 尾节点不为空,说明队列存在 if (pred != null) { // 设置当前线程节点的上一个节点是之前的尾节点 node.prev = pred; // cas尝试将尾节点设置为当前线程的节点 if (compareAndSetTail(pred, node)) { // 设置之前尾节点,现在的倒数第二个节点的下一个节点是当前线程节点 pred.next = node; // 返回当前节点 return node; } } // CAS失败或者节点没有创建,会执行这入队的 *** 作。详细请看下面的代码 enq(node); // 入队成功后,返回当前节点 return node; } private Node enq(final Node node) { for (;;) { // 得到之前的尾节点 Node t = tail; // 之前的尾节点为空,需要进行初始化队列 if (t == null) { // Must initialize // 通过CAS的方式将头节点设置为当前节点 if (compareAndSetHead(new Node())) // 头结点设置成功后,复制给尾节点。只有一个节点的状态 tail = head; } else { // 设置当前线程节点的上一个节点是之前的尾节点 node.prev = t; // cas尝试将尾节点设置为当前线程的节点 if (compareAndSetTail(t, node)) { // 设置之前尾节点,现在的倒数第二个节点的下一个节点是当前线程节点 t.next = node; // 返回当前节点!注意:这里是这个方法唯一返回的地方!也就是说初始化后还会继续循环一次来设置上一个下一个节点,然后进行返回。 return t; } } } }入队方法调用的第二个方法:acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
final boolean acquireQueued(final Node node, int arg) { // 设置失败的标志位为true boolean failed = true; try { // 设置中断的标记位为false boolean interrupted = false; for (;;) { // 得到当前节点的上一个(前置)节点,前置节点为null,会抛出空指针异常 final Node p = node.predecessor(); // 如果前置节点是头结点,并且尝试执行当前线程成功 if (p == head && tryAcquire(arg)) { // 可以执行了,就把当前节点设置为头结点 setHead(node); // 前置节点去掉引用,方便GC去回收 p.next = null; // help GC // 设置失败的标志位为false failed = false; // 返回中断的标记位:false return interrupted; } // 代码执行到这里,说明尝试获取锁,但是获取锁失败了。 // 阻塞前的准备工作 *** 作成功(状态是-1的时候成功) // 将线程阻塞,等待他去唤醒。唤醒后返回线程的中断状态! if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 设置中断的标记位:false interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱当前节点的等待状态 int ws = pred.waitStatus; // 状态为-1的时候:当前节点的后继节点包含的线程需要运行,也就是unpark; if (ws == Node.SIGNAL) // 前驱当前节点状态已经设置为SIGNAL,可以进行安全的阻塞 return true; // 大于0(CANCELLED状态):表示当前的线程被取消; if (ws > 0) { // 前驱节点已经因为超时或响应了中断,需要跳过这些状态大于0的节点,直到找到一个状态不是大于0的。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 跳过中断的线程后,设置前驱节点的下一个节点为当前节点。 pred.next = node; } else { // 针对于ReentrantLock,到这里的状态只能为0或者PROPAGATE(-3) // 通过CAS将前置节点的状态设置为SIGNAL(-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { // 阻塞 LockSupport.park(this); // unpark之后,返回当前的中断状态,并清除中断标志位 return Thread.interrupted(); } private void cancelAcquire(Node node) { // Ignore if node doesn't exist // 忽略节点不存在的时候 if (node == null) return; // 设置当前节点的线程为null node.thread = null; // Skip cancelled predecessors:有前驱节点被取消,跳过所有被取消的 // 得到前驱节点 Node pred = node.prev; // 前驱节点的状态大于0,被取消 while (pred.waitStatus > 0) // 将前驱结点的前驱结点设置为当前节点的前驱结点。简单理解就是将当前节点的前驱节点设置为第一个找到的正常状态(<=0)的前驱节点 node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. // 获取当前节点的下一个节点 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 将当前节点状态设置为1(取消状态)。这里不用CAS的原因是这个执行完其他线程会跳过取消状态,这个执行前无其他线程在执行! node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. // 如果当前节点是尾节点,将尾节点设置为上一个节点。简单理解就是移除当前节点。 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 进入else说明node不是队尾(或者是队尾但是cas队尾失败(其实结果也不是队尾,因为被别的线程抢先了)) // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. // 定义一个状态标识 int ws; // 筛选后的前驱节点不是头结点 // 并且当前节点状态为-1(等待唤醒)或者(当前节点不在运行或者不被取消(<= 0)并且可以将当前节点CAS到-1状态(等待唤醒)) // 并且前驱节点有线程持有! if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 得到当前节点的下一个节点 Node next = node.next; // 下一个节点不为空,并且下一个节点没有被取消 if (next != null && next.waitStatus <= 0) // CAS将前一个节点与下一个节点连接。简单理解就是跳过(取消)当前节点! compareAndSetNext(pred, predNext, next); } else { // 唤醒下一个不被取消的节点! unparkSuccessor(node); } // 当前节点的下一个节点取消指向 node.next = node; // help GC } }上面所有方法返回true后,调用selfInterrupt();
static void selfInterrupt() { Thread.currentThread().interrupt(); }解锁:unlock()方法跟踪
public void unlock() { sync.release(1); } public final boolean release(int arg) { // 尝试释放锁 if (tryRelease(arg)) { // 释放锁成功,获取头结点 Node h = head; // 头结点不为null并且当前节点的状态不在初始化状态 if (h != null && h.waitStatus != 0) // unpark去唤醒队列中的下一个线程 unparkSuccessor(h); // 返回解锁成功 return true; } // 返回解锁失败 return false; } protected final boolean tryRelease(int releases) { // 获取当前状态的state值与传入的releases。对应减去前面的重入次数 int c = getState() - releases; // 当前线程不是现在获取锁的线程,抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 定义可以释放锁的标记位 boolean free = false; // 回到了初始状态 if (c == 0) { // 释放锁的标记位变为0 free = true; // 设置持有锁的线程为null setExclusiveOwnerThread(null); } // 设置AQS的状态 setState(c); // 返回锁的标记位 return free; } private void unparkSuccessor(Node node) { // 获取当前节点的线程状态 int ws = node.waitStatus; // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark; // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中; // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行; // 对于ReentrantLock,只能出现-1的状态 if (ws < 0) // 通过CAS的方式,设置线程状态为0 compareAndSetWaitStatus(node, ws, 0); // 获取下一个节点 Node s = node.next; // 下一个节点为空或者下一个节点的状态大于0(CANCELLED,值为1,表示当前的线程被取消;) if (s == null || s.waitStatus > 0) { // 下一个节点直接置为null s = null; // 从尾部开始向前遍历,找到最前的一个处于正常阻塞状态的结点,直到节点重合 // 从尾部遍历的原因是为了防止在高并发场景下漏掉线程 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 过滤后的下一个节点不为null,唤醒他 if (s != null) // 唤醒下一个节点 LockSupport.unpark(s.thread); }结束语
获取更多有价值的文章,让我们一起成为架构师!关注公众号,可以让你对MySQL有非常深入的了解关注公众号,每天持续高效的了解并发编程!关注公众号,后续持续高效的了解spring源码!这个公众号,无广告!!!每日更新!!!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)