ReentrantLock为可重入锁,有公平锁实现和非公平锁实现。默认为非公平锁实现。具体实现是通过3个静态内部类来实现的。
分别为Sync 和 NoFairSync 以及 FairSync
先来看看常用的方法
public void lock() { sync.lock();}
public boolean tryLock() { return sync.tryLock();}public void unlock() { sync.release(1);}public Condition newCondition() { return sync.newCondition();}
可以看出都是交由内部类的实例来完成
private final Sync sync;
@ReservedStackAccessfinal void lock() { if (!initialTryLock()) acquire(1);}
看看lock的具体实现,这里是个模板模式initialTryLock由Sync的子类公平锁和非公平锁实现
//非公平 final boolean initialTryLock() { Thread current = Thread.currentThread(); if (compareAndSetState(0, 1)) { // first attempt is unguarded setExclusiveOwnerThread(current); return true; } else if (getExclusiveOwnerThread() == current) { int c = getState() + 1; if (c < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(c); return true; } else return false; }
//公平 final boolean initialTryLock() { Thread current = Thread.currentThread();//获得当前线程 int c = getState();//得到锁状态 if (c == 0) {//为0说明无锁 if (!hasQueuedThreads() && compareAndSetState(0, 1)) {//如果没有其他线程在等待,cas尝试获得锁 setExclusiveOwnerThread(current);//获得成功,设置当前线程为拥有锁的线程 return true; } } else if (getExclusiveOwnerThread() == current) { //进行重入判断 if (++c < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(c);//重入就将state++表明重入次数 return true; } //获得锁失败 return false; }
可以看到与非公平锁不同,公平锁不直接CAS,而是在指导当前没线程得到锁时,先判断有无线程在等待队列,没有才CAS尝试获得锁.也就是公平和非公平的区别,
再来看Sync中的lock,发现获得锁失败后,执行了个acquire方法,
@ReservedStackAccess final void lock() { if (!initialTryLock()) acquire(1); }
这是继承的AQS的方法,再来看看AQS(AbstractQueuedSynchronizer)
AQS中定义了一个内部类Node
abstract static class Node { volatile Node prev; // initially attached via casTail volatile Node next; // visibly nonnull when signallable Thread waiter; // visibly nonnull when enqueued volatile int status; // written by owner, atomic bit ops by others // methods for atomic operations final boolean casPrev(Node c, Node v) { // for cleanQueue return U.weakCompareAndSetReference(this, PREV, c, v); } final boolean casNext(Node c, Node v) { // for cleanQueue return U.weakCompareAndSetReference(this, NEXT, c, v); } final int getAndUnsetStatus(int v) { // for signalling return U.getAndBitwiseAndInt(this, STATUS, ~v); } final void setPrevRelaxed(Node p) { // for off-queue assignment U.putReference(this, PREV, p); } final void setStatusRelaxed(int s) { // for off-queue assignment U.putInt(this, STATUS, s); } final void clearStatus() { // for reducing unneeded signals U.putIntOpaque(this, STATUS, 0); } private static final long STATUS = U.objectFieldOffset(Node.class, "status"); private static final long NEXT = U.objectFieldOffset(Node.class, "next"); private static final long PREV = U.objectFieldOffset(Node.class, "prev"); }
可以看出这个节点有前驱和后继指针,那么这个节点形成的链表将会是双向的
在AQS中又有头节点和尾节点的定义
private transient volatile Node head; private transient volatile Node tail;
结合AQS的名字,AQS就是一个同步队列,队列通过链表实现,节点就是内部类的节点。继续看acquire
public final void acquire(int arg) { if (!tryAcquire(arg)) acquire(null, arg, false, false, false, 0L); }
首先再次尝试获得锁,tryAcquire(arg).这里就不看了先。
然后调用AQS的另一个acquire方法,接下来就是究极折磨,看这段头都大了
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { //获得当前线程 Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // retries upon unpark of first thread boolean interrupted = false, first = false; Node pred = null; // predecessor of node when enqueued for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible); }
一大段代码,看着头大,先跳着看。之前知道AQS是存放了Node的同步队列。那么这里Node在哪创建了呢。
if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); }
发现此时创建是根据是共享还是独占创建的节点。ReentrantLock使用的是独占节点ExclusiveNode.创建完成后重新循序,进入了
else if (pred == null) { // try to enqueue node.waiter = current; //将node的waiter设置为当前线程 Node t = tail;//然后得到尾节点 //将尾节点设置为node的前驱节点此处调用的Unsafe类的设置方法,不去深究 node.setPrevRelaxed(t); // avoid unnecessary fenc //如果尾巴节点为空,说明还没初始化等待队列,尝试建立头结点 if (t == null) tryInitializeHead(); //否则通过cas将尾巴节点设置为node else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out cas失败 退出 这里有点搞不懂 else t.next = node; //cas成功 设置前尾节点的后继节点为node 形成双向 }
到这里线程就被放入了这个同步等待队列了。这时可以看到 循环还没有退出,暂时不深究
看看Sync的 release方法
if (tryRelease(arg)) { signalNext(head); return true; } return false;
@ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases;//释放设置的数量 if (getExclusiveOwnerThread() != Thread.currentThread()) throw new IllegalMonitorStateException(); boolean free = (c == 0);//占有数为0 if (free)//释放成功 setExclusiveOwnerThread(null);//独占的锁拥有者为null setState(c);//改变锁状态 return free; }
释放成功后这里又调用了AQS的方法 signalNext
private static void signalNext(Node h) { Node s; //如果有头节点,获得头结点的下一节点后,如果节点状态不为0且不为null if (h != null && (s = h.next) != null && s.status != 0) { s.getAndUnsetStatus(WAITING);//改变节点状态 LockSupport.unpark(s.waiter);//唤醒节点中的线程 } }
当节点被唤醒后,执行下列
if (first || pred == null) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); //再次尝试获得锁 } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { //获得锁成功 if (first) { //是等待队列中第一个节点 node.prev = null; //去除前置节点 head = node; //当前节点当做头节点头节点 pred.next = null;//去除前头节点后继 node.waiter = null;//将当前线程从节点中移除 if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; //到此,节点中进程被唤醒后获得锁,当前节点当做新的头节点用于唤醒下一节点 } }
到此就是ReentrantLock的上锁,下锁原理
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)