工作架构设计并发编程

Java并发编程

2018-06-28  本文已影响74人  cuzz_

第1章 课程准备

本章首先从课程重点、特点、适合人群及学习收获几个方面对课程进行整体的介绍,然后会从一个实际的计数场景实现开始,给大家展示多线程并发时的线程不安全问题,让大家能够初体验到并发编程,之后会讲解并发和高并发的概念,并通过对比让大家明白到底什么是并发和高并发,最后会给出课程涉及到的知识技能,为后续的学习做好准备。

1-1 课程导学

1-2 并发编程初体验

多线程计数不准。

1-3 并发与高并发基本概念

第2章 并发基础

本章主要讲解并发学习必须理解的一些基本概念,主要包括CPU多级缓存和Java内存模型(JMM)。其中CPU多级缓存里深入讲解了缓存一致性和乱序执行优化。Java内存模型(JMM)里详细讲解了JMM规定、JMM抽象结构、同步的八种操作及同步规则。这些基本概念对于后面的并发编程很重要,也属于面试常考点,需要认真体会掌握。

2-1 CPU多级缓存-缓存一致性

M: 被修改(Modified)

该缓存行只被缓存在该CPU的缓存中,并且是被修改过的(dirty),即与主存中的数据不一致,该缓存行中的内存需要在未来的某个时间点(允许其它CPU读取请主存中相应内存之前)写回(write back)主存。当被写回主存之后,该缓存行的状态会变成独享(exclusive)状态。

E: 独享的(Exclusive)

该缓存行只被缓存在该CPU的缓存中,它是未被修改过的(clean),与主存中数据一致。该状态可以在任何时刻当有其它CPU读取该内存时变成共享状态(shared)。同样地,当CPU修改该缓存行中内容时,该状态可以变成Modified状态。

S: 共享的(Shared)

该状态意味着该缓存行可能被多个CPU缓存,并且各个缓存中的数据与主存数据一致(clean),当有一个CPU修改该缓存行中,其它CPU中该缓存行可以被作废(变成无效状态(Invalid))。

I: 无效的(Invalid)
该缓存是无效的(可能有其它CPU修改了该缓存行)。

2-2 CPU多级缓存-乱序执行优化

2-3 JAVA内存模型(Java Memory Model, JMM)

Stack: 存放的数据大小与生存期是确定的,缺乏灵活性,速度比较快,数据可已共享,存放基本类型和句柄
Heap: 运行时确定大小,生存期也不必事先告诉编译器,是在运行期在动态分配内存的,垃圾收集器会清理它们,由于动态分配的内存,速度比较慢

线程A把本地内存中的共享变量副本刷新到主内存里,线程B从主内存中读取。

2-4 并发的优势与风险

10.jpg

第3章 项目准备

本章主要是为课程里代码演示做必要的准备。首先会基于SpringBoot快速搭建一个方便演示的Java项目,然后简单介绍一下码云及代码的管理。项目搭建好,我会使用简单的例子演示一下并发的模拟验证,主要包括对工具Postman、JMeter、Apache Bench(AB)的使用,以及使用并发的代码来验证并发处理的正确性。

3-1 案例环境初始化

https://blog.csdn.net/lom9357bye/article/details/69677120
log:@Slf4j
https://www.2cto.com/kf/201712/702543.html

3-2 案例准备工作

3-3 并发模拟-工具

3-4 并发模拟-代码

第4章 线程安全性

本章讲解线程安全性,主要从原子性、可见性、有序性三个方面进行讲解。原子性部分,会详细讲解atomic包下相关类、CAS原理、Unsafe类、synchronized关键字等的使用及注意事项。可见性部分,主要介绍的是volatile关键字的规则和使用,及synchronized关键字的可见性。有序性部分,则重点讲解了happens-before原则。

4-1 线程安全性-原子性-atomic-1

image.png image.png
@Slf4j
@ThreadSafe
public class CountExample2 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }

    private static void add() {
        count.incrementAndGet();
    }
}

