多线程

JAVA多线程高并发使用示例,包括CountDownLatch/

2022-03-25  本文已影响0人  运气爆棚lsw

知识点

总结了并发多线程相关的线程安全,线程封闭,线程调度,同步容器,并发容器,AQS,JUC等等
包括CountDownLatch / CyclicBarrier / ReentrantLock / ReentrantReadWriteLock / Semaphore等功能使用示例代码,包括常见问题简单总结,比如缓存双写一致性,数据库性能瓶颈,常规高并发解决思路手段

高并发解决思路与手段

1.扩容:水平扩容、垂直扩容
2.缓存:Redis、Memcache、GuavaCache等
3.队列:Kafka、RabitMQ、RocketMQ等

应用拆分:服务化Dubbo与微服务Spring Cloud
限流:Guava RateLimiter使用、常用限流算法、自己实现分布式限流等
服务降级与服务熔断:服务降级的多重选择、Hystrix
数据库切库,分库分表:切库、分表、多数据源
高可用的一些手段:任务调度分布式elastic-job主备curator的实现、监控报警机制
xxl-job等的使用

基础知识与核心知识准备

并发高并发相关概念
cpu多级缓存:缓存一致,乱序执行优化
java内存模型:JMM规定,抽象结构,同步操作与规则
并发优势与风险
并发模拟:Postman,Jmeter,Apache Bench代码

并发基本概念

同时拥有两个或多个线程,如果程序在单核处理器上运行,多个线程将交替的换入或者换出内存,这些线程是同时“存在”的,每个线程都处于执行过程中的某个状态,如果运行在多核处理器上,此时,程序中的每个线程都将分配到一个处理器核上,因此可以同时运行。

高并发基本概念

高并发(High Concurrency)是互联网分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计保证系统能够同时并行处理很多请求。

并发:多个线程操作相同的资源,保证线程安全,合理使用资源
高并发:服务能同时处理很多请求,提高程序性能(更多的考虑技术手段)

知识技能

总体架构:Spring Boot、Maven、JDK8、MySQL
基础组件:Mybatis、Guava、Lombok、Redis、Kafka
高级组件:Joda-Time、Atomic包、JUC、AQS、ThreadLocal、RateLimiter、Hystrix、ThreadPool、Shardbatis分表插件、Curator、elastic-job、Xxl-Job等

基础知识

cpu多级缓存

主存和cpu通过主线连接,CPU缓存在主存和CPU之间,缓存的出现可以减少CPU读取共享主存的次数

为什么需要CPU cache:CPU的频率太快了,快到主存跟不上,这样在处理器时钟周期内,CPU常常需要等待主存,浪费资源。所以cache的出现,是为了缓解CPU和内存之间速度不匹配问题(结构:cpu -> cache -> memery).

CPU cache有什么意义:

1)时间局部性:如果某个数据被访问,name在不久的将来它很可能被再次访问。
2)空间局部性:如果某个数据被访问,那么与它相邻的数据很快也可能被访问

CPU多级缓存-缓存一致性(MESI)内存屏障保障数据一致性

MESI分别代表cache数据的四种状态,这四种状态可以相互转换
缓存四种操作:local read、local write、remote read、remote write

CPU多级缓存-乱序执行优化

在多核处理器上回出现问题

java内存模型(Java Memory Model/JMM)

java内存模型-同步八种操作

lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态
unlock(解锁):作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中
use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎
assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量
store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以遍随后的write的操作
write(写入):作用于主内存的变量,它把store操作从工作内存中的一个变量的值传送到主内存的变量中

java内存模型-同步规则

  1. 如果要把一个变量从主内存中复制到工作内存,就需要按顺序的执行read和load操作,如果把变量从工作内存中同步回主内存,就需要按顺序的执行store和write操作。但java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行
  2. 不允许read和load、store和write操作之一单独出现
  3. 不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到主内存中
  4. 不允许一个线程无原因的(没发生过任何assign操作)把数据从工作内存同步回主内存中
  5. 一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或assign)的变量。即就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作
  6. 一个变量在同一时刻只允许一条线程对其进行lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。lock和unlock必须成对出现
  7. 如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量之前需要重新执行load或assign操作初始化变量的值
  8. 如果一个变量实现没有被lock操作锁定,怎不允许对它执行unlock操作,也不允许去unlock一个被其他线程锁定的变量
  9. 对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)

