CAS与原子类

CAS与原子类,第1张

CAS与原子

CAS 全称 Compare And Swap 中文意思 比较并交换

跟中文意思一样就是一个比较然后交换的思想 比如

if(value == expectedValue)
{value = newValue;}

就像上面的这个代码 只要使用了CAS 当一个线程去内存获取值(称为b)的时候 会把内存的这个值 放到别的变量(称为a)中 当这个线程对b进行了赋值 *** 作 这时会让b跟a做比较 看看b有没有被别的线程进行更改 如果b被更改了那么会返回a的值也就是旧值 如果b没有被更改那么会给b进行赋值然后返回这个新值 这就是cas

cas的使用

在 Java 中,CAS *** 作是由 Unsafe 类提供支持的,该类定义了三种针对不同类型变量的 CAS *** 作

compareAndSwapObject

compareAndSwapInt

compareAndSwapLong

它们都是 native 方法,由 Java 虚拟机提供具体实现,这意味着不同的 Java 虚拟机对它们的实现可能会略有不同。
以 compareAndSwapInt 为例,Unsafe 的 compareAndSwapInt 方法接收 4 个参数,分别是:对象实例、内存偏移量、字段期望值、字段新值。该方法会针对指定对象实例中的相应偏移量的字段执行 CAS *** 作。

public class CASTest {

public static void main(String[] args) {
    Entity entity = new Entity();

    Unsafe unsafe = UnsafeFactory.getUnsafe();

    long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");

    boolean successful;

    // 4个参数分别是:对象实例、字段的内存偏移量、字段期望值、字段新值
    successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
    System.out.println(successful + "t" + entity.x);

    successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
    System.out.println(successful + "t" + entity.x);

    successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
    System.out.println(successful + "t" + entity.x);
}
}

public class UnsafeFactory {


public static Unsafe getUnsafe() {
    try {
        Field field = Unsafe.class.getDeclaredField("theUnsafe");
        field.setAccessible(true);
        return (Unsafe) field.get(null);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}


public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
    try {
        return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
    } catch (NoSuchFieldException e) {
        throw new Error(e);
    }
}

针对 entity.x 的 3 次 CAS *** 作,分别试图将它从 0 改成 3、从 3 改成 5、从 3 改成 8。

CAS源码分析
Hotspot 虚拟机对compareAndSwapInt 方法的实现如下:

#unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  // 根据偏移量,计算value的地址
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  // Atomic::cmpxchg(x, addr, e) cas逻辑 x:要交换的值   e:要比较的值
  //cas成功,返回期望值e,等于e,此方法返回true 
  //cas失败,返回内存中的value值,不等于e,此方法返回false
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;

核心逻辑在Atomic::cmpxchg方法中,这个根据不同 *** 作系统和不同CPU会有不同的实现。这里我们以linux_64x的为例,查看

Atomic::cmpxchg的实现
	#atomic_linux_x86.inline.hpp
	inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
	  //判断当前执行环境是否为多处理器环境
	  int mp = os::is_MP();
	  //LOCK_IF_MP(%4) 在多处理器环境下,为 cmpxchgl 指令添加 lock 前缀,以达到内存屏障的效果
	  //cmpxchgl 指令是包含在 x86 架构及 IA-64 架构中的一个原子条件指令,
	  //它会首先比较 dest 指针指向的内存值是否和 compare_value 的值相等,
	  //如果相等,则双向交换 dest 与 exchange_value,否则就单方面地将 dest 指向的内存值交给exchange_value。
	  //这条指令完成了整个 CAS  *** 作,因此它也被称为 CAS 指令。
	  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
	                    : "=a" (exchange_value)
	                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
	                    : "cc", "memory");
	  return exchange_value;

CAS实现高效率锁 自旋锁

自旋锁就是让线程不断的空循环 不让线程被休眠 线程休眠就要调用 *** 作系统的指令 进入内核态 这样效率就大大降低 cas实现自旋锁线程会一直空循环不会进入内核态 从而达到高效率

public class CASLock {

//加锁标记
private volatile int state;
private static final Unsafe UNSAFE;
private static final long OFFSET;

static {
    try {
        UNSAFE = UnsafeFactory.getUnsafe();
        OFFSET = UnsafeFactory.getFieldOffset(
                UNSAFE, CASLock.class, "state");
    } catch (Exception e) {
        throw new Error(e);
    }
}

public boolean cas() {
    return UNSAFE.compareAndSwapInt(this, OFFSET, 0, 1); // cas *** 作判断当前对象的state的值是不是0如果是0就更改成功如果不是就失败  这里在jvm源码中返回的是旧值 但是被jvm封装了  这里只要返回旧值就是false
}

public int getState() {
    return state;
}

public void setState(int state) {
    this.state = state;
}

}





public class Test {
    private volatile static int sum = 0;
    static Object object = "";
    static ReentrantLock lock = new ReentrantLock();

static CASLock casLock = new CASLock();

public static void main(String[] args) {

    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            //synchronized (object) {
            //lock.lock();
            //
            for(;;){   // 这个死循环就是所谓的自旋锁
                //state=0
                if(casLock.getState()==0&&casLock.cas()) {
                    try {
                        for (int j = 0; j < 10000; j++) {
                            sum++;
                        }
                        //
                        System.out.println(casLock.getState());
                    } finally {
                        //lock.unlock();
                        // state=0
                        casLock.setState(0);
                    }
                    break;
                }
            }

            //}
        });
        thread.start();
    }

    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println(sum);

}


}

