对应源码里面的类,可以理解这些基本概念,如下:
队列同步器(同步器):AbstractQueuedSynchronizer(AQS)
自定义同步器:java.util.concurrent.Semaphore.Sync、java.util.concurrent.locks.ReentrantLock.Sync等
同步组件:ReentrantLock、Semaphore、CountDownLatch等
如何实现ReentrantLock?ReentrantLock实现了Lock接口。它提供了这些获取锁的方法。
我们继续看它的构造方法:
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可以看出,通过构造方法,调用不同的自定义同步器,默认是获取的公平锁。
继续看NonfairSync和FairSync是如何实现的。
从上面的类定义可以看出,NonfairSync和FairSync 的实现是继承了Sync这个自定义同步器。这里面就是核心。Sync这个自定义同步器是继承自AQS,AQS里面提供了一些可以被重写的方法:
以上这些方法的被推迟到子类去,也就是Sync中实现。而他又提供了一些模板方法,这些模板方法如下:
我们回到ReentrantLock里面看,到底是如何实现,我们可以看到再Sync里面定义了一个抽象方法Lock,而Lock接口提供的lock方法调用了自定义同步器Sync的lock方法。
public void lock() {
sync.lock();
}
继续往Sync里面看,发现他里面定义了一个抽象方法。那也就是再Sync的子类中有具体实现。
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
我们看下他的具体实现,可以看出它调用了acquire()方法。acquire方法正是我们提到的AQS里面提供的模板方法。
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
我们继续看AQS里面accuire是如何实现的,可以看出它调用了tryAcquire方法。而这个tryAcquire在AQS里面没有具体实现,他的实现在子类Sync里面。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们看看Sync里面的tryAcquire的具体实现吧,可以看出它调用了Sync子类的的一个非公平锁获取的方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
这个非公平锁获取的定义如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
以上基本把ReentrantLock的实现给串起来了。其核心就是锁是给用户的,屏蔽了具体实现,也就是接口那些方法。而同步器则是锁的具体实现者。这个实现者指的就是自定义同步器Sync提供的方法。这些方法只关注状态设置。具体AQS有关队列的细节都不需要关注。
AQS源码解析final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
compareAndSetState(0, 1)是CAS *** 作,也就是检查内存里面的值是不是期望值0,如果是0的话,则将状态修改为更新值1.成功的话返回true。之后 setExclusiveOwnerThread(Thread.currentThread());也就是当前线程拿到锁,进入同步状态。compareAndSetState方法如下,可以看见是用了CAS *** 作。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
stateOffset则是内存中的地址。该方法参数分别解释如下,var2,var4,var5对应CAS *** 作的三个值:
-
val1:对象本身,这里是AbstractQueuedSynchronizer(AQS)
-
var2:该对象值得引用地址
stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
-
var4:期望值
-
var5:要更新的值
如果上面没有返回ture,说明当前线程无法同步,则会进入acquire方法。acquire方法是定义在AQS中的一个模板方法。如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法首先调用tryAcquire方法,看一下它是如何实现的。它的实现在Sync这个自定义同步器中。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//检查状态,如果是0的话,再次尝试获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果没有获取锁,获取当前的线程,看是否等于获取锁的线程
else if (current == getExclusiveOwnerThread()) {
// 更新nextc
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//重新设置状态
setState(nextc);
return true;
}
return false;
}
有个疑问这里为什么会判断两次。总之是还是判断当前线程有没有获取到锁。如果没有返回true,则进入
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
传的参数是static final Node EXCLUSIVE = null;说明当前线程没有获取锁。继续看addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//将Node的尾节点设置为前驱节点
Node pred = tail;
if (pred != null) {
//将前驱节点设置为当前节点的前驱节点
node.prev = pred;
//csa设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter中可以看出新建了一个Node节点,将当前线程加入Node中,compareAndSetTail会将新建节点加入链表中,返回当前节点。如果第一次没有加入成功,会进入enq方法中,循环尝试加入尾节点,知道成功为止。
private Node enq(final Node node) {
for (;;) {
//将节点t指向尾节点
Node t = tail;
if (t == null) { // Must initialize
//t为空的话,就设置头节点
if (compareAndSetHead(new Node()))
//将尾节点指向头结点
tail = head;
} else {
//t如果不为空的话,设置t为前驱节点
node.prev = t;
//设置尾节点,如果成功的话
if (compareAndSetTail(t, node)) {
//将头节点和next节点连起来
t.next = node;
//返回节点
return t;
}
}
}
}
之后节点会进入acquireQueued方法,等待获取同步状态,我们进这个方法看一下,可以看出同步队列一直在自旋获取同步。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//如果前驱是头节点,并能同步的话
if (p == head && tryAcquire(arg)) {
//设置当前节点为头结点
setHead(node);
//将p和下一个节点断开,也就是p进入了同步状态
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结:在获取同步状态时,同步器维护一个人同步队列,获取状态失败的线程会加入到队列中并在队列中自旋,指导前驱节点为头结点并获取同步状态。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)