使用了AtomicInteger这个类,可以保证原子性。当我们count.incrementAndGet();的时候,底层使用了这样的代码

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

基于CAS:Compare and Swap, 翻译成比较并交换

CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

详情看如下博客

public class AtomicExample3 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static LongAdder count = new LongAdder();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        count.increment();
    }
}

4-3 线程安全性-原子性-synchronized

image.png
@Slf4j
public class SynchronizedExample1 {

    // 修饰一个代码块
    public void test1() {
      synchronized (this) {
          for (int i = 0; i < 10; i++) {
              log.info("test1 - {}", i);
          }
      }
    }

    // 修饰一个方法
    public synchronized void test2() {
        for (int i = 0; i < 10; i++) {
            log.info("test - {}", i);
        }
    }

    public static void main(String[] args) {
        // 声明一个实例
        SynchronizedExample1 example1 = new SynchronizedExample1();
        // 声明一个线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()->{
            example1.test1();
        });

        executorService.execute(()->{
            example1.test1();
        });
    }
}

先打印test1-0到9再打印test1-0到9

15:22:05.516 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 0
15:22:05.526 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 1
15:22:05.526 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 2
15:22:05.526 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 3
15:22:05.526 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 4
15:22:05.527 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 5
15:22:05.527 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 6
15:22:05.527 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 7
15:22:05.527 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 8
15:22:05.527 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 9
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 0
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 1
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 2
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 3
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 4
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 5
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 6
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 7
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 8
15:22:05.527 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 9

使用线程池,他们本来可以同时进行的,加了synchronized 就同步是使用了同一个对象。

当使用多了对象时

@Slf4j
public class SynchronizedExample1 {

    // 修饰一个代码块
    public void test1() {
      synchronized (this) {
          for (int i = 0; i < 10; i++) {
              log.info("test1 - {}", i);
          }
      }
    }

    // 修饰一个方法
    public synchronized void test2() {
        for (int i = 0; i < 10; i++) {
            log.info("test - {}", i);
        }
    }

    public static void main(String[] args) {
        // 声明一个实例
        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        // 声明一个线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()->{
            example1.test1();
        });

        executorService.execute(()->{
            example2.test1();
        });
    }
}

打印结果,是交替出现的,也不一定是交替,不同对象调用时互相不影响的

15:35:12.188 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 0
15:35:12.188 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 0
15:35:12.196 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 1
15:35:12.196 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 1
15:35:12.196 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 2
15:35:12.196 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 2
15:35:12.196 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 3
15:35:12.196 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 3
15:35:12.196 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 4
15:35:12.196 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 4
15:35:12.196 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 5
15:35:12.196 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 5
15:35:12.197 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 6
15:35:12.197 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 6
15:35:12.197 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 7
15:35:12.197 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 7
15:35:12.197 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 8
15:35:12.197 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 8
15:35:12.197 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 9
15:35:12.197 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample1 - test1 - 9
public class SynchronizedExample2 {

    // 修饰一个类
    public static void test1(int j) {
      synchronized (SynchronizedExample2.class) {
          for (int i = 0; i < 10; i++) {
              log.info("test1 -{} - {}",j, i);
          }
      }
    }

    // 修饰一个静态方法
    public static synchronized void test2(int j) {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {} - {}", i);
        }
    }

    public static void main(String[] args) {
        // 声明一个实例
        SynchronizedExample2 example1 = new SynchronizedExample2();
        SynchronizedExample2 example2 = new SynchronizedExample2();
        // 声明一个线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()->{
            example1.test1(1);
        });

        executorService.execute(()->{
            example2.test1(2);
        });
    }
}

使用不同的对象,打印出来,是先打印test1 -1 线程的,再打印test1 - 2这个线程

