目录
一、什么是CAS
二、从一个案例引出CAS
三、Java中的Atomic 原子操作包
1. 基本原子类
2. 数组原子类
3. 引用原子类
4. 字段更新原子类
五、类 AtomicInteger
1、常用的方法:
AtomicInteger 案例:
2、AtomicInteger 源码解析:
六、Unsafe类
1、Unsafe 提供的 CAS 方法
2、获取属性偏移量
3、根据属性的偏移量获取属性的最新值:
七、CAS的缺点
八、以空间换时间:LongAdder
1、LongAdder 的原理
2、longAddr内部结构
4、LongAdder 类的 add 方法
5、LongAdder 类的 casCellsBusy 方法
九、使用 AtomicStampedReference 解决 ABA 问题
1、AtomicStampReference 的构造器:
2、AtomicStampReference 的常用的几个方法如下:
3、案例
一、什么是CAS
CAS,compare and swap的缩写,中文翻译成比较并交换。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。
二、从一个案例引出CAS
案例:
public class Test {
public static int count = 0;
private final static int MAX_TREAD=10;
public static AtomicInteger atomicInteger = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
/*CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。
使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。
当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。*/
CountDownLatch latch = new CountDownLatch(MAX_TREAD);
//匿名内部类
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
count++;
atomicInteger.getAndIncrement();
}
latch.countDown(); // 当前线程调用此方法,则计数减一
}
};
//同时启动多个线程
for (int i = 0; i < MAX_TREAD; i++) {
new Thread(runnable).start();
}
latch.await(); // 阻塞当前线程,直到计数器的值为0
System.out.println("理论结果:" + 1000 * MAX_TREAD);
System.out.println("static count: " + count);
System.out.println("AtomicInteger: " + atomicInteger.intValue());
}
}
输出:
理论结果:10000
static count: 9994
AtomicInteger: 10000
我们发现每次运行,atomicInteger 的结果值都是正确的,count++的结果却不对,下面我们就开始探究原因。
三、Java中的Atomic 原子操作包
并发包中原子类 , 都存放在 java.util.concurrent.atomic 类路径下:
- 基本原子类
- 数组原子类
- 原子引用类型
- 字段更新原子类
1. 基本原子类
- AtomicInteger:整型原子类。
- AtomicLong:长整型原子类。
- AtomicBoolean :布尔型原子类。
2. 数组原子类
- AtomicIntegerArray:整型数组原子类。
- AtomicLongArray:长整型数组原子类。
- AtomicReferenceArray :引用类型数组原子类。
3. 引用原子类
- AtomicReference:引用类型原子类。
- AtomicMarkableReference :带有更新标记位的原子引用类型。
- AtomicStampedReference :带有更新版本号的原子引用类型。
4. 字段更新原子类
- AtomicIntegerFieldUpdater:原子更新整型字段的更新器。
- AtomicLongFieldUpdater:原子更新长整型字段的更新器。
- AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
五、类 AtomicInteger
1、常用的方法:
方法 | 介绍 |
---|---|
public final int get() | 获取当前的值 |
public final int getAndSet(int newValue) | 获取当前的值,然后设置新的值 |
public final int getAndIncrement() | 获取当前的值,然后自增 |
public final int getAndDecrement() | 获取当前的值,然后自减 |
public final int getAndAdd(int delta) | 获取当前的值,并加上预期的值 |
boolean compareAndSet(int expect, int update) | 通过 CAS 方式设置整数值 |
AtomicInteger 案例:
private static void out(int oldValue,int newValue){
System.out.println("旧值:"+oldValue+",新值:"+newValue);
}
public static void main(String[] args) {
int value = 0;
AtomicInteger atomicInteger= new AtomicInteger(0);
//取值,然后设置一个新值
value = atomicInteger.getAndSet(3);
//旧值:0,新值:3
out(value,atomicInteger.get());
//取值,然后自增
value = atomicInteger.getAndIncrement();
//旧值:3,新值:4
out(value,atomicInteger.get());
//取值,然后增加 5
value = atomicInteger.getAndAdd(5);
//旧值:4,新值:9
out(value,atomicInteger.get());
//CAS 交换
boolean flag = atomicInteger.compareAndSet(9, 100);
//旧值:4,新值:100
out(value,atomicInteger.get());
}
2、AtomicInteger 源码解析:
public class AtomicInteger extends Number implements java.io.Serializable {
// 设置使用Unsafe.compareAndSwapInt进行更新
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
...省略
private volatile int value;
//自动设置为给定值并返回旧值。
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
//以原子方式将当前值加1并返回旧值。
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//以原子方式将当前值减1并返回旧值。
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
//原子地将给定值添加到当前值并返回旧值。
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
...省略
}
通过源码我们发现AtomicInteger的增减操作都调用了Unsafe 实例的方法,下面我们对Unsafe类做介绍:
六、Unsafe类
是位于
sun.misc 包下的一个类,Unsafe 提供了CAS 方法,直接通过native 方式(封装 C++代码)调用了底层的 CPU 指令 cmpxchg。
sun.misc.Unsafe
,从名字中我们可以看出来这个类对普通程序员来说是“危险”的,一般应用开发者不会用到这个类。
1、Unsafe 提供的 CAS 方法
主要如下: 定义在 Unsafe 类中的三个 “比较并交换”原子方法
/*
@param o 包含要修改的字段的对象
@param offset 字段在对象内的偏移量
@param expected 期望值(旧的值)
@param update 更新值(新的值)
@return true 更新成功 | false 更新失败
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object update);
public final native boolean compareAndSwapInt( Object o, long offset, int expected,int update);
public final native boolean compareAndSwapLong( Object o, long offset, long expected, long update);
提供的
CAS
方法包含四个入参:
包含要修改的字段对象、字段内存位置、预期原值及
Unsafe
的
CAS
方法的时候,这些方法首先将内存位置的值与预期值(旧的值)比
true
;如果不相匹配,
false
。
2、获取属性偏移量
Unsafe 提供的获取字段(属性)偏移量的相关操作,主要如下:
/**
* @param o 需要操作属性的反射
* @return 属性的偏移量
*/
public native long staticFieldOffset(Field field);
public native long objectFieldOffset(Field field);
方法用于获取静态属性
Field
在
Class
对象中的偏移量,在
CAS
操作静态属性时,会用到这个偏移量。
方法用于获取非静态
Field
(非静态属性)在
Object 实例中的偏移量,在 CAS
操作对象的非静态属性时,会用到这个偏移量。
3、根据属性的偏移量获取属性的最新值:
/**
* @param o 字段所属于的对象实例
* @param fieldOffset 字段的偏移量
* @return 字段的最新值
*/
public native int getIntVolatile(Object o, long fieldOffset);
七、CAS的缺点
1. ABA问题。因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。
JDK 提供了两个类 AtomicStampedReference、AtomicMarkableReference 来解决 ABA 问题。
2. 只能保证一个共享变量的原子操作。一个比较简单的规避方法为:把多个共享变量合并成一个共享变量来操作。 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,可以把多个变量放在一个 AtomicReference 实例后再进行 CAS 操作。比如有两个共享变量 i=1、j=2,可以将二者合并成一个对象,然后用 CAS 来操作该合并对象的 AtomicReference 引用。
3. 循环时间长开销大。高并发下N多线程同时去操作一个变量,会造成大量线程CAS失败,然后处于自旋状态,导致严重浪费CPU资源,降低了并发性。
CAS
恶性空自旋的较为常见的方案为:
- 分散操作热点,使用 LongAdder 替代基础原子类 AtomicLong。
- 使用队列削峰,将发生 CAS 争用的线程加入一个队列中排队,降低 CAS 争用的激烈程度。JUC 中非常重要的基础类 AQS(抽象队列同步器)就是这么做的。
八、以空间换时间:LongAdder
1、LongAdder 的原理
如果有竞争的话,内部维护了多个Cell变量,每个Cell里面有一个初始值为0的long型变量,
不同线程会命中到数组的不同Cell
(槽
)中,各个线程只对自己Cell(槽)
中的那个值进行 CAS 操作。这样热点就被分散了,冲突的概率就小很多。
LongAdder 存储的值,只要将各个槽中的变量值累加,后的值即可。
2、longAddr内部结构
Striped64类的重要成员属性:
/**
* cell表,当非空时,大小是2的幂。
*/
transient volatile Cell[] cells;
/**
* 基础值,主要在没有争用时使用
* 在没有争用时使用CAS更新这个值
*/
transient volatile long base;
/**
* 自旋锁(通过CAS锁定) 在调整大小和/或创建cell时使用,
* 为 0 表示 cells 数组没有处于创建、扩容阶段,反之为1
*/
transient volatile int cellsBusy;
内部包含一个
base
和一个
Cell[]
类型的
cells
数组
。 在没有竞争的情况下,要累加的数通过 CAS
累加到
base
上;如果有竞争的话,会将要累加的数累加到 Cells
数组中的某个
cell
元素里面。所以
Striped64
的整体值
value
为
base+
∑
[0~n]cells
。
的获取源码如下:
public long longValue() {
return sum();
}
public long sum() {
Striped64.Cell[] as = cells; Striped64.Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
的设计核心思路就是通过内部的分散计算来避免竞争,以空间换时间。
LongAdder
base
类似于
AtomicInteger
里面的
value
,在没有竞争的情况,cells 数组为
null
,这时只使用
base 做累加;而一旦发生竞争,cells
数组就上场了。
2
,以后每次扩容都是变为原来的两倍,一直到
cells
数组的长
CPU
的核数。为什么呢?同一时刻,能持有
CPU
时间片而去并发操作同
CPU
的核数。
cells[threadLocalRandomProbe & cells.length]
位置的 Cell
元素,该线程对
value
所做的累加操作,就执行在对应的
Cell
元素的值上,最终相当于将线程绑定到了 cells
中的某个
cell
对象上;
4、LongAdder 类的 add 方法
自增
public void increment() {
add(1L);
}
自减
public void decrement() {
add(-1L);
}
add方法
public void add(long x) {
//as: 表示cells引用
//b: base值
//v: 表示当前线程命中的cell的期望值
//m: 表示cells数组长度
//a: 表示当前线程命中的cell
Striped64.Cell[] as; long b, v; int m; Striped64.Cell a;
/*
stop 1:true -> 说明存在竞争,并且cells数组已经初始化了,当前线程需要将数据写入到对应的cell中
false -> 表示cells未初始化,当前所有线程应该将数据写到base中
stop 2:true -> 表示发生竞争了,可能需要重试或者扩容
false -> 表示当前线程cas替换数据成功
*/
if (
(as = cells) != null //stop 1
||
!casBase(b = base, b + x) //stop 2
) {
/*
进入的条件:
1.cells数组已经初始化了,当前线程需要将数据写入到对应的cell中
2.表示发生竞争了,可能需要重试或者扩容
*/
/*
是否有竞争:true -> 没有竞争
false -> 有竞争*/
boolean uncontended = true;
/*
stop 3:as == null || (m = as.length - 1)<0 代表 cells 没有初始化
stop 4:表示当前线程命中的cell为空,意思是还没有其他线程在同一个位置做过累加操作。
stop 5:表示当前线程命中的cell不为空, 然后在该Cell对象上进行CAS设置其值为v+x(x为该 Cell 需要累加的值),如果CAS操作失败,表示存在争用。
*/
if (as == null || (m = as.length - 1) < 0 || //stop 3
(a = as[getProbe() & m]) == null || //stop 4
!(uncontended = a.cas(v = a.value, v + x))) //stop 5
/*
进入的条件:
1.cells 未初始化
2.当前线程对应下标的cell为空
3.当前线程对应的cell有竞争并且cas失败
*/
longAccumulate(x, null, uncontended);
}
}
longAccumulate方法
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//条件成立: 说明当前线程还未分配hash值
if ((h = getProbe()) == 0) {
//1.给当前线程分配hash值
ThreadLocalRandom.current(); // force initialization
//2.提取当前线程的hash值
h = getProbe();
//3.因为上一步提取了重新分配的新的hash值,所以会重新分配cells数组的位置给当前线程写入,先假设它能找到一个元素不冲突的数组下标。
wasUncontended = true;
}
//扩容意向,collide=true 可以扩容,collide=false 不可扩容
boolean collide = false; // True if last slot nonempty
//自旋,一直到操作成功
for (;;) {
//as: 表示cells引用
//a: 当前线程命中的cell
//n: cells数组长度
//a: 表示当前线程命中的cell的期望值
Striped64.Cell[] as; Striped64.Cell a; int n; long v;
//CASE1: cells数组已经初始化了,当前线程将数据写入到对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
//CASE1.1: true 表示下标位置的 cell 为 null,需要创建 new Cell
if ((a = as[(n - 1) & h]) == null) {
// cells 数组没有处于创建、扩容阶段
if (cellsBusy == 0) { // Try to attach new Cell
Striped64.Cell r = new Striped64.Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Striped64.Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)//创建、扩容成功,退出自旋
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// CASE1.2:当前线程竞争修改cell失败
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// CASE 1.3:当前线程 rehash 过 hash 值,CAS 更新 Cell
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// CASE1.4:判断是否可以扩容
// CASE1.4.1:n >= NCPU
// true -> cells数组长度已经 >= cpu核数,不可进行扩容,把扩容意向改为false
// false -> 可扩容
// CASE1.4.2:cells != as
// true -> 其它线程已经扩容过了,当前线程rehash之后重试即可
// false -> 未有线程对cells进行修改
else if (n >= NCPU || cells != as)
collide = false; // 把扩容意向改为false
// CASE 1.5:设置扩容意向为 true,但是不一定真的发生扩容
else if (!collide)
collide = true;
//CASE1.6:真正扩容的逻辑
// CASE1.6.1:cellsBusy == 0
// true -> 表示cells没有被其它线程占用,当前线程可以去竞争锁
// false -> 表示有其它线程正在操作cells
// CASE1.6.2:casCellsBusy()
// true -> 表示当前线程获取锁成功,可以进行扩容操作
// false -> 表示当前线程获取锁失败,当前时刻有其它线程在做扩容相关的操作
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//重复判断一下当前线程的临时cells数组是否与原cells数组一致(防止有其它线程提前修改了cells数组,因为cells是volatile的全局变量)
if (cells == as) { // Expand table unless stale
//n << 1 表示数组长度翻一倍
Striped64.Cell[] rs = new Striped64.Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
//扩容后,将扩容意向置为false
collide = false;
continue; // Retry with expanded table
}
//重置当前线程hash值
h = advanceProbe(h);
}
//CASE2:cells 还未初始化(as 为 null),并且 cellsBusy 加锁成功
// CASE2.1:判断锁是否被占用
// true -> 表示当前未加锁
// false -> 表示当前已加锁
// CASE2.2:因为其它线程可能会在当前线程给as赋值之后修改了cells
// true -> cells没有被其它线程修改
// false -> cells已经被其它线程修改
// CASE2.3:获取锁
// true -> 获取锁成功 会把cellsBusy = 1
// false -> 表示其它线程正在持有这把锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
//双重检查,防止其它线程已经初始化,当前线程再次初始化,会导致数据丢失
// Initialize table
if (cells == as) {
Striped64.Cell[] rs = new Striped64.Cell[2];
rs[h & 1] = new Striped64.Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//CASE3:当前线程 cellsBusy 加锁失败,表示其他线程正在初始化 cells
//所以当前线程将值累加到 base,注意 add(…)方法调用此方法时 fn 为 null
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
5、LongAdder 类的 casCellsBusy 方法
方法的代码很简单,就是将
cellsBusy
成员的值改为
1
,表示目前的
cells
数组在
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
九、使用 AtomicStampedReference 解决 ABA 问题
JDK 的提供了一个类似 AtomicStampedReference 类来解决 ABA 问题。
在
CAS
的基础上增加了一个
Stamp 整型
印戳(或标记),使用这个印戳可以来觉察数据是否发生变化,给数据带上了一种实效性的检验。
的
compareAndSet
方法首先检查当前的对象引用值是否等于预期引用,
Stamp
)标志是否等于预期标志,如果全部相等,则以原子方式将引用值和印戳
Stamp
)标志的值更新为给定的更新值。
1、AtomicStampReference 的构造器:
/**
* @param initialRef初始引用
* @param initialStamp初始戳记
*\
AtomicStampedReference(V initialRef, int initialStamp)
2、AtomicStampReference 的常用的几个方法如下:
方法 | 介绍 |
public V getRerference()
|
引用的当前值 |
public int getStamp()
|
返回当前的"戳记" |
public boolean weakCompareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) |
expectedReference 引用的旧值 newReference 引用的新值 expectedStamp 旧的戳记 newStamp 新的戳记 |
3、案例
public static void main(String[] args) {
boolean success = false;
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(1, 0);
int stamp = atomicStampedReference.getStamp();
success = atomicStampedReference.compareAndSet(1, 0, stamp, stamp + 1);
System.out.println("success:" + success + ";reference:" + "" + atomicStampedReference.getReference() + ";stamp:" + atomicStampedReference.getStamp());
//修改印戳,更新失败
stamp = 0;
success = atomicStampedReference.compareAndSet(0, 1, stamp, stamp + 1);
System.out.println("success:" + success + ";reference:" + "" + atomicStampedReference.getReference() + ";stamp:" + atomicStampedReference.getStamp());
}
输出:
success:true;reference:0;stamp:1
success:false;reference:0;stamp:1