并发的优势与风险

优势

速度:同时处理多个请求,响应更快;复杂的操作可以分成多个进程同时进行
设计:程序设计在某些情况下更简单,也可以更多的选择
资源利用:CPU能够在等待IO的时候做一些其他的事情

风险

安全性:多个线程共享数据时可能会产生于期望不相符的结果
活跃性:某个操作无法继续进行下去时,就会发生活跃性问题。比如死锁、饥饿等问题
性能:线程过多时会使得CPU频繁切换,调度时间增多;同步机制;消耗过多内存

线程安全性

定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类时线程安全的。

线程安全体现在以下三个方面
1.原子性:提供了互斥访问,同一时刻只能有一个线程来对他进行操作
2.可见性:一个线程对主内存的修改可以及时的被其他线程观察到
3.有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

原子性——Atomic包

AtomicXxxx:CAS、Unsafe.compareAndSwapInt
AtomicXxxx类中方法incrementAndGet(),incrementAndGet方法中调用unsafe.getAndAddInt(),getAndAddInt方法中主题是do-while语句,while语句中调用compareAndSwapInt(var1, var2, var5, var5 + var4)

compareAndSwapInt方法就是CAS核心:

在死循环内,不断尝试修改目标值,直到修改成功,如果竞争不激烈,
修改成功率很高,否则失败概率很高,性能会受到影响
jdk8中新增LongAdder,它和AtomicLong比较
优点:性能好,在处理高并发情况下统计优先使用LongAdder
AtomicReference、AtomicReferenceFieldUpdater原子性更新字段(字段要求volatile修饰,并且是非static)
AtomicStampReference:CAS的ABA问题
ABA问题:变量已经被修改了,但是最终的值和原来的一样,那么如何区分是否被修改过呢,用版本号解决
AtomicBoolean可以让某些代码只执行一次

原子性——锁

synchronized:依赖jvm,作用对象的作用范围内
修饰代码块:同步代码块,大括号括起来的代码,作用于调用的对象
修饰方法:同步方法,整个方法,作用于调用的对象
修饰静态方法:整个静态方法,作用于所有对象
修饰类:括号括起来的部分,作用于所有对象

Lock:依赖特殊CPU指令,代码实现,ReentrantLock

原子性——对比

synchronized:不可中断锁,适合竞争不激烈,可读性好
Lock:可中断锁,多样化同步,竞争激烈时能维持常态
Atomic:竞争激烈时能维持常态,比Lock性能好,只能同步一个值

可见性

导致共享变量在线程间不可见的原因:
1 线程交叉执行
2 重排序结合线程交叉执行
3 共享变量更新后的值没有在工作内存与主内存间及时更新

可见性——synchronized

JMM关于synchronized的两条规定:
线程解锁前,必须把共享变量的最新值刷新到主内存
线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意,加锁和解锁是同一把锁)

可见性——volatile

通过加入内存屏障和禁止重排序优化来实现
1 对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存
2 随volatile变量度操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量
使用volatile修饰变量,无法保证线程安全

volatile适合修饰状态标识量

有序性

java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性

有序性——happens-before原则

1 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作
注:在单线程中,看起来是这样的,虚拟机可能会对代码进行指令重排序,虽然重排序了,但是运行结果在单线程中和指令书写顺序是一致的,事实上,这条规则是用来保证程序单在单线程中执行结果的正确性,无法保证程序在多线程中的正确性

2 锁定规则:一个unlock操作先行发生于后面对同一个锁的lock操作
3 volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作
4 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C则可以得出操作A先行发生于操作C

