• 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html
  • 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html

Java并发:了解无锁CAS就从源码分析

技术杂谈 勤劳的小蚂蚁 3个月前 (01-29) 72次浏览 已收录 0个评论 扫描二维码

什么是 CAS

CAS的全称为Compare And Swap,直译就是比较交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,其实现方式是基于硬件平台的汇编指令,在intel的CPU中,使用的是 cmpxchg指令,就是说CAS是靠硬件实现的,从而在硬件层面提升效率。

CSA 原理

利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法,其它原子操作都是利用类似的特性完成的。 在 java.util.concurrent 下面的源码中, Atomic, ReentrantLock 都使用了Unsafe类中的方法来保证并发的安全性。
CAS操作是原子性的,所以多线程并发使用CAS更新数据时,可以不使用锁,JDK中大量使用了CAS来更新数据而防止加锁来保持原子更新。
CAS 操作包含三个操作数 :内存偏移量位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。

源码分析

下面来看一下
java.util.concurrent.atomic.AtomicInteger.java, getAndIncrement(), getAndDecrement()是如何利用CAS实现原子性操作的。

AtomicInteger 源码解析

  1. // 使用 unsafe 类的原子操作方式
  2. privatestaticfinalUnsafe unsafe = Unsafe.getUnsafe();
  3. privatestaticfinallong valueOffset;
  4. static {
  5.    try {
  6.        //计算变量 value 在类对象中的偏移量
  7.        valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
  8.    } catch (Exception ex) { thrownewError(ex); }
  9. }
valueOffset 字段表示 "value" 内存位置,在 compareAndSwap 方法 ,第二个参数会用到.
关于偏移量图文介绍
Unsafe 调用C 语言可以通过偏移量对变量进行操作
  1. //volatile变量value
  2. privatevolatileint value;
  3. /**
  4. * 创建具有给定初始值的新 AtomicInteger
  5. *
  6. * @param initialValue 初始值
  7. */
  8. publicAtomicInteger(int initialValue) {
  9.    value = initialValue;
  10. }
  11. //返回当前的值
  12. publicfinalint get() {
  13.    return value;
  14. }
  15. //原子更新为新值并返回旧值
  16. publicfinalint getAndSet(int newValue) {
  17.    return unsafe.getAndSetInt(this, valueOffset, newValue);
  18. }
  19. //最终会设置成新值
  20. publicfinalvoid lazySet(int newValue) {
  21.    unsafe.putOrderedInt(this, valueOffset, newValue);
  22. }
  23. //如果输入的值等于预期值,则以原子方式更新为新值
  24. publicfinalboolean compareAndSet(int expect, int update) {
  25.    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  26. }
  1. //方法相当于原子性的 ++i
  2. publicfinalint getAndIncrement() {
  3.    //三个参数,1、当前的实例 2、value实例变量的偏移量 3、递增的值。
  4.    return unsafe.getAndAddInt(this, valueOffset, 1);
  5. }
  6. //方法相当于原子性的 --i
  7. publicfinalint getAndDecrement() {
  8.    //三个参数,1、当前的实例 2、value实例变量的偏移量 3、递减的值。
  9.    return unsafe.getAndAddInt(this, valueOffset, -1);
  10. }
实现逻辑封装在 Unsafe 中 getAndAddInt 方法,继续往下看, Unsafe 源码解析

Unsafe 源码解析

在JDK8中追踪可见 sun.misc.Unsafe这个类是无法看见源码的,打开 openjdk8源码看
openjdk-8-src-b132-03_mar_2014.zip
openjdkjdksrcshareclassessunmiscUnsafe.java
通常我们最好也不要使用 Unsafe类,除非有明确的目的,并且也要对它有深入的了解才行。要想使用 Unsafe类需要用一些比较 tricky的办法。Unsafe类使用了单例模式,需要通过一个静态方法 getUnsafe()来获取。但Unsafe类做了限制,如果是普通的调用的话,它会抛出一个 SecurityException异常;只有由主类加载器加载的类才能调用这个方法。
下面是 sun.misc.Unsafe.java类源码
  1. //获取Unsafe实例静态方法
  2. @CallerSensitive
  3. publicstaticUnsafe getUnsafe() {
  4.    Class<?> caller = Reflection.getCallerClass();
  5.    if (!VM.isSystemDomainLoader(caller.getClassLoader()))
  6.        thrownewSecurityException("Unsafe");
  7.    return theUnsafe;
  8. }
网上也有一些办法来用主类加载器加载用户代码,最简单方法是利用Java反射,方法如下:
  1. privatestaticUnsafe unsafe;
  2. static {
  3.    try {
  4.        //通过反射获取rt.jar下的Unsafe类
  5.        Field field = Unsafe.class.getDeclaredField("theUnsafe");
  6.        field.setAccessible(true);
  7.        unsafe = (Unsafe) field.get(null);
  8.    } catch (Exception e) {
  9.        System.out.println("Get Unsafe instance occur error" + e);
  10.    }
  11. }