15:46:20.320 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 0
15:46:20.330 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 1
15:46:20.330 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 2
15:46:20.330 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 3
15:46:20.330 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 4
15:46:20.330 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 5
15:46:20.330 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 6
15:46:20.330 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 7
15:46:20.330 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 8
15:46:20.330 [pool-1-thread-1] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -1 - 9
15:46:20.330 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 0
15:46:20.330 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 1
15:46:20.330 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 2
15:46:20.330 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 3
15:46:20.330 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 4
15:46:20.330 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 5
15:46:20.330 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 6
15:46:20.330 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 7
15:46:20.331 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 8
15:46:20.331 [pool-1-thread-2] INFO com.cuzz.concurrency.example.sync.SynchronizedExample2 - test1 -2 - 9

@Slf4j
@ThreadSafe
public class CountExample3 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private synchronized static void add() {
        count++;
    }
}

输出结果

15:52:21.184 [main] INFO com.cuzz.concurrency.example.count.CountExample3 - count:500

4-4 线程安全性-可见性

当我count++时
分为3步 1. 从主存中读取count 2. +1 3. 把count写回主存
当两个两个线程同时获取时,虽然它们都是读的最新的值,但是有时候会少计数

@Slf4j
@NotThreadSafe
public class CountExample4 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static volatile int count = 0;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        count++;
    }
}

4-5 线程安全性-有序性

4-6 线程安全性总结

image.png

第5章 安全发布对象

本章主要讲解安全发布对象的一些核心方法,主要通过单例类的多种实现方式,让大家在实现过程中去体会这些方法的具体含义。这一章也是对线程安全性的巩固,也是把线程安全性涉及的一些关键字和类再一次放到实际场景中使用,加深大家对他们的印象和认识。

5-1 安全发布对象-发布与逸出

image.png
@Slf4j
public class UnsafePublic {
    private String[] states = {"a", "b", "c"};

    public String[] getStrates() {
        return states;
    }

    public static void main(String[] args) {
        UnsafePublic unsafePublic = new UnsafePublic();
        log.info("{}", Arrays.toString(unsafePublic.getStrates()));

        unsafePublic.getStrates()[0] = "d";
        log.info("{}", Arrays.toString(unsafePublic.getStrates()));
    }
}
@Slf4j
public class Escape {

    private int thisCanBeEscape = 0;

    public Escape() {
        new InnerClass();
    }

    private class InnerClass{

        public InnerClass() {
            log.info("{}", Escape.this.thisCanBeEscape);
        }
    }

    public static void main(String[] args) {
        new Escape();
    }
}

5-2 安全发布对象-四种方法

如果一个对象时可变对象,就要安全的发布。


image.png
/**
 * 懒汉模式
 * 单例实例在第一次使用的时候创建出来
 */
@NotThreadSafe
public class SingletonExample1 {

    // 私有的构造函数
    private SingletonExample1() {

    }

    // 单例对象
    private static SingletonExample1 instance = null;

    // 静态的工厂方法
    public static SingletonExample1 getInstance() {
        if (instance == null) {
            instance = new SingletonExample1();
        }
        return instance;
    }
}
/**
 * 饿汉模式
 * 单例实例在类装载使用时候创建出来
 */
@ThreadSafe
public class SingletonExample2 {

    // 私有的构造函数
    private SingletonExample2() {

    }

    // 单例对象
    private static SingletonExample2 instance = new SingletonExample2();

    // 静态的工厂方法
    public static SingletonExample2 getInstance() {
        return instance;
    }
}
/**
 * 懒汉模式
 * 单例实例在第一次使用的时候创建出来
 */
@ThreadSafe
@NotRecommend
public class SingletonExample3 {

    // 私有的构造函数
    private SingletonExample3() {

    }

    // 单例对象
    private static SingletonExample3 instance = null;

    // 静态的工厂方法
    public static synchronized  SingletonExample3 getInstance() {
        if (instance == null) {
            instance = new SingletonExample3();
        }
        return instance;
    }
}
/**
 * 懒汉模式 双重锁同步单例模式
 * 单例实例在第一次使用的时候创建出来
 */
@NotThreadSafe
public class SingletonExample4 {

    // 私有的构造函数
    private SingletonExample4() {

    }
  // 单例对象
    private static SingletonExample4 instance = null;