前四条规则比较重要
5 线程启动规则:Thread对象的start()方法先行发生于次线程的每一个动作
6 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码监测到中断事件的发生
7 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行

8 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始

线程安全性——总结

原子性:Atomic包、CAS算法、synchronized、Lock
可见性:synchronized、volatile
有序性:happens-before规则
一个线程观察其他线程指令执行顺序,由于重排序的存在,观察结果一般是无序的,如果两个操作执行顺序无法从happens-before原则推导出来,name他们就不能保证有序性,虚拟机可以随意的对他们重排序

发布对象

发布对象:使一个对象能够被当前范围之外的代码所使用
对象逸出:一种错误的发布。当一个对象还没有构造完成时,就使它被其他线程所见

安全发布对象四种方法

1 在静态初始化函数中初始化一个对象引用
2 将对象的引用保存到volatile类型域或者AtomicReference对象中
3 将对象的引用保存到某个正确构造对象的final类型域中
4 将对象的引用保存到一个由锁保护的域中
私有构造函数,单例对象,静态工厂方法获取对象

以单例模式为例

懒汉模式:单例实例在第一次使用时进行创建(线程不安全)
懒汉模式也可以实现线程安全,给getInstance方法添加synchronized关键字(不推荐,因为性能不好)
双重同步锁单例模式:双重监测机制,在方法内部加synchronized关键字(不是线程安全的)
原因是,创建对象是,分为以下三个步骤:
1)memory = allocate() 分配对象的内存空间
2)ctorInstance() 初始化对象
3)instance = memory() 设置instance指向刚分配的内存

由于JVM和cpu优化,可能会发生指令重排:
1) memory = allocate() 分配对象的内存空间
3) instance = memory() 设置instance指向刚分配的内存
2) ctorInstance() 初始化对象
当以上面这种指令执行时,线程A执行到3 instance = memory() 设置instance指向刚分配的内存 这一步时,线程B执行if(instance == null)这段代码,此时instance != null,线程B直接return instance,导致对象没有初始化完毕就返回
解决办法就是限制对象创建时进行指令重排,volatile+双重监测机制->禁止指令重排引起非线程安全
饿汉模式:单例实例在类装载时进行创建(线程安全)
枚举模式:线程安全

不可变对象

不可变对象需要满足的条件:
对象创建以后其状态就不能修改
对象所有域都是final类型
对象是正确创建的(在对象创建期间,this引用没有逸出)
参考String类型

final关键字定义不可变对象

修饰类、方法、变量
修饰类:不能被继承
修饰方法:1.锁定方法不被继承类修改 2.效率
修饰变量:基本数据类型,数值不可变;引用类型变量,不能再指向另外一个对象,因此容易引起线程安全问题

其他实现不可变对象

Collections.unmodifiableXXX:Collection、List、Set、Map(线程安全)
Guava:ImmutableXXX:Collection、List、Set、Map

线程不安全类与写法

字符串

StringBuilder:线程不安全
StringBuffer:线程安全

时间转换

SimpleDateFormat:线程不安全
JodaTime:线程安全

集合

ArrayList:线程不安全
HashSet:线程不安全
HashMap:线程不安全
编程注意:
if(condition(a)){handle(a)}; 不是线程安全的,因为这条判断语句不是原子性的,如果有线程共享这条代码,则会出现并发问题,解决方案是想办法这这段代码是原子性的(加锁)

线程安全——同步容器(在多线程环境下不推荐使用)

ArrayList -> Vector, Stack
Vector中的方法使用synchronized修饰过
Stack继承Vector
HashMap -> HashTable(key、value不能为null)
HashTable使用synchronized修饰方法
Collections.synchronizedXXX(List、Set、Map)

同步容器不完全是线程安全的
编程注意:如果使用foreach或者iterator遍历集合时,尽量不要对集合进行修改操作

线程安全——并发容器J.U.C(java.util.concurrent)(在多线程环境下推荐使用)

