073-JAVA线程安全的原子变量方案【非阻塞】

2022-06-17  本文已影响0人  XAbo

要想并发程序正确地执行,必须要保证原子性、可见性以及有序性。

可见性

可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。由于java的内存模型+JIT的原因,会导致线程对变量的不可见性。

主内存,存放共享数据。工作内存,存放线程自有数据。 JIT会将变量run放入本线程缓存中,导致线程t没有及时感知到run的变更

解决可见性问题

volatile示例:优化两阶段终止优化前

public class Demo {
    public static void main(String[] args) throws InterruptedException {
      TwoPhsaeTermination  twoPhsaeTermination = new TwoPhsaeTermination();
      twoPhsaeTermination.start();
      Thread.sleep(2000);
      twoPhsaeTermination.stop();
    }
}
class  TwoPhsaeTermination{
    private Thread   monitor;
    public  void start(){
        monitor = new Thread(()->{
            while (true){
                Thread thread = Thread.currentThread();
                if(thread.isInterrupted()){
                    System.out.println("料理后事");
                    break;
                }
                try {
                    Thread.sleep(1000);
                    System.out.println("执行监控");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    // 如果是在sleep过程中被打断,打断标记会被清除,需要重新进行打断标记。
                  // 这里较繁琐,需要特别处理
                    thread.interrupt();
                }
            }
        });
        monitor.start();
    }
    public  void  stop(){
        monitor.interrupt();
    }
}

两阶段终止,优化后

public class Demo {
    public static void main(String[] args) throws InterruptedException {
      TwoPhsaeTermination  twoPhsaeTermination = new TwoPhsaeTermination();
      twoPhsaeTermination.start();
      Thread.sleep(2000);
      twoPhsaeTermination.stop();
    }
}
class  TwoPhsaeTermination{
    private Thread   monitor;
    private volatile boolean stop = false;
    public  void start(){
        monitor = new Thread(()->{
            while (true){
                Thread thread = Thread.currentThread();
                if(stop){
                    System.out.println("料理后事");
                    break;
                }
                try {
                    Thread.sleep(1000);
                    System.out.println("执行监控");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        monitor.start();
    }
    public  void  stop(){
        stop = true;
        monitor.interrupt();
    }
}
volatile保证可见性

原子性

在Java中,对基本数据类型的变量的读取和赋值操作是原子性操作,即这些操作是不可被中断的,要么执行,要么不执行。

x = 10;            //语句1
y = x;      //语句2
x++;        //语句3
x = x + 1;  //语句4

注意:其实只有语句1是原子性操作,其他三个语句都不是原子性操作。

解决原子性问题

有序性

有序性:即程序执行的顺序按照代码的先后顺序执行。

int i = 0;
boolean flag = false;
i = 1; //语句1
flag = true; //语句2

上面代码定义了一个int型变量,定义了一个boolean类型变量,然后分别对两个变量进行赋值操作。从代码顺序上看,语句1是在语句2前面的,那么JVM在真正执行这段代码的时候会保证语句1一定会在语句2前面执行吗?不一定,为什么呢?这里可能会发生指令重排序Instruction Reorder

指令重排序:一般来说,处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。

比如上面的代码中,语句1和语句2谁先执行对最终的程序结果并没有影响,那么就有可能在执行过程中,语句2先执行而语句1后执行。但是要注意,虽然处理器会对指令进行重排序,但是它会保证程序最终结果会和代码顺序执行结果相同,那么它靠什么保证的呢?再看下面一个例子:

int a = 10; //语句1
int r = 2; //语句2
a = a + 3; //语句3
r = a * a; //语句4

这段代码有4个语句,那么可能的一个执行顺序是:那么可不可能是这个执行顺序呢: 语句2 >语句1> 语句4 > 语句3。
不可能,因为处理器在进行重排序时是会考虑指令之间的数据依赖性,如果一个指令Instruction 2必须用到Instruction 1的结果,那么处理器会保证Instruction 1会在Instruction 2之前执行。

虽然重排序不会影响单个线程内程序执行的结果,但是多线程呢?下面看一个例子:

//线程1:
context = loadContext(); //语句1
inited = true; //语句2
//线程2:
while(!inited ){
sleep()
}
doSomethingwithconfig(context);

上面代码中,由于语句1和语句2没有数据依赖性,因此可能会被重排序。假如发生了重排序,在线程1执行过程中先执行语句2,而此是线程2会以为初始化工作已经完成,那么就会跳出while循环,去执行doSomethingwithconfig(context)方法,而此时context并没有被初始化,就会导致程序出错。从上面可以看出,指令重排序不会影响单个线程的执行,但是会影响到线程并发执行的正确性。

解决有序性问题

volatile保证有序性

Happens-Before规则

规定了对共享变量的写操作对其他线程的读操作可见,它是可见性和有序性的一套规则总结。

一、CAS保证线程安全

传统方式:

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        Account account = new AccountSafe(10000);
        Account.test(account);
    }
}
class  AccountSafe implements  Account{
    private  Integer balance;
    public  AccountSafe(Integer balance){
        this.balance = balance;
    }
    @Override
    public Integer getBalance() {
        synchronized (this) {
            return this.balance;
        }
    }
    @Override
    public void withdraw(Integer amount) {
        synchronized (this){
            this.balance -=  amount;
        }
    }
}

interface  Account{
    Integer getBalance();
    void withdraw(Integer amount);
    static  void test(Account account){
        List<Thread>  ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(()->{
                account.withdraw(10);
            }));
        }
        long start = System.nanoTime();
        ts.forEach(Thread::start);
        ts.forEach( t ->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance()+"cost = " + (end-start)/1000_0000+"ms");
    }
}