    // 静态的工厂方法
    public static SingletonExample4 getInstance() {
        if (instance == null) { // 双重检测机制        // B
            synchronized (SingletonExample4.class) { // 同步锁  // A - 3
               if (instance == null) {
                   instance = new SingletonExample4();
               }
            }
        }
        return instance;
    }
}
/**
 * 懒汉模式 双重锁同步单例模式
 * 单例实例在第一次使用的时候创建出来
 */
@ThreadSafe
public class SingletonExample5 {

    // 私有的构造函数
    private SingletonExample5() {

    }
    // 1. memory = allocate() 分配对象空间
    // 2. ctorInstance() 初始化对象
    // 3. instance = memory 设置instance指向刚分配的内存

    // JVM和cpu优化,发生了指令重排

    // 1. memory = allocate() 分配对象空间
    // 3. instance = memory 设置instance指向刚分配的内存
    // 2. ctorInstance() 初始化对象

    // 单例对象 // 加上volatile防止指令重排
    private volatile static  SingletonExample5 instance = null;

    // 静态的工厂方法
    public static SingletonExample5 getInstance() {
        if (instance == null) { // 双重检测机制
            synchronized (SingletonExample5.class) { // 同步锁
               if (instance == null) {
                   instance = new SingletonExample5();
               }
            }
        }
        return instance;
    }
}
/**
 * 饿汉模式
 * 单例实例在类装载使用时候创建出来
 */
@ThreadSafe
public class SingletonExample6 {

    // 私有的构造函数
    private SingletonExample6() {

    }
    
    // 单例对象
    // 静态域一定要再静态代码块之前,因为他们是按照循序执行的
    private static SingletonExample6 instance = null;
    
    static {
        instance = new SingletonExample6();
    }


    // 静态的工厂方法
    public static SingletonExample6 getInstance() {
        return instance;
    }
}
/**
 * 枚举模式:最安全的
 */
@ThreadSafe
@Recommend
public class SingletonExample7 {
    // 私有构造函数
    private SingletonExample7() {

    }

    public static SingletonExample7 getInstance() {
        return Singleton.INSTANCE.getInstance();

    }

    private enum Singleton {
        INSTANCE;

        private SingletonExample7 singleton;

        // JVM保证这个方法绝对只调用一次
        Singleton() {
            singleton = new SingletonExample7();
        }

        public SingletonExample7 getInstance() {
            return singleton;
        }
    }
}

第6章 线程安全策略

本章主要讲解线程安全策略,包括定义不可变对象、线程封闭、同步容器、并发容器等,引出并发里的关键知识J.U.C。同时还额外介绍了开发中常见的一些线程不安全类和写法,并给出他们各自对应的替代方案。这一章涉及的内容在日常开发和面试中都会涉及很多。

6-1 不可变对象-1

public class ImmutableExample1 {

    private final static Integer a = 1;
    private final static String b = "2";
    private final static Map<Integer, Integer> map = new HashMap<>();

    static {
        map.put(1, 2);
        map.put(3, 4);
        map.put(5, 6);
    }
    
    // 可以保证变量不发生变化
    private void test(final int a) {
        // a = 1;  // 编译出错
    }

    public static void main(String[] args) {
        // a = 2;   // 编译出错
        // b = "3"; // 编译出错
        // map = new HashMap<>(); // 编译出错
        map.put(1, 3);   // 引用对象可以修改里面的值
    }
}

6-2 不可变对象-2

image.png
@ThreadSafe
public class ImmutableExample2 {
    private static Map<Integer, Integer> map = new HashMap<>();

    static {
        map.put(1, 2);
        map.put(3, 4);
        map.put(5, 6);
        map = Collections.unmodifiableMap(map);
    }

    public static void main(String[] args) {
        map.put(1, 3); // 运行时报错
    }
}

6-3 线程封闭-1

image.png

还不是很清楚,可以看看以下博客
ThreadLocal

6-4 线程封闭-2

6-5 线程不安全类与写法-1

