第4章 Java并发包中原子操作类原理剖析

    上述代码中,valueOffset为AtomicLong在static语句块中进行初始化时通过Unsafe类获得的本类中value属性的内存偏移值。

    可以看到,上述四个方法都是基于Unsafe类中的getAndAddLong方法实现的。

    getAndAddLong源码如下

    1. long var6;
    2. //CAS操作设置var1对象偏移为var2处的值增加var4
    3. do {
    4. var6 = this.getLongVolatile(var1, var2);
    5. } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
    6. return var6;
    7. }
    1. public final boolean compareAndSet(long expect, long update) {
    2. return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    3. }

    可见,内部还是调用了Unsafe类中的CAS方法。

    1. public class AtomicLongDemo {
    2. private static AtomicLong al = new AtomicLong(0);
    3. public static long addNext() {
    4. return al.getAndIncrement();
    5. }
    6. public static void main(String[] args) {
    7. for (int i = 0; i < 100; i++) {
    8. new Thread() {
    9. @Override
    10. public void run() {
    11. AtomicLongDemo.addNext();
    12. }
    13. }.start();
    14. }
    15. // 等待线程运行完
    16. try {
    17. TimeUnit.SECONDS.sleep(1);
    18. } catch (InterruptedException e) {
    19. e.printStackTrace();
    20. }
    21. System.out.println("final result is " + AtomicLongDemo.addNext());
    22. }
    23. }

    AtomicLong使用CAS非阻塞算法,性能比使用synchronized等的阻塞算法实现同步好很多。但在高并发下,大量线程会同时去竞争更新同一个原子变量,由于同时只有一个线程的CAS会成功,会造成大量的自旋尝试,十分浪费CPU资源。因此,JDK8中新增了原子操作类LongAdder。

    由上可知,AtomicLong的性能瓶颈是多个线程同时去竞争一个变量的更新权导致的。而LongAdder通过将一个变量分解成多个变量,让同样多的线程去竞争多个资源解决了此问题。

    如图,LongAdder内部维护了多个Cell,每个Cell内部有一个初始值为0的long类型变量,这样,在同等并发下,对单个变量的争夺会变少。此外,多个线程争夺同一个变量失败时,会到另一个Cell上去尝试,增加了重试成功的可能性。当LongAdder要获取当前值时,将所有Cell的值于base相加返回即可。

    LongAdder维护了一个初始值为null的Cell数组和一个基值变量base。当一开始Cell数组为空且并发线程较少时,仅使用base进行累加。当并发增大时,会动态地增加Cell数组的容量。

    先看LongAdder的定义

    Striped64类中有如下三个变量:

    1. transient volatile Cell[] cells;
    2. transient volatile long base;
    3. transient volatile int cellsBusy;

    cellsBusy用于实现自旋锁,状态值只有0和1,当创建Cell元素、扩容Cell数组或初始化Cell数组时,使用CAS操作该变量来保证同时只有一个变量可以进行其中之一的操作。

    下面看Cell的定义:

    1. @sun.misc.Contended static final class Cell {
    2. volatile long value;
    3. Cell(long x) { value = x; }
    4. final boolean cas(long cmp, long val) {
    5. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    6. }
    7. // Unsafe mechanics
    8. private static final sun.misc.Unsafe UNSAFE;
    9. private static final long valueOffset;
    10. static {
    11. try {
    12. UNSAFE = sun.misc.Unsafe.getUnsafe();
    13. valueOffset = UNSAFE.objectFieldOffset
    14. (ak.getDeclaredField("value"));
    15. } catch (Exception e) {
    16. throw new Error(e);
    17. }
    18. }

    将value声明伪volatile确保了内存可见性,CAS操作保证了value值的原子性,@sun.misc.Contented注解的使用解决了伪共享问题。

    下面来看LongAdder中的几个方法:

    • long Sum()
    1. public long sum() {
    2. Cell[] as = cells; Cell a;
    3. long sum = base;
    4. if (as != null) {
    5. for (int i = 0; i < as.length; ++i) {
    6. if ((a = as[i]) != null)
    7. sum += a.value;
    8. }
    9. }
    10. return sum;
    11. }

    sum的结果并非一个精确值,因为计算总和时并没有对Cell数组加锁,累加过程中Cell的值可能被更改。

    • void reset()

    reset非常简单,将base和Cell数组中非空元素的值置为0.

    • long sumThenRest()
    1. public long sumThenReset() {
    2. Cell[] as = cells; Cell a;
    3. long sum = base;
    4. base = 0L;
    5. if (as != null) {
    6. for (int i = 0; i < as.length; ++i) {
    7. if ((a = as[i]) != null) {
    8. sum += a.value;
    9. a.value = 0L;
    10. }
    11. }
    12. }
    13. return sum;
    14. }

    sumThenReset同样非常简单,将某个Cell的值加到sum中后随即重置。

    • void add(long x)
    1. public void add(long x) {
    2. Cell[] as; long b, v; int m; Cell a;
    3. // 判断cells是否为空,如果不为空则直接进入内层判断,
    4. // 否则尝试通过CAS在base上进行add操作,若CAS成功则结束,否则进入内层
    5. if ((as = cells) != null || !casBase(b = base, b + x)) {
    6. // 记录cell上的CAS操作是否失败
    7. boolean uncontended = true;
    8. if (as == null || (m = as.length - 1) < 0 ||
    9. // 计算当前线程应该访问cells数组的哪个元素
    10. (a = as[getProbe() & m]) == null ||
    11. // 尝试通过CAS操作在对应cell上add
    12. !(uncontended = a.cas(v = a.value, v + x)))
    13. longAccumulate(x, null, uncontended);
    14. }
    15. }

    下面重点来看longAccumelate方法:

    longAccumulate时Striped64类中定义的,其中包含了初始化cells数组,改变cells数组长度,新建cell等逻辑。

    1. final void longAccumulate(long x, LongBinaryOperator fn,
    2. boolean wasUncontended) {
    3. int h;
    4. if ((h = getProbe()) == 0) {
    5. ThreadLocalRandom.current(); // 初始化当前线程的probe,以便于找到线程对应的cell
    6. h = getProbe();
    7. wasUncontended = true; // 标记执行longAccumulate前对相应cell的CAS操作是否失败,失败为false
    8. }
    9. boolean collide = false; // 是否冲突,如果当前线程尝试访问的cell元素与其他线程冲突,则为true
    10. for (;;) {
    11. Cell[] as; Cell a; int n; long v;
    12. // 当前cells不为空且元素个数大于0则进入内层,否则尝试初始化
    13. if ((as = cells) != null && (n = as.length) > 0) {
    14. if ((a = as[(n - 1) & h]) == null) {
    15. if (cellsBusy == 0) { // 尝试添加新的cell
    16. Cell r = new Cell(x);
    17. boolean created = false;
    18. try { // Recheck under lock
    19. Cell[] rs; int m, j;
    20. if ((rs = cells) != null &&
    21. (m = rs.length) > 0 &&
    22. rs[j] = r;
    23. created = true;
    24. }
    25. } finally {
    26. cellsBusy = 0;
    27. }
    28. if (created)
    29. break;
    30. continue;
    31. }
    32. }
    33. collide = false;
    34. }
    35. else if (!wasUncontended) // 如果已经进行了失败的CAS操作
    36. wasUncontended = true; // 则不调用下面的a.cas()函数(反正肯定是失败的),而是重新计算probe值来尝试
    37. else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
    38. break;
    39. else if (n >= NCPU || cells != as)
    40. collide = false; // 如果当前cells长度大于CPU个数则不进行扩容,因为每个cell都使用一个CPU处理时性能才是最高的
    41. // 如果当前cells已经过时(其他线程对cells执行了扩容操作,改变了cells指向),也不会扩容
    42. else if (!collide)
    43. collide = true; // 执行到此处说明a.cas()执行失败,即有冲突,将collide置为true,
    44. // 跳过扩容阶段,重新获取probe,到cells不同位置尝试cas,再次失败则扩容
    45. // 扩容
    46. else if (cellsBusy == 0 && casCellsBusy()) {
    47. try {
    48. if (cells == as) {
    49. Cell[] rs = new Cell[n << 1];
    50. for (int i = 0; i < n; ++i)
    51. rs[i] = as[i];
    52. cells = rs;
    53. }
    54. } finally {
    55. cellsBusy = 0;
    56. }
    57. collide = false;
    58. continue; // 扩容后再次尝试(扩容后cells长度改变,
    59. // 根据(n - 1) & h计算当前线程在cells中对应元素下标会变化,减少再次冲突的可能性)
    60. }
    61. h = advanceProbe(h); // 重新计算线程probe,减小下次访问cells元素时的冲突机会
    62. }
    63. // 初始化cells数组
    64. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    65. boolean init = false;
    66. try {
    67. if (cells == as) {
    68. Cell[] rs = new Cell[2];
    69. rs[h & 1] = new Cell(x);
    70. cells = rs;
    71. init = true;
    72. }
    73. } finally {
    74. cellsBusy = 0;
    75. }
    76. if (init)
    77. break;
    78. }
    79. // 尝试通过base的CAS操作进行add,成功则结束当前函数,否则再次循环
    80. else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
    81. break;
    82. }

    代码比较复杂,细节的解释都写在注释中了。大体逻辑就是判断cells是否为空或者长度为0:如果空或者长度为0则尝试进行cells数组初始化,初始化失败的话则尝试通过CAS操作在base上进行add,仍然失败则重走一次流程;如果cells不为空且长度大于0,则获取当前线程对应于cells中的元素,如果该元素为null则尝试创建,否则尝试通过CAS操作在上面进行add,仍失败则扩容。

    LongAdder是LongAccumulator的特例,两者都继承自Striped64。

    看如下代码:

    LongAccumulator构造器允许传入一个双目运算符接口用于自定义加法规则,还允许传入一个初始值。

    自定义的加法函数是如何被应用的呢?以上提到的longAccumulate()方法中有如下代码:

    1. a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x)))

    LongAdder的add()方法中调用longAccumulate()方法时传入的是null,而LongAccumulator的accumulate()方法传入的是this.function,即自定义的加法函数。

    相关笔记: