并发无锁的艺术
前言
在并发编程中,多线程的共享资源的修改往往会造成严重的线程安全问题,解决这种问题简单暴力的方式就是加锁,加锁的方式使用简单易理解,但常常会因为阻塞导致性能问题
有没有可能做到无锁还保证线程安全呐?这得看具体情况。得益于CAS技术,有很多情况下我们可以做到不使用锁也能保证线程的安全
比如今天我最近遇到的场景如下(由于场景比较复杂,用一个模拟简化一下)
场景
假设有一个商店,背后有一个工厂可以生产商品,商店也可以有用户来购买商品,为了简化,假设工厂只能生产一个商品、而用户也只能买一个商品
需求如下:
- 用户来购买,如果商品已经生产好了,则直接发货,完成交易
- 用户来购买,如果商品还没生产好,让用户填写一个欠货单,待工厂生产好后,如果发现有欠货,则直接发货,完成交易
简简单单的一个需求,在多线程环境下就会出现隐患
单线程
先不考虑多线程情况,这个代码很好写,我们用一个ready
变量标识是否生产完成,用一个unSupply
变量标识是否有欠用户一个商品,代码如下
public class SerialShop {
private volatile boolean ready; // 商品生产完成
private volatile boolean unSupply; // 是否欠用户一个商品
public volatile boolean done; // 交易完成
public void send() { // 发货
System.out.println("send to user");
done = true;
}
public void buy() {
if (ready) { // 商品生产完成
send(); // 直接发货
return;
}
this.unSupply = true; // 没有准备好则填写一个欠货单
}
public void ready() {
this.ready = true; // 标识商品准备完成
if (this.unSupply) { // 如果发现有欠货单
send(); // 给用户发货
}
}
}
这时,我们简单跑一下
@Test
public void buyBeforeReady() {
buy();
ready();
}
@Test
public void buyAfterReady() {
ready();
buy();
}
结果无论先购买再生产完,还是生产完再购买,最终都会走到send
方法,完成交易
多线程
上面的代码虽然简单,但在多线程下就会出现问题,用实际的情形描述一下
- 用户来购买发现商品没生产好,则开始准备填写欠货单,由于用户文盲,填写的很慢
- 此时工厂恰好生产好了,标识已准备,但一看还没有欠货单,所以不发货
- 用户刚刚填写完欠货单,没啥事就回家了
- 最终,用户付完了钱,工厂也生产完毕,就是没有发货完成交易
画个时序图描述一下这个情景
时序图因为多线程无法保证有序性,所以这种情况出现的概率很大,而一旦出现就是严重问题
用代码模拟一下这个场景:
public class UnsafeShop {
private volatile boolean ready; // 商品生产完成
private volatile boolean unSupply; // 欠用户
public volatile boolean done; // 交易完成
public void send() {
System.out.println("send to user");
done = true;
}
public void buy() throws InterruptedException {
if (ready) { // 准备好了
send(); // 直接发货
return;
}
Thread.sleep(100); // 这里手动降低线程速度,为了重现场景
this.unSupply = true; // 没有准备好则填写一个欠货单
}
public void ready() throws InterruptedException {
this.ready = true; // 标识商品准备完成
if (this.unSupply) { // 如果发现有欠货单
send(); // 给用户发货
}
}
@Test
public void unsafe() throws InterruptedException {
// 用户购买
new Thread(() -> {
try {
buy();
} catch (InterruptedException e) {
}
}).start();
Thread.sleep(50);
// 工厂生产
new Thread(() -> {
try {
ready();
} catch (InterruptedException e) {
}
}).start();
while (true) ;
}
}
执行结果:并没有走到send方法(上面的代码通过sleep来降低线程的执行速度,是为了100%呈现错误,实际中就算不写sleep也有可能出现这种情况)
悲观锁
那么如何避免上面的问题呐,最简单暴力的方式就是加锁
上面的问题之所以出现,是因为用户查看是否商品已准备和标识欠货的两步操作没有原子性,导致中间的过程可能被工厂的线程快速完成所有动作和判断
实际情形下我们可以这么解决问题:在接纳用户的时候,如果工厂来人送货,让工厂的人在外面等着,等用户把该做的都做了,工厂的人再进来标识准备完毕并送货
用代码模拟一下这个解决方案
public class BlockShop {
private volatile boolean ready; // 商品生产完成
private volatile boolean unSupply; // 欠用户
public volatile boolean done; // 交易完成
public void send() {
System.out.println("send to user");
done = true;
}
public void buy() throws InterruptedException {
synchronized (this) { // 接纳用户时不让工厂人进入
if (ready) { // 准备好了
send(); // 直接发货
return;
}
Thread.sleep(100);
this.unSupply = true; // 没有准备好则填写一个欠货单
}
}
public void ready() throws InterruptedException {
synchronized (this) { // 接纳用户时不让工厂人进入
this.ready = true; // 标识商品准备完成
}
if (this.unSupply) { // 如果发现有欠货单
send(); // 给用户发货
}
}
@Test
public void block() throws InterruptedException {
// 用户购买
new Thread(() -> {
try {
buy();
} catch (InterruptedException e) {
}
}).start();
Thread.sleep(50);
// 工厂生产
new Thread(() -> {
try {
ready();
} catch (InterruptedException e) {
}
}).start();
while (true) ;
}
}
这时,不会在出现上述问题,彻底的解决了线程安全
而从解决问题的实际场景来看,这种解决问题的方法在现实中简直是弱智,工厂的人就在外面傻等,这就是阻塞,会降低代码的执行速度
当然以上场景的阻塞实际其实很小,但个人认为锁这东西能不用尽量不用,在场景复杂的时候阻塞的弱点会更加凸显出来
无锁
在这种场景下,能不能不使用锁来达到线程安全的效果呐?
我想了很多办法,比如buy时先标识已欠货,再去判断是否已准备,或者标识完已欠货再去看一眼是否已准备好,但都行不通,原因就是无法保证原子性,也无法保证多线程的有序性
冥思苦想后,想到一个解决方案:工厂人员上门后第一件事就是把欠货单撕了!
- 此时如果用户正在写这个欠货单,那肯定是撕不成的,出现冲突说明用户已来了,直接发货即可
- 如果用户还没写且正准备写,发现欠货单没了,出现冲突说明货来了,直接发货即可
此时欠货单有三个状态:初始状态/被撕了/填写完,我们用商品的库存标识为:0/1/-1(欠用户一台)
private volatile int stock = 0;
而stock==1
也说明货已到,所以不需要ready
变量了
最终代码如下
public class NoBlockShop {
private volatile int stock = 0; // 库存量 -1代表亏欠用户一台
public volatile boolean done; // 交易完成
final AtomicIntegerFieldUpdater<NoBlockShop> STATUS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(NoBlockShop.class, "stock");
public void send() {
done = true;
}
public void buy() throws InterruptedException {
for (;;) {
if (stock ==1) { // 有货
send(); // 直接发货
return;
}
if (STATUS_UPDATER.compareAndSet(this, 0, -1)) {// 标识欠货,如果失败说明库存有变动,再回头查看一下
return;
}
}
}
public void ready() throws InterruptedException {
if (!STATUS_UPDATER.compareAndSet(this, 0, 1)) { // 标识有库存
send(); // 如果失败代表用户来过了,直接发货
}
}
}
不仅解决了线程安全,还无锁(也可以称作乐观锁),并且代码还简洁了,CAS是真香
测试一下线程安全,代码如下
ExecutorService executorService = Executors.newFixedThreadPool(20);
List<NoBlockShop> shops = new ArrayList<>();
for (int i=0;i<100000;i++) {
NoBlockShop shop = new NoBlockShop();
shops.add(shop);
executorService.execute(()->{
try {
shop.buy();
} catch (InterruptedException e) {}
});
executorService.execute(()->{
try {
shop.ready();
} catch (InterruptedException e) {}
});
}
Thread.sleep(500);
System.out.println(shops.stream().filter(v->!v.done).count());
初始化10万个shop,然后用不同线程分别buy和ready,最终输出没交易的shop个数
- 如果使用
UnsafeShop
(初版),一般结果都不是0,且每次执行都不一样,说明有的shop对象出现线程安全问题 - 如果使用
BlockShop
(锁版),结果是0,说明线程安全 - 如果使用
NoBlockShop
(CAS版),结果是0,说明也实现了线程安全
根据这个可以继续改造一下让商店,让工厂可以不断生产商品,用户也能不断购买,依然使用stock
,为正代表有n个库存,为负代表欠用户n个商品,并且可以一次性购买/生产多个,不再是一次性买卖了,代码如下
public class NoBlockSupermarket {
private volatile int stock = 0; // 当前库存数量,为负代表欠货
public AtomicInteger deals = new AtomicInteger(0); // 交易量,测试用
final AtomicIntegerFieldUpdater<NoBlockSupermarket> STOCK_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(NoBlockSupermarket.class, "stock");
public void send() {
deals.incrementAndGet(); // 增加成交数,测试用
}
public void buy(int n) {
int e = 0; // 已买数量
while (e != n) {
int stock = this.stock;
if (!STOCK_UPDATER.compareAndSet(this, stock, stock - 1)) { // 库存-1
continue;
}
if (stock > 0) { // 有货
send();
}
e++;
}
}
public void supply(int n) {
int e = 0; // 已处理数量
while (e != n) {
int stock = this.stock;
if (!STOCK_UPDATER.compareAndSet(this, stock, stock + 1)) {// 库存+1
continue;
}
if (stock < 0) { // 欠货
send();
}
e++;
}
}
}
最后
使用CAS可以避免多线程情况下的阻塞,但也并不是所有场景都适用,在冲突严重的情况下乐观锁性能可能反而不如悲观锁
我所举例的场景其实就是一个典型的发布订阅模式的场景,冲突不高的情况下用乐观锁的方式替换悲观锁,会达到性能上质的飞跃