4、Java并发编程入门与高并发面试-线程安全性

2020-05-23  本文已影响0人  安安汐而

慕课网 Jimin老师 Java并发编程入门与高并发面试 学习笔记
Java并发编程入门与高并发面试

线程安全性定义:
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的

原子性:提供了互斥访问,同一时刻只能有一一个线程来对它进行操作
可见性: 一个线程对主内存的修改可以及时的被其他线程观察到
有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果- -般杂乱无序

原子性-Atomic包
◆AtomicXXX : CAS. Unsafe.compareAndSwapInt
package com.huhao.concurrency.example.count;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 代码模拟并发测试
 * Atomic包演示
 */
@Slf4j
@ ThreadSafe
public class CountExample2 {

    //请求总数
    public static int clientTotal = 5000;

    //同时并发执行的线程数
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);


    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int index = 0; index < clientTotal; index++) {
            exec.execute(() -> {
                try {
                    //线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
                    semaphore.acquire();
                    add();
                    //add执行完后,释放当前线程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            });
        }
        //保证线程减到0
        countDownLatch.await();
        //关闭线程池
        exec.shutdown();
        log.error("count:{}", count);//count:4952
    }

    private static void add() {
        count.incrementAndGet();
    }
}

原因详解:

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

/**
 * eg:2+1=3
 * @param var1 :当前count对象
 * @param var2:当前的值  2
 * @param var4 :加的值   1
 * 
 * var5是底层方法获取到的值。如果没有别的线程处理更改的时候,正常应该var5=2,
 * compareAndSwapInt 来判断,直到var2与var5相同时,才会var5+var4=2+1,返回3
 * @return the updated value
 */
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}
AtomicLong、LongAdder

由于getAndAddInt方法,一直循环比较,在线程数少的时候,比较的成功率比较高,在多的时候,失败的次数会增多,导致一直循环,效率较低
并发量大的时候,使用LongAdder效率会高一点。如果小的话,也可以用AtomicInteger

AtomicReference 、AtomicReferenceFieldUpdater
package com.huhao.concurrency.example.atomic;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

/**
 * 代码模拟并发测试
 * AtomicReference包演示
 */
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicReference {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0, 2);//count=2
        count.compareAndSet(0, 1);//count=2,不是0,则不执行
        count.compareAndSet(1, 3);//count=2,不是0,则不执行
        count.compareAndSet(2, 4);//count=4
        count.compareAndSet(3, 5);//count=4,不是0,则不执行
        log.info("count:{}", count.get());//count:4
    }
}
package com.huhao.concurrency.example.atomic;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 代码模拟并发测试
 * AtomicReferenceFieldUpdater包演示
 */
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicExampleAtomicReference {

    //AtomicExampleAtomicExampleAtomicReference的count字段,必须用volatile修饰
    private static AtomicIntegerFieldUpdater<AtomicExampleAtomicExampleAtomicReference>
            updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExampleAtomicExampleAtomicReference.class, "count");

    @Getter
    public volatile int count = 100;

    public static void main(String[] args) {

        AtomicExampleAtomicExampleAtomicReference example5 = new AtomicExampleAtomicExampleAtomicReference();

        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 1,{}", example5.getCount());   // update success 1,120
            if (updater.compareAndSet(example5, 100, 120)) {
                log.info("update success 2,{}", example5.getCount());
            } else {
                log.info("update failed,{}", example5.getCount());  // update failed,120
            }
        }
    }
}
AtomicStampReference : CAS的ABA问题

aba问题:cas操作时候,其他线程将变量值A,从A改到B,又改回A。本线程在比较的时候,发现A变量没有变,于是就讲A值进行了交换操作,其实该值以及被其他线程更新过,与最初设计初衷不符。
解决:在变量更新时候,将变量版本号加1

AtomicBoolean

只执行一次


import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 代码模拟并发测试
 * AtomicBoolean包演示
 */
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicBoolean {

    //请求总数
    public static int clientTotal = 5000;

    //同时并发执行的线程数
    public static int threadTotal = 200;

    public static AtomicBoolean isHappened = new AtomicBoolean();


    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int index = 0; index < clientTotal; index++) {
            exec.execute(() -> {
                try {
                    //线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
                    semaphore.acquire();
                    test();
                    //add执行完后,释放当前线程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            });
        }
        //保证线程减到0
        countDownLatch.await();
        //关闭线程池
        exec.shutdown();
        log.error("count:{}", isHappened.get());
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)) {
            //5000次线程,只会执行一次
            log.info("exec");
        }
    }
}

原子性-synchronized

◆修饰代码块:大括号括起来的代码,作用于调用的对象
◆修饰方法:整个方法,作用于调用的对象
◆修饰静态方法:整个静态方法,作用于所有对象
◆修饰类:括号括起来的部分,作用于所有对象

package com.huhao.concurrency.example.sync;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 *  Synchronized测试修饰代码块和方法
 */
