CAS 全称是 compare and swap,是一种用于在多线程环境下实现同步功能的机制。CAS *** 作包含三个 *** 作数 -- 内存位置、预期数值和新值。
CAS 的实现逻辑是将内存位置处的数值与预期数值想比较,若相等,则将内存位置处的值替换为新值。若不相等,则不做任何 *** 作。
具体实现,public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); 这个是一个native方法, 第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的headOffset的值),第三个参数为期待的值,第四个为更新后的值 整个方法的作用是如果当前时刻的值等于预期值var4相等,则更新为新的期望值 var5,如果更新成功,则返回true,否则返回false;
Unsafe Unsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Hadoop、Kafka等;
Unsafe可认为是Java中留下的后门,提供了一些低层次 *** 作,如直接内存访问、线程调度等2、问题
a、ABA问题,就是要维护的变量被替换后,又设置回来。类实例将无法辨别它被替换过。 举个例子,假设有一个变量x:
- 线程1试图用cas把x从A设置为C,所以它先查询x的值。(在这瞬间,线程切换)
- 线程2用cas把x设置为B
- 线程2用cas把x设置为A
- (线程切换回来)线程1查询到x的值为A,于是cas理所当然地把x改为了C。
问题是:线程1在查询x的过程中,x的值已经经历了A->B→A的转变,而线程1对此无所知。这就是ABA问题了。
解决方案:
对于ABA问题,比较有效的方案是引入版本号,内存中的值每发生一次变化,版本号都+1;在进行CAS *** 作时,不仅比较内存中的值,也会比较版本号,只有当二者都没有变化时,CAS才能执行成功。AtomicStampedReference类便是使用版本号来解决ABA问题的。
AtomicStampedReference原子类是一个带有时间戳的对象引用,在每次修改后,AtomicStampedReference不仅会设置新值而且还会记录更改的时间。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值才能写入成功,这也就解决了反复读写时,无法预知值是否已被修改的窘境。
b、性能问题
我们使用时大部分时间使用的是 while true 方式对数据的修改,直到成功为止。优势就是相应极快,但当线程数不停增加时,性能下降明显,因为每个线程都需要执行,占用CPU时间。
二、锁 1、锁的类型锁从宏观上分类,分为悲观锁与乐观锁。重量级锁是悲观锁的一种,自旋锁、轻量级锁与偏向锁属于乐观锁
1.1 乐观锁/悲观锁
乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,采取在写时先读出当前版本号,然后加锁 *** 作(比较跟上一次的版本号,如果一样则更新),如果失败则要重复读-比较-写的 *** 作。
java中的乐观锁基本都是通过CAS *** 作实现的,CAS是一种更新的原子 *** 作,比较当前值跟传入值是否一样,一样则更新,否则失败。
悲观锁是就是悲观思想,即认为写多,遇到并发写的可能性高,每次去拿数据的时候都认为别人会修改,所以每次在读写数据的时候都会上锁,这样别人想读写这个数据就会block直到拿到锁。java中的悲观锁就是Synchronized,AQS框架下的锁则是先尝试cas乐观锁去获取锁,获取不到,才会转换为悲观锁,如RetreenLock。
1.2 独享锁/共享锁
独享锁是指该锁一次只能被一个线程所持有。
共享锁是指该锁可被多个线程所持有。
对于Java ReentrantLock而言,其是独享锁。但是对于Lock的另一个实现类ReadWriteLock,其读锁是共享锁,其写锁是独享锁。
读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。
独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。
对于Synchronized而言,当然是独享锁。
1.3 互斥锁/读写锁
上面讲的独享锁/共享锁就是一种广义的说法,互斥锁/读写锁就是具体的实现。
互斥锁在Java中的具体实现就是ReentrantLock。
读写锁在Java中的具体实现就是ReadWriteLock。
1.4 可重入锁
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。
对于Java ReetrantLock而言,从名字就可以看出是一个重入锁,其名字是Re entrant Lock 重新进入锁。
对于Synchronized而言,也是一个可重入锁。可重入锁的一个好处是可一定程度避免死锁。
1.5 公平锁/非公平锁
公平锁是指多个线程按照申请锁的顺序来获取锁。
非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。
对于Java ReetrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的优点在于吞吐量比公平锁大。
对于Synchronized而言,也是一种非公平锁。由于其并不像ReentrantLock是通过AQS的来实现线程调度,所以并没有任何办法使其变成公平锁。
1.6 分段锁
分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发 *** 作。
我们以ConcurrentHashMap来说一下分段锁的含义以及设计思想,ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap(JDK7和JDK8中HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock(Segment继承了ReentrantLock)。
当需要put元素的时候,并不是对整个hashmap进行加锁,而是先通过hashcode来知道他要放在哪一个分段中,然后对这个分段进行加锁,所以当多线程put的时候,只要不是放在一个分段中,就实现了真正的并行的插入。
但是,在统计size的时候,可就是获取hashmap全局信息的时候,就需要获取所有的分段锁才能统计。
分段锁的设计目的是细化锁的粒度,当 *** 作不需要更新整个数组的时候,就仅仅针对数组中的一项进行加锁 *** 作。
1.7 偏向锁/轻量级锁/重量级锁
这三种锁是指锁的状态,并且是针对Synchronized。在Java 5通过引入锁升级的机制来实现高效Synchronized。
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让他申请的线程进入阻塞,性能降低。
各种锁的优缺点及适用场景
锁
优点
缺点
适用场景
1.8 自旋锁
在Java中,自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
三、AQS解析 1、前置资料当state=0时,表示无锁状态
当state>0时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁
AQS的Node中有每个Node自己的状态(waitStatus):
SIGNAL(-1) :前面有线程在运行,需要前面线程结束后,调用unpark()方法才能激活自己
CANCELLED(1):因为超时或中断,该线程已经被取消
ConDITION(-2):表明该线程被处于条件队列,就是因为调用了>- Condition.await而被阻塞
PROPAGATE(-3):传播共享锁
2、ReentrantLock
Sync这个类有两个具体的实现,分别是NofairSync(非公平锁),FailSync(公平锁).
ReentrantLock 主要是基于非公平锁的独占锁实现。在获得同步锁时,同步器维护一个FIFO双向同步(CLH)队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;
移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
2.1 NonfairSync.lock
final void lock() {
if (compareAndSetState(0, 1)) //通过cas *** 作来修改state状态,表示争抢锁的 *** 作
setExclusiveOwnerThread(Thread.currentThread());//设置当前获得锁状态的线程
else
acquire(1); //尝试去获取锁
}
2.2 AbstractQueuedSynchronizer.acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && //通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部
//acquireQueued Node作为参数,通过自旋去尝试获取锁
selfInterrupt();
}
2.3 NonfairSync.tryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //state=0说明当前是无锁状态
//通过cas *** 作来替换state的值改为1
if (compareAndSetState(0, acquires)) {
//保存当前获得锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
//如果是同一个线程来获得锁,则直接增加重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; //增加重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2.4 AbstractQueuedSynchronizer.addWaiter
private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE
//将当前线程封装成Node,并且mode为独占锁
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法
Node pred = tail;
if (pred != null) { //tail不为空的情况,说明队列中存在节点数据
node.prev = pred; //1.将当前线程的Node的prev节点指向tail
if (compareAndSetTail(pred, node)) {//2.通过cas讲node添加到AQS队列
pred.next = node;//3.cas成功,把旧的tail的next指针指向新的tail
return node;
}
}
enq(node); //tail=null,将node添加到同步队列中
return node;
}
2.5 AbstractQueuedSynchronizer.enq
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail; //如果是第一次添加到队列,那么tail=null
if (t == null) { // Must initialize
//CAS的方式创建一个空的Node作为头结点
if (compareAndSetHead(new Node()))
//此时队列中只一个头结点,所以tail也指向它
tail = head;
} else {
node.prev = t;//1. 进行第二次循环时,tail不为null。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
if (compareAndSetTail(t, node)) { //2. t此时指向tail,所以可以CAS成功,将tail重新指向Node。此时t为更新前的tail的值,即指向空的头结点
t.next = node; //3. t目前指向了头结点,将头结点的后续结点指向Node,返回头结点
return t;
}
}
}
}
2.6 AbstractQueuedSynchronizer.acquireQueued
将添加到队列中的Node作为参数传入acquireQueued方法,这里面会做抢占锁的 *** 作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException
if (p == head && tryAcquire(arg)) {// 如果前驱为head才有资格进行锁的抢夺
setHead(node); // 获取锁成功后就不需要再进行同步 *** 作了,获取锁成功的线程作为新的head节点
//凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null
p.next = null; // help GC 移除原来的初始化head节点
failed = false; //获取锁成功
return interrupted;
}
//如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 阻塞 *** 作,正常情况下,获取不到锁,代码就在该方法停止了,直到被唤醒
interrupted = true;
}
} finally {
if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue) *** 作
cancelAcquire(node);
}
}
2.7 AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire
靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //前继节点的状态
if (ws == Node.SIGNAL)//如果是SIGNAL状态,意味着当前线程需要被park
return true;
如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个 *** 作实际是把队列中CANCELLED的节点剔除掉。
if (ws > 0) {// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点'的前继节点”。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2.7 Sync.tryrelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 这里是将锁的数量减1
if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 由于重入的关系,不是每次释放锁c都等于0,直到最后一次释放锁时,才会把当前线程释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2.8 AbstractQueuedSynchronizer.unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {//判断后继节点是否为空或者是否是取消状态,
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点,为什么向前遍历?
s = t;
}
//内部首先会发生的动作是获取head节点的next节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁
if (s != null)
LockSupport.unpark(s.thread); //释放许可
}
简要总结一下AQS的流程的一些特性: • 关键获取锁、释放锁 *** 作由AQS子类实现:acquire-release、acquireShared-releaseShared; • 维护了一个FIFO链表结构的队列,通过自旋方式将新结点添加到队尾; • 添加结点时会从前驱结点向前遍历,跳过那些处于CANCELLED状态的结点; • 释放结点时会从队尾向前遍历,踢出CANCELLED状态的结点,然后唤醒后继结点;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)