Java 线程相关
目录
1.并行与并发
2.进程与线程
---- 2.1 进程
---- 2.2 线程
---- 2.3 进程与线程的区别
---- 2.4 线程调度
3.线程的创建方式
---- 3.1 继承方式
---- 3.2 实现方式
---- 3.3 匿名内部类方式
---- 3.4 继承和实现的区别
---- 3.5 线程资源在内存中的分布
---- 3.6 线程池方式
---- ---- 3.6.1 什么是线程池
---- ---- 3.6.2 线程池的使用
---- ---- 3.3.3 线程池的种类
4.高并发与线程安全
---- 4.1 悲观锁方式解决线程安全问题
---- ---- 4.1.2 问题代码示例
---- ---- 4.1.3 synchronized 关键字 (悲观锁)
---- ---- ---- 4.1.3.1 同步代码块
---- ---- ---- 4.1.3.2 同步方法
---- ---- 4.1.4 Lock
---- ---- 4.1.5 死锁
---- 4.2 乐观锁的方式解决线程安全问题
---- ---- 4.2.1 不可见性
---- ---- 4.2.2 无序性
---- ---- 4.2.3 原子性
---- ---- 4.2.4 原子类
---- ---- 4.2.5 CAS (乐观锁)
---- ---- 4.2.6 原子操作引发的 ABA 问题
---- 4.3 乐观锁与悲观锁的区别
---- 4.4 并发工具包
---- ---- 4.4.1 并发容器
---- ---- ---- CopyOnWriteArrayList
---- ---- ---- CopyOnWriteArraySet
---- ---- ---- ConcurrentHashMap
---- ---- 4.4.2 并发工具类
---- ---- ---- CountDownLatch
---- ---- ---- CyclicBarrier
---- ---- ---- Semaphore
---- ---- ---- Exchanger
5.线程的状态
---- 5.1 产者消费者案例一
---- 5.1 产者消费者案例二
1. 并行与并发
- 并行: 指多个事件在同一时刻发生;
- 并发: 指多个事件在同一个时间段内发生;
在操作系统中, 这装了多个程序, 并发的是在一段时间内看起来有多个程序同时运行, 实际上是由CPU调度的分时交替运行; 如果CPU的调度时间比较长, 那么这个过程就会比较清晰, 在一段时间内只有一个线程在运行;
2. 进程与线程
2.1 进程
进程是程序的一次执行过程,是系统运行程序的基本单位;系统运行一个程序即是一个进程,从创建、运行到消亡的过程;每个进程都有一个独立的内存空间,一个应用程序可以同时运行多个进程;
- 每个进程都有一个独立的内存空间, 进程与进程之间互不影响;
- 一个应用程序可以有多个进程;
2.1 线程
线程是进程中一段程序的不同执行路线流程,是进程中的一个执行单元,负责当前进程中程序的执行;
- 线程是拥有资源和独立运行的最小单位;
- 每个线程都有单独的内存空间;
- 一个进程之后, 可以有多个线程共享进程资源;
一个 Java 程序其实就是一个进程, 而一个进程一次只能执行一条线程, 所以 Java 只有高并发;
2.3 进程与线程的区别
- 进程: 有独立的内存空间, 进程中的数据存放空间是独立的(栈和堆);
- 一个 Java 程序至少有两个线程, main线程和GC(垃圾回收)线程 ;
- 线程: 堆空间是共享的, 栈空间是独立的, 线程通信与转换消耗的资源比进程小很多;
2.4 线程调度
-
分时调度
所有线程轮流使用 CPU 的使用权, 平均分配每个线程占用 CPU 的时间 (这个时间会非常非常的短);
-
抢占式调度
优先让优先级高的线程使用 CPU, 如果线程的优先级相同, 那么会随机选择一个(线程随机性);
Java使用的为抢占式调度;
3.线程的创建方式
3.1 继承方式
// Java 中 Thread 表示一个线程
// Runnable接口表示执行体函数的回调, 而Thread实现了Runnable接口
public class MyThread extends Thread {
// 线程执行体
@Override
public void run() {
}
}
public class ThreadTest {
public static void main(String[] args) {
new MyThread().start();
}
}
3.2 实现方式
// Runnable接口的回调就是线程执行体
public class MyRunnable implements Runnable {
// 线程执行体
@Override
public void run() {
}
}
public class ThreadTest {
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
new Thread(runnable).start();
}
}
3.3 匿名内部类方式
public class ThreadTest {
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
new Thread(runnable).start();
}
}
3.4 继承和实现的区别
- 实现的方式将线程 (Thread) 和任务 (Runnable的run方法中的内容) 分开了, 而继承是绑在了一起
- Runnable 中的资源可以被多个线程 (Thread) 共享
- 实现解耦操作, 增强程序的健壮性和扩展性
- Java 中线程池只能支持实现的Runnable 或 Callable 类线程, 不支持继承Thread的类
3.5 线程资源在内存中的分布
- 一个 Java 程序至少有两个线程, 一个的 Main 方法所在的线程, 另一个是垃圾回收线程(GC);
- 当线程启动时, 会在栈内存中开辟出一块独立空间 (每个线程都会有一块独立的栈空间);
- 此空间中保存 基本数据类型的对象和引用数据类型对象的引用 (堆空间中的地址), 也就是说基本所有的 线程都共享堆空间;
- 线程中的 静态变量会保存在方法区 (也叫静态区, 包含整个程序中永远唯一的元素, 也就是 class 和 static变量); (data segment 与 code segment , 前面存放静态变量或字符串常量, 后者存放类中的方法 )
- 当线程中的方法被调用时, 该 方法会入此线程所在的栈 空间;
- Main 线程是最后出栈的, 基本代表程序的结束;
3.6 线程池方式
3.6.1 什么是线程池
线程池是 Java 提供的为我们管理和使用线程对象的池, 使用线程池不必再关心线程的频繁创建与回收等等;
Java 里面线程池的顶级接口是 Executor, 但是严格意义上讲 Executor 并不是一个线程池, 而只是一个执行线程的工具; 真正的线程池接口是 ExecutorService;
3.6.2 线程池的使用
示例一:
public class ThreadTest {
public static void main(String[] args) {
// 创建线程池,并指定线程池中初始化线程的个数
ExecutorService es = Executors.newFixedThreadPool(3);
// 提交无返回值的任务,并执行任务
Future<?> submit1 = es.submit(new Runnable() {
@Override
public void run() {
}
});
// 提交有返回值的任务,并执行任务
Future<String> submit = es.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "yiGeSiren";
}
});
try {
System.out.println(submit1.get());
System.out.println(submit.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
示例二:
public class ThreadTest {
public static void main(String[] args) {
// 创建线程池,并指定线程池中初始化线程的个数
ExecutorService es = Executors.newFixedThreadPool(3);
Callable<String> runnable = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName()+
"开始: 实现Callable接口的任务...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+
"结束:实现Callable接口的任务...");
return "tiGeSiRen";
}
};
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
}
}
对于上述代码线程池中的线程来说, 当任务多于线程个数时, 任务会等待, 直到有空闲的线程从线程池中释放出来;
3.6.2 线程池的种类
-
1. newCachedThreadPool
创建一个可缓存的线程池, 如果线程池长度超过处理需要, 会终止并从缓存中移除那些已有 60 秒钟未被使用的线程;
如果线程池长度小于任务数, 则会将已完成任务的线程加入缓存, 然后从缓存中取出线程去执行新的任务;
如果缓存中没有空闲进程则会创建一个新的线程;public class ThreadTest { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Callable<String> runnable = new Callable<String>() { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName()+" 开始: 实现Callable接口的任务..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } // System.out.println(Thread.currentThread().getName()+" 结束:实现Callable接口的任务..."); return "tiGeSiRen"; } }; es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.shutdown(); } }
打印结果
pool-1-thread-6 开始: 实现Callable接口的任务...
pool-1-thread-4 开始: 实现Callable接口的任务...
pool-1-thread-7 开始: 实现Callable接口的任务...
pool-1-thread-5 开始: 实现Callable接口的任务...
pool-1-thread-1 开始: 实现Callable接口的任务...
pool-1-thread-3 开始: 实现Callable接口的任务...
pool-1-thread-2 开始: 实现Callable接口的任务...从上面的打印结果可以看出创建了七个线程对象;
-
2. newFixedThreadPool
指定线程个数的线程池, 也就是说当任务数大于线程个数时, 任务只能等待线程空闲出来;
-
3. newScheduledThreadPool
创建一个指定线程个数的线程池, 支持定时及执行周期性任务;
public class ThreadTest { public static void main(String[] args) { ScheduledExecutorService es = Executors.newScheduledThreadPool(2); Callable<String> runnable = new Callable<String>() { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + " 开始: 实现Callable接口的任务..."); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "tiGeSiRen"; } }; es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.shutdown(); } }
pool-1-thread-1 开始: 实现Callable接口的任务...
pool-1-thread-2 开始: 实现Callable接口的任务...
pool-1-thread-1 开始: 实现Callable接口的任务...
pool-1-thread-2 开始: 实现Callable接口的任务...
pool-1-thread-2 开始: 实现Callable接口的任务... -
4. newSingleThreadExecutor
创建一个单线程的线程池; 这个线程池只有一个线程在工作, 也就是相当于单线程串行执行所有任务;
如果这个唯一的线程因为异常结束, 那么会有一个新的线程来替代它, 此线程池保证所有任务的执行顺序按照任务的提交顺序执行;
4 高并发与线程安全
- 高并发: 是指在某个时间点上, 有大量的用户(线程)同时访问同一资源;
- 线程安全: 在某个时间点上, 当大量用户(线程)访问同一资源时, 由于多线程运行机制的原因, 可能会导致被访问的资源出现"数据污染"的问题;
4.1 悲观锁方式的解决线程安全问题
4.1.2 问题代码示例
public class ThreadTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
new Thread(myRunnable,"线程一").start();
new Thread(myRunnable,"线程二").start();
new Thread(myRunnable,"线程三").start();
}
}
public class MyRunnable implements Runnable {
int tickets = 100;
@Override
public void run() {
while (true) {
if (tickets < 1) {
break;
}
System.out.println(Thread.currentThread().getName() + "正在出售第" +
tickets + "张票");
tickets--;
}
}
}
上面的代码会执行的结果会有几个问题:
- 多条线程出现卖重复票
- 会出现卖负数票
- 有些票没有出售
出现这些问题的原因, 是因为在 Java 中线程是抢占式调度, 所以当线程在执行任务的时候, 会被其他线程打断;
解决方式一: 使用 synchronized 关键字加锁
解决方式二: 使用 Lock 加锁
解决方式三: 使用 volatile 关键字加 原子类
4.1.3 synchronized 关键字 (悲观锁)
synchronized 可以作用在 方法中的某个区块 或 函数 上, 表示只对这个区块的资源或函数进行互斥访问; 也就是说, 当有线程访问到这个区块或函数时, 其它的线程只能先等待;
需要注意:
- 锁对象可以是任意对象;
- 但是同步线程加锁的对象要一致;
- 同步函数的锁对象; 如果这个函数是静态函数, 那么锁对象是这个函数所在的 类.class; 如果这个函数是非静态函数, 那么锁对象是调用这个函数的对象, 也就是 this ;
4.1.3.1 同步代码块
使用同步代码块方式, 解决 4.1.2 代码示例中的问题; 代码如下:
public class ThreadTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
new Thread(myRunnable,"线程一").start();
new Thread(myRunnable,"线程二").start();
new Thread(myRunnable,"线程三").start();
}
}
public class MyRunnable implements Runnable {
int tickets = 50;
@Override
public void run() {
while (true) {
synchronized (this){
if (tickets < 1) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第"
+ tickets + "张票");
tickets--;
}
}
}
}
4.1.3.2 同步方法
使用同步方法方式, 解决 4.1.2 代码示例中的问题; 代码如下:
public class ThreadTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
new Thread(myRunnable,"线程一").start();
new Thread(myRunnable,"线程二").start();
new Thread(myRunnable,"线程三").start();
}
}
public class MyRunnable implements Runnable {
int tickets = 50;
@Override
public void run() {
while (true) {
if (sellTickets()) {
break;
}
}
}
public synchronized boolean sellTickets() {
if (tickets < 1) {
return true;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第" + tickets
+ "张票");
tickets--;
return false;
}
}
4.1.4 Lock
Lock 也称同步锁, 是一个接口, 其实现类为 ReentrantLock ;
使用 Lock 锁解决 4.1.2 代码示例中的问题; 代码如下:
public class MyRunnable implements Runnable {
Lock lock = new ReentrantLock();
int tickets = 50;
@Override
public void run() {
while (true) {
// 加锁
lock.lock();
if (tickets < 1) {
// 循环退出 也要解锁
lock.unlock();
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第" +
tickets + "张票");
tickets--;
// 解锁
lock.unlock();
}
}
}
4.1.5 死锁
- 什么是死锁
多线程程序中, 使用了多把锁, 造成线程之间相互等待;- 产生死锁的条件
- 有多个线程;
- 有多把锁
- 有同步代码块嵌套
- 案例:
线程A : 需要获取A锁, 再获取B锁, 才能执行里面的代码
线程B : 需要获取B锁, 再获取A锁, 才能执行里面的代码
代码示例:
public class ThreadTest {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized ("锁A"){
System.out.println("线程A获取到了锁A,等待获取锁B");
synchronized ("锁B"){
System.out.println("线程A获取到了锁A,锁B 开始执行里面的代码");
}
}
}
},"线程A").start();
new Thread(new Runnable() {
@Override
public void run() {
synchronized ("锁B"){
System.out.println("线程B获取到了锁B,等待获取锁A");
synchronized ("锁A"){
System.out.println("线程B获取到了锁B,锁A 开始执行里面的代码");
}
}
}
},"线程B").start();
}
}
打印结果:
线程A获取到了锁A,等待获取锁B
线程B获取到了锁B,等待获取锁A (程序到这里是没有结束的)
4.2 乐观锁的方式解决线程安全问题
使用乐观锁方式解决线程安全问题, 需要解决不可见性, 无序性和非原子操作的问题;
4.2.1 不可见性
代码示例:
public class ThreadTest {
public static void main(String[] args) {
MyThread thread = new MyThread();
// 在子线程中修改 flag 变量为True
thread.start();
while (true){
// 检测到线程中 flag 变量为 true 之后结否死循环
if (MyThread.flag){
System.out.println("结束 Main 线程中的死循环!");
break;
}
}
}
}
public class MyThread extends Thread {
static boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("修改了flag变量为true");
}
}
期望结果:
子线程进行睡眠3秒, 主线程一直死循环;
子线程醒了之后,修改flag=true,子线程结束,主线程结束;
实际结果:
子线程进入睡眠3秒,主线程一直死循环;
子线程醒了之后,修改flag=true,子线程结束,但是主线程结束不了;
原因:
子线程对共享变量flag的值进行了修改,而主线程没有看见(没有获取到修改后的值);
为什么主线程获取不到子线程对共享变量flag修改后的值
- Java 内存模型 (Java Memory Modle) JMM 描述了 Java 程序中各种变量的访问规则 (静态共享变量) , 以及 JVM 在将变量 读取到内存 和将变量 存储到内存 这样的底层细节;
- 线程中的 静态变量会保存在方法区 (也叫静态区, 包含整个程序中永远唯一的元素, 也就是 class 和 static变量); (data segment 与 code segment , 前面存放静态变量或字符串常量, 后者存放类中的方法 )
- 不同的线程会在栈区中有 各自的工作空间;
- 当线程访问共享静态变量时, 线程会将这个变量 拷备一份到自己的工作区 (也就是说, 在上面代码中获取到的变量值是在线程独立的栈区), 进行读写的操作;
- 但是不会将在栈区操作后的变量值 写回主内存(也就是静态区);
volatile关键字可以解决线程中的可见性问题
public class MyThread extends Thread {
// 当共享变量被volatile修饰,会强制让线程每次获取变量的值都从主内存(方法区也就静态区)中去获取;
volatile static boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("修改了flag变量为true");
}
}
4.2.2 无序性
无序性是指 内存访问顺序与得到的字节码顺序 不一样 或者 字节码顺序与实际的执行顺序 不一样; 是由 JIT 动态编译 (会重排序) 优化的原因造成的, 静态编译是不会有这样的问题产生的;
volatile 关键字修饰变量可以禁止变量相关的指令重排序;
https://www.jianshu.com/p/119ffdcef55a
https://baijiahao.baidu.com/s?id=1662251623172268398&wfr=spider&for=pc
4.2.3 原子性
原子性是指在一次或多次操作中, 要么全部执行不被中断, 要么全部不执行;
代码示例如下:
public class ThreadTest {
public static void main(String[] args) {
// 子线程1000000次自增
new MyThread().start();
// main线程1000000次自增
for (int i = 0; i < 1000000; i++) {
MyThread.a++;
}
System.out.println("Main线程的1000000次自增结束了");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("a的值为" + MyThread.a);
}
}
public class MyThread extends Thread {
volatile static int a = 0;
@Override
public void run() {
for (int i = 0; i < 1000000; i++) {
a++;
}
System.out.println("子线程的1000000次自增结束了");
}
}
期望打印结果
Main线程的1000000次自增结束了
子线程的1000000次自增结束了
a的值为2000000
实际打印结果
Main线程的1000000次自增结束了
子线程的1000000次自增结束了
a的值为1803604(实际上小于2000000)
原因: 两个线程对a的自增操作, 产生了覆盖效果; 就是这样一种情况main线程自增一次之后等于1, 而子线程自增加之后也等于1, 而实际上是等于2了;
并且Volatile不能解决原子性问题, 实际上就是两个或两个以上的线程同时对主内存的变量进行了写的操作, 正确的应该是当多个线程同时对主内存的变量时行写操作时, 只有一个线程可以进行写操作, 在等待当前线程写完之后其它的线程需要重要读取变量的值;
4.2.4 原子类
在 java.util.concurrent.atomic 包下提供了一系的具有 原子性的原子类 API ;
它们可以保证对“变量”操作的:原子性、有序性、可见性; 其工作原理是基于 CAS 机制;
以 AtomicInteger 为例验证上面4.2.3示例代码中的原子性问题:
public class ThreadTest {
public static void main(String[] args) {
// 子线程1000000次自增
new MyThread().start();
// main线程1000000次自增
for (int i = 0; i < 1000000; i++) {
MyThread.a.addAndGet(1);
}
System.out.println("Main线程的1000000次自增结束了");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("a的值为" + MyThread.a);
}
}
public class MyThread extends Thread {
static AtomicInteger a = new AtomicInteger(0);
@Override
public void run() {
for (int i = 0; i < 1000000; i++) {
a.addAndGet(1);
}
System.out.println("子线程的1000000次自增结束了");
}
}
Main线程的1000000次自增结束了
子线程的1000000次自增结束了
a的值为2000000
4.2.5 CAS (乐观锁)
CAS (Compare and Swap/Set) 比较并交换, CAS 的算法过程是: 它包含3个参数CAS(V,E,N) ;
- V 表示要更新的变量(内存值), E表示预期值(旧的), N表示新值;
- 当且仅当 V 值等于 E 值是, 才会将 N 值覆给 V 值;
- 如果 V 值不等于 E 值, 表示当前有其它线程正在进行操作, 那么当前线程则什么都不做, 返回 V 的真实值;
- 当多个线程同时使用 CAS 操作一个变量时, 只有一个会胜出, 并成功更新, 其余均会失败; 失败的线程不会被挂起, 仅是被告知失败, 并且允许再次尝试, 当然也允许失败的线程放弃操作;
4.2.6 原子操作引发的 ABA 问题
4.3 乐观锁与悲观锁的区别
- 乐观锁: 乐观思想, 认为读多写少, 遇到并发写的可能性低; 每次去拿数据的时候都认为别人不会修改, 所以不会上锁, 但是在更新的时候会判断一下有么有其它线程在此期间去更新这个数据, 采取在写入数据的时候加锁的操作, 也就是说在 写入数据 时候, 其它线程是可以进行读操作 的;
- 悲观锁: 悲观思想, 认为写多, 遇到并发写的可能性高; 每次去拿数据的时候都认为别人会修改, 所以每次在读写数据的时候都会上锁, 这样别人想读写这个数据就会block直到拿到锁;
4.4 并发工具包
java.util.concurrent 下提供的一系列的处理并发问题的容器和工具类
4.4.1 并发容器
CopyOnWriteArrayList
与 ArrayList 相关操作一致, 不同的是 CopyOnWriteArrayList 就线程安全的, 其内部源码使用了同步代码块的方式, 并且 new 了一个 Object 对象作为锁;
CopyOnWriteArraySet
与 CopyOnWriteArrayList 的相关操作一致, 是依赖 CopyOnWriteArrayList 实现的, 额外加了两个函数 addIfAbsent() 和 addAllAbsent() 来保证元素的唯 一性, 元素已经在列表中, 就不往里面添加了;
ConcurrentHashMap
并发基于分段锁思想;
4.4.2 并发工具类
CountDownLatch
利用这个类可以实现类似计数器的功能; 比如: 线程C 只有在 线程A 和线程B 走完 之后才能执行; 示例示码如下:
public class ThreadTest {
public static void main(String[] args) {
// 初始计数值为 2
CountDownLatch countDownLatch = new CountDownLatch(2);
// 线程c
new Thread(new Runnable() {
@Override
public void run() {
try {
// 当基数值减为0时, 等待的线程C 被唤醒执行
countDownLatch.await();
Thread.sleep(3000);
System.out.println("线程C执行完毕");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程c").start();
// 线程 A
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程A执行完毕");
// 每次调用 countDown 函数计数值都会减一
countDownLatch.countDown();
}
}, "线程A").start();
// 线程 B
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程B执行完毕");
// 每次调用 countDown 函数计数值都会减一
countDownLatch.countDown();
}
}, "线程B").start();
}
}
CyclicBarrier
通过它可以实现让一组线程等待至某个状态之后再全部同时执行, 当所有等待线程都被释放以后,CyclicBarrier 可以被重用;
示例:公司召集5名员工开会,等5名员工都到了,会议开始。
---------创建5个员工线程,1个开会线程,几乎同时启动,使用 CyclicBarrier 保证5名员工线程全部执行后,再执行开会线程;
---------如果是再有5个员工线程, 重新进入, 也可以重新开始开会 ( CyclicBarrier 可被重复利用);
public class ThreadTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
// 当达到解放出阻塞线程的解开条件时,会回调此函数
@Override
public void run() {
System.out.println("可以开始开会了,会议内容是......");
}
});
MyRunnable myRunnable = new MyRunnable(cyclicBarrier);
new Thread(myRunnable,"员工一").start();
new Thread(myRunnable,"员工二").start();
new Thread(myRunnable,"员工三").start();
new Thread(myRunnable,"员工四").start();
new Thread(myRunnable,"员工五").start();
new Thread(myRunnable,"员工六").start();
new Thread(myRunnable,"员工七").start();
new Thread(myRunnable,"员工八").start();
new Thread(myRunnable,"员工九").start();
new Thread(myRunnable,"员工十").start();
}
}
public class MyRunnable implements Runnable {
CyclicBarrier cyclicBarrier;
public MyRunnable(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
if (cyclicBarrier != null) {
System.out.println(Thread.currentThread().getName()+":到达了会议室");
try {
// 阻塞线程继续执行,直到达到放开阻塞的条件
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
// 阻塞放开之后,线程继续往下执行
System.out.println(Thread.currentThread().getName()+":离开了会议室");
}
}
}
Semaphore
字面意思为信号量,Semaphore 可以控制 同时访问的线程个数;
案例: 教室同时只允许至多4名学生进来上课, 出去一名, 才能再进来一名; 示例代码如下:
public class CLassRoom {
// 最大同时访问的线程个数为4
Semaphore sp = new Semaphore(4);
public void into(){
// 获得许可
try {
sp.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(Thread.currentThread().getName()+"正在听课3秒钟");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"听课完成");
// 释放许可
sp.release();
}
}
public class MyRunnable implements Runnable {
ClassRoom cr;
public MyRunnable(ClassRoom cr) {
this.cr = cr;
}
@Override
public void run() {
cr.into();// 进入教室
}
}
public class ThreadTest {
public static void main(String[] args) {
CLassRoom cLassRoom = new CLassRoom();
MyRunnable myRunnable = new MyRunnable(cLassRoom);
new Thread(myRunnable,"学生一").start();
new Thread(myRunnable,"学生二").start();
new Thread(myRunnable,"学生三").start();
new Thread(myRunnable,"学生四").start();
new Thread(myRunnable,"学生五").start();
new Thread(myRunnable,"学生六").start();
new Thread(myRunnable,"学生七").start();
new Thread(myRunnable,"学生八").start();
new Thread(myRunnable,"学生九").start();
new Thread(myRunnable,"学生十").start();
}
}
Exchanger
是一个用于线程间协作的工具类; Exchanger用于进行 线程间的数据交换;
示例代码:
public class ThreadA extends Thread {
Exchanger<String> exchanger;
public ThreadA(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String s = exchanger.exchange("ThreadA");
System.out.println("线程A接收到了线程B交换的数据: "+ s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
Exchanger<String> exchanger;
public ThreadB(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String s = exchanger.exchange("ThreadB");
System.out.println("线程B接收到了线程A交换的数据: "+ s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadTest {
public static void main(String[] args) {
Exchanger<String> stringExchanger = new Exchanger<>();
new ThreadA(stringExchanger).start();
new ThreadB(stringExchanger).start();
}
}
打印结果:
线程A接收到了线程B交换的数据: ThreadB
线程B接收到了线程A交换的数据: ThreadA
5 线程的状态
- 新建(new)
- 运行(runnable)
- 终止(teminated)
- 锁阻塞(blocked)
- 无线等待(waiting)
- 限时等待(time waiting)
线程状态之间的关系:
运行与锁阻塞: 线程未抢到锁对象就会阻塞, 抢到锁对象就会进入运行状态;
运行与无线等待: 运行中的线程调用锁对象的 wait() 函数, 会进入无限等待状态等待被唤醒; 如果无限等待中的线程被唤醒但 没有抢到锁, 那么会进入阻塞状态; 如果无限等待中的线程被唤醒并且 抢到了锁, 那么会进入运行状态;
运行与限时等待第一种情况: 运行中的线程调用锁对象的 wait(数字) 函数, 会进入无限等待状态等待被唤醒;
如果时间到了, 但没有抢到锁, 会进入阻塞状态;
如果时间到了, 并且抢到了锁, 会进入运行状态;
如果时间没到, 却被唤醒, 抢到了锁就会进行运行状态, 没有抢到锁就会进入阻塞状态;
运行与限时等待第二种情况: 运行中的线程调用 sleep() 方法, 由于调用 sleep() 方法并不像调用 wait() 方法那样会释放锁, 所以线程应该是属于限时等待状态, 但是时间到了就会直接进行运行状态;
代码示例
public class ThreadTest {
public static void main(String[] args) {
Object object = new Object();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (object){
try {
System.out.println("线程一进入无限等待状态");
object.wait();
System.out.println("线程一被线程二唤醒进行运行状态");
Thread.sleep(2000);
System.out.println("线程一执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"线程一").start();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (object){
try {
System.out.println("线程二准备唤醒线程一");
Thread.sleep(2000);
object.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"线程二").start();
}
}
5.1 生产者消费者案例一
public class ThreadA extends Thread {
Product product;
String name;
public ThreadA(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (product.flag) {
try {
System.out.println(name + "进入无限等待");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!product.flag) {
product.flag = true;
System.out.println(name + "正在生产商品");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.notify();
}
}
}
}
}
public class ThreadB extends Thread {
Product product;
String name;
public ThreadB(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (!product.flag) {
try {
System.out.println("------" + name + "进入无限等待");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (product.flag) {
product.flag = false;
System.out.println("------" + name + "正在消费商品");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.notify();
}
}
}
}
}
public class Product {
public boolean flag = false;
}
public class ThreadTest2 {
public static void main(String[] args) {
Product product = new Product();
new ThreadA(product, "生产者线程1").start();
new ThreadB(product, "消费者线程1").start();
}
}
5.1 生产者消费者案例二
public class Product {
public int num = 0;
public boolean flag2 = false;
}
public class ThreadTest2 {
public static void main(String[] args) {
Product product = new Product();
new ThreadA(product, "生产者线程1").start();
new ThreadA(product, "生产者线程2").start();
new ThreadA(product, "生产者线程3").start();
new ThreadA(product, "生产者线程4").start();
new ThreadA(product, "生产者线程5").start();
new ThreadA(product, "生产者线程6").start();
new ThreadA(product, "生产者线程7").start();
new ThreadA(product, "生产者线程8").start();
new ThreadA(product, "生产者线程9").start();
new ThreadB(product, "消费者线程1").start();
new ThreadB(product, "消费者线程2").start();
new ThreadB(product, "消费者线程3").start();
new ThreadB(product, "消费者线程4").start();
new ThreadB(product, "消费者线程5").start();
new ThreadB(product, "消费者线程6").start();
new ThreadB(product, "消费者线程7").start();
new ThreadB(product, "消费者线程8").start();
new ThreadB(product, "消费者线程9").start();
}
}
public class ThreadA extends Thread {
Product product;
String name;
public ThreadA(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (!product.flag2) {
if (product.num < 10) {
product.num++;
System.out.println(name + "正在生产第" + product.num + "商品");
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (product.num == 10) {
product.flag2 = true;
product.notifyAll();
System.out.println(name + "进入无限等待");
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
}
public class ThreadB extends Thread {
Product product;
String name;
public ThreadB(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (product.flag2) {
if (product.num <= 10) {
System.out.println("------" + name + "正在消费第" + product.num + "商品");
product.num--;
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (product.num == 0) {
product.flag2 = false;
product.notifyAll();
System.out.println("------" + name + "进入无限等待");
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
}