多线程实现与同步工具包详解
多线程学习
概念
简述如下:
并发:指一个CPU可以异步的处理多个进程
并行:则是一个CPU同时处理多个进程
进程:程序运行的执行过程,是一个程序的实例。每个进程都有自己的虚拟地址空间和控制线程
线程:是进程的一个执行单元,是操作系统调度器(Schduler)分配处理器时间的基础单元。
一句话总结:
线程是程序执行时的最小单位,它是进程的一个执行流,是CPU调度和分派的基本单位,一个进程可以由很多个线程组成,线程间共享进程的所有资源,每个线程有自己的堆栈和局部变量。线程由CPU独立调度执行,在多CPU环境下就允许多个线程同时运行。同样多线程也可以实现并发操作,每个请求分配一个线程来处理。
Java代码实现方式
- 方式一、继承Thread类,因为Java 是单继承,继承Thread的类不能再继承其他类,并且继承Thread其本身就是执行线程,每个子线程之间相互独立,资源不共享
- 方式二、实现Runnable接口,资源共享,多个线程可以对共享数据进行操作
- 方式三、实现Callable接口通过FutureTask包装器来创建Thread线程,可以接收一个返回值
- 方式四、使用ExecutorService、Callable、Future实现有返回结果的线程
完整代码如下:
import java.util.concurrent.*;
public class Demo1 {
/**
* 第一种实现方式:
* 继承Thread类创建线程
*/
public class MyThread extends Thread{
private int ticket = 10;
@Override
public void run() {
for (int i=0;i<=10;i++){
if(ticket>0){
System.out.println(Thread.currentThread().getName()+"工号 还有 "+ticket--+" 张传单需要派发");
}else{
System.out.println(Thread.currentThread().getName()+"工号 的传单全部派发完了,可以下班了");
}
}
}
}
/**
* 第二种实现方式:
* 实现Runnable接口创建线程
*/
public class MyRunable implements Runnable{
private int ticket = 10;
@Override
public void run() {
for (int i=0;i<10;i++){
//加锁互斥
synchronized (this){
if(ticket>0){
System.out.println(Thread.currentThread().getName()+"窗口 卖出倒数第"+ticket--+"张回家车票");
}else {
System.out.println(Thread.currentThread().getName()+"窗口 车票已售罄");
break;
}
}
}
}
}
/**
* 第三种实现方式:
* 实现Callable接口通过FutureTask包装器来创建Thread线程
*/
public class MyCallRunable implements Callable<Integer>{
@Override
public Integer call() throws Exception {
int sum=0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
}
/**
* 第四种方式:
* 线程池的方式,使用ExecutorService、Callable、Future实现有返回结果的线程
*/
public class OddCallRunable implements Callable<Integer>{
@Override
public Integer call() throws Exception {
int sum=0;
for (int i = 0; i <= 100; i++) {
if(i%2 == 0)
sum += i;
}
return sum;
}
}
public class EvenCallRunable implements Callable<Integer>{
@Override
public Integer call() throws Exception {
int sum=0;
for (int i = 0; i <= 100; i++) {
if(i%2 == 1)
sum += i;
}
return sum;
}
}
/**
* 总结:
* 方式一、继承Thread类,因为Java 是单继承,继承Thread的类不能再继承其他类,并且继承Thread 其本身就是执行线程,每个子线程之间相互独立,资源不共享
* 方式二、实现Runnable接口,资源共享,多个线程可以对共享数据进行操作
* 方式三、实现Callable接口通过FutureTask包装器来创建Thread线程,可以接收一个返回值
* 方式四、使用ExecutorService、Callable、Future实现有返回结果的线程
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("第一种方式:");
MyThread thread1 = new Demo1().new MyThread();
MyThread thread2 = new Demo1().new MyThread();
MyThread thread3 = new Demo1().new MyThread();
thread1.start();
thread2.start();
thread3.start();
Thread.sleep(1000);
System.out.println();
System.out.println("第二种方式,");
MyRunable myRunable = new Demo1().new MyRunable();
Thread threadTwo1 = new Thread(myRunable);
Thread threadTwo2 = new Thread(myRunable);
Thread threadTwo3 = new Thread(myRunable);
threadTwo1.start();
threadTwo2.start();
threadTwo3.start();
Thread.sleep(1000);
System.out.println();
System.out.println("第三种方式:");
MyCallRunable myCallRunable = new Demo1().new MyCallRunable();
FutureTask<Integer> futureTask = new FutureTask<>(myCallRunable);
Thread threadCall = new Thread(futureTask);
threadCall.start();
Integer integer = futureTask.get();
System.out.println("result sum="+integer);
Thread.sleep(1000);
System.out.println();
System.out.println("第四种方式:");
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 创建多个有返回值的任务
OddCallRunable oddCallRunable = new Demo1().new OddCallRunable();
EvenCallRunable evenCallRunable = new Demo1().new EvenCallRunable();
Future<Integer> submit1 = pool.submit(oddCallRunable);
Future<Integer> submit2 = pool.submit(evenCallRunable);
// 关闭线程池
pool.shutdown();
System.out.println("1-100 偶数和:"+submit1.get());
System.out.println("1-100 奇数和:"+submit2.get());
System.out.println("1-100 总和:"+(submit1.get()+submit2.get()));
}
}
线程的类型
- 主线程:
JVM调用程序main()所产生的线程。 - 当前线程:
这个是容易混淆的概念。一般指通过Thread.currentThread()来获取的进程。 - 后台线程:
指为其他线程提供服务的线程,也称为守护线程。JVM的垃圾回收线程就是一个后台线程。用户线程和守护线程的区别在于,是否等待主线程依赖于主线程结束而结束 - 前台线程:
是指接受后台线程服务的线程,其实前台后台线程是联系在一起,就像傀儡和幕后操纵者一样的关系。傀儡是前台线程、幕后操纵者是后台线程。由前台线程创建的线程默认也是前台线程。
可以通过isDaemon()和setDaemon()方法来判断和设置一个线程是否为后台线程。但必须在start()方法前调用,否则会报错
线程的常用方法
- sleep()
强迫一个线程睡眠N毫秒。 - isAlive()
判断一个线程是否存活。 - join()
等待线程终止。 - activeCount()
程序中活跃的线程数。 - enumerate()
枚举程序中的线程。 - currentThread()
得到当前线程。 - isDaemon()
一个线程是否为守护线程。 - setDaemon()
设置一个线程为守护线程。(用户线程和守护线程的区别在于,是否等待主线程依赖于主线程结束而结束) - setName()
为线程设置一个名称。 - wait()
强迫一个线程等待。 - notify()
通知一个线程继续运行。 - setPriority()
设置一个线程的优先级 - stop()
线程中断,弃用,这个方法是有问题。 - interrupt()
给线程设置一个中断状态,然后通过isInterruptd()方法获取是否中断的状态,但是如果线程中调用 join()、wait()、slettp() 等线程阻塞的方法,会清除掉 interrupt状态,所以慎用。
synchronized关键字
由于每个线程执行的过程是不可控的,所以很可能导致最终的结果与实际上的愿望相违背或者直接导致程序出错。
比如,当多个线程同时访问临界资源(一个对象,对象中的属性,一个文件,一个数据库等)时,就可能会产生线程安全问题。
临界资源就是共享资源。比如共享变量等
怎么解决线程安全?专业术语叫序列化访问临界资源
线程安全解决方案
synchronized 就是来解决线程安全的方案之一,互斥锁:顾名思义,能到达到互斥访问目的的锁,即在同一时刻,只能有一个线程访问临界资源,也称作同步互斥访问
如果对临界资源加上互斥锁,当一个线程在访问该临界资源时,其他线程便只能等待。
使用synchronized关键字来标记一个方法或者代码块,当某个线程调用该对象的synchronized方法或者访问synchronized代码块时,这个线程便获得了该对象的锁,其他线程暂时无法访问这个方法,只有等待这个方法执行完毕或者代码块执行完毕,这个线程才会释放该对象的锁,其他线程才能执行这个方法或者代码块。
就比如上述代码实现的方式二,片段代码如下,
public void run() {
for (int i=0;i<10;i++){
//加锁互斥
synchronized (this){
if(ticket>0){
System.out.println(Thread.currentThread().getName()+"窗口 卖出倒数第"+ticket--+"张回家车票");
}else {
System.out.println(Thread.currentThread().getName()+"窗口 车票已售罄");
break;
}
}
}
}
synchronized (this)
中的this 就是一把锁,以当前对象作为锁,多个窗口售票,但却保证了卖票的数量不会出错。
上述的例子是修饰一段代码块,同样的也是可以修饰方法,看情况选择,修饰代码块比较灵活,比如一个方法里面可能你只有一小部分需要同步的话,就没必要放在方法上
synchronized的作用域
-
当一个线程正在访问一个对象的synchronized方法,那么其他线程不能访问该对象的其他
synchronized
方法。这个原因很简单,因为一个对象只有一把锁,当一个线程获取了该对象的锁之后,其他线程无法获取该对象的锁,所以无法访问该对象的其他synchronized方法。 -
当一个线程正在访问一个对象的synchronized方法,那么其他线程能访问该对象的
非synchronized
方法。这个原因很简单,访问非synchronized
方法不需要获得该对象的锁,假如一个方法没用synchronized关键字修饰,说明它不会使用到临界资源,那么其他线程是可以访问这个方法的, -
如果一个线程A需要访问对象object1的
synchronized
方法method1
,另外一个线程B需要访问对象object2的synchronized
方法method1
,即使object1和object2是同一类型),也不会产生线程安全问题,因为他们访问的是不同的对象,所以不存在互斥问题。 -
synchronized关键字是不能继承的,也就是说,基类的方法
synchronized method1(){}
在继承类中并不自动是synchronized method1(){}
,而是变成了method1(){}
。继承类需要你显式的指定它的某个方法为synchronized方法;
Lock
从Java 5之后,在java.util.concurrent.locks包下提供了另外一种方式来实现同步访问,那就是Lock。
可以说Lock就是synchronized的增强版,区别:
- synchronized是java内置关键字,在jvm层面,Lock是个java类;
- synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁;
- synchronized会自动释放锁,Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁;
- 用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁可以设置一个超时时间
- synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(两者皆可)
- Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题。
小栗子:
public class Demo2 {
public static void main(String[] args) {
MyRunable myRunable = new MyRunable();
Thread thread1 = new Thread(myRunable);
Thread thread2 = new Thread(myRunable);
Thread thread3 = new Thread(myRunable);
thread1.start();
thread2.start();
thread3.start();
}
}
class MyRunable implements Runnable{
private static int ticket = 10;
private static Lock lock = new ReentrantLock();
@Override
public void run() {
while (true){
//加锁互斥
try {
lock.lock();
if(ticket>0){
System.out.println(Thread.currentThread().getName()+"窗口 卖出倒数第"+ticket--+"张回家车票");
}else {
System.out.println(Thread.currentThread().getName()+"窗口 车票已售罄");
break;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
wait()、notify/notifyAll()
这三个方法是用于线程间通信的基础方法,但实际上,它们不是Thread类中的方法,而是Object类中的本地方法
当线程执行wait()方法时候,会释放当前的锁,然后让出CPU,进入等待状态。
只有当 notify/notifyAll() 被执行时候,才会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到wait() ,再次释放锁。
notify方法只唤醒一个等待(对象的)线程并使该线程开始执行。所以如果有多个线程等待一个对象,这个方法只会唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现。notifyAll 会唤醒所有等待(对象的)线程,尽管哪一个线程将会第一个处理取决于操作系统的实现
需求:交替打印0-100 的奇偶数
// 打印的数据体
public class Number {
int i=1;
volatile boolean isOdd=false;
public int getI() {
return i;
}
public void setI(int i) {
this.i = i;
}
public boolean isOdd() {
return isOdd;
}
public void setOdd(boolean odd) {
isOdd = odd;
}
}
//奇数线程
public class EvenRunable implements Runnable{
Number number;
int maxNumber;
public EvenRunable(Number number, int maxNumber) {
this.number=number;
this.maxNumber=maxNumber;
}
@Override
public void run() {
while (number.getI() <= maxNumber) {
synchronized (number) {
if (!number.isOdd) {
System.out.println("奇数i=" + number.getI());
number.setI(number.getI()+1);
number.setOdd(true);
//唤醒其他线程
number.notify();
} else {
try {
//释放锁,阻塞
number.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
// 偶数线程
public class OddRunable implements Runnable {
Number number;
int maxNumber;
public OddRunable(Number number, int maxNumber) {
this.number = number;
this.maxNumber = maxNumber;
}
@Override
public void run() {
while (number.getI() <= maxNumber) {
synchronized (number) {
if (number.isOdd) {
System.out.println("偶数i=" + number.getI());
number.setI(number.getI()+1);
number.setOdd(false);
number.notify();
} else {
try {
number.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
// 运行
public class Main {
public static void main(String[] args) {
Number number = new Number();
int maxNumber=100;
EvenRunable evenRunable = new EvenRunable(number,maxNumber);
OddRunable oddRunable = new OddRunable(number,maxNumber);
Thread evenThread = new Thread(evenRunable);
Thread oddThread = new Thread(oddRunable);
oddThread.start();
evenThread.start();
}
}
//输出
奇数i=1
偶数i=2
奇数i=3
.....
偶数i=98
奇数i=99
偶数i=100
JUC(java.util.concurrent
)包同步工具
AtomicInteger
AtomicInteger原子类型,线程安全,如何保证线程安全,它的部分源码如下:
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
我们看到value
使用了volatile
修饰符,volatile
相当于synchronized
的弱实现,也就是说volatile
实现了类似synchronized
的语义,却又没有锁机制。
它确保对volatile字段的更新以可预见的方式告知其他的线程。
例子:多线程顺序打印abc
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Demo5 implements Runnable{
private static AtomicInteger currentCount = new AtomicInteger(0);
private static final Integer MAX_COUNT = 30;
private static String [] chars = {"a", "b", "c"};
private String name;
public Demo5(String name) {
this.name = name;
}
@Override
public void run() {
while(currentCount.get()<MAX_COUNT){
if(this.name.equals(chars[currentCount.get()%3])){
printAndPlusOne(this.name + "\t" + currentCount);
}
}
}
public void printAndPlusOne(String content){
System.out.println(content);
currentCount.getAndIncrement();
}
public static void main(String [] args){
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Demo5("a"));
executorService.submit(new Demo5("b"));
executorService.submit(new Demo5("c"));
executorService.shutdown();
}
}
CyclicBarrier
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
回环的意思是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
例子:多线程顺序打印abc
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Demo6 implements Runnable{
//其参数表示屏障拦截的线程数量
private static int threadNum = 3;
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum);
private static Integer currentCount = 0;
private static final Integer MAX_COUNT = 30;
private static String [] chars = {"a", "b", "c"};
private String name;
public Demo6(String name) {
this.name = name;
}
@Override
public void run() {
while(currentCount<MAX_COUNT){
while(this.name.equals(chars[currentCount%3]))
printAndPlusOne(this.name + "\t" + currentCount);
try {
//使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。等threadNum 个全部await之后再全部同时执行
cyclicBarrier.await();
}catch (Exception e){
e.printStackTrace();
}
}
}
public void printAndPlusOne(String name){
System.out.println(name);
currentCount ++;
}
public static void main(String [] args){
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Demo6("a"));
executorService.submit(new Demo6("b"));
executorService.submit(new Demo6("c"));
executorService.shutdown();
}
}
参考自:https://blog.csdn.net/u013968384/article/details/82584944
CountDownLatch
倒计时计数器,countDown()
方法调用递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
调用await()方法的线程会被挂起,它会等待直到计数值为0才继续执行
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class Demo8 {
public static void main(String [] args) throws InterruptedException {
//赛道个数,一人一道
int count = 6;
//指挥官
final CountDownLatch masterCountDownLatch = new CountDownLatch(1);
//闭锁,可实现计数器递减
final CountDownLatch countDownLatch = new CountDownLatch(count);
AtomicInteger atomicInteger = new AtomicInteger(1);
System.out.println("校园400米赛跑,即将开始");
Thread.sleep(1*1000);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < count ; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName()+"准备好了");
//比赛选择准备
masterCountDownLatch.await();
System.out.println(Thread.currentThread().getName()+"拼命奔跑中");
Thread.sleep((long)(Math.random()*10000));
if(atomicInteger.decrementAndGet() >= 0){
System.out.println("冠军诞生了 "+Thread.currentThread().getName()+"首先到达终点");
}else{
System.out.println("其次 "+Thread.currentThread().getName()+"到达终点");
}
} catch (Exception e) {
e.printStackTrace();
}
//闭锁减一
countDownLatch.countDown();
});
}
Thread.sleep(5*1000);
System.out.println("预备,开始比赛");
// 准备结束,开始比赛,处于等待的线程继续执行任务
masterCountDownLatch.countDown();
countDownLatch.await();//线程阻塞,直到闭锁值为0时,阻塞才释放,继续往下执行
System.out.println("比赛结束");
executorService.shutdown();
}
}
控制台打印:
校园400米赛跑,即将开始
pool-1-thread-1准备好了
pool-1-thread-2准备好了
pool-1-thread-3准备好了
pool-1-thread-4准备好了
pool-1-thread-5准备好了
pool-1-thread-6准备好了
预备,开始比赛
pool-1-thread-1拼命奔跑中
pool-1-thread-2拼命奔跑中
pool-1-thread-6拼命奔跑中
pool-1-thread-5拼命奔跑中
pool-1-thread-3拼命奔跑中
pool-1-thread-4拼命奔跑中
冠军诞生了 pool-1-thread-4首先到达终点
其次 pool-1-thread-6到达终点
其次 pool-1-thread-1到达终点
其次 pool-1-thread-2到达终点
其次 pool-1-thread-5到达终点
其次 pool-1-thread-3到达终点
比赛结束
Semaphore
信号量,是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
构造器参数 permits(许可数),定义资源可以并发访问的最大个数。
常用方法:
- acquire()
获取执行许可,当总计未释放的许可数不超过permits时,允许通行,否则线程阻塞等待,直到获取到许可。 - release()
释放许可,可以让其他线程获得许可
并发访问如下demo:
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
public class Demo7{
public static void main(String [] args) throws InterruptedException {
int clientTotal = 12;
// 同时并发执行的线程数,这里
int threadTotal = 5;
int count = 0;
ExecutorService executorService = Executors.newCachedThreadPool();
//信号量,此处用于控制并发的线程数
final Semaphore semaphore = new Semaphore(threadTotal);
//闭锁,可实现计数器递减
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
//执行此方法用于获取执行许可,当总计未释放的许可数不超过threadTotal时,
//允许通行,否则线程阻塞等待,直到获取到许可。
semaphore.acquire();
//任务
goToWC();
//释放许可
semaphore.release();
} catch (Exception e) {
//log.error("exception", e);
e.printStackTrace();
}
//闭锁减一
countDownLatch.countDown();
});
}
countDownLatch.await();//线程阻塞,直到闭锁值为0时,阻塞才释放,继续往下执行
executorService.shutdown();
}
//上厕所
public static void goToWC() throws InterruptedException {
System.out.println(new SimpleDateFormat("HH:mm:ss ").format(new Date())+Thread.currentThread().getName()+"-正在使用厕所 ");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+"上完厕所了,回去上班");
}
}
上面的场景是这样的,一个楼层有5个厕所。现在有12 个人要上厕所。只能排队
控制台打印输出,如下
14:12:28 pool-1-thread-5-正在使用厕所
14:12:28 pool-1-thread-7-正在使用厕所
14:12:28 pool-1-thread-3-正在使用厕所
14:12:28 pool-1-thread-2-正在使用厕所
14:12:28 pool-1-thread-1-正在使用厕所
pool-1-thread-5上完厕所了,回去上班
pool-1-thread-1上完厕所了,回去上班
pool-1-thread-2上完厕所了,回去上班
pool-1-thread-3上完厕所了,回去上班
14:12:30 pool-1-thread-9-正在使用厕所
14:12:30 pool-1-thread-6-正在使用厕所
pool-1-thread-7上完厕所了,回去上班
14:12:30 pool-1-thread-10-正在使用厕所
14:12:30 pool-1-thread-11-正在使用厕所
14:12:30 pool-1-thread-4-正在使用厕所
pool-1-thread-6上完厕所了,回去上班
pool-1-thread-9上完厕所了,回去上班
14:12:32 pool-1-thread-8-正在使用厕所
14:12:32 pool-1-thread-12-正在使用厕所
pool-1-thread-4上完厕所了,回去上班
pool-1-thread-10上完厕所了,回去上班
pool-1-thread-11上完厕所了,回去上班
pool-1-thread-8上完厕所了,回去上班
pool-1-thread-12上完厕所了,回去上班
通过前面打印的时间,可知只有threadTotal 个获得许可,其他只能等待获得许可的线程释放许可才能继续执行。
这几个demo算是对多线程有一个了解吧,多线程还有更难的,一起加油!!!