CAS方式:

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        Account account = new AccountSafe(10000);
        Account.test(account);
    }
}

class  AccountSafe implements  Account{
    private AtomicInteger balance;
    public  AccountSafe(int account){
        this.balance = new AtomicInteger(account);
    }
    @Override
    public Integer getBalance() {
        return  balance.get();
    }
    @Override
    public void withdraw(Integer amount) {
        while(true){
            int pre = balance.get();
            int next = pre - amount;
            if(balance.compareAndSet(pre,next)){
                break;
            }
        }
    }
}
interface   Account{
    Integer getBalance();
    void withdraw(Integer amount);
    static  void test(Account account){
        List<Thread>  ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(()->{
                account.withdraw(10);
            }));
        }
        long start = System.nanoTime();
        ts.forEach(Thread::start);
        ts.forEach( t ->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance()+"cost = " + (end-start)/1000_0000+"ms");
    }
}

CAS与volatile关系;以AtomicInteger为例:

CAS适用场景:多核CPU且线程数不超过CPU核心数。

CAS与synchronized区别:

二、享元模式

除了给共享变量加锁、使用原子性的类保证共享变量的安全外,也可使用不可变类,如JDK自带的 DateTimeFormatterString,其主要思想是保护性拷贝机制,以通过创建新的对象来保证变量的不共享,从而保证变量的安全性。但这样的机制弊端就是重复创建对象。

享元模式就是解决重复创建对象的设计模式,比如经常使用的Long对象的内部类,在初始化时就创建 -128 ~ 127的Long数组;valueOf方法在获取Long时,会从数组中获取现有的数,从而避免重复创建Long对象。

 private static class LongCache {
        private LongCache(){}
        static final Long cache[] = new Long[-(-128) + 127 + 1];

        static {
            for(int i = 0; i < cache.length; I++)
                cache[i] = new Long(i - 128);
        }
    }

   public static Long valueOf(long l) {
        final int offset = 128;
        if (l >= -128 && l <= 127) { // will cache
            return LongCache.cache[(int)l + offset];
        }
        return new Long(l);
    }

享元模式示例:自定义数据库连接池

public class PoolDemo {
    // 1.连接数大小
    private  final int poolsize;
    // 2. 连接对象数组
    private Connection[]  connects;
    // 3.连接状态数组 0 空闲  1 繁忙
    private AtomicIntegerArray state;
    // 4.初始化
    public PoolDemo(int poolsize) {
        this.poolsize = poolsize;
        this.connects = new Connection[poolsize];
        this.state = new AtomicIntegerArray(new int[poolsize]);
        for (int i = 0; i < poolsize ; i++) {
            connects[i] = new MockConnect();
        }
    }
    // 5. 获取连接
    public  Connection borrow(){
     while (true){
         for (int i = 0; i <  poolsize; i++) {
             // 获取空闲连接
              if(state.get(i) == 0){
                 if(state.compareAndSet(i,0,1)){
                     System.out.println(Thread.currentThread().getName()+"创建连接" +connects[i]);
                     return  connects[i];
                 }
              }
         }
         // 无空闲连接,当前线程则等待
         synchronized (this){
             try {
                 System.out.println(Thread.currentThread().getName()+"等待" );
                 this.wait();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
     }
    }
    // 6. 归还连接
    public  void  free(Connection conn){
        for (int i = 0; i < poolsize ; i++) {
            if(connects[i] == conn){
                state.set(i,0);
                synchronized (this){
                    System.out.println(Thread.currentThread().getName()+"归还 = " + conn);
                    this.notifyAll();
                }
                break;
            }
        }
    }
}

class MockConnect implements  Connection{...}

比较其他方式的连接池方式:

public class ConnectionUtils {
   //解决并发问题,每个线程只取各自的Connect
    private ThreadLocal <Connection> tl = new ThreadLocal<Connection>();
    private DataSource  dataSource;
    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }
    public  Connection getThreadConnection(){
        try {
        //获取当前线程绑定的局部变量
        Connection con = tl.get();
        if(con == null){
           con = dataSource.getConnection();
        //设置当前线程绑定的局部变量
           tl.set(con);
          }
        return  con;
        }catch (Exception e) {
            throw  new RuntimeException(e);
        }
    }

    public void removeConnection(){
      //移除当前线程绑定的局部变量
        tl.remove();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读