073-JAVA线程安全的原子变量方案【非阻塞】
要想并发程序正确地执行,必须要保证原子性、可见性以及有序性。
可见性
可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。由于java的内存模型+JIT的原因,会导致线程对变量的不可见性。
主内存,存放共享数据。工作内存,存放线程自有数据。 JIT会将变量run放入本线程缓存中,导致线程t没有及时感知到run的变更解决可见性问题:
-
synchronized
规定,线程在加锁时,先清空工作内存 → 在主内存中拷贝最新变量的副本到工作内存 → 执行完代码 → 将更改后的共享变量的值刷新到主内存中 → 释放互斥锁。 - 给变量增加
volatile
关键字,这种方式可以保证每次取数直接从主存取。 -
AtomicXXX
jdk 提供了很多原子类型,这种类型的基本原理总结起来,volatile + unsafe 的 Compare and Swap,这种 Unsafe 操作并不推荐在自己的代码中使用,因为各 JDK 版本在这里变化较大,有可能升级 JDK 时造成各种问题。而且也要保证自己能够用好。
volatile示例:优化两阶段终止优化前
public class Demo {
public static void main(String[] args) throws InterruptedException {
TwoPhsaeTermination twoPhsaeTermination = new TwoPhsaeTermination();
twoPhsaeTermination.start();
Thread.sleep(2000);
twoPhsaeTermination.stop();
}
}
class TwoPhsaeTermination{
private Thread monitor;
public void start(){
monitor = new Thread(()->{
while (true){
Thread thread = Thread.currentThread();
if(thread.isInterrupted()){
System.out.println("料理后事");
break;
}
try {
Thread.sleep(1000);
System.out.println("执行监控");
} catch (InterruptedException e) {
e.printStackTrace();
// 如果是在sleep过程中被打断,打断标记会被清除,需要重新进行打断标记。
// 这里较繁琐,需要特别处理
thread.interrupt();
}
}
});
monitor.start();
}
public void stop(){
monitor.interrupt();
}
}
两阶段终止,优化后
public class Demo {
public static void main(String[] args) throws InterruptedException {
TwoPhsaeTermination twoPhsaeTermination = new TwoPhsaeTermination();
twoPhsaeTermination.start();
Thread.sleep(2000);
twoPhsaeTermination.stop();
}
}
class TwoPhsaeTermination{
private Thread monitor;
private volatile boolean stop = false;
public void start(){
monitor = new Thread(()->{
while (true){
Thread thread = Thread.currentThread();
if(stop){
System.out.println("料理后事");
break;
}
try {
Thread.sleep(1000);
System.out.println("执行监控");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
monitor.start();
}
public void stop(){
stop = true;
monitor.interrupt();
}
}
volatile保证可见性
原子性
在Java中,对基本数据类型
的变量的读取和赋值操作是原子性操作,即这些操作是不可被中断的,要么执行,要么不执行。
x = 10; //语句1
y = x; //语句2
x++; //语句3
x = x + 1; //语句4
注意:其实只有语句1是原子性操作,其他三个语句都不是原子性操作。
- 语句1是直接将数值10赋值给x,也就是说线程执行这个语句的会直接将数值10写入到工作内存中。
- 语句2实际上包含2个操作,它先要去读取x的值,再将x的值写入工作内存,虽然读取x的值以及将x的值写入工作内存,这2个操作都是原子性操作,但是合起来就不是原子性操作了。
- 同样的,x++和 x = x+1包括3个操作:读取x的值,进行加1操作,写入新的值也就是说,只有简单的读取、赋值(而且必须是将数字赋值给某个变量,变量之间的相互赋值不是原子操作)才是原子操作。
解决原子性问题:
-
synchronized
和Lock
来实现。
有序性
有序性:即程序执行的顺序按照代码的先后顺序执行。
int i = 0;
boolean flag = false;
i = 1; //语句1
flag = true; //语句2
上面代码定义了一个int型变量,定义了一个boolean类型变量,然后分别对两个变量进行赋值操作。从代码顺序上看,语句1是在语句2前面的,那么JVM在真正执行这段代码的时候会保证语句1一定会在语句2前面执行吗?不一定,为什么呢?这里可能会发生指令重排序Instruction Reorder
。
指令重排序:一般来说,处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。
比如上面的代码中,语句1和语句2谁先执行对最终的程序结果并没有影响,那么就有可能在执行过程中,语句2先执行而语句1后执行。但是要注意,虽然处理器会对指令进行重排序,但是它会保证程序最终结果会和代码顺序执行结果相同,那么它靠什么保证的呢?再看下面一个例子:
int a = 10; //语句1
int r = 2; //语句2
a = a + 3; //语句3
r = a * a; //语句4
这段代码有4个语句,那么可能的一个执行顺序是:那么可不可能是这个执行顺序呢: 语句2 >语句1> 语句4 > 语句3。
不可能,因为处理器在进行重排序时是会考虑指令之间的数据依赖性,如果一个指令Instruction 2必须用到Instruction 1的结果,那么处理器会保证Instruction 1会在Instruction 2之前执行。
虽然重排序不会影响单个线程内程序执行的结果,但是多线程呢?下面看一个例子:
//线程1:
context = loadContext(); //语句1
inited = true; //语句2
//线程2:
while(!inited ){
sleep()
}
doSomethingwithconfig(context);
上面代码中,由于语句1和语句2没有数据依赖性,因此可能会被重排序。假如发生了重排序,在线程1执行过程中先执行语句2,而此是线程2会以为初始化工作已经完成,那么就会跳出while循环,去执行doSomethingwithconfig(context)方法,而此时context并没有被初始化,就会导致程序出错。从上面可以看出,指令重排序不会影响单个线程的执行,但是会影响到线程并发执行的正确性。
解决有序性问题:
- 使用
volatile
关键字修饰变量就可以禁止重排序。 - 使用
synchronized
防止“指令重排”,其本质是让多个线程在调用synchronized修饰的方法时,由并行(并发)变成串行调用,谁获得锁谁执行;非真正的禁止指令重排。由多线程变为单线程。
Happens-Before规则
规定了对共享变量的写操作对其他线程的读操作可见,它是可见性和有序性的一套规则总结。
- 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作;
- 管程锁定规则:一个unLock操作先行发生于后面对同一个锁的lock操作;(此处后面指时间的先后)
- volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;(此处后面指时间的先后)
- 线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作;
- 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行;
- 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生;
- 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始;
- 传递性:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;
一、CAS保证线程安全
传统方式:
public class Demo {
public static void main(String[] args) throws InterruptedException {
Account account = new AccountSafe(10000);
Account.test(account);
}
}
class AccountSafe implements Account{
private Integer balance;
public AccountSafe(Integer balance){
this.balance = balance;
}
@Override
public Integer getBalance() {
synchronized (this) {
return this.balance;
}
}
@Override
public void withdraw(Integer amount) {
synchronized (this){
this.balance -= amount;
}
}
}
interface Account{
Integer getBalance();
void withdraw(Integer amount);
static void test(Account account){
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(()->{
account.withdraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach( t ->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance()+"cost = " + (end-start)/1000_0000+"ms");
}
}
CAS方式:
public class Demo {
public static void main(String[] args) throws InterruptedException {
Account account = new AccountSafe(10000);
Account.test(account);
}
}
class AccountSafe implements Account{
private AtomicInteger balance;
public AccountSafe(int account){
this.balance = new AtomicInteger(account);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while(true){
int pre = balance.get();
int next = pre - amount;
if(balance.compareAndSet(pre,next)){
break;
}
}
}
}
interface Account{
Integer getBalance();
void withdraw(Integer amount);
static void test(Account account){
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(()->{
account.withdraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach( t ->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance()+"cost = " + (end-start)/1000_0000+"ms");
}
}
CAS与volatile关系;以AtomicInteger
为例:
- 数据存在
volatile
修饰的变量中,能够保证线程的可见和有序性。 -
compareAndSet()
能保证线程的原子性。 - 如果共享变量是引用类型,避免出现ABA问题,使用可以
AtomicStampedReferencre
或AtomicMarkableReferencre
类。
原子操作类
CAS适用场景:多核CPU且线程数不超过CPU核心数。
CAS与synchronized区别:
- CAS基于乐观锁:不担心别人修改共享变量。无锁、不阻塞。
- synchronized基于悲观锁:总担心别人修改共享变量。有锁,阻塞。
二、享元模式
除了给共享变量加锁、使用原子性的类保证共享变量的安全外,也可使用不可变类,如JDK自带的 DateTimeFormatter
和 String
,其主要思想是保护性拷贝机制,以通过创建新的对象来保证变量的不共享,从而保证变量的安全性。但这样的机制弊端就是重复创建对象。
享元模式就是解决重复创建对象的设计模式,比如经常使用的Long对象的内部类,在初始化时就创建 -128 ~ 127的Long数组;valueOf方法在获取Long时,会从数组中获取现有的数,从而避免重复创建Long对象。
private static class LongCache {
private LongCache(){}
static final Long cache[] = new Long[-(-128) + 127 + 1];
static {
for(int i = 0; i < cache.length; I++)
cache[i] = new Long(i - 128);
}
}
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
享元模式示例:自定义数据库连接池
public class PoolDemo {
// 1.连接数大小
private final int poolsize;
// 2. 连接对象数组
private Connection[] connects;
// 3.连接状态数组 0 空闲 1 繁忙
private AtomicIntegerArray state;
// 4.初始化
public PoolDemo(int poolsize) {
this.poolsize = poolsize;
this.connects = new Connection[poolsize];
this.state = new AtomicIntegerArray(new int[poolsize]);
for (int i = 0; i < poolsize ; i++) {
connects[i] = new MockConnect();
}
}
// 5. 获取连接
public Connection borrow(){
while (true){
for (int i = 0; i < poolsize; i++) {
// 获取空闲连接
if(state.get(i) == 0){
if(state.compareAndSet(i,0,1)){
System.out.println(Thread.currentThread().getName()+"创建连接" +connects[i]);
return connects[i];
}
}
}
// 无空闲连接,当前线程则等待
synchronized (this){
try {
System.out.println(Thread.currentThread().getName()+"等待" );
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 6. 归还连接
public void free(Connection conn){
for (int i = 0; i < poolsize ; i++) {
if(connects[i] == conn){
state.set(i,0);
synchronized (this){
System.out.println(Thread.currentThread().getName()+"归还 = " + conn);
this.notifyAll();
}
break;
}
}
}
}
class MockConnect implements Connection{...}
比较其他方式的连接池方式:
public class ConnectionUtils {
//解决并发问题,每个线程只取各自的Connect
private ThreadLocal <Connection> tl = new ThreadLocal<Connection>();
private DataSource dataSource;
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
public Connection getThreadConnection(){
try {
//获取当前线程绑定的局部变量
Connection con = tl.get();
if(con == null){
con = dataSource.getConnection();
//设置当前线程绑定的局部变量
tl.set(con);
}
return con;
}catch (Exception e) {
throw new RuntimeException(e);
}
}
public void removeConnection(){
//移除当前线程绑定的局部变量
tl.remove();
}
}