ArrayList -> CopyOnWriteArrayList:相比ArrayList,CopyOnWriteArrayList是线程安全的,写操作时复制,即当有新元素添加到CopyOnWriteArrayList时,先从原有的数组里拷贝一份出来,然后在新的数组上写操作,写完之后再将原来的数组指向新的数组,CopyOnWriteArrayList整个操作都是在锁(ReentrantLock锁)的保护下进行的,这么做主要是避免在多线程并发做add操作时复制出多个副本出来,把数据搞乱了。第一个缺点是做写操作时,需要拷贝数组,就会消耗内存,如果元素内容比较多会导致youngGC或者是fullGc;第二个缺点是不能用于实时读的场景,比如拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到的数据可能还是旧的,虽然CopyOnWriteArrayList能够做到最终的一致性,但是没法满足实时性要求,因此CopyOnWriteArrayList更适合读多写少的场景

CopyOnWriteArrayList设计思想:
1.读写分离 2.最终一致性 3.使用时另外开辟空间解决并发冲突

HashSet -> CopyOnWriteArraySet
TreeSet -> ConcurrentSkipListSet
CopyOnWriteArraySet:底层实现是CopyOnWriteArrayList
ConcurrentSkipListSet:和TreeSet 一样支持自然排序,基于map集合,但是批量操作不是线程安全的
HashMap -> ConcurrentHashMap :不允许空值,针对读操作做了大量的优化,具有特别高的并发性
TreeMap -> **ConcurrentSkipListMap **:内部使用SkipList跳表结构实现的,key是有序的,支持更高的并发

安全共享对象策略——总结

  1. 线程限制:一个呗线程限制的对象,由线程独占,并且只能被占有它的线程修改
  2. 共享只读:一个共享只读的对象,在没有额外的同步情况下,可以被多个线程并发访问,但是任何线程都不能修改它
    3 .线程安全对象:一个线程安全的对象或容器,在内部通过同步机制来保证线程安全,所以其他线程无序额外的同步就可以通过公共接口随意访问它
  3. 被守护对象:被守护对象只能通过获取特定的锁来访问不可变对象、线程封闭、同步容器、并发容器

JUC之AQS

AQS:AbstractQueuedSynchronizer

  1. 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
  2. 利用了int类型表示状态
  3. 使用方法是继承
  4. 子类通过继承并通过实现它的方法管理器状态{acquire和release}的方法操纵状态
  5. 可以同时实现排它锁和共享锁模式(独占、共享)

AQS同步组件

  1. CountDownLatch:闭锁,通过计数来保证线程是否需要一直阻塞
  2. Semaphore:控制同一时间并发线程的数目
  3. CyclicBarrier:和CountDownLatch相似,都能阻阻塞线程
  4. ReentrantLock
  5. Condition
  6. FutureTask

CountDownLatch

package com.example.demo.function;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author lisanwei24282
 */
