Java高并发系列——检视阅读(六)
Java高并发系列——CAS
Java中的CAS
需求:我们开发了一个网站,需要对访问量进行统计,用户每次发一次请求,访问量+1,如何实现呢?
我们在看一下count++操作,count++操作实际上是被拆分为3步骤执行:
1. 获取count的值,记做A:A=count
2. 将A的值+1,得到B:B = A+1
3. 让B赋值给count:count = B
方式2中我们通过加锁的方式让上面3步骤同时只能被一个线程操作,从而保证结果的正确性。
我们是否可以只在第3步加锁,减少加锁的范围,对第3步做以下处理:
获取锁
第三步获取一下count最新的值,记做LV
判断LV是否等于A,如果相等,则将B的值赋给count,并返回true,否者返回false
释放锁
如果我们发现第3步返回的是false,我们就再次去获取count,将count赋值给A,对A+1赋值给B,然后再将A、B的值带入到上面的过程中执行,直到上面的结果返回true为止。
示例:(自己实现一个CAS)
public class CASTest {
private static volatile int count = 0;
private static void request() throws InterruptedException {
//模拟耗时5毫秒
TimeUnit.MILLISECONDS.sleep(5);
int execeptVal;
do {
execeptVal = getCount();
} while (!compareAndSet(execeptVal, execeptVal + 1));
}
private static synchronized boolean compareAndSet(int execeptVal, int newVal) {
if (getCount() == execeptVal) {
count = newVal;
return true;
}
return false;
}
public static int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
long starTime = System.currentTimeMillis();
ExecutorService threadPool = Executors.newCachedThreadPool();
int userCount = 100;
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < userCount; i++) {
threadPool.execute(() -> {
try {
for (int j = 0; j < 10; j++) {
CASTest.request();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ",耗时:" + (endTime - starTime) + ",count=" + count);
threadPool.shutdown();
}
}
输出:
main,耗时:133,count=1000
代码中用了volatile
关键字修饰了count,可以保证count在多线程情况下的可见性。
咱们再看一下代码,compareAndSwap
方法,我们给起个简称吧叫CAS.
这个方法使用synchronized
修饰了,能保证此方法是线程安全的,多线程情况下此方法是串行执行的。方法由两个参数,expectCount:表示期望的值,newCount:表示要给count设置的新值。方法内部通过getCount()
获取count当前的值,然后与期望的值expectCount比较,如果期望的值和count当前的值一致,则将新值newCount赋值给count。
再看一下request()方法,方法中有个do-while循环,循环内部获取count当前值赋值给了expectCount,循环结束的条件是compareAndSwap
返回true,也就是说如果compareAndSwap如果不成功,循环再次获取count的最新值,然后+1,再次调用compareAndSwap方法,直到compareAndSwap
返回成功为止。
代码中相当于将count++拆分开了,只对最后一步加锁了,减少了锁的范围,此代码的性能是不是比方式2快不少,还能保证结果的正确性。大家是不是感觉这个compareAndSwap
方法挺好的,这东西确实很好,java中已经给我们提供了CAS的操作,功能非常强大,我们继续向下看。
CAS
CAS,compare and swap的缩写,中文翻译成比较并交换。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该 位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前 值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”
通常将 CAS 用于同步的方式是从地址 V 读取值 A,执行多步计算来获得新 值 B,然后使用 CAS 将 V 的值从 A 改为 B。如果 V 处的值尚未同时更改,则 CAS 操作成功。
很多地方说CAS操作是非阻塞的,其实系统底层进行CAS操作的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,所以同一芯片上的其他处理器就暂时不能通过总线访问内存,保证了该指令在多处理器环境下的原子性。总线上锁的,其他线程执行CAS还是会被阻塞一下,只是时间可能会非常短暂,所以说CAS是非阻塞的并不正确,只能说阻塞的时间是非常短的。
java中提供了对CAS操作的支持,具体在sun.misc.Unsafe
类中,声明如下:
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
上面三个方法都是类似的,主要对4个参数做一下说明。
var1:表示要操作的对象
var2:表示要操作对象中属性地址的偏移量
var4:表示需要修改数据的期望的值
var5:表示需要修改为的新值
悲观锁 (ReentrantLock
)VS 乐观锁 (CAS)
synchronized
、ReentrantLock
这种独占锁属于悲观锁,它是在假设需要操作的代码一定会发生冲突的,执行代码的时候先对代码加锁,让其他线程在外面等候排队获取锁。悲观锁如果锁的时间比较长,会导致其他线程一直处于等待状态,像我们部署的web应用,一般部署在tomcat中,内部通过线程池来处理用户的请求,如果很多请求都处于等待获取锁的状态,可能会耗尽tomcat线程池,从而导致系统无法处理后面的请求,导致服务器处于不可用状态。
除此之外,还有乐观锁,乐观锁的含义就是假设系统没有发生并发冲突,先按无锁方式执行业务,到最后了检查执行业务期间是否有并发导致数据被修改了,如果有并发导致数据被修改了 ,就快速返回失败,这样的操作使系统并发性能更高一些。cas中就使用了这样的操作。
关于乐观锁这块,想必大家在数据库中也有用到过,给大家举个例子,可能以后会用到。
如果你们的网站中有调用支付宝充值接口的,支付宝那边充值成功了会回调商户系统,商户系统接收到请求之后怎么处理呢?假设用户通过支付宝在商户系统中充值100,支付宝那边会从用户账户中扣除100,商户系统接收到支付宝请求之后应该在商户系统中给用户账户增加100,并且把订单状态置为成功。
那我们可以用乐观锁来实现,给订单表加个版本号version,要求每次更新订单数据,将版本号+1,那么上面的过程可以改为:
获取订单信息,将version的值赋值给V_A
if(订单状态==待处理){
开启事务
给用户账户增加100
update影响行数 = update 订单表 set version = version + 1 where id = 订单号 and version = V_A;
if(update影响行数==1){
提交事务
}else{
回滚事务
}
}
返回订单处理成功
上面的update语句相当于我们说的CAS操作,执行这个update语句的时候,多线程情况下,数据库会对当前订单记录加锁,保证只有一条执行成功,执行成功的,影响行数为1,执行失败的影响行数为0,根据影响行数来决定提交还是回滚事务。上面操作还有一点是将事务范围缩小了,也提升了系统并发处理的性能。
CAS 的问题
cas这么好用,那么有没有什么问题呢?
ABA问题
CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。这就是CAS的ABA问题。常见的解决思路是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A
就会变成1A-2B-3A
。目前在JDK的atomic包里提供了一个类AtomicStampedReference
来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
循环时间长开销大
上面我们说过如果CAS不成功,则会原地循环(自旋操作),如果长时间自旋会给CPU带来非常大的执行开销。并发量比较大的情况下,CAS成功概率可能比较低,可能会重试很多次才会成功。
并发量大的情况下应该改为悲观锁避免自旋带来的CPU的大量开销。
使用JUC中的类实现计数器
juc框架中提供了一些原子操作,底层是通过Unsafe类中的cas操作实现的。通过原子操作可以保证数据在并发情况下的正确性。
示例:
public class CASTest1 {
private static AtomicInteger count = new AtomicInteger(0);
private static void request() throws InterruptedException {
//模拟耗时5毫秒
TimeUnit.MILLISECONDS.sleep(5);
count.getAndIncrement();
}
public static void main(String[] args) throws InterruptedException {
long starTime = System.currentTimeMillis();
ExecutorService threadPool = Executors.newCachedThreadPool();
int userCount = 100;
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < userCount; i++) {
threadPool.execute(() -> {
try {
for (int j = 0; j < 10; j++) {
CASTest1.request();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ",耗时:" + (endTime - starTime) + ",count=" + count);
threadPool.shutdown();
}
}
JUC底层工具类Unsafe
juc中大部分类都是依赖于Unsafe来实现的,主要用到了Unsafe中的CAS、线程挂起、线程恢复等相关功能。所以如果打算深入了解JUC原理的,必须先了解一下Unsafe类。
Unsafe类的功能图:
image.pngUnsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。
从Unsafe功能图上看出,Unsafe提供的API大致可分为内存操作、CAS、Class相关、对象操作、线程调度、系统信息获取、内存屏障、数组操作等几类,本文主要介绍3个常用的操作:CAS、线程调度、对象操作。
看一下UnSafe的源码部分:
public final class Unsafe {
// 单例对象
private static final Unsafe theUnsafe;
private Unsafe() {
}
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
// 仅在引导类加载器`BootstrapClassLoader`加载时才合法
if(!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
}
从代码中可以看出,Unsafe类为单例实现,提供静态方法getUnsafe获取Unsafe实例,内部会判断当前调用者是否是由系统类加载器加载的,如果不是系统类加载器加载的,会抛出SecurityException
异常。
获取Unsafe的两种方式:
- 可以把我们的类放在jdk的lib目录下,那么启动的时候会自动加载,这种方式不是很好。
-
通过反射可以获取到
Unsafe
中的theUnsafe
字段的值,这样可以获取到Unsafe对象的实例。
通过反射获取Unsafe实例
public class UnsafeTest {
//通过反射获取Unsafe实例
private static Unsafe unsafe;
static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
//基本上你通过反射得到的字段就像任何其他字段一样,但是当你调用get方法时,你传递的是null,因为没有实例可以作用。
//field.get(null)方法参数传递的是实例,而静态域是没有实例的,获取静态变量直接用field.get(null)。
unsafe = (Unsafe) field.get(null);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
System.out.println(unsafe);
}
}
Unsafe中的CAS操作
看一下Unsafe中CAS相关方法定义:
/**
* CAS 操作
*
* @param o 包含要修改field的对象
* @param offset 对象中某field的偏移量
* @param expected 期望值
* @param update 更新值
* @return true | false
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object update);
public final native boolean compareAndSwapInt(Object o, long offset, int expected,int update);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long update);
什么是CAS? 即比较并替换,实现并发算法时常用到的一种技术。CAS操作包含三个操作数——内存位置、预期原值及新值。执行CAS操作的时候,将内存位置的值与预期原值比较,如果相匹配,那么处理器会自动将该位置值更新为新值,否则,处理器不做任何操作,多个线程同时执行cas操作,只有一个会成功。我们都知道,CAS是一条CPU的原子指令(cmpxchg指令:读作compare and change),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现的, 其实在这一点上还是有排他锁的,只是比起用synchronized, 这里的排他时间要短的多, 所以在多线程情况下性能会比较好。
说一下offset,offeset为字段的偏移量,每个对象有个地址,offset是字段相对于对象地址的偏移量,对象地址记为baseAddress,字段偏移量记为offeset,那么字段对应的实际地址就是baseAddress+offeset,所以cas通过对象、偏移量就可以去操作字段对应的值了。
CAS在AtomicInteger上的应用
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
//初始化时获取到AtomicInteger的字段value的字段的偏移量offeset
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
//原子加1,并返回加之前的值
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//原子加delta,并返回加之前的值
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
}
Unsafe中原子操作相关方法介绍
5个方法,内部通过自旋的CAS操作实现的,这些方法都可以保证操作的数据在多线程环境中的原子性,正确性。 看一下实现:
/**
* int类型值原子操作,对var2地址对应的值做原子增加操作(增加var4)
*
* @param var1 操作的对象
* @param var2 var2字段内存地址偏移量
* @param var4 需要加的值
* @return
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while (!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
/**
* long类型值原子操作,对var2地址对应的值做原子增加操作(增加var4)
*
* @param var1 操作的对象
* @param var2 var2字段内存地址偏移量
* @param var4 需要加的值
* @return 返回旧值
*/
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while (!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
}
/**
* int类型值原子操作方法,将var2地址对应的值置为var4
*
* @param var1 操作的对象
* @param var2 var2字段内存地址偏移量
* @param var4 新值
* @return 返回旧值
*/
public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while (!this.compareAndSwapInt(var1, var2, var5, var4));
return var5;
}
/**
* long类型值原子操作方法,将var2地址对应的值置为var4
*
* @param var1 操作的对象
* @param var2 var2字段内存地址偏移量
* @param var4 新值
* @return 返回旧值
*/
public final long getAndSetLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while (!this.compareAndSwapLong(var1, var2, var6, var4));
return var6;
}
/**
* Object类型值原子操作方法,将var2地址对应的值置为var4
*
* @param var1 操作的对象
* @param var2 var2字段内存地址偏移量
* @param var4 新值
* @return 返回旧值
*/
public final Object getAndSetObject(Object var1, long var2, Object var4) {
Object var5;
do {
var5 = this.getObjectVolatile(var1, var2);
} while (!this.compareAndSwapObject(var1, var2, var5, var4));
return var5;
}
使用Unsafe实现一个网站计数功能:
public class UnsafeCountTest {
//通过反射获取Unsafe实例
private static Unsafe unsafe;
//count在Demo.class对象中的地址偏移量
private static long valueOffset;
//用来记录网站访问量,每次访问+1
private static int count;
//private volatile static int count;
static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
//基本上你通过反射得到的字段就像任何其他字段一样,但是当你调用get方法时,你传递的是null,因为没有实例可以作用。
//field.get(null)方法参数传递的是实例,而静态域是没有实例的,获取静态变量直接用field.get(null)。
unsafe = (Unsafe) field.get(null);
Field fieldC = UnsafeCountTest.class.getDeclaredField("count");
valueOffset = unsafe.staticFieldOffset(fieldC);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
private static void request() throws InterruptedException {
//模拟耗时5毫秒
TimeUnit.MILLISECONDS.sleep(5);
//对count原子加1
unsafe.getAndAddInt(UnsafeCountTest.class,valueOffset,1);
}
public static void main(String[] args) throws InterruptedException {
long starTime = System.currentTimeMillis();
ExecutorService threadPool = Executors.newCachedThreadPool();
int userCount = 100;
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < userCount; i++) {
threadPool.execute(() -> {
try {
for (int j = 0; j < 10; j++) {
UnsafeCountTest.request();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ",耗时:" + (endTime - starTime) + ",count=" + count);
threadPool.shutdown();
}
}
输出:
main,耗时:157,count=1000
Unsafe中线程调度相关方法
这部分,包括线程挂起、恢复、锁机制等方法。
//取消阻塞线程
public native void unpark(Object thread);
//阻塞线程,isAbsolute:是否是绝对时间,如果为true,time是一个绝对时间,如果为false,time是一个相对时间,time表示纳秒
public native void park(boolean isAbsolute, long time);
//获得对象锁(可重入锁)
@Deprecated
public native void monitorEnter(Object o);
//释放对象锁
@Deprecated
public native void monitorExit(Object o);
//尝试获取对象锁
@Deprecated
public native boolean tryMonitorEnter(Object o);
调用park
后,线程将被阻塞,直到unpark
调用或者超时,如果之前调用过unpark
,不会进行阻塞,即park
和unpark
不区分先后顺序。monitorEnter、monitorExit、tryMonitorEnter 3个方法已过期,不建议使用了。
线程中相当于有个许可,许可默认是0,调用park的时候,发现是0会阻塞当前线程,调用unpark之后,许可会被置为1,并会唤醒当前线程。如果在park之前先调用了unpark方法,执行park方法的时候,不会阻塞。park方法被唤醒之后,许可又会被置为0。多次调用unpark的效果是一样的,许可还是1。
juc中的LockSupport
类是通过unpark和park方法实现的。
实例:
//表示一直阻塞等待
unsafe.park(false, 0);
//取消阻塞线程
unsafe.unpark(thread);
//线程挂起3秒,超时等待
unsafe.park(false, TimeUnit.SECONDS.toNanos(3));
Unsafe锁示例——已废弃
//模拟访问一次
public static void request() {
unsafe.monitorEnter(Demo4.class);
try {
count++;
} finally {
unsafe.monitorExit(Demo4.class);
}
}
注意:
- monitorEnter、monitorExit、tryMonitorEnter 3个方法已过期,不建议使用了
- monitorEnter、monitorExit必须成对出现,出现的次数必须一致,也就是说锁了n次,也必须释放n次,否则会造成死锁
Unsafe中保证变量的可见性的方法——相当于对要读取和修改的变量加volatile
关于变量可见性需要先了解java内存模型JMM。
java中操作内存分为主内存和工作内存,共享数据在主内存中,线程如果需要操作主内存的数据,需要先将主内存的数据复制到线程独有的工作内存中,操作完成之后再将其刷新到主内存中。如线程A要想看到线程B修改后的数据,需要满足:线程B修改数据之后,需要将数据从自己的工作内存中刷新到主内存中,并且A需要去主内存中读取数据。
被关键字volatile修饰的数据,有2点语义:
- 如果一个变量被volatile修饰,读取这个变量时候,会强制从主内存中读取,然后将其复制到当前线程的工作内存中使用
- 给volatile修饰的变量赋值的时候,会强制将赋值的结果从工作内存刷新到主内存
上面2点语义保证了被volatile修饰的数据在多线程中的可见性。
Unsafe中提供了和volatile语义一样的功能的方法,如下:
//设置给定对象的int值,使用volatile语义,即设置后立马更新到内存对其他线程可见
public native void putIntVolatile(Object o, long offset, int x);
//获得给定对象的指定偏移量offset的int值,使用volatile语义,总能获取到最新的int值。
public native int getIntVolatile(Object o, long offset);
putIntVolatile方法,2个参数:
o:表示需要操作的对象
offset:表示操作对象中的某个字段地址偏移量
x:将offset对应的字段的值修改为x,并且立即刷新到主存中
调用这个方法,会强制将工作内存中修改的数据刷新到主内存中。
getIntVolatile方法,2个参数
o:表示需要操作的对象
offset:表示操作对象中的某个字段地址偏移量
每次调用这个方法都会强制从主内存读取值,将其复制到工作内存中使用。
其他的还有几个putXXXVolatile、getXXXVolatile方法和上面2个类似。
JUC中原子类
JUC中原子类介绍
什么是原子操作?
atomic 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰,所以,所谓原子类说简单点就是具有原子操作特征的类,原子操作类提供了一些修改数据的方法,这些方法都是原子操作的,在多线程情况下可以确保被修改数据的正确性。
JUC中对原子操作提供了强大的支持,这些类位于java.util.concurrent.atomic包中.
JUC中原子类思维导图
image.png基本类型原子类
使用原子的方式更新基本类型
- AtomicInteger:int类型原子类
- AtomicLong:long类型原子类
- AtomicBoolean :boolean类型原子类
上面三个类提供的方法几乎相同,这里以 AtomicInteger 为例子来介绍。
AtomicInteger 类常用方法
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
部分源码
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
2个关键字段说明:
value:使用volatile修饰,可以确保value在多线程中的可见性。
valueOffset:value属性在AtomicInteger中的偏移量,通过这个偏移量可以快速定位到value字段,这个是实现AtomicInteger的关键。
getAndIncrement源码:
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
内部调用的是Unsafe类中的getAndAddInt方法,我们看一下getAndAddInt源码:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
说明:
this.getIntVolatile:可以确保从主内存中获取变量最新的值。compareAndSwapInt:CAS操作,CAS的原理是拿期望的值和原本的值作比较,如果相同则更新成新的值,可以确保在多线程情况下只有一个线程会操作成功,不成功的返回false。
上面有个do-while循环,compareAndSwapInt返回false之后,会再次从主内存中获取变量的值,继续做CAS操作,直到成功为止。
getAndAddInt操作相当于线程安全的count++操作,如同:
synchronize(lock){
count++;
}
count++操作实际上是被拆分为3步骤执行:
- 获取count的值,记做A:A=count
- 将A的值+1,得到B:B = A+1
- 让B赋值给count:count = B
多线程情况下会出现线程安全的问题,导致数据不准确。synchronize的方式会导致占时无法获取锁的线程处于阻塞状态,性能比较低。CAS的性能比synchronize要快很多。
数组类型原子类介绍
使用原子的方式更新数组里的某个元素,可以确保修改数组中数据的线程安全性。
- AtomicIntegerArray:整形数组原子操作类
- AtomicLongArray:长整形数组原子操作类
- AtomicReferenceArray :引用类型数组原子操作类
上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerArray 为例子来介绍。
AtomicIntegerArray 类常用方法
public final int get(int i) //获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue
public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int i, int delta) //获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int i, int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update)
public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
示例
统计网站页面访问量,假设网站有10个页面,现在模拟100个人并行访问每个页面10次,然后将每个页面访问量输出,应该每个页面都是1000次,代码如下:
public class AtomicIntegerArrayTest {
private static AtomicIntegerArray array = new AtomicIntegerArray(10);
private static void request(int page) throws InterruptedException {
//模拟耗时5毫秒
TimeUnit.MILLISECONDS.sleep(5);
array.getAndIncrement(page-1);
}
public static void main(String[] args) throws InterruptedException {
long starTime = System.currentTimeMillis();
ExecutorService threadPool = Executors.newCachedThreadPool();
int userCount = 100;
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < userCount; i++) {
threadPool.execute(() -> {
try {
for (int j = 0; j < 10; j++) {
for (int k = 1; k <= 10; k++) {
AtomicIntegerArrayTest.request(k);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ",耗时:" + (endTime - starTime) + ",array=" + array.toString());
threadPool.shutdown();
}
}
输出:
main,耗时:672,array=[1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000]
引用类型原子类介绍
基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用 引用类型原子类。
- AtomicReference:引用类型原子类
- AtomicStampedRerence:原子更新引用类型里的字段原子类
- AtomicMarkableReference :原子更新带有标记位的引用类型
AtomicReference 和 AtomicInteger 非常类似,不同之处在于 AtomicInteger是对整数的封装,而AtomicReference则是对应普通的对象引用,它可以确保你在修改对象引用时的线程安全性。在介绍AtomicReference的同时,我们先来了解一个有关原子操作逻辑上的不足。
ABA问题
之前我们说过,线程判断被修改对象是否可以正确写入的条件是对象的当前值和期望值是否一致。这个逻辑从一般意义上来说是正确的,但是可能出现一个小小的例外,就是当你获得当前数据后,在准备修改为新值前,对象的值被其他线程连续修改了两次,而经过这2次修改后,对象的值又恢复为旧值,这样,当前线程就无法正确判断这个对象究竟是否被修改过,这就是所谓的ABA问题,可能会引发一些问题。
举个例子
有一家蛋糕店,为了挽留客户,决定为贵宾卡客户一次性赠送20元,刺激客户充值和消费,但条件是,每一位客户只能被赠送一次,现在我们用AtomicReference
来实现这个功能,代码如下:
public class AtomicReferenceTest1 {
//账户原始余额
static int accountMoney = 19;
//用于对账户余额做原子操作
static AtomicReference<Integer> money = new AtomicReference<>(accountMoney);
/**
* 模拟2个线程同时更新后台数据库,为用户充值
*/
static void recharge() {
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
Integer m = money.get();
if (m == accountMoney) {
if (money.compareAndSet(m, m + 20)) {
System.out.println("当前余额:" + m + ",充值20元成功,余额:" + money.get() + "元");
}
}
//休眠100ms
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
/**
* 模拟用户消费
*/
static void consume() throws InterruptedException {
for (int i = 0; i < 5; i++) {
Integer m = money.get();
if (m > 20) {
if (money.compareAndSet(m, m - 20)) {
System.out.println("当前余额:" + m + ",成功消费10元,余额:" + money.get() + "元");
}
}
//休眠50ms
TimeUnit.MILLISECONDS.sleep(50);
}
}
public static void main(String[] args) throws InterruptedException {
recharge();
consume();
}
}
输出:
当前余额:19,充值20元成功,余额:39元
当前余额:39,成功消费10元,余额:19元
当前余额:39,成功消费10元,余额:19元
当前余额:19,充值20元成功,余额:19元
当前余额:19,充值20元成功,余额:39元
当前余额:39,成功消费10元,余额:19元
当前余额:19,充值20元成功,余额:39元
从输出中可以看到,这个账户被先后反复多次充值。其原因是账户余额被反复修改,修改后的值和原有的数值19一样,使得CAS操作无法正确判断当前数据是否被修改过(是否被加过20)。虽然这种情况出现的概率不大,但是依然是有可能出现的,因此,当业务上确实可能出现这种情况时,我们必须多加防范。JDK也为我们考虑到了这种情况,使用AtomicStampedReference
可以很好地解决这个问题。
AtomicStampedReference
内部不仅维护了对象的值,还维护了一个版本号(我们这里把他称为时间戳,实际上它可以使用任何一个整形来表示状态值),当AtomicStampedReference对应的数值被修改时,除了更新数据本身外,还必须要更新版本号。当AtomicStampedReference设置对象值时,对象值及版本号都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要版本号发生变化,就能防止不恰当的写入。
AtomicStampedReference
的几个Api在AtomicStampedReference
的基础上新增了有关版本号的信息。
//比较设置,参数依次为:期望值、写入新值、期望版本号、新版本号
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp);
//获得当前对象引用
public V getReference();
//获得当前版本号
public int getStamp();
//设置当前对象引用和版本号
public void set(V newReference, int newStamp);
AtomicStampedReference
内部维护了一个Pair对象存放值,绑定了当前值和版本号。
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
现在我们使用AtomicStampedRerence
来修改一下上面充值的问题,代码如下:
public class AtomicStampedReferenceTest1 {
//账户原始余额
static int accountMoney = 19;
//用于对账户余额做原子操作
static AtomicStampedReference<Integer> money = new AtomicStampedReference<>(accountMoney, 0);
/**
* 模拟2个线程同时更新后台数据库,为用户充值
*/
static void recharge() {
for (int i = 0; i < 2; i++) {
int stamp = money.getStamp();
new Thread(() -> {
for (int j = 0; j < 50; j++) {
Integer m = money.getReference();
if (m == accountMoney) {
if (money.compareAndSet(m, m + 20, stamp, stamp + 1)) {
System.out.println("当前时间戳:" + money.getStamp() + ",当前余额:" + m + ",充值20元成功,余额:" + money.getReference() + "元");
}
}
//休眠100ms
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
/**
* 模拟用户消费
*/
static void consume() throws InterruptedException {
for (int i = 0; i < 50; i++) {
Integer m = money.getReference();
int stamp = money.getStamp();
if (m > 20) {
if (money.compareAndSet(m, m - 20, stamp, stamp + 1)) {
System.out.println("当前时间戳:" + money.getStamp() + ",当前余额:" + m + ",成功消费20元,余额:" + money.getReference() + "元");
}
}
//休眠50ms
TimeUnit.MILLISECONDS.sleep(50);
}
}
public static void main(String[] args) throws InterruptedException {
recharge();
consume();
}
}
输出:
当前时间戳:1,当前余额:19,充值20元成功,余额:39元
当前时间戳:2,当前余额:39,成功消费20元,余额:19元
关于这个时间戳的,在数据库修改数据中也有类似的用法,比如2个编辑同时编辑一篇文章,同时提交,只允许一个用户提交成功,提示另外一个用户:博客已被其他人修改,如何实现呢?
数据库对于乐观锁的ABA问题也是同样的道理,加个版本号或者时间戳解决ABA问题。
博客表:t_blog(id,content,stamp),stamp默认值为0,每次更新+1
A、B 二个编辑同时对一篇文章进行编辑,stamp都为0,当点击提交的时候,将stamp和id作为条件更新博客内容,执行的sql如下:
update t_blog set content = 更新的内容,stamp = stamp+1 where id = 博客id and stamp = 0;
这条update会返回影响的行数,只有一个会返回1,表示更新成功,另外一个提交者返回0,表示需要修改的数据已经不满足条件了,被其他用户给修改了。这种修改数据的方式也叫乐观锁。
对象的属性修改原子类介绍
如果需要原子更新某个类里的某个字段时,需要用到对象的属性修改原子类。
- AtomicIntegerFieldUpdater:原子更新整形字段的值
- AtomicLongFieldUpdater:原子更新长整形字段的值
- AtomicReferenceFieldUpdater :原子更新应用类型字段的值
要想原子地更新对象的属性需要两步:
- 第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
- 第二步,更新的对象属性必须使用 public volatile 修饰符。
上面三个类提供的方法几乎相同,所以我们这里以AtomicReferenceFieldUpdater
为例子来介绍。
调用AtomicReferenceFieldUpdater
静态方法newUpdater
创建AtomicReferenceFieldUpdater
对象
public static <U, W> AtomicReferenceFieldUpdater<U, W> newUpdater(Class<U> tclass, Class<W> vclass, String fieldName)
说明:
三个参数
tclass:需要操作的字段所在的类
vclass:操作字段的类型
fieldName:字段名称
示例
多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次
public class AtomicReferenceFieldUpdaterTest {
static AtomicReferenceFieldUpdaterTest updaterTest = new AtomicReferenceFieldUpdaterTest();
//不能操作static修饰的字段,会报Caused by: java.lang.IllegalArgumentException错误。compareAndSet操作的是对象实例的偏移值字段,static修饰的字段不属于对象实例
//必须被volatile修饰
private volatile Boolean isInit = Boolean.FALSE;
private static AtomicReferenceFieldUpdater referenceFieldUpdater = AtomicReferenceFieldUpdater.
newUpdater(AtomicReferenceFieldUpdaterTest.class, Boolean.class, "isInit");
public static void init() {
if (referenceFieldUpdater.compareAndSet(updaterTest, false, true)) {
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",开始初始化!");
//模拟休眠3秒
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",初始化完毕!");
} else {
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",有其他线程已经执行了初始化!");
}
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(()->{
AtomicReferenceFieldUpdaterTest.init();
}).start();
}
}
}
输出:
1599030805588,Thread-0,开始初始化!
1599030805588,Thread-1,有其他线程已经执行了初始化!
1599030805588,Thread-2,有其他线程已经执行了初始化!
1599030805589,Thread-3,有其他线程已经执行了初始化!
1599030805589,Thread-4,有其他线程已经执行了初始化!
1599030808590,Thread-0,初始化完毕!
说明:
- isInit属性必须要volatille修饰,可以确保变量的可见性
- 可以看出多线程同时执行
init()
方法,只有一个线程执行了初始化的操作,其他线程跳过了。多个线程同时到达updater.compareAndSet
,只有一个会成功。
ThreadLocal、InheritableThreadLocal
使用技巧,可以用static方法包装ThreadLocal的get/set方法,这样就可以直接调用了。也可以在抽象类中定义ThreadLocal,这样所有的继承类也能调用到。
示例:这个是在static中便对每个线程的ThreadLocal都赋初值了。
private static ThreadLocal<DateFormat> sdfThreadLocal = new ThreadLocal<DateFormat>() {
@Override
public SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyyMMdd");
}
};
private static ThreadLocal<DateFormat> dateTimeSdfThreadLocal = new ThreadLocal<DateFormat>() {
@Override
public SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
ThreadLocal
线程就相当于一个人一样,每个请求相当于一个任务,任务来了,人来处理,处理完毕之后,再处理下一个请求任务。人身上是不是有很多口袋,人刚开始准备处理任务的时候,我们把任务的编号放在处理者的口袋中,然后处理中一路携带者,处理过程中如果需要用到这个编号,直接从口袋中获取就可以了。那么刚好java中线程设计的时候也考虑到了这些问题,Thread对象中就有很多口袋,用来放东西。Thread类中有这么一个变量:
ThreadLocal.ThreadLocalMap threadLocals = null;
这个就是用来操作Thread中所有口袋的东西,ThreadLocalMap
源码中有一个数组(有兴趣的可以去看一下源码),对应处理者身上很多口袋一样,数组中的每个元素对应一个口袋。
如何来操作Thread中的这些口袋呢,java为我们提供了一个类ThreadLocal
,ThreadLocal对象用来操作Thread中的某一个口袋,可以向这个口袋中放东西、获取里面的东西、清除里面的东西,这个口袋一次性只能放一个东西,重复放东西会将里面已经存在的东西覆盖掉。
常用的3个方法:
//向Thread中某个口袋中放东西
public void set(T value);
//获取这个口袋中目前放的东西
public T get();
//清空这个口袋中放的东西
public void remove()
ThreadLocal的官方API解释为:
“该类提供了线程局部 (thread-local) 变量。这些变量不同于它们的普通对应物,因为访问某个变量(通过其 get 或 set 方法)的每个线程都有自己的局部变量,它独立于变量的初始化副本。ThreadLocal 实例通常是类中的 private static 字段,它们希望将状态与某一个线程(例如,用户 ID 或事务 ID)相关联。”
InheritableThreadLocal
如果一个线程还做并发处理开启多个线程时,这时候子线程如果也想要父线程保留在口袋里的东西,就要使用InheritableThreadLocal来代替ThreadLocal。
父线程相当于主管,子线程相当于干活的小弟,主管让小弟们干活的时候,将自己兜里面的东西复制一份给小弟们使用,主管兜里面可能有很多牛逼的工具,为了提升小弟们的工作效率,给小弟们都复制一个,丢到小弟们的兜里,然后小弟就可以从自己的兜里拿去这些东西使用了,也可以清空自己兜里面的东西。
Thread
对象中有个inheritableThreadLocals
变量,代码如下:
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
inheritableThreadLocals相当于线程中另外一种兜,这种兜有什么特征呢,当创建子线程的时候,子线程会将父线程这种类型兜的东西全部复制一份放到自己的inheritableThreadLocals
兜中,使用InheritableThreadLocal
对象可以操作线程中的inheritableThreadLocals
兜。
InheritableThreadLocal
常用的方法也有3个:
//向Thread中某个口袋中放东西
public void set(T value);
//获取这个口袋中目前放的东西
public T get();
//清空这个口袋中放的东西
public void remove()
实例:
@Slf4j
public class ThreadLocalTest {
//private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
//
private static InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
//自定义包含策略
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
//需要插入的数据
List<String> dataList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
dataList.add("数据" + i);
}
for (int i = 0; i < 5; i++) {
String traceId = String.valueOf(i);
executor.execute(() -> {
threadLocal.set(traceId);
try {
ThreadLocalTest.controller(dataList);
} finally {
threadLocal.remove();
}
});
}
}
//模拟controller
public static void controller(List<String> dataList) {
log.error("接受请求: " + "traceId:" + threadLocal.get());
service(dataList);
}
//模拟service
public static void service(List<String> dataList) {
log.error("执行业务:" + "traceId:" + threadLocal.get());
//dao(dataList);
daoMuti(dataList);
}
//模拟dao
public static void dao(List<String> dataList) {
log.error("执行数据库操作" + "traceId:" + threadLocal.get());
//模拟插入数据
for (String s : dataList) {
log.error("插入数据" + s + "成功" + "traceId:" + threadLocal.get());
}
}
//模拟dao--多线程
public static void daoMuti(List<String> dataList) {
CountDownLatch countDownLatch = new CountDownLatch(dataList.size());
log.error("执行数据库操作" + "traceId:" + threadLocal.get());
String threadName = Thread.currentThread().getName();
//模拟插入数据
for (String s : dataList) {
new Thread(() -> {
try {
//模拟数据库操作耗时100毫秒
TimeUnit.MILLISECONDS.sleep(100);
log.error("插入数据" + s + "成功" + threadName + ",traceId:" + threadLocal.get());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}).start();
}
//等待上面的dataList处理完毕
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出:
17:35:30.465 [From DemoThreadFactory's 订单创建组-Worker-2] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:1
17:35:30.465 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:0
17:35:30.465 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:2
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:2
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:0
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-2] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:1
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:2
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:0
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-2] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:1
17:35:30.574 [Thread-3] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:2
17:35:30.574 [Thread-4] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-2,traceId:1
17:35:30.574 [Thread-1] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:2
17:35:30.574 [Thread-2] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:2
17:35:30.574 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:3
17:35:30.574 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:3
17:35:30.574 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:3
17:35:30.575 [Thread-9] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:0
17:35:30.575 [Thread-8] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:0
17:35:30.575 [Thread-7] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:0
17:35:30.575 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:4
17:35:30.575 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:4
17:35:30.575 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:4
17:35:30.575 [Thread-6] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-2,traceId:1
17:35:30.575 [Thread-5] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-2,traceId:1
17:35:30.682 [Thread-10] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:3
17:35:30.682 [Thread-13] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:4
17:35:30.682 [Thread-14] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:4
17:35:30.682 [Thread-12] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:3
17:35:30.683 [Thread-15] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:4
17:35:30.683 [Thread-11] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:3
JUC中的阻塞队列
Queue接口
队列是一种先进先出(FIFO)的数据结构,java中用Queue
接口来表示队列。
Queue
接口中定义了6个方法:
public interface Queue<E> extends Collection<E> {
boolean add(e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
每个Queue
方法都有两种形式:
(1)如果操作失败则抛出异常。
(2)如果操作失败,则返回特殊值(null
或false
,具体取决于操作),接口的常规结构如下表所示。
操作类型 | 抛出异常 | 返回特殊值 |
---|---|---|
插入 | add(e) |
offer(e) |
移除 | remove() |
poll() |
检查 | element() |
peek() |
Queue
从Collection
继承的add
方法插入一个元素,除非它违反了队列的容量限制,在这种情况下它会抛出IllegalStateException
;offer
方法与add
不同之处仅在于它通过返回false
来表示插入元素失败。
remove
和poll
方法都移除并返回队列的头部,确切地移除哪个元素是由具体的实现来决定的,仅当队列为空时,remove
和poll
方法的行为才有所不同,在这些情况下,remove
抛出NoSuchElementException
,而poll
返回null
。
element
和peek
方法返回队列头部的元素,但不移除,它们之间的差异与remove
和poll
的方式完全相同,如果队列为空,则element
抛出NoSuchElementException
,而peek
返回null
。
队列一般不要插入空元素。
BlockingQueue接口
BlockingQueue
位于juc中,熟称阻塞队列, 阻塞队列首先它是一个队列,继承Queue
接口,是队列就会遵循先进先出(FIFO)的原则,又因为它是阻塞的,故与普通的队列有两点区别:
- 当一个线程向队列里面添加数据时,如果队列是满的,那么将阻塞该线程,暂停添加数据
- 当一个线程从队列里面取出数据时,如果队列是空的,那么将阻塞该线程,暂停取出数据
BlockingQueue
相关方法:
操作类型 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) |
offer(e) |
put(e) | offer(e,timeuout,unit) |
移除 | remove() |
poll() |
take() | poll(timeout,unit) |
检查 | element() |
peek() |
不支持 | 不支持 |
重点,再来解释一下,加深印象:
- 3个可能会有异常的方法,add、remove、element;这3个方法不会阻塞(是说队列满或者空的情况下是否会阻塞);队列满的情况下,add抛出异常;队列为空情况下,remove、element抛出异常
- offer、poll、peek 也不会阻塞(是说队列满或者空的情况下是否会阻塞);队列满的情况下,offer返回false;队列为空的情况下,pool、peek返回null
- 队列满的情况下,调用put方法会导致当前线程阻塞
- 队列为空的情况下,调用take方法会导致当前线程阻塞
-
offer(e,timeuout,unit)
,超时之前,插入成功返回true,否者返回false -
poll(timeout,unit)
,超时之前,获取到头部元素并将其移除,返回元素,否者返回null
BlockingQueue常见的实现类
ArrayBlockingQueue
基于数组的阻塞队列实现,其内部维护一个定长的数组,用于存储队列元素。线程阻塞的实现是通过ReentrantLock来完成的,数据的插入与取出共用同一个锁,因此ArrayBlockingQueue并不能实现生产、消费同时进行。而且在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
LinkedBlockingQueue
基于单向链表的阻塞队列实现,在初始化LinkedBlockingQueue的时候可以指定大小,也可以不指定,默认类似一个无限大小的容量(Integer.MAX_VALUE),不指队列容量大小也是会有风险的,一旦数据生产速度大于消费速度,系统内存将有可能被消耗殆尽,因此要谨慎操作。另外LinkedBlockingQueue中用于阻塞生产者、消费者的锁是两个(锁分离:注意这里是JDK8之前的设计,JDK8之后是用一个锁实现),因此生产与消费是可以同时进行的。
PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列,进入队列的元素会按照优先级进行排序
SynchronousQueue
同步阻塞队列,SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素
LinkedTransferQueue
LinkedTransferQueue是基于链表的FIFO无界阻塞队列,它出现在JDK7中,Doug Lea 大神说LinkedTransferQueue是一个聪明的队列,它是ConcurrentLinkedQueue、SynchronousQueue(公平模式下)、无界的LinkedBlockingQueues等的超集,
LinkedTransferQueue
包含了ConcurrentLinkedQueue、SynchronousQueue、LinkedBlockingQueues
三种队列的功能
ArrayBlockingQueue
有界阻塞队列,内部使用数组存储元素,有2个常用构造方法:
//capacity表示容量大小,默认内部采用非公平锁
public ArrayBlockingQueue(int capacity)
//capacity:容量大小,fair:内部是否是使用公平锁
public ArrayBlockingQueue(int capacity, boolean fair)
注意:ArrayBlockingQueue
如果队列容量设置的太小,消费者发送的太快,消费者消费的太慢的情况下,会导致队列空间满,调用put方法会导致发送者线程阻塞,所以注意设置合理的大小,协调好消费者的速度。
LinkedBlockingQueue
内部使用单向链表实现的阻塞队列,3个构造方法:
//默认构造方法,容量大小为Integer.MAX_VALUE
public LinkedBlockingQueue();
//创建指定容量大小的LinkedBlockingQueue
public LinkedBlockingQueue(int capacity);
//容量为Integer.MAX_VALUE,并将传入的集合丢入队列中
public LinkedBlockingQueue(Collection<? extends E> c);
LinkedBlockingQueue
的用法和ArrayBlockingQueue
类似,建议使用的时候指定容量,如果不指定容量,插入的太快,移除的太慢,可能会产生OOM。
PriorityBlockingQueue
无界的优先级阻塞队列,内部使用数组存储数据,达到容量时,会自动进行扩容,放入的元素会按照优先级进行排序,4个构造方法:
//默认构造方法,默认初始化容量是11
public PriorityBlockingQueue();
//指定队列的初始化容量
public PriorityBlockingQueue(int initialCapacity);
//指定队列的初始化容量和放入元素的比较器
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator);
//传入集合放入来初始化队列,传入的集合可以实现SortedSet接口或者PriorityQueue接口进行排序,如果没有实现这2个接口,按正常顺序放入队列
public PriorityBlockingQueue(Collection<? extends E> c);
优先级队列放入元素的时候,会进行排序,所以我们需要指定排序规则,有2种方式:
- 创建
PriorityBlockingQueue
指定比较器Comparator
- 放入的元素需要实现
Comparable
接口
上面2种方式必须选一个,如果2个都有,则走第一个规则排序。
示例:
public class PriorityBlockingQueueTest {
private static PriorityBlockingQueue<Msg> priorityBlockingQueue = new PriorityBlockingQueue<>();
private static class Msg implements Comparable<Msg>{
private String msg;
private int priority;
public Msg(String msg, int priority) {
this.msg = msg;
this.priority = priority;
}
@Override
public String toString() {
return "Msg{" +
"priority=" + priority +
", msg='" + msg + '\'' +
'}';
}
@Override
public int compareTo(Msg o) {
//return this.priority-o.priority;
return o.priority-this.priority;
//return Integer.compare(this.priority,o.priority);
}
}
public static void putMsg(Msg msg) throws InterruptedException {
priorityBlockingQueue.put(msg);
System.out.println("已推送"+msg);
}
static {
new Thread(() -> {
while (true) {
Msg msg;
try {
long starTime = System.currentTimeMillis();
//获取一条推送消息,此方法会进行阻塞,直到返回结果
msg = priorityBlockingQueue.take();
long endTime = System.currentTimeMillis();
//模拟推送耗时
TimeUnit.MILLISECONDS.sleep(500);
System.out.println(String.format("[%s,%s,take耗时:%s],%s,发送消息:%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName(), msg));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
PriorityBlockingQueueTest.putMsg(new Msg("消息" + i,i));
}
}
}
输出:
已推送Msg{priority=0, msg='消息0'}
已推送Msg{priority=1, msg='消息1'}
已推送Msg{priority=2, msg='消息2'}
已推送Msg{priority=3, msg='消息3'}
已推送Msg{priority=4, msg='消息4'}
[1599459824306,1599459824307,take耗时:1],Thread-0,发送消息:Msg{priority=0, msg='消息0'}
[1599459824829,1599459824829,take耗时:0],Thread-0,发送消息:Msg{priority=4, msg='消息4'}
[1599459825330,1599459825330,take耗时:0],Thread-0,发送消息:Msg{priority=3, msg='消息3'}
[1599459825831,1599459825831,take耗时:0],Thread-0,发送消息:Msg{priority=2, msg='消息2'}
[1599459826331,1599459826331,take耗时:0],Thread-0,发送消息:Msg{priority=1, msg='消息1'}
SynchronousQueue
同步阻塞队列,SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。SynchronousQueue 在现实中用的不多,线程池中有用到过,
Executors.newCachedThreadPool()
实现中用到了这个队列,当有任务丢入线程池的时候,如果已创建的工作线程都在忙于处理任务,则会新建一个线程来处理丢入队列的任务。
调用queue.put
方法向队列中丢入一条数据,调用的时候产生了阻塞,从输出结果中可以看出,直到take方法被调用时,put方法才从阻塞状态恢复正常。
DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素。
- DelayQueue是一个内部依靠AQS队列同步器所实现的无界延迟阻塞队列。
- 延迟对象需要覆盖 getDelay()与compareTo()方法,并且要注意 getDelay()的时间单位的统一,compareTo()根据业务逻辑进行合理的比较逻辑重写。
- DelayQueue中内聚的重入锁是非公平的。
- DelayQueue是实现定时任务的关键,ScheduledThreadPoolExecutor中就用到了DelayQueue。
示例:
public class DelayQueueTest {
//推送信息封装
static class Msg implements Delayed {
//优先级,越小优先级越高
private int priority;
//推送的信息
private String msg;
//定时发送时间,毫秒格式
private long sendTimeMs;
public Msg(int priority, String msg, long sendTimeMs) {
this.priority = priority;
this.msg = msg;
this.sendTimeMs = sendTimeMs;
}
@Override
public String toString() {
return "Msg{" +
"priority=" + priority +
", msg='" + msg + '\'' +
", sendTimeMs=" + sendTimeMs +
'}';
}
//@Override
//public long getDelay(TimeUnit unit) {
// return unit.convert(this.sendTimeMs - Calendar.getInstance().getTimeInMillis(), TimeUnit.MILLISECONDS);
//}
/**
* 需要实现的接口,获得延迟时间 用过期时间-当前时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.sendTimeMs - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
}
}
//推送队列
static DelayQueue<Msg> pushQueue = new DelayQueue<Msg>();
static {
//启动一个线程做真实推送
new Thread(() -> {
while (true) {
Msg msg;
try {
//获取一条推送消息,此方法会进行阻塞,直到返回结果
msg = pushQueue.take();
//此处可以做真实推送
long endTime = System.currentTimeMillis();
System.out.println(String.format("定时发送时间:%s,实际发送时间:%s,发送消息:%s", msg.sendTimeMs, endTime, msg));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
//推送消息,需要发送推送消息的调用该方法,会将推送信息先加入推送队列
public static void pushMsg(int priority, String msg, long sendTimeMs) throws InterruptedException {
pushQueue.put(new Msg(priority, msg, sendTimeMs));
}
public static void main(String[] args) throws InterruptedException {
for (int i = 5; i >= 1; i--) {
String msg = "一起来学java高并发,第" + i + "天";
DelayQueueTest.pushMsg(i, msg, Calendar.getInstance().getTimeInMillis() + i * 2000);
}
}
}
输出:
定时发送时间:1599462287589,实际发送时间:1599462287590,发送消息:Msg{priority=1, msg='一起来学java高并发,第1天', sendTimeMs=1599462287589}
定时发送时间:1599462289589,实际发送时间:1599462289589,发送消息:Msg{priority=2, msg='一起来学java高并发,第2天', sendTimeMs=1599462289589}
定时发送时间:1599462291589,实际发送时间:1599462291590,发送消息:Msg{priority=3, msg='一起来学java高并发,第3天', sendTimeMs=1599462291589}
定时发送时间:1599462293588,实际发送时间:1599462293589,发送消息:Msg{priority=4, msg='一起来学java高并发,第4天', sendTimeMs=1599462293588}
定时发送时间:1599462295571,实际发送时间:1599462295571,发送消息:Msg{priority=5, msg='一起来学java高并发,第5天', sendTimeMs=1599462295571}
LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
LinkedTransferQueue类继承自AbstractQueue抽象类,并且实现了TransferQueue接口:
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则返回false,并且不进入队列。
boolean tryTransfer(E e);
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则等待直到元素被消费者接收。
void transfer(E e) throws InterruptedException;
// 在上述方法的基础上设置超时时间
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 如果至少有一位消费者在等待,则返回true
boolean hasWaitingConsumer();
// 获取所有等待获取元素的消费线程数量
int getWaitingConsumerCount();
}
再看一下上面的这些方法,transfer(E e)
方法和SynchronousQueue的put方法
类似,都需要等待消费者取走元素,否者一直等待。其他方法和ArrayBlockingQueue、LinkedBlockingQueue
中的方法类似。
总结
- 重点需要了解
BlockingQueue
中的所有方法,以及他们的区别 - 重点掌握
ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
、DelayQueue
的使用场景 - 需要处理的任务有优先级的,使用
PriorityBlockingQueue
- 处理的任务需要延时处理的,使用
DelayQueue
疑问:
Q:有界阻塞队列和无界阻塞队列的区别?是不是以是否定义队列大小来作为区分,没有定义的就是无界的,有定义的就算是容量(Integer.MAX_VALUE)也是有界的?
有界阻塞队列就是要求创建队列时要指定队列容量,无界阻塞队列是指不需要我们指定队列容量,默认是Integer.MAX_VALUE。