6-6 线程不安全类与写法-2

6-7 同步容器-1

6-8 同步容器-2

在使用foreach和iterator的时候不要删除元素。
同步容器使用synchronized性能不好,同步容器也不是完全线程安全的。

6-9 并发容器及安全共享策略总结

第7章 J.U.C之AQS

AQS(AbstractQueuedSynchronizer)是J.U.C的重要组件,也是面试的重要考点。这一章里将重点讲解AQS模型设计及相关同步组件的原理和使用,都非常实用,具体包括:CountDownLatch、Semaphore、CyclicBarrier、ReentrantLock与锁、Condition等。这些组件需要大家能熟练明白他们的用途及差异,不但会使用,而且还要明确知道不同方法调用后的不同效果。

7-1 J.U.C之AQS-介绍

image.png
image.png image.png
image.png

大致思路:首先AQS内部维护了一个CHL队列来管理锁,线程会首选尝试获取锁,如果获取失败,就把当前线程信息包装成一个节点锁,加入到队列中。然后循环尝试获取锁,当持有锁释放锁时,会唤醒后继锁。

7-2 J.U.C之AQS-CountDownLatch

image.png
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService  exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() ->{
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finished.");
        exec.shutdown();

    }

    private static void test(int threadNum) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}", threadNum);
    }
}

其中的countDownLathc.await(), 还可以接受2个参数:时间和单位。其含义是超过这个时间就不管了。继续执行下面的代码。

7-3 J.U.C之AQS-Semaphore

计数信号量
有限访问的资源
对并发访问的控制

public class SemaphoreExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService  exec = Executors.newCachedThreadPool();

        // 并发量为20
        final Semaphore semaphore = new Semaphore(20);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() ->{
                try {
                    semaphore.acquire(); // 获取一个许可
                    test(threadNum);
                    semaphore.release(); // 释放许可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();

    }

    private static void test(int threadNum) throws InterruptedException {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
public class SemaphoreExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService  exec = Executors.newCachedThreadPool();

        // 并发量为20
        final Semaphore semaphore = new Semaphore(20);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() ->{
                try {
                    if (semaphore.tryAcquire()) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();

    }

    private static void test(int threadNum) throws InterruptedException {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

7-4 J.U.C之AQS-CyclicBarrier

image.png

告诉我们有多少个值同步等待。

public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);


    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor= Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(()->{
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }

            });
        }
    }

    private static void race(int threadNum) throws Exception{
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

7-5 J.U.C之AQS-ReentrantLock与锁-1

public class LockExample1 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;

    private final static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}
public class LockExample4 {

    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();

        new Thread(() -> {
            try {
                reentrantLock.lock();
                log.info("wait signal"); // 1
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal"); // 4
            reentrantLock.unlock();
        }).start();

        new Thread(() -> {
            reentrantLock.lock();
            log.info("get lock"); // 2
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signalAll();
            log.info("send signal ~ "); // 3
            reentrantLock.unlock();
        }).start();
    }
}

7-6 J.U.C之AQS-ReentrantLock与锁-2

public class LockExample2 {

    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            readLock.unlock();
        }
    }

    class Data {

    }
}
public class LockExample3 {

    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        //下面看看乐观读锁案例
        double distanceFromOrigin() { // A read-only method
            long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
            double currentX = x, currentY = y;  //将两个字段读入本地局部变量
            if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                try {
                    currentX = x; // 将两个字段读入本地局部变量
                    currentY = y; // 将两个字段读入本地局部变量
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        //下面是悲观读锁案例
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                    long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                    if (ws != 0L) { //这是确认转为写锁是否成功
                        stamp = ws; //如果成功 替换票据
                        x = newX; //进行状态改变
                        y = newY;  //进行状态改变
                        break;
                    } else { //如果不能成功转换为写锁
                        sl.unlockRead(stamp);  //我们显式释放读锁
                        stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                    }
                }
            } finally {
                sl.unlock(stamp); //释放读锁或写锁
            }
        }
    }
}

