什么是cas

2021-01-11  本文已影响0人  yanghx

什么是cas

本篇文章是类似笔记的形式。文笔写不好,而且会大量摘抄别人的文章。

定义

CAS操作包含三个操作数————内存位置(V)、期望值(A)和新值(B)。如果内存位置的值与期望值匹配,那么处理器会自动将该位置值更新为新值。否则处理器不作任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。(CAS在一些特殊情况下仅返回CAS是否成功,而不提取当前值)CAS有效的说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置的值,只告诉我这个位置现在的值即可。

方法说明

unsafe: 不安全的。封装了很多本地方法,采用jni调用。compareAndSwapxxx系列的方法就是借助“C语言”来调用cpu底层指令实现的。以常用的Intel x86平台来说,最终映射到的cpu的指令为“cmpxchg”,这是一个原子指令,cpu执行此命令时,实现比较并替换的操作!

java中提供了对CAS操作的支持,具体在sun.misc.unsafe类中,声明如下:

  public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
  public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
  public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

  参数var1:表示要操作的对象
  参数var2:表示要操作对象中属性地址的偏移量
  参数var4:表示需要修改数据的期望的值
  参数var5:表示需要修改为的新值

QA

现代计算机动不动就上百核心,cmpxchg怎么保证多核心下的线程安全?

系统底层进行CAS操作的时候,会判断当前系统是否为多核心系统,如果是
就给“总线”加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行CAS操作,
也就是说CAS的原子性是平台级别的!

什么是ABA问题?

说明

count++

count ++ 操作实际上是由3步来完成!(jvm执行引擎)

  1. 获取count的值,记做A : A=count
  2. 将A值+1,得到B :B=A+1
  3. 将B值赋值给count

如果有A.B两个线程同时执行count++,他们通知执行到上面步骤的第一步,得到的count是一样的,3步操作结束后,count只加1,导致count结果不正确!

怎么解决结果不正确问题?

对count++操作的时候,我们让多个线程排队处理,多个线程同时到达request()方法的时候,只能允许一个线程可以进去操作,其它的线程在外面等着,等里面的处理完毕出来之后,外面等着的再进去一个,这样操作的count++就是排队进行的,结果一定是正确的。

怎么实现排队效果?
java中synchronized关键字和ReentrantLock都可以实现对资源枷锁,保证并发正确性,多线程的情况下可以保证被锁住的资源被“串行”访问。

一些代码

使用synchronized关键字仿写cas

package cn.study.cas;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;

/**
 * 仿写cas
 *
 * @author: yanghx
 * @created: 2021/01/10 23:35
 */
@Slf4j
public class CasDemo1 {

    /**
     * 同步调用的count
     */
    static int syncAddCount = 0;

    /**
     * 模仿Cas调用的count
     */
    static int casCount = 0;

    public static int getCasCount() {
        return casCount;
    }

    public static int getSyncAddCount() {
        return syncAddCount;
    }


    /**
     * 使用 synchronized 实现同步
     * 锁了。效率慢
     */
    public static synchronized void add() throws InterruptedException {
        syncAddCount++;
    }

    /**
     * 循环调用。
     *
     * @throws InterruptedException e
     */
    public static void casAdd() throws InterruptedException {
        //期望值
        int expectCount;
        while (!compareAndSwap(expectCount = getCasCount(), expectCount + 1)) {
        }
    }

    /**
     * 模拟cas
     * <p>
     * getCount 获取当前count的值, 如果和期望值相同,就进行赋值操作。否则就失败
     *
     * @param expectCount 期望值
     * @param newCount    要设置的新值
     * @return boolean
     */
    public static synchronized boolean compareAndSwap(int expectCount, int newCount) {
        if (getCasCount() == expectCount) {
            casCount = newCount;
            return true;
        }
        return false;

    }