Atomic原子 *** 作类

在并发编程中很容易出现并发安全的问题,有一个很简单的例子就是多线程更新变量i=1,比如多个线程执行i++ *** 作,就有可能获取不到正确的值,而这个问题,最常用的方法是通过Synchronized进行控制来达到线程安全的目的。但是由于synchronized是采用的是悲观锁策略,并不是特别高效的一种解决方案。实际上,在J.U.C下的atomic包提供了一系列的 *** 作简单,性能高效,并能保证线程安全的类去更新基本类型变量,数组元素,引用类型以及更新对象中的字段类型。atomic包下的这些类都是采用的是乐观锁策略去原子更新数据,在java中则是使用CAS *** 作具体实现。

原子类可分为五种

基本类型:AtomicInteger、AtomicLong、AtomicBoolean;
引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64

原子更新基本类型

以AtomicInteger为例总结常用的方法

//以原子的方式将实例中的原值加1,返回的是自增前的旧值;

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
 
//getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;

public final boolean getAndSet(boolean newValue) {
    boolean prev;
    do {
        prev = get();
    } while (!compareAndSet(prev, newValue));
    return prev;
}
 
//incrementAndGet() :以原子的方式将实例中的原值进行加1 *** 作,并返回最终相加后的结果;
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
 
//addAndGet(int delta) :以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果;
public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;

}

使用

public class AtomicIntegerTest {
    static AtomicInteger sum = new AtomicInteger(0);

public static void main(String[] args) {

    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                // 原子自增  CAS
                sum.incrementAndGet();
                //TODO
            }
        });
        thread.start();
    }

    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(sum.get());

}

incrementAndGet()方法通过CAS自增实现,如果CAS失败,自旋直到成功+1。

incrementAndGet()方法最终回到这里

   public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
		// compareAndSwapInt(var1, var2, var5, var5 + var4) 利用CAS不断的更新值直到成功为止 
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

原子更新数组类型
AtomicIntegerArray为例总结常用的方法

//addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加;
public final int addAndGet(int i, int delta) {
    return getAndAdd(i, delta) + delta;
}
 
//getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1;
public final int getAndIncrement(int i) {
    return getAndAdd(i, 1);
}
 
//compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新
public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);

}

测试

public class AtomicIntegerArrayTest {

static int[] value = new int[]{ 1, 2, 3, 4, 5 };
static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(value);


public static void main(String[] args) throws InterruptedException {

    //设置索引0的元素为100
    atomicIntegerArray.set(0, 100);
    System.out.println(atomicIntegerArray.get(0));
    //以原子更新的方式将数组中索引为1的元素与输入值相加
    atomicIntegerArray.getAndAdd(1,5);

    System.out.println(atomicIntegerArray);
}

LongAdder/DoubleAdder

之前有很多空循环也大大的浪废cpu的资源LongAdder/DoubleAdder解决了这个问题
AtomicLong是利用了底层的CAS *** 作来提供并发性的,比如addAndGet方法:

性能测试

public class LongAdderTest {

    public static void main(String[] args) {
    testAtomicLongVSLongAdder(10, 10000);
    System.out.println("==================");
    testAtomicLongVSLongAdder(10, 200000);
    System.out.println("==================");
    testAtomicLongVSLongAdder(100, 200000);
}

static void testAtomicLongVSLongAdder(final int threadCount, final int times) {
    try {
        long start = System.currentTimeMillis();
        testLongAdder(threadCount, times);
        long end = System.currentTimeMillis() - start;
        System.out.println("条件>>>>>>线程数:" + threadCount + ", 单线程 *** 作计数" + times);
        System.out.println("结果>>>>>>LongAdder方式增加计数" + (threadCount * times) + "次,共计耗时:" + end);

        long start2 = System.currentTimeMillis();
        testAtomicLong(threadCount, times);
        long end2 = System.currentTimeMillis() - start2;
        System.out.println("条件>>>>>>线程数:" + threadCount + ", 单线程 *** 作计数" + times);
        System.out.println("结果>>>>>>AtomicLong方式增加计数" + (threadCount * times) + "次,共计耗时:" + end2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    AtomicLong atomicLong = new AtomicLong();
    for (int i = 0; i < threadCount; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < times; j++) {
                    atomicLong.incrementAndGet();
                }
                countDownLatch.countDown();
            }
        }, "my-thread" + i).start();
    }
    countDownLatch.await();
}

static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    LongAdder longAdder = new LongAdder();
    for (int i = 0; i < threadCount; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < times; j++) {
                    longAdder.add(1);
                }
                countDownLatch.countDown();
            }
        }, "my-thread" + i).start();
    }

    countDownLatch.await();
}

LongAdder原理

10个线程各循环10万次对i+1 使用了自旋锁拿不到锁的线程得到了cpu时间片会空循环 LongAdder解决这个问题的原理 它会创建一个数组 获取当前线程的哈希值 然后对这个数组抹除 这样这个线程会进入数组中其中的一个 这时把我们要加的数记下来也就是1 第一次把1赋值给 为了好理解 这个创建一个数组 A[] a = new A[10] 假设来了一个线程进入了a[4] 第一次进入的时候把1赋值给A类中的属性(称为aa)上 第二次进去会对aa属性进行加1 *** 作 然后所有线程执行完之后 把a数组中的每个A对象里的aa属性进行相加 最后的结果就是正确的 把一个属性拆分成多个属性最后把拆分的属性进行相加 得到结果这样的思想 大大的提升线程的利用率了

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5637812.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存