@Slf4j
public class CountDownLatchExample1 {
    /**
     * 指定一个初始变量
     */
    private final static Integer THREAT_COUNT = 200;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        /* 声明总计数器   */
        final CountDownLatch countDownLatch = new CountDownLatch(THREAT_COUNT);
        for (int i = 0; i < THREAT_COUNT; i++) {
            final int threadNum = i;
            executorService.execute(() -> {
                try {
                    test(threadNum);
                } catch (InterruptedException e) {
                    log.error("==={}===", e);
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        /* 第一种: 持续阻塞等待完成 */
        countDownLatch.await();

        /* 第二种:超过时间则不再阻塞i等待  await可以指定等待的时间 超过这个时间以后将不再等待 */
        countDownLatch.await(10, TimeUnit.MILLISECONDS);

        log.info("===finish===");
        executorService.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(100);
        log.info("{}", threadNum);
        TimeUnit.MILLISECONDS.sleep(100);

    }
}


Semaphore

Semaphore可以很容易控制某个资源可悲同时访问的线程个数,和CountDownLatch使用有些类似,提供acquire和release两个方法,acquire是获取一个许可,如果没有就等待,release是在操作完成后释放许可出来。Semaphore维护了当前访问的线程的个数,提供同步机制来控制同时访问的个数,Semaphore可以实现有限大小的链表,重入锁(如ReentrantLock)也可以实现这个功能,但是实现上比较复杂。

Semaphore使用场景:适用于仅能提供有限资源,如数据库连接数

代码示例1:

package com.example.demo.function;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * @author lisanwei24282
 */
@Slf4j
public class SemaphoreExample1 {
    /**
     * 指定一个初始变量
     */
    private final static Integer THREAT_COUNT = 20;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        /* 声明总计数器   */
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < THREAT_COUNT; i++) {
            final int threadNum = i;
            executorService.execute(() -> {
                try {
                    /* 第一种 */
                    // 试图获取一个许可
                    boolean flag = semaphore.tryAcquire();
                    if (flag) {
                        log.info("===试图获取一个许可成功===");
                        // 执行具体业务
                        test(threadNum);
                        // 执行业务完毕之后,释放许可
                        semaphore.release();
                    } else {
                        log.info("===试图获取一个许可失败===");
                    }


                    /* 第二种  指定时间内获取一个许可试图获取一个许可*/
                    boolean flagA = semaphore.tryAcquire(1, TimeUnit.SECONDS);
                    if (flagA) {
                        log.info("===试图获取一个许可成功===");
                        // 执行具体业务
                        test(threadNum);
                        // 执行业务完毕之后,释放许可
                        semaphore.release();
                    } else {
                        log.info("===试图获取一个许可失败===");
                    }


                    /* 第三种 */
                    // 实际获取一个许可
                    semaphore.acquire();
                    // 执行具体业务
                    test(threadNum);
                    // 执行业务完毕之后,释放许可
                    semaphore.release();

                } catch (InterruptedException e) {
                    log.error("==={}===", e.getMessage());
                    e.printStackTrace();
                }
            });
        }
        log.info("===finish===");
        executorService.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(100);
        log.info("{}", threadNum);
        TimeUnit.MILLISECONDS.sleep(100);

    }
}


CyclicBarrier

与CountDownLatch相似,都是通过计数器实现,当某个线程调用await方法,该线程就进入等待状态,且计数器进行加1操作,当计数器的值达到设置的初始值,进入await等待的线程会被唤醒,继续执行他们后续的操作。由于CyclicBarrier在释放等待线程后可以重用,所以又称循环屏障。使用场景和CountDownLatch相似,可用于并发运算。

CyclicBarrier和CountDownLatch区别:

1 CountDownLatch计数器只能使用一次,CyclicBarrier的计数器可以使用reset方法重置循环使用

2 CountDownLatch主要是视线1个或n个线程需要等待其他线程完成某项操作才能继续往下执行,CyclicBarrier主要是实现多个线程之间相互等待知道所有线程都满足了条件之后才能继续执行后续的操作,CyclicBarrier能处理更复杂的场景

代码示例:

package com.example.demo.function;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * @author lisanwei24282
 */
@Slf4j
public class CyclicBarrierExample1 {
    /**
     * 指定一个初始变量
     */
    private final static Integer THREAT_COUNT = 10;

    /**
     * 0到5
     */
    public static final CyclicBarrier CYCLICBARRIER = new CyclicBarrier(5);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        /* 声明总计数器   */