获取到Unsafe实例之后,我们就可以为所欲为了。Unsafe类提供了以下这些功能:
  1.    //native硬件级别的原子操作
  2.    //类似的有compareAndSwapInt,compareAndSwapLong,compareAndSwapBoolean,compareAndSwapChar等等。
  3.    publicfinalnativeboolean compareAndSwapInt(Object o, long offset,int expected,int x);
  4.    //内部使用自旋的方式进行CAS更新(while循环进行CAS更新,如果更新失败,则循环再次重试)
  5.    publicfinalint getAndAddInt(Object o, long offset, int delta) {
  6.        int v;
  7.        do {
  8.            //获取对象内存地址偏移量上的数值v
  9.            v = getIntVolatile(o, offset);
  10.            //如果现在还是v,设置为 v + delta,否则返回false,继续循环再次重试.
  11.        } while (!compareAndSwapInt(o, offset, v, v + delta));
  12.        return v;
  13.    }
利用 Unsafe 类的 JNI compareAndSwapInt 方法实现,使用CAS实现一个原子操作更新,
compareAndSwapInt 四个参数
1、当前的实例 2、实例变量的内存地址偏移量 3、预期的旧值 4、要更新的值

unsafe.cpp 深层次解析

  1. // unsafe.cpp
  2. /*
  3. * 这个看起来好像不像一个函数,不过不用担心,不是重点。UNSAFE_ENTRY 和 UNSAFE_END 都是宏,
  4. * 在预编译期间会被替换成真正的代码。下面的 jboolean、jlong 和 jint 等是一些类型定义(typedef):
  5. *
  6. * 省略部分内容
  7. */
  8. UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  9.  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  10.  oop p = JNIHandles::resolve(obj);
  11.  // 根据偏移量,计算 value 的地址。这里的 offset 就是 AtomaicInteger 中的 valueOffset
  12.  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  13.  // 调用 Atomic 中的函数 cmpxchg,该函数声明于 Atomic.hpp 中
  14.  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
  15. UNSAFE_END
  16. // atomic.cpp
  17. unsigned Atomic::cmpxchg(unsigned int exchange_value, volatile unsigned int* dest, unsigned int compare_value) {
  18.  assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
  19.  /*
  20.   * 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载
  21.   * 函数。相关的预编译逻辑如下:
  22.   *
  23.   * atomic.inline.hpp:
  24.   *    #include "runtime/atomic.hpp"
  25.   *  
  26.   *    // Linux
  27.   *    #ifdef TARGET_OS_ARCH_linux_x86
  28.   *    # include "atomic_linux_x86.inline.hpp"
  29.   *    #endif
  30.   *
  31.   *    // 省略部分代码
  32.   *  
  33.   *    // Windows
  34.   *    #ifdef TARGET_OS_ARCH_windows_x86
  35.   *    # include "atomic_windows_x86.inline.hpp"
  36.   *    #endif
  37.   *  
  38.   *    // BSD
  39.   *    #ifdef TARGET_OS_ARCH_bsd_x86
  40.   *    # include "atomic_bsd_x86.inline.hpp"
  41.   *    #endif
  42.   *
  43.   * 接下来分析 atomic_windows_x86.inline.hpp 中的 cmpxchg 函数实现
  44.   */
  45.  return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,
  46.                                       (jint)compare_value);
  47. }

上面的分析看起来比较多,不过主流程并不复杂。如果不纠结于代码细节,还是比较容易看懂的。接下来,我会分析 Windows 平台下的 Atomic::cmpxchg 函数。继续往下看吧。

  1. // atomic_windows_x86.inline.hpp
  2. #define LOCK_IF_MP(mp) __asm cmp mp, 0  
  3.                       __asm je L0      
  4.                       __asm _emit 0xF0
  5.                       __asm L0:
  6. inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  7.  // alternative forInterlockedCompareExchange
  8.  int mp = os::is_MP();
  9.  __asm {
  10.    mov edx, dest
  11.    mov ecx, exchange_value
  12.    mov eax, compare_value
  13.    LOCK_IF_MP(mp)
  14.    cmpxchg dword ptr [edx], ecx
  15.  }
  16. }
