月泉的博客

JUC-并发原子类Long系列原理剖析

月泉 并发编程JUC

JUC-并发原子类Long系列原理剖析

为什么需要并发原子类

在并发编程中时常会遇见一个共享资源的读改写,如果要保证这些共享资源的读改写的原子操作只能通过2种方式,一种是锁的方式一种是通过CAS的方式,如果利用锁那么将有可能造成大量的阻塞导致内核态用户态及线程唤醒带来的性能开销,那么如果只是针对单一资源的读改写保证其原子操作利用CAS是最佳的做法了,因为其不会导致线程阻塞挂起。

如何使用

private static AtomicLong count = new AtomicLong();
private static int EXCEPT = 0;
public static void main(String[] args) throws InterruptedException {
    Integer[] groupA = new Integer[]{0,2,1,5,1,1,0,3,1,3};
    Integer[] groupB = new Integer[]{3,5,66,122,4,11,0,112,0};

    Thread threadA = new Thread(() -> {
        for (int i = 0; i < groupA.length; i++) {
            if(groupA[i].intValue() == EXCEPT){
                count.getAndIncrement();
            }
        }
    });

    Thread threadB = new Thread(() -> {
        for (int i = 0; i < groupB.length; i++) {
            if(groupB[i].intValue() == EXCEPT){
                count.getAndIncrement();
            }
        }
    });

    threadA.start();
    threadB.start();
    threadA.join();
    threadB.join();
    System.out.println(count.get());
}

通过getAndIncrement方法来获取并对原值+1,通过get可以获取当前变量的值

其背后的机制是什么?

既然已经提过它是CAS算法来实现,但还是要看下其源码

该类定义了几个全局变量和常量

// Unsafe实例....
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
// value 字段的偏移量
private static final long VALUE = U.objectFieldOffset(AtomicLong.class, "value");
// value 值
private volatile long value;

值得注意的是value是被volatile关键字锁修饰的,使用该关键字来保证该字段的修改能够立即对其它线程可见

接着是getAndIncrement

public final long getAndIncrement() {
    return U.getAndAddLong(this, VALUE, 1L);
}

通过调用unsafe实例中的getAndAddLong对当前对象的VALUE偏移量上的值加1,那么接着继续看下getAndAddLong

public final long getAndAddLong(Object o, long offset, long delta) {
    long v;
    do {
        v = getLongVolatile(o, offset);
    } while (!weakCompareAndSetLong(o, offset, v, v + delta));
    return v;
}

根据源码可以得知此处修改值是使用的自旋CAS来对值进行修改的,首先先获取到偏移量位置的值,然后利用CAS修改如果修改失败则重复执行,否则修改成功返回

接着看下get

public final long get() {
    return value;
}

就是很简单的返回

JDK1.8增加的LongAddr

动机

AtomicLong上可以看见确实是解决了在多线程并发中对共享资源的读改写保证了其原子操作,但是如果在并发量大的情况下还是会出现性能不佳的现象,何以见得?可以从getAndIncrement调用getAndAddLong中的源码来看,采用的是自旋CAS的算法,在数据竞争大的情况下,如果频繁出现自旋势必也会影响性能 ,为了解决这个问题JDK1.8新增加了一个类LongAddr

如何使用

private static LongAdder count = new LongAdder();
private static int EXCEPT = 0;
public static void main(String[] args) throws InterruptedException {
    Integer[] groupA = new Integer[]{0,2,1,5,1,1,0,3,1,3};
    Integer[] groupB = new Integer[]{3,5,66,122,4,11,0,112,0};

    Thread threadA = new Thread(() -> {
        for (int i = 0; i < groupA.length; i++) {
            if(groupA[i].intValue() == EXCEPT){
                count.increment();
            }
        }
    });

    Thread threadB = new Thread(() -> {
        for (int i = 0; i < groupB.length; i++) {
            if(groupB[i].intValue() == EXCEPT){
                count.increment();
            }
        }
    });

    threadA.start();
    threadB.start();
    threadA.join();
    threadB.join();
    System.out.println(count.sum());
}

从代码上可以看到,通过increment方法给值加1,通过sum来获得总和

背后的机制

从用法上看和AtomicLong没有什么太大的区别无非就是API有所变动,那么为什么还要再造一个对Long的原子操作类呢?那么它又是如何解决数据竞争所带来的频繁自旋呢?首先看下这个类的结构