        for (int i = 0; i < THREAT_COUNT; i++) {
            final int threadNum = i;
            executorService.execute(() -> {
                try {
                    Thread.sleep(1000);
                    race(threadNum);

                } catch (InterruptedException | BrokenBarrierException e) {
                    log.error("===执行异常{}===", e.getMessage());
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
    }

    private static void race(int threadNum) throws InterruptedException, BrokenBarrierException {
        TimeUnit.MILLISECONDS.sleep(1000);
        log.info("{} id ready-->await之前,到达现场,开始准备环节", threadNum);

        /* 当某个线程调用await方法,该线程就进入等待状态,且计数器进行加1操作,当计数器的值达到设置的初始值,进入await等待的线程会被唤醒
         * 继续执行他们后续的操作。由于CyclicBarrier在释放等待线程后可以重用,所以又称循环屏障。*/
        CYCLICBARRIER.await();

        log.info("{} continue-->await之后,准备环节结束,开始表演", threadNum);

    }
}


ReentrantLock

reentrantLock(可重入锁)和synchronized区别

1 可重入性:同一线程可以重入获得相同的锁,计数器加1,释放锁计数器减1

synchronized也是可重入锁

2 锁的实现:synchronized依赖jvm实现(操作系统级别的实现),reentrantLock是jdk实现的(用户自己编程实现)

3 性能区别:synchronized在优化前性能比reentrantLock差,优化后性能有了恨到提升,相同条件下优先使用synchronized

4 功能区别:1)便利性方面,synchronized使用简单,reentrantLock需要手工加锁和释放锁2)锁的细粒度和灵活度方面,reentrantLock优于synchronized

5 reentrantlock独有的功能:1)可指定是公平锁还是非公平锁,synchronized只能是非公平锁 2)提供了一个Condition类,可以分组唤醒需要唤醒的线程 3)能够提供中断等待锁的线程机制,lock.lockInterruptibly()

代码示例:

package com.example.demo.function;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author lisanwei24282
 */
@Slf4j
public class ReentrantLockExample1 {
    /**
     * 同时并发执行
     */
    public static int threadTotal = 200;

    public static int count = 0;