上面的代码由 LOCKIFMP 预编译标识符和 cmpxchg 函数组成。为了看到更清楚一些,我们将 cmpxchg 函数中的 LOCKIFMP 替换为实际内容。如下:
  1. inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  2.  // 判断是否是多核 CPU
  3.  int mp = os::is_MP();
  4.  __asm {
  5.    // 将参数值放入寄存器中
  6.    mov edx, dest    // 注意: dest 是指针类型,这里是把内存地址存入 edx 寄存器中
  7.    mov ecx, exchange_value
  8.    mov eax, compare_value
  9.    // LOCK_IF_MP
  10.    cmp mp, 0
  11.    /*
  12.     * 如果 mp = 0,表明是线程运行在单核 CPU 环境下。此时 je 会跳转到 L0 标记处,
  13.     * 也就是越过 _emit 0xF0 指令,直接执行 cmpxchg 指令。也就是不在下面的 cmpxchg 指令
  14.     * 前加 lock 前缀。
  15.     */
  16.    je L0
  17.    /*
  18.     * 0xF0 是 lock 前缀的机器码,这里没有使用 lock,而是直接使用了机器码的形式。至于这样做的
  19.     * 原因可以参考知乎的一个回答:
  20.     *     https://www.zhihu.com/question/50878124/answer/123099923
  21.     */
  22.    _emit 0xF0
  23. L0:
  24.    /*
  25.     * 比较并交换。简单解释一下下面这条指令,熟悉汇编的朋友可以略过下面的解释:
  26.     *   cmpxchg: 即“比较并交换”指令
  27.     *   dword: 全称是 double word,在 x86/x64 体系中,一个
  28.     *          word = 2 byte,dword = 4 byte = 32 bit
  29.     *   ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元
  30.     *   [edx]: [...] 表示一个内存单元,edx 是寄存器,dest 指针值存放在 edx 中。
  31.     *          那么 [edx] 表示内存地址为 dest 的内存单元
  32.     *        
  33.     * 这一条指令的意思就是,将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值
  34.     * 进行对比,如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中。
  35.     */
  36.    cmpxchg dword ptr [edx], ecx
  37.  }
  38. }
到这里 CAS 的实现过程就讲了,CAS 的实现离不开处理器的支持。以上这么多代码,其实核心代码就是一条带 lock 前缀的 cmpxchg 指令,即 lockcmpxchg dword ptr[edx],ecx
通过上述的分析,可以发现 AtomicInteger原子类的内部几乎是基于前面分析过 Unsafe类中的 CAS相关操作的方法实现的,这也同时证明 AtomicIntegergetAndIncrement自增操作实现过程,是基于无锁实现的。

CAS的ABA问题及其解决方案

假设这样一种场景,当第一个线程执行CAS(V,E,U)操作。在获取到当前变量V,准备修改为新值U前,另外两个线程已连续修改了两次变量V的值,使得该值又恢复为旧值,这样的话,我们就无法正确判断这个变量是否已被修改过,如下图:
这就是典型的CAS的ABA问题,一般情况这种情况发现的概率比较小,可能发生了也不会造成什么问题,比如说我们对某个做加减法,不关心数字的过程,那么发生ABA问题也没啥关系。但是在某些情况下还是需要防止的,那么该如何解决呢?在Java中解决ABA问题,我们可以使用以下原子类
AtomicStampedReference类
AtomicStampedReference原子类是一个带有时间戳的对象引用,在每次修改后,AtomicStampedReference不仅会设置新值而且还会记录更改的时间。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值才能写入成功,这也就解决了反复读写时,无法预知值是否已被修改的窘境
底层实现为: 通过Pair私有内部类存储数据和时间戳, 并构造volatile修饰的私有实例
接着看 java.util.concurrent.atomic.AtomicStampedReference类的compareAndSet()方法的实现:
  1. private static class Pair<T> {
  2.    final T reference;
  3.    final int stamp;
  4.    //最好不要重复的一个数据,决定数据是否能设置成功,时间戳会重复
  5.    private Pair(T reference, int stamp) {
  6.        this.reference = reference;
  7.        this.stamp = stamp;
  8.    }
  9.    static <T> Pair<T> of(T reference, int stamp) {
  10.        return new Pair<T>(reference, stamp);
  11.    }
  12. }