public class LongAdder extends Striped64 implements Serializable {
    ...
}

从类结构上来看该类是继承至Striped64

abstract class Striped64 extends Number {
    @sun.misc.Contended static final class Cell {
        ......
    }
    
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

该类在静态块将线程对象的threadLocalRandomProbeStriped64的变量的偏移量取到和赋值,还可以发现该类定义了一个Cell的内部类,那么这个类是干嘛呢的?先看下它的源码在介绍下LongAddr的运转机制

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

首先可以看到Cell类是被@Contented注解所修饰的一个类,从语义上来说是要被填充单独占一个缓存行的解决伪共享的这个问题,然后其自身记录了自身value值的偏移量和拥有一个value

接着聊一下它背后的运转机制

首先我们看AtomicLong对值的操作,是一对多

AtomicLong

再看LongAdder

LongAdder

从上述图中可以看到多个线程对应一个LongAdder实例操作的时候其背后实际上是在对其中的一个Cell进行操作

首先看increment的时候发生了什么

public void increment() {
    add(1L);
}

调用了add方法并传入了long类型的参数1

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

再看add方法,首先它会判断当前实例的cells是否为空,然后再调用了casBase传入了bb+xb的值第一次是0

transient volatile long base;
final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

casBase的源码可以很 轻松的看出来判断当前对象的BASE偏移量位置的值是否是cmp如果是就更新成val,然后base就记录下来了当前加1的值,如果此时发生了数据冲突产生了竞争,cas更新失败就会返回false然后这里的if判断取反接着就会进入if的代码块

紧跟着if代码块中又有一个if,它首先判断cells是否为空,如果不为空则判断cells的长度是否小于0(此时的赋值别忘了),如果不小于0接着就会调用getProbe()&m实际上这一个地方的作用就是限定范围内的取值,然后赋值给a接着看getProbe是什么

static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

从当前线程对象拿到PROBE偏移量位置的值,实际上也就是线程对象中的实例变量threadLocalRandomProbe

接着就会调用a.cas对当前值进行更新(更新cell中的值),如果进入了if块就会调用longAccumulate

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    int h;
     // probe为0就初始化
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current() 
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    // 自旋
    for (;;) { 
        Cell[] as; Cell a; int n; long v;
        // cells 不为空(已被初始化)
        if ((as = cells) != null && (n = as.length) > 0) {
            // 范围取余处位置的数据cell是否为空
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // 尝试创建新的cell
                    Cell r = new Cell(x);   // Optimistically create
                     // 避免竞态条件带来的数据竞争使用cas的方式修改cellsBusy
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // 给cells[位置]赋上新创建的值
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // CAS 更新a处的值
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                // 更新成功终止自旋
                break;
            // 判断当前cells数组的数量是否大于处理器的数量(最大就是CPU的数量)或者已不是相对新值
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            // 扩容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // 判断当前的值是否还是相对新值
                        Cell[] rs = new Cell[n << 1]; //每次扩容都会翻倍
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2]; // 第一次初始化数量为2
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 尝试利用cas更新base的值
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

初步总结一下,先去更新base如果发生了竞争就会进入longAccumulate逻辑 =》如果已经被初始化了就会判断取模(限制范围)拿到的值处是否为null如果为null就会创建一个新实例cell,否则就会利用cas去更新该处的值更新失败代表发生了竞争则继续判断是否大于当前CPU数量如果大于了继续自旋否则扩容每次扩容是当前数组的2倍,如果在第一次未初始化就会默认初始化一个长度为2的cell,前2者也就是未被初始化,然后要初始化时发生了竞争,就会尝试利用cas去更新base的值。

然后再来看下sum的源码

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

可以看到sum就是将base + cells[i]中的值累加起来,需要注意的是该方法并没有保证其原子性,在并发操作频繁的时候可能获取的数据是不准确的。

总结

Java8新增的LongAdder在性能上是要比AtomicLong好,当时比它更加耗费空间(具体依论网上有很多测试可以自行查询),当时AtomicLong也有AtomicLong的好处,它由于是针对一个变量可以很轻松的就拿到更新后的值,但LongAdder就不好拿了,所以看具体场景并不需要立即获取更新后的值可以直接使用LongAdder,如果要获取立即更新的值还是使用AtomicLong会更加方便。

月泉
伪文艺中二青年,热爱技术,热爱生活。