    private final static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        /* 声明总计数器   */
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(threadTotal);
        for (int i = 0; i < threadTotal; i++) {
            executorService.execute(() -> {
                try {
                    /*获取一个许可*/
                    semaphore.acquire();
                    log.info("===试图获取一个许可成功===");
                    // 执行具体业务
                    add();
                    // 执行业务完毕之后,释放许可
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("==={}===", e.getMessage());
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        log.info("===finish===");
        countDownLatch.await();
        executorService.shutdown();
        log.info("===count:{}===", count);
    }

    private static void add() throws InterruptedException {
        lock.lock();
        try {
            count++;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}


ReentrantReadWriteLock

悲观写锁,即当所有读锁释放之后,才能加写锁,对于读多写少的程序,会引起堵塞或者死锁

代码示例:

package com.example.demo.function;

import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 悲观写锁,即当所有读锁释放之后,才能加写锁,对于读多写少的程序,会引起堵塞或者死锁
 *
 * @author lisanwei24282
 */
@Slf4j
public class ReentrantReadWriteLockExample1 {
    private final Map<String, Object> map = new HashMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();


    /**
     * 读锁
     *
     * @param key key
     * @return 结果集
     */
    public Object get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
        }
        return null;
    }

    /**
     * 写锁
     *
     * @param key key
     * @return 结果集
     */
    public Object put(String key, Object data) {
        writeLock.lock();
        try {
            Object res = map.put(key, data);
            return data;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
        }
        return null;
    }
}

Condition

多线程建协调通信的工具类

代码示例:

package com.example.demo.function;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author lisanwei24282
 */
@Slf4j
public class ConditionExample1 {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        /**
         * 当前线程阻塞,等待其他线程执行完毕,再唤醒当前线程
         */
        new Thread(() -> {
            lock.lock();
            try {
                log.info("===wait signal 阻塞等待唤醒===");
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("===get signal 成功唤醒===");
                lock.unlock();
            }
        }).start();

        /**
         * 当前线程执行业务完毕,唤醒全部阻塞等待线程
         */
        new Thread(() -> {
            lock.lock();
            log.info("===get Lock===");
            try {
                log.info("===模拟执行业务===");
                log.info("================================");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                condition.signalAll();
                lock.unlock();
            }
        }).start();
    }
}


FutureTask

Callable与Runnable接口对比

Feature接口,可以得到任务的返回值

FeatureTask父类是RunnableFeature,RunnableFeature继承了Runnable和Feature两个接口

代码示例1:

package com.example.demo.function;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author lisanwei24282
 */
@Slf4j
public class CallableExample1 {
    /**
     * 执行结果类
     */
    static class MyCallable implements Callable<String> {

        private final Integer num;

        public MyCallable(Integer num) {
            this.num = num;
        }

        @Override
        public String call() throws Exception {
            log.info("=== do something in Callable");
            Thread.sleep(3000);
            return "Done 业务执行完毕[" + num + "]";
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Future<String>> futureList = new ArrayList<>();

        for (int i = 0; i < 5; i++) {
            Future<String> submit = executorService.submit(new MyCallable(i));
            futureList.add(submit);
        }

        log.info("=== do something in main线程");
        Thread.sleep(1000);

        try {
            for (Future<String> stringFuture : futureList) {
                String res = stringFuture.get();
                log.info("=== res:{}==", res);
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }

}


package com.example.demo.function;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author lisanwei24282
 */
@Slf4j
public class FutureTaskExample1 {
    public static void main(String[] args) {
        FutureTask<String> stringFutureTask = new FutureTask<>(new Callable<String>() {

            @Override
            public String call() throws Exception {
                log.info("=== do something in callable");
                Thread.sleep(1000);
                return "Done";
            }
        });

        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(stringFutureTask);

        try {
            String res = stringFutureTask.get();
            log.info("===执行完毕:{} ==", res);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        /**
         * 集合实际业务使用方式
         */
        List<FutureTask<String>> futureTaskList = new ArrayList<>();

        for (int i = 0; i < 5; i++) {
            int finalI = i;
            FutureTask<String> stringFutureTaskRes = new FutureTask<>(new Callable<String>() {

                @Override
                public String call() throws Exception {
                    log.info("=== 执行功能业务 ===");
                    return "Done [" + finalI + "]";
                }
            });
            executorService.submit(stringFutureTaskRes);
            futureTaskList.add(stringFutureTaskRes);
        }

        try {
            for (FutureTask<String> futureTask : futureTaskList) {
                String s = futureTask.get();
                log.info("FutureTask = " + s);
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        log.info("======即将关闭======");
        /* 关闭 */
        executorService.shutdown();
    }
}


Fork/Join框架

将大人物切分成多个小任务并行执行,最后将结果汇总,思想和mapreduce类似。采用工作窃取算法,充分利用线程并行计算

BlockingQueue——阻塞队列

当队列满进行入队操作,线程阻塞,当队列空时进行出队操作,将会阻塞

线程安全,应用场景:生产者、消费者

线程池

new Thread弊端:

1 每次new Thread新建对象,性能差

2 线程缺乏统一的管理,肯无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或者OOM

3 缺少更多功能,如更多执行、定期执行、线程中断

线程池的好处:

1 重用存在的线程,减少对象创建、消亡的开销,性能佳

2 可有效控制最大并发的线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞

3 提供定时执行、定期执行、单线程、并发数控制等功能

线程池——ThreadPoolExecutor

ThreadPoolExecutor参数:

1 corePoolSize:核心线程数

2 maximumPoolSize:最大线程数

3 workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响

如果当前系统运行的线程数量小于corePoolSize,直接新建线程执行处理任务,即使线程池中的其他线程是空闲的。如果当前系统运行的线程数量大于或等于corePoolSize,且小于maximumPoolSize,只有当workQueue满的时候才创建新的线程去处理任务,如果设置corePoolSize和maximumPoolSize相同的话,那么创建的线程池大小是固定的,这时如果有新任务提交,当workQueue没满时,把请求放进workQueue中,等待有空闲的线程从workQueue中取出任务去处理。如果运行的线程数量大于maximumPoolSize,这时如果workQueue满,根据拒绝策略去处理。

4 keepAliveTime:线程没有任务执行时最多保持多久的时间终止

5 unit:keepAliveTime的时间单位

6 threadFactory:线程工厂,用来创建线程

7 rejectHandler:当拒绝处理任务时的策略

线程池状态:






image.png









上一篇下一篇

猜你喜欢

热点阅读