@Slf4j
@ThreadSafe
public class SynchronizedExample {

    /**
     * 一般的循环,多线程测下看看对不对
     */
    private void notSyncBlock() {
        for (int i = 0; i < 10; i++) {
            log.info("notSyncBlock - {}", i);
        }
    }

    /**
     * 修饰代码块
     * 大括号括起来的代码,作用于调用的对象
     */
    private void synchronizedBlock() {
        synchronized (this) {
            for (int i = 0; i < 4; i++) {
                log.info("test1 - {}", i);
            }
        }
    }

    /**
     * 修饰代码块
     * 大括号括起来的代码,作用于调用的对象
     */
    private void synchronizedBlock(int j) {
        synchronized (this) {
            for (int i = 0; i < 4; i++) {
                log.info("synchronizedBlock {} - {}", j, i);
            }
        }
    }

    /**
     * 修饰方法
     * 整个方法,作用于调用的对象
     */
    private synchronized void synchronizedMethod() {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}", i);
        }
    }

    //请求总数
    public static int clientTotal = 4;

    //同时并发执行的线程数
    public static int threadTotal = 3;

    public static void main(String[] args) throws InterruptedException {
        SynchronizedExample synchronizedExample = new SynchronizedExample();
        SynchronizedExample synchronizedExample2 = new SynchronizedExample();

        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        /**
         * 线程同步执行测试是不是同步一个个的执行
         */
        for (int index = 0; index < clientTotal; index++) {
            exec.execute(() -> {
                try {
                    //线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
                    semaphore.acquire();
                    /**
                     *  没有加sync的,
                     *  notSyncBlock - 0
                     *  notSyncBlock - 0
                     *  notSyncBlock - 0
                     *  notSyncBlock - 1
                     *  notSyncBlock - 1
                     *  notSyncBlock - 1
                     *  notSyncBlock - 2
                     *  notSyncBlock - 2
                     *  notSyncBlock - 2
                     *  notSyncBlock - 3
                     */
//                    synchronizedExample.notSyncBlock();


                    /**
                     * 加了sync的,同步执行
                     * test1 - 0
                     * test1 - 1
                     * test1 - 2
                     * test1 - 3
                     * test1 - 0
                     * test1 - 1
                     * test1 - 2
                     * test1 - 3
                     * test1 - 0
                     * test1 - 1
                     * test1 - 2
                     * test1 - 3
                     * test1 - 0
                     */
//                    synchronizedExample.synchronizedBlock();

                    /**
                     * example和example2会同时调用,但是本身是顺序执行的
                     * 同一个对象,是同步执行
                     * 不同对象间,不干扰
                     */
//                    synchronizedExample.synchronizedBlock(0);
//                    synchronizedExample2.synchronizedBlock(1);
//                    synchronizedExample2.synchronizedBlock(2);

                    //add执行完后,释放当前线程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            });
        }
        //保证线程减到0
        countDownLatch.await();
        //关闭线程池
        exec.shutdown();
//        synchronizedExample.synchronizedMethod();
    }
}
package com.huhao.concurrency.example.sync;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

/**
 * Synchronized 测试修饰静态方法和类
 */
@Slf4j
@ThreadSafe
public class SynchronizedExample2 {

    /**
     * 一般的循环,多线程测下看看对不对
     */
    public void notSyncBlock() {
        for (int i = 0; i < 10; i++) {
            log.info("notSyncBlock - {}", i);
        }
    }

    /**
     * 修饰类
     */
    public static void synchronizedBlock() {
        synchronized (SynchronizedExample2.class) {
            for (int i = 0; i < 4; i++) {
                log.info("test1 - {}", i);
            }
        }
    }

    /**
     * 修饰静态方法
     */
    public static synchronized void synchronizedMethod() {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}", i);
        }
    }

    //请求总数
    public static int clientTotal = 4;

    //同时并发执行的线程数
    public static int threadTotal = 3;

    public static void main(String[] args) throws InterruptedException {
    }
}

对比:

◆synchronized :不可中断锁,适合竞争不激烈,可读性好
◆Lock :可中断锁,多样化同步,竞争激烈时能维持常态
◆Atomic :竞争激烈时能维持常态,比Lock性能好;只能同步一个值

可见性

导致共享变量在线程间不可见的原因
◆线程交叉执行
◆重排序结合线程交叉执行
◆共享变量更新后的值没有在工作内存与主存间及时更新

可见性- synchronized

JMM关于synchronized的两条规定:
◆线程解锁前 ,必须把共享变量的最新值刷新到主内存
◆线程加锁时 ,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意,
加锁与解锁是同一把锁)

可见性- volatile

通过加入内存屏障和禁止重排序优化来实现
◆对volatile变量写操作时 ,会在写操作后加入-条store屏障指令,将本地内存中的共享变量值刷新到主内存
◆对volatile变量读操作时 ,会在读操作前加入-条load屏障指令,从主内存中读取共享变量

上一篇下一篇

猜你喜欢

热点阅读