多线程第一阶段课程
2020-09-05 本文已影响0人
jiahzhon
- Java应用程序的main函数是一个线程,是被JVM启动的时候调用,线程的名字叫main。
- 实现一个线程,必须创建Thread实例,override run 方法,并且调用start方法。
- 当JVM启动后,实际上有多个线程,但是至少有一个非守护线程。
- 当你调用一个线程start方法的时候,此时至少有两个线程,一个是调用你的线程,还有一个执行run方法的线程。
- 线程的生命周期分为new,runnable,running,block,terminate.
- run 和 start 的关系很像模板方法模式。run的内容不同,但是最后start都启动了。
- 相比继承方法创建线程的思想,使用接口创建线程的思想把具体算法抽离开来,很像策略模式。(定义一个总的算法接口,再编写接口具体实现类,使用类使用时时创建下转型对象,再把策略放入具体需要的对象构造函数中)。
- 如果构造线程对象时未传入ThreadGroup,Thread会默认获取父线程的ThreadGroup作为该线程的ThreadGroup。ThreadGroup有很多实用的Api,如获取当前线程组在运行线程的数量。
守护线程
-
在Java中有两类线程:User Thread(用户线程)、Daemon Thread(守护线程)
-
用个比较通俗的比如,任何一个守护线程都是整个JVM中所有非守护线程的保姆:
-
只要当前JVM实例中尚存在任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作。
-
Daemon的作用是为其他线程的运行提供便利服务,守护线程最典型的应用就是 GC (垃圾回收器),它就是一个很称职的守护者。
-
User和Daemon两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果 User Thread已经全部退出运行了,只剩下Daemon Thread存在了,虚拟机也就退出了。 因为没有了被守护者,Daemon也就没有工作可做了,也就没有继续运行程序的必要了。
join
- 可设置join后的等待时间
-
Thread,currentThread.join()
这句话的意思是当前线程等待自己死亡,但是自己又不死,所有会一直挂着。
interupt
- 可以打断wait join,sleep。
- 以优雅的方式打断:
public class ThreadCloseGraceful {
private static class Worker extends Thread {
private volatile boolean start = true;
@Override
public void run() {
while (start) {
//
}
}
public void shutdown() {
this.start = false;
}
}
public static void main(String[] args) {
Worker worker = new Worker();
worker.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
worker.shutdown();
}
}
public class ThreadCloseGraceful2 {
private static class Worker extends Thread {
@Override
public void run() {
while (true) {
if (Thread.interrupted())
break;
}
//-------------
//-------------
//-------------
}
}
public static void main(String[] args) {
Worker worker = new Worker();
worker.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
worker.interrupt();
}
}
- 以暴力方式中断(原是用stop方法,但是已过时。基本思想:把任务线程设置成执行线程的守护线程)。
public class ThreadService {
private Thread executeThread;
private boolean finished = false;
public void execute(Runnable task) {
executeThread = new Thread() {
@Override
public void run() {
Thread runner = new Thread(task);
runner.setDaemon(true);
runner.start();
try {
//executeThread 执行线程一定要等 runner任务线程结束
runner.join();
finished = true;
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
};
executeThread.start();
}
public void shutdown(long mills) {
long currentTime = System.currentTimeMillis();
while (!finished) {
if ((System.currentTimeMillis() - currentTime) >= mills) {
System.out.println("任务超时,需要结束他!");
executeThread.interrupt();
break;
}
try {
executeThread.sleep(1);
} catch (InterruptedException e) {
System.out.println("执行线程被打断!");
break;
}
}
finished = false;
}
}
synchronize
- 模拟三个窗口同时接受500个号码
public class Bank {
public static void main(String[] args) {
final SynchronizedRunnable ticketWindow = new SynchronizedRunnable();
Thread windowThread1 = new Thread(ticketWindow, "一号窗口");
Thread windowThread2 = new Thread(ticketWindow, "二号窗口");
Thread windowThread3 = new Thread(ticketWindow, "三号窗口");
windowThread1.start();
windowThread2.start();
windowThread3.start();
}
}
- 使用同步代码块(syn写在死循环里,写在外面会导致只有一个窗口执行)
public class SynchronizedRunnable implements Runnable {
private int index = 1;
private final static int MAX = 500;
private final Object MONITOR = new Object();
@Override
public void run() {
while (true) {
//1
synchronized (MONITOR) {
if (index > MAX)
break;
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 的号码是:" + (index++));
}
//2
}
}
}
- 使用同步方法(这种写法会导致只有一个窗口运行)
public class SynchronizedRunnable implements Runnable{
private int index = 1;
//read only shared data.
private final static int MAX = 500;
//this
@Override
public synchronized void run() {
while (true) {
//1. getField
if (index > MAX)
break;
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
//index++=>index = index+1
//1. get Field index
//2. index = index+1
//3. put field index
System.out.println(Thread.currentThread() + " 的号码是:" + (index++));
}
}
}
- 解决方案(把方法抽取出来,放在死循环里)
public class SynchronizedRunnable implements Runnable {
private int index = 1;
//read only shared data.
private final static int MAX = 500;
//this
@Override
public void run() {
while (true) {
if (ticket())
break;
}
}
private synchronized boolean ticket() {
//1. getField
if (index > MAX)
return true;
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
//index++=>index = index+1
//1. get Field index
//2. index = index+1
//3. put field index
System.out.println(Thread.currentThread() + " 的号码是:" + (index++));
return false;
}
}
this锁(T1,T2并不会同时输出)
public class SynchronizedThis {
public static void main(String[] args) {
ThisLock thisLock = new ThisLock();
new Thread("T1") {
@Override
public void run() {
thisLock.m1();
}
}.start();
new Thread("T2") {
@Override
public void run() {
thisLock.m2();
}
}.start();
}
}
class ThisLock {
public synchronized void m1() {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void m2() {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 同步方法默认的锁为this锁,如果把同步方法设置为静态,则默认使用Class锁。
死锁
- 简易死锁(两个线程的方法调用对方的方法,sleep模拟让线程获取锁的时间变长点就可以了。不加sleep报的是栈溢出)
public class DeadLock {
private OtherService otherService;
public DeadLock(OtherService otherService) {
this.otherService = otherService;
}
private final Object lock = new Object();
public void m1() {
try {
synchronized (lock) {
System.out.println("m1");
Thread.sleep(3000);
otherService.s1();
}
}catch (InterruptedException e){
}
}
}
public class OtherService {
private final Object lock = new Object();
private DeadLock deadLock;
public void s1() {
try {
synchronized (lock) {
System.out.println("s1==========");
Thread.sleep(3000);
deadLock.m1();
}
}catch (InterruptedException e){
}
}
public void setDeadLock(DeadLock deadLock) {
this.deadLock = deadLock;
}
}
public class DeadLockTest {
public static void main(String[] args) throws InterruptedException{
OtherService otherService = new OtherService();
DeadLock deadLock = new DeadLock(otherService);
otherService.setDeadLock(deadLock);
new Thread() {
@Override
public void run() {
while (true) {
deadLock.m1();
}
}
}.start();
new Thread() {
@Override
public void run() {
while (true)
otherService.s1();
}
}.start();
}
}
- 查看死锁
- 命令窗口敲jps,查看对应号码
-
jstack 对应号码
image.png
生产者-消费者模式
- 线程间没有通信的情况下(生产者生产了一大堆,消费者没有消费完)
public class ProduceConsumerVersion1 {
private int i = 1;
final private Object LOCK = new Object();
private void produce() {
synchronized (LOCK) {
System.out.println("P->" + (i++));
}
}
private void consume() {
synchronized (LOCK) {
System.out.println("C->" + i);
}
}
public static void main(String[] args) {
ProduceConsumerVersion1 pc = new ProduceConsumerVersion1();
new Thread("P") {
@Override
public void run() {
while (true)
pc.produce();
}
}.start();
new Thread("C") {
@Override
public void run() {
while (true)
pc.consume();
}
}.start();
}
}
- 用wait,notify来进行线程的通信。在一个生产者一个消费者的情况下运行正常。多生产者多消费者会出现假死情况(全部线程wait)
public class ProduceConsumerVersion2 {
private int i = 0;
final private Object LOCK = new Object();
private volatile boolean isProduced = false;
public void produce() {
synchronized (LOCK) {
if (isProduced) {
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
i++;
System.out.println("P->" + i);
LOCK.notify();
isProduced = true;
}
}
}
public void consume() {
synchronized (LOCK) {
if (isProduced) {
System.out.println("C->" + i);
LOCK.notify();
isProduced = false;
} else {
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProduceConsumerVersion2 pc = new ProduceConsumerVersion2();
Stream.of("P1", "P2").forEach(n ->
new Thread(n) {
@Override
public void run() {
while (true)
pc.produce();
}
}.start()
);
Stream.of("C1", "C2").forEach(n ->
new Thread(n) {
@Override
public void run() {
while (true)
pc.consume();
}
}.start()
);
}
}
-
造成假死的根本原因:notify是随机从wait队列中唤起一个。例(两个生产者处于wait状态,其中一个消费者消费完产品唤醒的却是消费者2,消费者2发现没有产品也进入wait)
-
多生产者消费者版本(解决方案:把notify改成notifyall,把判断标识从If改成while循环)
- if要改成while的原因:当两个生产者都处于wait的时候,被消费者notifyall唤醒。启动一条会抢到锁生产。但是另一条生产并没有再去判断,也会生产商品。造成生产方过多生产。
public class ProduceConsumerVersion3 {
private int i = 0;
final private Object LOCK = new Object();
private volatile boolean isProduced = false;
public void produce() {
synchronized (LOCK) {
while (isProduced) {
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
i++;
System.out.println("P->" + i);
LOCK.notifyAll();
isProduced = true;
}
}
public void consume() {
synchronized (LOCK) {
while (!isProduced) {
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("C->" + i);
LOCK.notifyAll();
isProduced = false;
}
}
public static void main(String[] args) {
ProduceConsumerVersion3 pc = new ProduceConsumerVersion3();
Stream.of("P1", "P2", "P3").forEach(n ->
new Thread(n) {
@Override
public void run() {
while (true) {
pc.produce();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start()
);
Stream.of("C1", "C2", "C3", "C4").forEach(n ->
new Thread(n) {
@Override
public void run() {
while (true) {
pc.consume();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start()
);
}
}
sleep和wait的区别
- sleep是Thread的方法,wait是object的方法
- sleep不需要monitor,wait一定需要同步
- sleep不会释放锁,wait会
- sleep不需要被唤醒,wait需要
简易线程池
public class SimpleThreadPool {
private final int size;
private final static int DEFAULT_SIZE = 10;
private static volatile int seq = 0;
private final static String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";
private final static ThreadGroup GROUP = new ThreadGroup("POOL_GROUP");
private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<Runnable>();
private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<WorkerTask>();
public SimpleThreadPool() {
this(DEFAULT_SIZE);
}
public SimpleThreadPool(int size){
this.size = size;
init();
}
private void init() {
for(int i = 0; i<size; i++) {
createWorkTask();
}
}
public void submit(Runnable runnable) {
synchronized (TASK_QUEUE) {
TASK_QUEUE.addLast(runnable);
TASK_QUEUE.notifyAll();
}
}
private void createWorkTask(){
WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX + (seq++));
task.start();
THREAD_QUEUE.add(task);
}
private enum TaskState{
FREE,RUNNING,BLOCKED,DEAD
}
private static class WorkerTask extends Thread {
private volatile TaskState taskState = TaskState.FREE;
public WorkerTask(ThreadGroup group, String name) {
super(group,name);
}
public TaskState getTaskState(){
return this.taskState;
}
public void run() {
OUTER:
while (this.taskState != TaskState.DEAD) {
Runnable runnable;
synchronized (TASK_QUEUE) {
while (TASK_QUEUE.isEmpty()) {
try {
taskState = TaskState.BLOCKED;
TASK_QUEUE.wait();
}catch (InterruptedException e) {
break OUTER;
}
}
runnable = TASK_QUEUE.removeFirst();
}
if(runnable != null) {
taskState = TaskState.RUNNING;
runnable.run();
taskState = TaskState.FREE;
}
}
}
public void close() {
this.taskState = TaskState.DEAD;
}
}
public static void main(String[] args) throws InterruptedException {
SimpleThreadPool threadPool = new SimpleThreadPool();
for (int i = 0; i < 40; i++) {
threadPool.submit(new Runnable() {
public void run() {
System.out.println("The runnable be serviced by " + Thread.currentThread() + " start.");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The runnable be serviced by " + Thread.currentThread() + " finished.");
}
});
}
}
}