线程安全性

2018-11-02  本文已影响0人  磊_5d71

原子性

图片.png

AtomicInteger

AtomicInteger类中的incrementAndGet方法,调用CompareAndSetInt 简称CAS
有native修饰说明是底层的类,不是用java实现的

package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@ThreadSafe
public class AtomicExample1 {


    //请求数1000
    public static int clientTotal = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;

    //count里面的值相当于工作内存,有可能和底层也就是主内存的值不一致
    public static AtomicInteger count = new AtomicInteger(0);


    private static void add(){
        //用到unsafe类
        count.incrementAndGet();
    }

    public static void main(String[] args) throws InterruptedException {

        //定义线程池ExecutorService接口
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量,传入并发线程数 final修饰不允许重新赋值
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定义计数器闭锁。传入请求总数
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            //通过匿名内部类方式
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //semaphore控制并发数量
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception",e);
                    }
                    //每次执行计数器减掉一个
                    countDownLatch.countDown();
                }

            });

        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count.get());
    }
}

LongAdder

package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

@Slf4j
@ThreadSafe
public class AtomicExample3 {


    //请求数1000
    public static int clientTotal = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;

    //count里面的值相当于工作内存,有可能和底层也就是主内存的值不一致
    public static LongAdder count = new LongAdder();


    private static void add(){
        //用到unsafe类
        count.increment();
    }

    public static void main(String[] args) throws InterruptedException {

        //定义线程池ExecutorService接口
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量,传入并发线程数 final修饰不允许重新赋值
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定义计数器闭锁。传入请求总数
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            //通过匿名内部类方式
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //semaphore控制并发数量
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception",e);
                    }
                    //每次执行计数器减掉一个
                    countDownLatch.countDown();
                }

            });

        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);
    }
}

AtomicReference

package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0,2);
        count.compareAndSet(0,1);
        count.compareAndSet(1,3);
        count.compareAndSet(2,4);
        count.compareAndSet(3,5);
        log.info("count:{}",count.get());
    }

}

AtomicIntegerFieldUpdater

package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

@Slf4j
@ThreadSafe
public class AtomicExample5 {


    //这里必须要使用volatile修饰符,并且非static才可以。通过lombok添加注解
    @Getter
    public volatile int count = 100;

    private static AtomicIntegerFieldUpdater<AtomicExample5> update = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class,"count");

    public static void main(String[] args) {

        AtomicExample5 example5 = new AtomicExample5();

        if(update.compareAndSet(example5,100,120)){
            log.info("update success,{}",example5.getCount());
        }

        if(update.compareAndSet(example5,100,120)){
            log.info("update success,{}",example5.getCount());
        }else {
            log.info("update failed,{}",example5.getCount());
        }
    }
}

AtomicStampedReference

-ABA问题 线程1准备用CAS将变量的值由A替换为B,在此之前,线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。但实际上这时的现场已经和最初不同了,尽管CAS成功,但可能存在潜藏的问题, AtomicStampedReference可以解决此问题

AtomicBoolean

package com.alan.concurrency.example.atomic;


import com.alan.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

@Slf4j
@ThreadSafe
public class AtomicExample6 {

    //请求数5000
    public static int clientTotal = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;

    private static AtomicBoolean isHappened = new AtomicBoolean(false);

    public static void main(String[] args) throws InterruptedException {     //定义线程池ExecutorService接口
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量,传入并发线程数 final修饰不允许重新赋值
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定义计数器闭锁。传入请求总数
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            //通过匿名内部类方式
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //semaphore控制并发数量
                        semaphore.acquire();
                        test();
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception",e);
                    }
                    //每次执行计数器减掉一个
                    countDownLatch.countDown();
                }

            });

        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}",isHappened.get());
    }

    //让某一段代码只执行一次,绝对不会重复
    private static void test(){
        if (isHappened.compareAndSet(false, true)) {
            log.info("executed");
        }
        }
}

图片.png

synchronized

package com.alan.concurrency.example.sync;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class SynchronizedExample1 {


    //修饰一个代码块。作用范围是大括号括起的代码
    public void test1(int   j){
        synchronized (this){
            for (int i = 0; i < 10; i++) {
                log.info("test1 {}- {}",j,i);
            }
        }
    }

    //修饰一个方法。作用范围是整个方法
    public synchronized void test2(int j){
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}",j,i);
        }

    }

    public static void main(String[] args) {

        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()->{
            example1.test2(1);
        });
        executorService.execute(()->{
            example2.test2( 2);
        });
    }
}
package com.alan.concurrency.example.sync;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class SynchronizedExample2 {


    //修饰一个类。作用范围是所有对象
    public void test1(int  j){
        synchronized (SynchronizedExample2.class){
            for (int i = 0; i < 10; i++) {
                log.info("test1 {}- {}",j,i);
            }
        }
    }

    //修饰一个静态方法。作用范围是所有对象
    public static synchronized void test2(int j){
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}",j,i);
        }

    }

    public static void main(String[] args) {

        SynchronizedExample2 example1 = new SynchronizedExample2();
        SynchronizedExample2 example2 = new SynchronizedExample2();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()->{
            example1.test1(1);
        });
        executorService.execute(()->{
            example2.test1( 2);
        });
    }

}

对比

图片.png

可见性

可见性-synchronized方式

图片.png

可见性-volatile方式 (volatile不具备原子性)

通过加入内存屏障和禁止重排序优化来实现

volatile 也可以用于double check

有序性

有序性原则

图片.png
图片.png
图片.png
图片.png
上一篇下一篇

猜你喜欢

热点阅读