    public static void syncAddTest() throws InterruptedException {
        int threadSize = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < threadSize; i++) {
            new Thread(() -> {
                try {
                    add();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        long endTime = System.currentTimeMillis();
        countDownLatch.await();
        log.info("同步 add调用。开始时间 {}、结束时间 {} 共用时 {} 毫秒  count = {}", startTime, endTime, endTime - startTime, getSyncAddCount());
    }

    public static void casAddTest() throws InterruptedException {
        int threadSize = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < threadSize; i++) {
            new Thread(() -> {
                try {
                    casAdd();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        long endTime = System.currentTimeMillis();
        countDownLatch.await();
        log.info("cas add调用。开始时间 {}、结束时间 {} 共用时 {} 毫秒  count = {}", startTime, endTime, endTime - startTime, getCasCount());

    }


    public static void main(String[] args) throws InterruptedException {
        syncAddTest();
        Thread.sleep(2000);
        casAddTest();
    }

}


cousole

9:55:10: Executing task 'CasDemo1.main()'...

> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes

> Task :CasDemo1.main()
09:55:13.526 [main] INFO cn.study.cas.CasDemo1 - 同步 add调用。开始时间 1610330113487、结束时间 1610330113524 共用时 37 毫秒  count = 100
09:55:15.534 [main] INFO cn.study.cas.CasDemo1 - cas add调用。开始时间 1610330115528、结束时间 1610330115534 共用时 6 毫秒  count = 100

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.6.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 5s
3 actionable tasks: 2 executed, 1 up-to-date
9:55:15: Task execution finished 'CasDemo1.main()'.

ABA问题示例


package cn.study.cas;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * cas Aba问题示例
 *
 * @author: yanghx
 * @created: 2021/01/11 10:03
 */
public class CasAbaDemo {
    public static AtomicInteger a = new AtomicInteger(1);

    public static void main(String[] args) {
        Thread main = new Thread(() -> {
            System.out.println("操作线程" + Thread.currentThread().getName() + ", 初始值:" + a.get());
            try {
                int expectNum = a.get();
                int newNum = expectNum + 1;
                Thread.sleep(1000);//主线程休眠一秒钟,让出cpu

                boolean isCASSccuess = a.compareAndSet(expectNum, newNum);
                System.out.println("操作线程" + Thread.currentThread().getName() + ",CAS操作:" + isCASSccuess);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "主线程");

        Thread other = new Thread(() -> {
            try {
                Thread.sleep(20);//确保Thread-main线程优先执行

                a.incrementAndGet();//a + 1,a=2
                System.out.println("操作线程" + Thread.currentThread().getName() + ",【increment】,值=" + a.get());
                a.decrementAndGet();//a - 1,a=1
                System.out.println("操作线程" + Thread.currentThread().getName() + ",【decrement】,值=" + a.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "干扰线程");

        main.start();
        other.start();

    }
}


cousole

11:22:25: Executing task 'CasAbaDemo.main()'...

> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes

> Task :CasAbaDemo.main()
操作线程主线程, 初始值:1
操作线程干扰线程,【increment】,值=2
操作线程干扰线程,【decrement】,值=1
操作线程主线程,CAS操作:true

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.6.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 2s
3 actionable tasks: 2 executed, 1 up-to-date
11:22:28: Task execution finished 'CasAbaDemo.main()'.


ABA问题解决

package cn.study.cas;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * cas aba问题解决
 * 加一个字典。版本
 *
 * 重点看一下  AtomicStampedReference 类。提供基于版本号的方式解决aba
 *
 * 然后看两个线程中的sleep方法
 * 假定两个线程刚好同时进行
 *
 * 干扰线程开发执行
 * - 休眠 20毫秒  Thread.sleep(20);//确保Thread-main线程优先执行
 *
 * 主线程开始执行
 * - 计算好 expectNum、expectStamp、newNum、newStamp
 * - 休眠 (主线程休眠一秒钟,让出cpu)
 *
 * 干扰线程开始执行
 * -  执行+1 操作   //a.compareAndSet(a.getReference(), (a.getReference() + 1), a.getStamp(), (a.getStamp() + 1));
 * -  执行-1 操作   //a.compareAndSet(a.getReference(), (a.getReference() - 1), a.getStamp(), (a.getStamp() + 1));
 * -  [注意。 执行操作时,版本号一直是加的]
 *
 * 主线程开始执行
 * - 主线程拿到上次计算好的  expectNum、expectStamp、newNum、newStamp 进行操作 。// boolean isCASSccuess = a.compareAndSet(expectNum, newNum, expectStamp, newStamp);
 * - 失败
 *
 *
 * 通过console可以看到执行顺序
 *
 * 11:51:14.899 [干扰] INFO cn.study.cas.CasAbaDemo1 - 操作线程 干扰 初始值 1 初始版本
 * 11:51:14.899 [主线程] INFO cn.study.cas.CasAbaDemo1 - 操作线程 主线程 初始值 1 初始版本
 * 11:51:14.923 [干扰] INFO cn.study.cas.CasAbaDemo1 - 操作线程 干扰  num 2 stamp 1
 * 11:51:14.923 [干扰] INFO cn.study.cas.CasAbaDemo1 - 操作线程 干扰  num 1 stamp 2
 * 11:51:15.903 [主线程] INFO cn.study.cas.CasAbaDemo1 - 操作线程 主线程 期望Num 1 expectStamp 0 newNum 2 newStamp 1
 * 操作线程主线程,CAS操作:false
 *
 * @author: yanghx
 * @created: 2021/01/11 11:28
 */
@Slf4j
public class CasAbaDemo1 {
    public static void main(String[] args) {
        AtomicStampedReference<Integer> a = new AtomicStampedReference<>(1, 0);

        Thread main = new Thread(new Runnable() {
            @Override
            public void run() {
                log.info("操作线程 {} 初始值 {} 初始版本", Thread.currentThread().getName(), a.getReference(), a.getStamp());
                try {
                    Integer expectNum = a.getReference();
                    int expectStamp = a.getStamp();
                    Integer newNum = expectNum + 1;
                    Integer newStamp = expectStamp + 1;
                    Thread.sleep(1000);//主线程休眠一秒钟,让出cpu
                    log.info("操作线程 {} 期望Num {} expectStamp {} newNum {} newStamp {}", Thread.currentThread().getName(), expectNum, expectStamp, newNum, newStamp);

                    boolean isCASSccuess = a.compareAndSet(expectNum, newNum, expectStamp, newStamp);
                    System.out.println("操作线程" + Thread.currentThread().getName() + ",CAS操作:" + isCASSccuess);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "主线程");

        Thread other = new Thread(new Runnable() {
            @Override
            public void run() {

                log.info("操作线程 {} 初始值 {} 初始版本", Thread.currentThread().getName(), a.getReference(), a.getStamp());
                try {
                    Thread.sleep(20);//确保Thread-main线程优先执行
                    // ref +1 stamp+1
                    a.compareAndSet(a.getReference(), (a.getReference() + 1), a.getStamp(), (a.getStamp() + 1));
                    log.info("操作线程 {}  num {} stamp {}", Thread.currentThread().getName(), a.getReference(), a.getStamp());
                    //ref-1 stamp+1
                    a.compareAndSet(a.getReference(), (a.getReference() - 1), a.getStamp(), (a.getStamp() + 1));
                    log.info("操作线程 {}  num {} stamp {}", Thread.currentThread().getName(), a.getReference(), a.getStamp());

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "干扰");

        main.start();
        other.start();
    }


}


上一篇下一篇

猜你喜欢

热点阅读