第8章 J.U.C组件拓展

这一章继续讲解J.U.C相关组件,主要包括FutureTask、Fork/Join框架、BlockingQueue,其中FutureTask讲解时会对比着Callable、Runnable、Future来讲。这些组件使用场景相对AQS会少一些,但也是J.U.C的重要组成部分,也是需要掌握的。


image.png

8-1 J.U.C-FutureTask-1

@Slf4j
public class FutureExample {
    static class MyCallable implements Callable<String> {


        @Override
        public String call() throws Exception {
            log.info("do something in callable");
            sleep(5000);
            return "Done";
        }

    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        Future<String> future = executor.submit(new MyCallable());
        log.info("do something in main");
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String res = future.get();
        log.info("res: {}", res);

    }
}

8-2 J.U.C-FutureTask-2

@Slf4j
public class FutureTaskExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do something in callable");
                sleep(5000);
                return "Done";
            }
        });

        new Thread(futureTask).start();
        log.info("do something in main");
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String res = futureTask.get();
        log.info("res: {}", res);

    }
}

8-3 J.U.C-ForkJoin


用于并行计算

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

8-4 J.U.C-BlockingQueue

image.png
image.png
image.png
image.png

第9章 线程调度-线程池

本章讲解J.U.C里最后一部分:线程池。面试大概率会问到线程池相关的知识点。这一章将主要从new Thread弊端、线程池的好处、ThreadPoolExecutor详细介绍(参数、状态、方法)、线程池类图、Executor框架接口等进行讲解,需要大家能了解线程池的许多细节及配置,并能在实际项目中正确使用。...

9-1 线程池-1

9-2 线程池-2

@Slf4j
public class ThreadPoolExample1 {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}", index);
                }
            });
        }
        executor.shutdown();
    }
}
@Slf4j
public class ThreadPoolExample2 {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}", index);
                }
            });
        }
        executor.shutdown();
    }
}
@Slf4j
public class ThreadPoolExample3 {

    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);

//        executor.schedule(new Runnable() {
//            @Override
//            public void run() {
//                log.warn("schedule run");
//            }
//        }, 3, TimeUnit.SECONDS);


        // 启动是延迟1秒然后每隔3秒执行任务
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                log.warn("schedule run");
            }
        }, 1, 3, TimeUnit.SECONDS);
        //executor.shutdown();
    }
}

9-3 线程池-3

第10章 多线程并发拓展

本章会对并发编程做些补充,但都贴近当前的面试,主要讲解死锁产生的条件及预防、多线程并发编程的最佳实践、Spring与线程安全、以及面试都特别喜欢问的HashMap和ConcurrentMap源码细节。当然,面试喜欢问的问题,对实际项目开发也是特别重要的。

10-1 死锁

@Slf4j
public class DeadLock implements Runnable{

    public int flag = 1;
    private String str1;
    private String str2;

    public DeadLock(String str1, String str2) {
       this.str1 = str1;
       this.str2 = str2;
    }

    @Override
    public void run() {
        if (flag == 1) {
            synchronized (str1) {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                synchronized (str2) {
                    log.info("1");
                }
            }
        }

        if (flag == 0) {
            synchronized (str2) {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                synchronized (str1) {
                    log.info("0");
                }
            }
        }
    }

    public static void main(String[] args) {
        String str1 = new String("a");
        String str2 = new String("b");

        DeadLock td1 = new DeadLock(str1, str2);
        DeadLock td2 = new DeadLock(str1, str2);

        td1.flag = 1;
        td2.flag = 0;
        new Thread(td1).start();
        new Thread(td2).start();
    }
}

10-2 并发最佳实践

image.png
image.png
image.png

10-3 Spring与线程安全

image.png

10-4 HashMap与ConcurrentHashMap解析

java8


image.png

10-5 多线程并发与线程安全总结

2.jpg

第11章 高并发之扩容思路