同时对当前数据和当前时间进行比较,只有两者都相等是才会执行casPair()方法,
单从该方法的名称就可知是一个CAS方法,最终调用的还是 Unsafe类中的 compareAndSwapObject方法
到这我们就很清晰 AtomicStampedReference的内部实现思想了,
通过一个键值对 Pair存储数据和时间戳,在更新时对数据和时间戳进行比较,
只有两者都符合预期才会调用 UnsafecompareAndSwapObject方法执行数值和时间戳替换,也就避免了ABA的问题。
  1. /**
  2. * 原子更新带有版本号的引用类型。
  3. * 该类将整数值与引用关联起来,可用于原子的更数据和数据的版本号。
  4. * 可以解决使用CAS进行原子更新时,可能出现的ABA问题。
  5. */
  6. publicclassAtomicStampedReference<V> {
  7.    //静态内部类Pair将对应的引用类型和版本号stamp作为它的成员
  8.    privatestaticclassPair<T> {
  9.        //最好不要重复的一个数据,决定数据是否能设置成功,建议时间戳
  10.        final T reference;
  11.        finalint stamp;
  12.        privatePair(T reference, int stamp) {
  13.            this.reference = reference;
  14.            this.stamp = stamp;
  15.        }
  16.        //根据reference和stamp来生成一个Pair的实例
  17.        static <T> Pair<T> of(T reference, int stamp) {
  18.            returnnewPair<T>(reference, stamp);
  19.        }
  20.    }
  21.    //作为一个整体的pair变量被volatile修饰
  22.    privatevolatilePair<V> pair;
  23.    //构造方法,参数分别是初始引用变量的值和初始版本号
  24.    publicAtomicStampedReference(V initialRef, int initialStamp) {
  25.        pair = Pair.of(initialRef, initialStamp);
  26.    }
  27.    ....
  28.    privatestaticfinal sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
  29.    privatestaticfinallong pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
  30.    //获取pair成员的偏移地址
  31.    staticlong objectFieldOffset(sun.misc.Unsafe UNSAFE,
  32.                                  String field, Class<?> klazz) {
  33.        try {
  34.            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
  35.        } catch (NoSuchFieldException e) {
  36.            NoSuchFieldError error = newNoSuchFieldError(field);
  37.            error.initCause(e);
  38.            throw error;
  39.        }
  40.    }
  41. }
  1. /**
  2. * @param 期望(老的)引用
  3. * @param       (新的)引用数据
  4. * @param 期望(老的)标志stamp(时间戳)值
  5. * @param       (新的)标志stamp(时间戳)值
  6. * @return 是否成功
  7. */
  8. publicboolean compareAndSet(V expectedReference,V   newReference,int expectedStamp,int newStamp) {
  9.    Pair<V> current = pair;
  10.    return
  11.        // 期望(老的)引用 == 当前引用
  12.        expectedReference == current.reference &&
  13.        // 期望(老的)标志stamp(时间戳)值 == 当前标志stamp(时间戳)值
  14.        expectedStamp == current.stamp &&
  15.        // (新的)引用数据 == 当前引用数据 并且 (新的)标志stamp(时间戳)值 ==当前标志stamp(时间戳)值
  16.        ((newReference == current.reference && newStamp == current.stamp) ||
  17.          #原子更新值
  18.         casPair(current, Pair.of(newReference, newStamp)));
  19. }
  20. //当引用类型的值与期望的一致的时候,原子的更改版本号为新的值。该方法只修改版本号,不修改引用变量的值,成功返回true
  21. publicboolean attemptStamp(V expectedReference, int newStamp) {
  22.    Pair<V> current = pair;
  23.    return
  24.        expectedReference == current.reference &&
  25.        (newStamp == current.stamp ||
  26.         casPair(current, Pair.of(expectedReference, newStamp)));
  27. }
  28. /**
  29. * CAS真正实现方法
  30. */
  31. privateboolean casPair(Pair<V> cmp, Pair<V> val) {
  32.        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
  33. }
期望 Paircmp(A) == 当前内存存偏移量位置 Pair(V),就更新值 Pairval(B)成功返回true 否则 false
  1. publicstaticvoid main(String[] args) {
  2.    AtomicStampedReference<Integer> num = newAtomicStampedReference<Integer>(1, 0);
  3.    Integer i = num.getReference();
  4.    int stamped = num.getStamp();
  5.    if (num.compareAndSet(i, i + 1, stamped, stamped + 1)) {
  6.        System.out.println("测试成功");
  7.    }
  8. }
通过以上原子更新方法,可见 AtomicStampedReference就是利用了Unsafe的CAS方法+Volatile关键字对存储实际的引用变量和int的版本号的Pair实例进行更新,实现解决CAS使用中的ABA问题。
参考:

丨极客文库, 版权所有丨如未注明 , 均为原创丨
本网站采用知识共享署名-非商业性使用-相同方式共享 3.0 中国大陆许可协议进行授权
转载请注明原文链接:Java并发:了解无锁CAS就从源码分析
喜欢 (0)
[247507792@qq.com]
分享 (0)
勤劳的小蚂蚁
关于作者:
温馨提示:本文来源于网络,转载文章皆标明了出处,如果您发现侵权文章,请及时向站长反馈删除。

您必须 登录 才能发表评论!

  • 精品技术教程
  • 编程资源分享
  • 问答交流社区
  • 极客文库知识库

客服QQ


QQ:2248886839


工作时间:09:00-23:00