本章重点是让大家学会解决高并发问题的思路和手段,及重点类的使用。在扩容讲解时,首先通过例子介绍垂直扩容和水平扩容的区别,之后详细介绍数据库的读操作扩展和写操作扩展。扩容这个最基本的手段,相信大家都不会有什么问题,关键是根据实际场景分析做什么样的扩容。

11-1 高并发之扩容思路

第12章 高并发之缓存思路

本章讲解高并发中缓存方案。包含对缓存特征(命中率、最大元素、清空策略)、影响缓存命中率因素、缓存分类和应用场景(本地缓存、分布式缓存)、高并发场景下缓存常见问题(缓存一致性、缓存并发、缓存穿透、雪崩)等的具体介绍。此外,针对大家常用的缓存组件Guava Cache、Memcache、Redis也做了原理性的分析,并且演示。

12-1 高并发之缓存-特征、场景及组件介绍-1

19.jpg

12-2 高并发之缓存-特征、场景及组件介绍-2

12-3 高并发之缓存-redis的使用

12-4 高并发之缓存-高并发场景问题及实战讲解

image.png

第13章 高并发之消息队列思路

本章重点介绍了消息队列的特性(业务无关、FIFO、容灾、性能)、为什么需要消息队列以及消息队列的好处(业务解耦、最终一致性、广播、错峰与流控),并在最后对当前比较流行的消息队列组件kafka和rabbitmq做了架构分析和特性介绍,让大家对消息队列能有明确的认识。

13-1 高并发之消息队列-1

image.png

13-2 高并发之消息队列-2

13-3 高并发之消息队列-3

第14章 高并发之应用拆分思路

本章直接从实际项目拆分步骤讲起,让大家可以实际感受到应用拆分的好处和解决的问题,之后引出对应用拆分原则(业务优先、循序渐进、兼顾技术、可靠测试)和应用拆分时思考的内容(应用之间通信、应用之间数据库设计、避免事务跨应用),并引出对服务化Dubbo和微服务Spring Cloud的框架介绍。

14-1 高并发之应用拆分-1

image.png

14-2 高并发之应用拆分-2

第15章 高并发之应用限流思路

本章从实际项目保存百万数据的限流场景开始讲起,让大家感受一下某些高并发场景下使用限流和不使用限流的区别,明确限流的重要作用。之后详细介绍了限流常用的四种算法:计数法、滑动窗口、漏桶算法和令牌桶算法,并对他们做了简单的对比。

15-1 高并发之应用限流-1

31.png

15-2 高并发之应用限流-2

第16章 高并发之服务降级与服务熔断思路

本章首先通过举例让大家明白什么是服务降级和服务熔断,之后介绍了服务降级的分类:自动降级(超时、失败次数、故障、限流)和人工降级(开关),总结了服务降级和服务熔断的共性(目的、最终表现、粒度、自治)和区别(出发原因、管理目标层次、实现方式)以及服务降级要考虑的问题。最后介绍了Hystrix在服务降级和服务熔。

16-1 高并发之服务降级与服务熔断思路

第17章 高并发之数据库切库分库分表思路

本章从数据库瓶颈开始讲起,引出对数据库切库分库分表的介绍。数据库切库里重点介绍了读写分离的设计,对比支持多数据源和分库的区别;最后介绍了什么时候该考虑分表、横向分表与纵向分表,以及通过mybatis的分页插件shardbatis2.0实现数据库分表。

17-1 高并发之数据库切库分库分表

第18章 高并发之高可用手段介绍

本章主要介绍了高可用的三个常用手段:任务调度系统分布式、主备切换设计和引入监控报警机制。任务调度系统分布式部分对 elastic-job 的优点、思路、特性等做了介绍,主备切换设计部分则是对zookeeper的分布式锁这个典型应用进行介绍。

18-1 高并发之高可用一些手段

image.png

第19章 课程总结

本章首先对本课程的知识进行总结回顾,然后针对面试中的并发问题与高并发问题进行提问,希望大家都能有所收获,并期待与大家共同探讨并发与高并发的话题。

19-1 课程总结

上一篇下一篇

猜你喜欢

热点阅读