多线程:3线程间通信
线程间通信:《多线程编程核心技术》笔记
1.等待/通知机制
1.1什么是等待通知机制
1.厨师做完一道菜的时间不确定,将菜放到传菜台上的时间也不确定
2.服务员取到菜的时间取决于厨师,所以服务员就有wait的状态
3.厨师将菜放到台上的时候,通知服务员,就是一种notify通知机制
1.2wait/notify的实现
wait的作用是使当前执行代码的线程进行等待,wait是Object类的方法,该方法用来将当前线程置入“预执行队列”,并且在wait所在的代码处停止执行,知道接到通知或者中断为止,wait之前,线程必须获得该对象的对象级别的锁,也就是只能在同步方法或者同步代码块中调用wait方法,执行wait只会当前线程释放锁。
如果调用wait时没有获得适当的锁,则会抛出IllegalMonitorStateException,他是RuntimeException的一个子类,因此不需要trycatch
notify方法
该方法也要在同步代码块或者同步方法中调用,即调用之前需要取得该对象的对象级别锁,如果调用notify时没有取得合适的锁,会抛出IllegalMonitorStateException,该方法用来通知那些可能等待该对象对象锁的其他线程,如果多个线程等待,则由县城规划器随机挑选其中一个呈wait状态的线程,对其发出通知notify。
要注意:notify之后,当前线程不会马上释放锁,呈wait状态的也不是马上获取锁,要等待执行notify的线程将程序执行完成,也就是退出同步代码块之后,当前线程才会释放锁,wait状态的线程才会获取锁,当一个wait线程由于notifiy获取到锁执行完成之后会释放锁,但是如果他没有调用notify,即便该所对象已经空闲,其他wait状态等待的线程由于没有得到通知也会继续wait下去。
public class Run {
public static void main(String[] args) throws InterruptedException{
String str = new String("");
str.wait();
}
}
运行结果:没有在同步代码块中执行wait会报错
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Unknown Source)
at t1.Run.main(Run.java:8)
public class Run {
public static void main(String[] args) throws InterruptedException{
String lock = "abc";
System.out.println("befor sync");
synchronized (lock) {
System.out.println("before wait");
lock.wait();
System.out.println("after wait");
}
System.out.println("after sync");
}
}
运行结果:没有结束,一直处于wait状态
befor sync
before wait
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock){
super();
this.lock = lock;
}
@Override
public void run() {
super.run();
try {
synchronized (lock) {
System.out.println("开始 wait time = "+ System.currentTimeMillis());
lock.wait();
System.out.println("结束 wait time = "+System.currentTimeMillis());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock){
super();
this.lock = lock;
}
@Override
public void run() {
super.run();
synchronized (lock) {
System.out.println("开始 notify time = "+ System.currentTimeMillis());
lock.notify();
System.out.println("结束 notify time = "+System.currentTimeMillis());
}
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
ThreadA ta = new ThreadA(lock);
ta.start();
Thread.sleep(3000);
ThreadB tb = new ThreadB(lock);
tb.start();
}
}
运行结果:
开始 wait time = 1472717309090//wait进入等待状态
开始 notify time = 1472717312092//notify之后并没有立马释放锁
结束 notify time = 1472717312092//notify执行完成之后才释放锁
结束 wait time = 1472717312092//wait之后的代码执行
更改上面的程序如下,验证wait和notify的对象必须是同一个对象,而且必须是锁对象
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
Object lock2 = new Object();
ThreadA ta = new ThreadA(lock);
ta.start();
Thread.sleep(3000);
ThreadB tb = new ThreadB(lock2);
tb.start();
}
}
运行结果:lock2.notify执行了,没有找到有这个对象锁里的wait线程,线程a是由lock对象锁的,没有相应的notify,所以一直处于wait状态
开始 wait time = 1472717601015
开始 notify time = 1472717604018
结束 notify time = 1472717604018
wait方法可以使调用该方法的线程释放共享资源的锁,然后从运行状态进入等待状态,直到被唤醒
notify方法可以随机唤醒一个处于等待队列中等待同一个共享资源的一个线程,并使这个线程由等待状态进入运行状态,(注意不是立马运行,要等到调用notify方法的线程里的任务执行完成)
notifyall可以使所有在等待队列中等待同一个共享资源的全部线程从等待状态进入运行状态,优先级高的先执行,也可能随机执行,取决于jvm
public class MyList {
private static List list = new ArrayList();
public static void add(){
list.add("anyThing");
}
public static int size(){
return list.size();
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock){
super();
this.lock = lock;
}
@Override
public void run() {
super.run();
try {
synchronized (lock) {
if(MyList.size()!=5){
System.out.println("wait begin "+System.currentTimeMillis());
lock.wait();
System.out.println("wait end "+System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock){
super();
this.lock = lock;
}
@Override
public void run() {
try {
super.run();
synchronized (lock) {
for(int i=0;i<10;i++){
MyList.add();
System.out.println("添加了"+(i+1)+"个元素");
if(MyList.size() == 5){
lock.notify();
System.out.println("已发出通知");
}
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
ThreadA ta = new ThreadA(lock);
ta.start();//线程a先执行,wait之后处于等待状态
Thread.sleep(50);
ThreadB tb = new ThreadB(lock);
tb.start();
//线程b后执行,在list的size等于5时,发出notifiy,
//这时b不会立马释放锁,而是继续将线程内的任务执行完成,然后释放锁,a获取到锁执行wait之后的代码
}
}
运行结果:
wait begin 1472718552983
添加了1个元素
添加了2个元素
添加了3个元素
添加了4个元素
添加了5个元素
已发出通知
添加了6个元素
添加了7个元素
添加了8个元素
添加了9个元素
添加了10个元素
wait end 1472718563036
1.3wait立马释放锁,notify需要执行完成才会释放锁
线程a获取锁,执行wait之后释放锁,线程b才能执行,wait之后也释放锁,然后一直处于wait状态
public class Service {
public void testMethod(Object lock){
try {
synchronized (lock) {
System.out.println("begin wait");
lock.wait();
System.out.println("end wait");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadA extends Thread {
private Object lock;
private Service service;
public ThreadA(Object lock,Service service){
super();
this.lock = lock;
this.service = service;
}
@Override
public void run() {
service.testMethod(lock);
}
}
public class ThreadB extends Thread {
private Object lock;
private Service service;
public ThreadB(Object lock,Service service){
super();
this.lock = lock;
this.service = service;
}
@Override
public void run() {
service.testMethod(lock);
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
Service service = new Service();
ThreadA ta = new ThreadA(lock, service);
ThreadB tb = new ThreadB(lock, service);
ta.start();
Thread.sleep(200);
tb.start();
}
}
运行结果:
begin wait
begin wait
1.4sleep不释放锁
上面的代码做如下修改,
运行结果:执行完成才释放锁
begin sleep
end sleep
begin sleep
end sleep
public class Service {
public void testMethod(Object lock){
try {
synchronized (lock) {
System.out.println("begin sleep");
Thread.sleep(1000);
System.out.println("end sleep");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1.5interrupt遇上wait
当线程处于wait状态时,调用线程对象的interrupt方法会出现InterruptedException异常
public class Service {
public void testMethod(Object lock){
try {
synchronized (lock) {
System.out.println("begin wait");
lock.wait();
System.out.println("end wait");
}
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("出现异常,wait状态的线程被interrupt了");
}
}
}
public class ThreadA extends Thread {
private Object lock;
private Service service;
public ThreadA(Object lock,Service service){
super();
this.lock = lock;
this.service = service;
}
@Override
public void run() {
service.testMethod(lock);
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
Service service = new Service();
ThreadA ta = new ThreadA(lock, service);
ta.start();
Thread.sleep(4000);
ta.interrupt();
}
}
运行结果:
begin wait
java.lang.InterruptedException
出现异常,wait状态的线程被interrupt了
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Unknown Source)
at c3.Service.testMethod(Service.java:8)
at c3.ThreadA.run(ThreadA.java:14)
1.6 只通知一个线程:notify一次只能通知一个被wait的线程
public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait() ThreadName="
+ Thread.currentThread().getName());
lock.wait();
System.out.println(" end wait() ThreadName="
+ Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class ThreadC extends Thread {
private Object lock;
public ThreadC(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class NotifyThread extends Thread {
private Object lock;
public NotifyThread(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println("call notify");
lock.notify();
System.out.println("call notify");
lock.notify();
System.out.println("call notify");
lock.notify();
System.out.println("notify thread end");
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
ThreadB b = new ThreadB(lock);
b.start();
ThreadC c = new ThreadC(lock);
c.start();
Thread.sleep(3000);
NotifyThread notifyThread = new NotifyThread(lock);
notifyThread.start();
}
}
运行结果:
begin wait() ThreadName=Thread-2
begin wait() ThreadName=Thread-0
begin wait() ThreadName=Thread-1
call notify
call notify
call notify
notify thread end
end wait() ThreadName=Thread-2
end wait() ThreadName=Thread-1
end wait() ThreadName=Thread-0
1.7唤醒所有线程:notifyAll能唤醒所有wait的线程
对上面的代码做如下修改
public class NotifyThread extends Thread {
private Object lock;
public NotifyThread(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println("before notify all");
lock.notifyAll();
System.out.println("after notify all");
}
}
}
运行结果:
begin wait() ThreadName=Thread-0
begin wait() ThreadName=Thread-1
begin wait() ThreadName=Thread-2
before notify all
after notify all
end wait() ThreadName=Thread-2
end wait() ThreadName=Thread-1
end wait() ThreadName=Thread-0
1.8wait(long)的使用:等待某一段时间,如果这段时间内有线程唤醒,那么就唤醒,如果没有线程来唤醒,就自己唤醒自己
public class MyRunnable {
static private Object lock = new Object();
static private Runnable runnable1 = new Runnable() {
@Override
public void run() {
try {
synchronized (lock) {
System.out.println("wait begin timer="
+ System.currentTimeMillis());
lock.wait(5000);
System.out.println("wait end timer="
+ System.currentTimeMillis());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
static private Runnable runnable2 = new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("notify begin timer="
+ System.currentTimeMillis());
lock.notify();
System.out.println("notify end timer="
+ System.currentTimeMillis());
}
}
};
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(runnable1);
t1.start();
// Thread.sleep(3000);
// Thread t2 = new Thread(runnable2);
// t2.start();
}
}
注释运行:
wait begin timer=1472733885521
wait end timer=1472733890531
不注释运行:
wait begin timer=1472733995233
notify begin timer=1472733998244
notify end timer=1472733998244
wait end timer=1472733998244
1.9先notify,wait进入之后就会一直处于wati状态
public class MyRun {
private String lock = new String("");
private boolean isFirstRunB = false;
private Runnable runnableA = new Runnable() {
@Override
public void run() {
try {
synchronized (lock) {
while (isFirstRunB == false) {
System.out.println("begin wait");
lock.wait();
System.out.println("end wait");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
private Runnable runnableB = new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("begin notify");
lock.notify();
System.out.println("end notify");
isFirstRunB = true;
}
}
};
public static void main(String[] args) throws InterruptedException {
MyRun run = new MyRun();
Thread b = new Thread(run.runnableB);
b.start();
Thread.sleep(100);
Thread a = new Thread(run.runnableA);
a.start();
}
}
运行结果:
begin notify
end notify
1.10生产者消费者
1.10.1一生产,一消费:操作值
//生产者
public class P {
private String lock;
public P(String lock) {
super();
this.lock = lock;
}
public void setValue() {
try {
synchronized (lock) {
if (!ValueObject.value.equals("")) {
System.out.println("有值生产者等待");
lock.wait();
}
String value = System.currentTimeMillis() + "_"
+ System.nanoTime();
System.out.println("set的值是" + value);
ValueObject.value = value;
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//消费者
public class C {
private String lock;
public C(String lock) {
super();
this.lock = lock;
}
public void getValue() {
try {
synchronized (lock) {
if (ValueObject.value.equals("")) {
System.out.println("值为空消费者等待");
lock.wait();
}
System.out.println("get的值是" + ValueObject.value);
ValueObject.value = "";
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ValueObject {
public static String value = "";
}
public class ThreadP extends Thread {
private P p;
public ThreadP(P p) {
super();
this.p = p;
}
@Override
public void run() {
while (true) {
p.setValue();
}
}
}
public class ThreadC extends Thread {
private C r;
public ThreadC(C r) {
super();
this.r = r;
}
@Override
public void run() {
while (true) {
r.getValue();
}
}
}
public class Run {
public static void main(String[] args) {
String lock = new String("");
P p = new P(lock);
C r = new C(lock);
ThreadP pThread = new ThreadP(p);
ThreadC rThread = new ThreadC(r);
pThread.start();
rThread.start();
}
}
运行流程分析:
pThread,cThread执行,pThread先获取锁,然后在循环中执行setValue方法,最初value为空设置一个值,之后notify,然后pThread在执行的时候value不为空,进入等待,同时cThread被唤醒getvalue,然后notifiy,pthread被唤醒的同时,cThread进入wait
1.10.2多生产,多消费:操作值,假死
同类唤醒,也就是说生产者唤醒生产者,消费者唤醒消费者,会造成假死
//生产者
public class P {
private String lock;
public P(String lock) {
super();
this.lock = lock;
}
public void setValue() {
try {
synchronized (lock) {
while (!ValueObject.value.equals("")) {
System.out.println("生产者 "
+ Thread.currentThread().getName() + " WAITING了★");
lock.wait();
}
System.out.println("生产者 " + Thread.currentThread().getName()
+ " RUNNABLE了");
String value = System.currentTimeMillis() + "_"
+ System.nanoTime();
ValueObject.value = value;
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//消费者
public class C {
private String lock;
public C(String lock) {
super();
this.lock = lock;
}
public void getValue() {
try {
synchronized (lock) {
while (ValueObject.value.equals("")) {
System.out.println("消费者 "
+ Thread.currentThread().getName() + " WAITING了☆");
lock.wait();
}
System.out.println("消费者 " + Thread.currentThread().getName()
+ " RUNNABLE了");
ValueObject.value = "";
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ValueObject {
public static String value = "";
}
public class ThreadC extends Thread {
private C r;
public ThreadC(C r) {
super();
this.r = r;
}
@Override
public void run() {
while (true) {
r.getValue();
}
}
}
public class ThreadP extends Thread {
private P p;
public ThreadP(P p) {
super();
this.p = p;
}
@Override
public void run() {
while (true) {
p.setValue();
}
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
String lock = new String("");
P p = new P(lock);
C r = new C(lock);
ThreadP[] pThread = new ThreadP[2];
ThreadC[] rThread = new ThreadC[2];
for (int i = 0; i < 2; i++) {
pThread[i] = new ThreadP(p);
pThread[i].setName("生产者" + (i + 1));
rThread[i] = new ThreadC(r);
rThread[i].setName("消费者" + (i + 1));
pThread[i].start();
rThread[i].start();
}
Thread.sleep(5000);
Thread[] threadArray = new Thread[Thread.currentThread()
.getThreadGroup().activeCount()];
Thread.currentThread().getThreadGroup().enumerate(threadArray);
for (int i = 0; i < threadArray.length; i++) {
System.out.println(threadArray[i].getName() + " "
+ threadArray[i].getState());
}
}
}
运行结果:
main RUNNABLE
生产者1 WAITING
消费者1 WAITING
生产者2 WAITING
消费者2 WAITING
1.10.1多生产,多消费:解决假死问题
将上面的代码中notify改成notifyAll,通知所有。
1.10.1一生产,一消费:操作栈
public class MyStack {
private List list = new ArrayList();
synchronized public void push() {
try {
if (list.size() == 1) {
this.wait();
}
list.add("anyString=" + Math.random());
this.notify();
System.out.println("push=" + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public String pop() {
String returnValue = "";
try {
if (list.size() == 0) {
System.out.println("pop操作中的:"
+ Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
}
returnValue = "" + list.get(0);
list.remove(0);
this.notify();
System.out.println("pop=" + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
public class P {
private MyStack myStack;
public P(MyStack myStack) {
super();
this.myStack = myStack;
}
public void pushService() {
myStack.push();
}
}
public class C {
private MyStack myStack;
public C(MyStack myStack) {
super();
this.myStack = myStack;
}
public void popService() {
System.out.println("pop=" + myStack.pop());
}
}
public class P_Thread extends Thread {
private P p;
public P_Thread(P p) {
super();
this.p = p;
}
@Override
public void run() {
while (true) {
p.pushService();
}
}
}
public class C_Thread extends Thread {
private C r;
public C_Thread(C r) {
super();
this.r = r;
}
@Override
public void run() {
while (true) {
r.popService();
}
}
}
public class Run {
public static void main(String[] args) {
MyStack myStack = new MyStack();
P p = new P(myStack);
C r = new C(myStack);
P_Thread pThread = new P_Thread(p);
C_Thread rThread = new C_Thread(r);
pThread.start();
rThread.start();
}
}
1.10.1一生产,多消费操作栈:问题
public class MyStack {
private List list = new ArrayList();
synchronized public void push() {
try {
if (list.size() == 1) {
System.out.println("list.size==1 wait");
this.wait();
}
list.add("anyString=" + Math.random());
this.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public String pop() {
String returnValue = "";
try {
if (list.size() == 0) {
System.out.println("pop操作中的:"+ Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
System.out.println("pop操作中的:"+ Thread.currentThread().getName() + " 线程呈wake up状态");
}
System.out.println(Thread.currentThread().getName()+"------>");
returnValue = "" + list.get(0);
list.remove(0);
this.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
public class P {
private MyStack myStack;
public P(MyStack myStack) {
super();
this.myStack = myStack;
}
public void pushService() {
myStack.push();
}
}
public class C {
private MyStack myStack;
public C(MyStack myStack) {
super();
this.myStack = myStack;
}
public void popService() {
System.out.println("pop=" + myStack.pop());
}
}
public class P_Thread extends Thread {
private P p;
public P_Thread(P p) {
super();
this.p = p;
}
@Override
public void run() {
while (true) {
p.pushService();
}
}
}
public class C_Thread extends Thread {
private C r;
public C_Thread(C r) {
super();
this.r = r;
}
@Override
public void run() {
while (true) {
r.popService();
}
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
MyStack myStack = new MyStack();
P p = new P(myStack);
C r1 = new C(myStack);
C r2 = new C(myStack);
C r3 = new C(myStack);
C r4 = new C(myStack);
C r5 = new C(myStack);
P_Thread pThread = new P_Thread(p);
pThread.start();
C_Thread cThread1 = new C_Thread(r1);
C_Thread cThread2 = new C_Thread(r2);
C_Thread cThread3 = new C_Thread(r3);
C_Thread cThread4 = new C_Thread(r4);
C_Thread cThread5 = new C_Thread(r5);
cThread1.start();
cThread2.start();
cThread3.start();
cThread4.start();
cThread5.start();
}
}
运行结果分析:异常出现的原因是判断条件用if,从wait之后醒来的线程不会在进行判断了
list.size==1 wait
Thread-5------>
pop=anyString=0.9249108450015674
pop操作中的:Thread-4 线程呈wait状态
pop操作中的:Thread-3 线程呈wait状态
pop操作中的:Thread-2 线程呈wait状态
pop操作中的:Thread-1 线程呈wait状态
pop操作中的:Thread-5 线程呈wait状态
list.size==1 wait
pop操作中的:Thread-4 线程呈wake up状态//在wait之后执行
Thread-4------>
pop=anyString=0.512932045510578
pop操作中的:Thread-3 线程呈wake up状态//在wait之后执行
Thread-3------>
pop操作中的:Thread-4 线程呈wait状态//从头开始
Exception in thread "Thread-3" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(Unknown Source)
at java.util.ArrayList.get(Unknown Source)
at entity.MyStack.pop(MyStack.java:31)
at service.C.popService(C.java:15)
at extthread.C_Thread.run(C_Thread.java:17)
1.10.1解决问题一:
public class MyStack {
private List list = new ArrayList();
synchronized public void push() {
try {
while (list.size() == 1) {
System.out.println("list.size==1 wait");
this.wait();
}
list.add("anyString=" + Math.random());
this.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public String pop() {
String returnValue = "";
try {
while (list.size() == 0) {
System.out.println("pop操作中的:"+ Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
System.out.println("pop操作中的:"+ Thread.currentThread().getName() + " 线程呈wake up状态");
}
System.out.println(Thread.currentThread().getName()+"------>");
returnValue = "" + list.get(0);
list.remove(0);
this.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
运行结果:假死问题
list.size==1 wait
Thread-5------>
pop=anyString=0.48747145827806926
pop操作中的:Thread-4 线程呈wait状态
pop操作中的:Thread-3 线程呈wait状态
pop操作中的:Thread-2 线程呈wait状态
pop操作中的:Thread-1 线程呈wait状态
pop操作中的:Thread-5 线程呈wait状态
list.size==1 wait
pop操作中的:Thread-4 线程呈wake up状态
Thread-4------>
pop=anyString=0.7843307752651489
pop操作中的:Thread-3 线程呈wake up状态
pop操作中的:Thread-3 线程呈wait状态
pop操作中的:Thread-4 线程呈wait状态
1.10.1解决问题二:notifyAll
public class MyStack {
private List list = new ArrayList();
synchronized public void push() {
try {
while (list.size() == 1) {
System.out.println("list.size==1 wait");
this.wait();
}
list.add("anyString=" + Math.random());
this.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public String pop() {
String returnValue = "";
try {
while (list.size() == 0) {
System.out.println("pop操作中的:"+ Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
System.out.println("pop操作中的:"+ Thread.currentThread().getName() + " 线程呈wake up状态");
}
System.out.println(Thread.currentThread().getName()+"------>");
returnValue = "" + list.get(0);
list.remove(0);
this.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
1.11通过管道进行线程间通信
管道流是一种特殊的流,用于在不同的线程之间直接传送数据,一个线程发送数据到输出管道,另一线程从数据管道中读入数据,通过管道,实现不同线程间通信,而无须借助于类似于临时文件的东西
PipedInputStream和PipedOutputStream
PipedReader和PipedWriter
1.11.1字节流
public class ReadData {
public void readMethod(PipedInputStream input) {
try {
System.out.println("read :");
byte[] byteArray = new byte[5];
System.out.println(" before read 1");
int readLength = input.read(byteArray);
System.out.println(" after read 1");
while (readLength != -1) {
String newData = new String(byteArray, 0, readLength);
System.out.print(newData);
readLength = input.read(byteArray);
System.out.println(" after read 2");
}
System.out.println();
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class WriteData {
public void writeMethod(PipedOutputStream out) {
try {
System.out.println("write :");
for (int i = 0; i < 54; i++) {
String outData = "" + (i + 1);
out.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class ThreadRead extends Thread {
private ReadData read;
private PipedInputStream input;
public ThreadRead(ReadData read, PipedInputStream input) {
super();
this.read = read;
this.input = input;
}
@Override
public void run() {
read.readMethod(input);
}
}
public class ThreadWrite extends Thread {
private WriteData write;
private PipedOutputStream out;
public ThreadWrite(WriteData write, PipedOutputStream out) {
super();
this.write = write;
this.out = out;
}
@Override
public void run() {
write.writeMethod(out);
}
}
public class Run {
public static void main(String[] args) {
try {
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
inputStream.connect(outputStream);
//outputStream.connect(inputStream);
ThreadRead threadRead = new ThreadRead(readData, inputStream);
threadRead.start();
Thread.sleep(2000);
ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
threadWrite.start();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:readThread先执行,执行read之后阻塞在那里,writeThread写入数据,完成之后,readThread读数据
connect使两个stream产生链接
read :
before read 1
write :
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
after read 1
12345 after read 2
67891 after read 2
01112 after read 2
13141 after read 2
51617 after read 2
18192 after read 2
02122 after read 2
23242 after read 2
52627 after read 2
28293 after read 2
03132 after read 2
33343 after read 2
53637 after read 2
38394 after read 2
04142 after read 2
43444 after read 2
54647 after read 2
48495 after read 2
05152 after read 2
5354 after read 2
1.11.2字符流
public class ReadData {
public void readMethod(PipedReader input) {
try {
System.out.println("read :");
char[] byteArray = new char[20];
int readLength = input.read(byteArray);
while (readLength != -1) {
String newData = new String(byteArray, 0, readLength);
System.out.print(newData);
readLength = input.read(byteArray);
}
System.out.println();
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class WriteData {
public void writeMethod(PipedWriter out) {
try {
System.out.println("write :");
for (int i = 0; i < 66; i++) {
String outData = "" + (i + 1);
out.write(outData);
System.out.print(outData);
}
System.out.println();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class ThreadRead extends Thread {
private ReadData read;
private PipedReader input;
public ThreadRead(ReadData read, PipedReader input) {
super();
this.read = read;
this.input = input;
}
@Override
public void run() {
read.readMethod(input);
}
}
public class ThreadWrite extends Thread {
private WriteData write;
private PipedWriter out;
public ThreadWrite(WriteData write, PipedWriter out) {
super();
this.write = write;
this.out = out;
}
@Override
public void run() {
write.writeMethod(out);
}
}
public class Run {
public static void main(String[] args) {
try {
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedReader inputStream = new PipedReader();
PipedWriter outputStream = new PipedWriter();
// inputStream.connect(outputStream);
outputStream.connect(inputStream);
ThreadRead threadRead = new ThreadRead(readData, inputStream);
threadRead.start();
Thread.sleep(2000);
ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
threadWrite.start();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
read :
write :
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
1.12等待通知交叉备份
public class DBTools {
volatile private boolean prevIsA = false;
synchronized public void backupA() {
try {
while (prevIsA == true) {
wait();
}
for (int i = 0; i < 5; i++) {
System.out.println("★★★★★");
}
prevIsA = true;
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public void backupB() {
try {
while (prevIsA == false) {
wait();
}
for (int i = 0; i < 5; i++) {
System.out.println("☆☆☆☆☆");
}
prevIsA = false;
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class BackupA extends Thread {
private DBTools dbtools;
public BackupA(DBTools dbtools) {
super();
this.dbtools = dbtools;
}
@Override
public void run() {
dbtools.backupA();
}
}
public class BackupB extends Thread {
private DBTools dbtools;
public BackupB(DBTools dbtools) {
super();
this.dbtools = dbtools;
}
@Override
public void run() {
dbtools.backupB();
}
}
public class Run {
public static void main(String[] args) {
DBTools dbtools = new DBTools();
for (int i = 0; i < 20; i++) {
BackupB output = new BackupB(dbtools);
output.start();
BackupA input = new BackupA(dbtools);
input.start();
}
}
}
2.方法join的使用
在很多情况下,主线程创建并启动子线程,如果子线程要进行大量的耗时计算,主线程往往将早于子线程结束,如果主线程想等待子线程执行完成之后再结束,就要用到join方法,方法join的作用是等待线程对象销毁
2.1join方法前的铺垫
public class MyThread extends Thread {
@Override
public void run() {
try {
int secondValue = (int) (Math.random() * 10000);
System.out.println(secondValue);
Thread.sleep(secondValue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
MyThread threadTest = new MyThread();
threadTest.start();
// Thread.sleep(?)
System.out.println("我想当threadTest对象执行完毕后我再执行");
System.out.println("但上面代码中的sleep()中的值应该写多少呢?");
System.out.println("答案是:根据不能确定:)");
}
}
2.2用join方法来解决
方法join的作用是使所属的线 程对象x正常执行run方法,而是当前线程z进行阻塞,等到线程x销毁后再去执行z后面的代码
方法join具有使线程排队运行的作用,join与sychronized方法的却别是,join在内部使用wait机制进行等待,而sychronized是使用对象监视器原理作为同步
public class MyThread extends Thread {
@Override
public void run() {
try {
int secondValue = (int) (Math.random() * 10000);
System.out.println(secondValue);
Thread.sleep(secondValue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
try {
MyThread threadTest = new MyThread();
threadTest.start();
threadTest.join();
System.out.println("我想当threadTest对象执行完毕后我再执行,我做到了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
7307
我想当threadTest对象执行完毕后我再执行,我做到了
2.3方法join与interrupt相遇会有异常
public class ThreadA extends Thread {
@Override
public void run() {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
String newString = new String();
Math.random();
}
}
}
public class ThreadB extends Thread {
@Override
public void run() {
try {
ThreadA a = new ThreadA();
a.start();
a.join(); //线程b等待a执行完成再执行
System.out.println("线程B在run end处打印了");
} catch (InterruptedException e) {
System.out.println("线程B在catch处打印了");
e.printStackTrace();
}
}
}
public class ThreadC extends Thread {
private ThreadB threadB;
public ThreadC(ThreadB threadB) {
super();
this.threadB = threadB;
}
@Override
public void run() {
threadB.interrupt();
}
}
public class Run {
public static void main(String[] args) {
try {
ThreadB b = new ThreadB();
b.start();
Thread.sleep(500);
ThreadC c = new ThreadC(b);
c.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Unknown Source)
at java.lang.Thread.join(Unknown Source)
at extthread.ThreadB.run(ThreadB.java:10)
线程B在catch处打印了
2.4方法join(long)的使用
join(long)只等long时间长,如果这时间内线程没有运行完成,就不等待了
如果long时间大于线程的运行时间,线程运行完成就继续执行,不用等够long时间
public class MyThread extends Thread {
@Override
public void run() {
try {
System.out.println("begin Timer=" + System.currentTimeMillis());
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
try {
MyThread threadTest = new MyThread();
threadTest.start();
threadTest.join(2000);// 只等2秒
//Thread.sleep(2000);
System.out.println(" end timer=" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
threadTest.join(2000);// 只等2秒
Thread.sleep(2000);
两个效果一样
2.5join(long)和sleep(long)的区别
join的源码
join(long)内部是wait(long)实现的,所以join(long)方法具有释放锁的特点
sleep(long)不释放锁
/**
* Blocks the current Thread (<code>Thread.currentThread()</code>) until the
* receiver finishes its execution and dies or the specified timeout
* expires, whatever happens first.
*
* <p>
* A timeout of zero means the calling thread should wait forever unless
* interrupted.
*
* @param millis
* The maximum time to wait (in milliseconds).
* @param nanos
* Extra nanosecond precision
* @throws InterruptedException
* if the current thread has been interrupted. The interrupted
* status of the current thread will be cleared before the
* exception is thrown.
* @see Object#notifyAll
* @see java.lang.ThreadDeath
*/
public final void join(long millis, int nanos) throws InterruptedException {
if (millis < 0 || nanos < 0 || nanos >= NANOS_PER_MILLI) {
throw new IllegalArgumentException("bad timeout: millis=" + millis
+ ",nanos=" + nanos);
}
// avoid overflow: if total > 292,277 years, just wait forever
boolean overflow = millis >= (Long.MAX_VALUE - nanos) / NANOS_PER_MILLI;
boolean forever = (millis | nanos) == 0;
if (forever | overflow) {
join();
return;
}
synchronized (lock) {
if (!isAlive()) {
return;
}
// guaranteed not to overflow
long nanosToWait = millis * NANOS_PER_MILLI + nanos;
// wait until this thread completes or the timeout has elapsed
long start = System.nanoTime();
while (true) {
lock.wait(millis, nanos);//wait方法会释放锁
if (!isAlive()) {
break;
}
long nanosElapsed = System.nanoTime() - start;
long nanosRemaining = nanosToWait - nanosElapsed;
if (nanosRemaining <= 0) {
break;
}
millis = nanosRemaining / NANOS_PER_MILLI;
nanos = (int) (nanosRemaining - millis * NANOS_PER_MILLI);
}
}
}
join的内部实现
public final void join() throws InterruptedException {
synchronized (lock) {
while (isAlive()) {
lock.wait();
}
}
}
sleep不释放锁
bService方法用的是threadB线程对象,threadA的run方法内部用的也是threadB锁对象,这两个方法会同步,如果sleep释放锁的话,就不会等够5秒后再执行bService方法
public class ThreadA extends Thread {
private ThreadB b;
public ThreadA(ThreadB b) {
super();
this.b = b;
}
@Override
public void run() {
try {
synchronized (b) {
System.out.println("a run start");
b.start();
Thread.sleep(6000);
// Thread.sleep()不释放锁!
System.out.println("a run end");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
@Override
public void run() {
try {
System.out.println(" b run begin timer="
+ System.currentTimeMillis());
Thread.sleep(5000);
System.out.println(" b run end timer="
+ System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public void bService() {
System.out.println("打印了bService timer=" + System.currentTimeMillis());
}
}
public class ThreadC extends Thread {
private ThreadB threadB;
public ThreadC(ThreadB threadB) {
super();
this.threadB = threadB;
}
@Override
public void run() {
threadB.bService();
}
}
public class Run {
public static void main(String[] args) {
try {
ThreadB b = new ThreadB();
ThreadA a = new ThreadA(b);
a.start();
Thread.sleep(1000);
ThreadC c = new ThreadC(b);
c.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:同步效果,不释放锁
a run start
b run begin timer=1472799749986
b run end timer=1472799754986
a run end
打印了bService timer=1472799755986
join释放锁
public class ThreadA extends Thread {
private ThreadB b;
public ThreadA(ThreadB b) {
super();
this.b = b;
}
@Override
public void run() {
try {
synchronized (b) {
b.start();
b.join();// 说明join释放锁了!
System.out.println("join 后面的代码");
for (int i = 0; i < Integer.MAX_VALUE; i++) {
String newString = new String();
Math.random();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
@Override
public void run() {
try {
System.out.println(" b run begin timer="
+ System.currentTimeMillis());
Thread.sleep(5000);
System.out.println(" b run end timer="
+ System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public void bService() {
System.out.println("打印了bService timer=" + System.currentTimeMillis());
}
}
public class ThreadC extends Thread {
private ThreadB threadB;
public ThreadC(ThreadB threadB) {
super();
this.threadB = threadB;
}
@Override
public void run() {
threadB.bService();
}
}
public class Run {
public static void main(String[] args) {
try {
ThreadB b = new ThreadB();
ThreadA a = new ThreadA(b);
a.start();
Thread.sleep(1000);
ThreadC c = new ThreadC(b);
c.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
b.join();释放了锁,bService方法运行的同时a线程会等待b运行完成之后才执行后面的代码
b run begin timer=1472799836470
打印了bService timer=1472799837471
b run end timer=1472799841469
join 后面的代码
3.ThreadLocal类的使用
3.1get与null
public class Run {
public static ThreadLocal tl = new ThreadLocal();
public static void main(String[] args) {
if (tl.get() == null) {
System.out.println("从未放过值");
tl.set("我的值");
}
System.out.println(tl.get());
System.out.println(tl.get());
}
}
运行结果:
从未放过值
我的值
我的值
3.2线程变量的隔离性
public class Tools {
public static ThreadLocal tl = new ThreadLocal();
}
public class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
if (Tools.tl.get() == null) {
Tools.tl.set("ThreadA" + (i + 1));
} else {
System.out.println("ThreadA get Value=" + Tools.tl.get());
}
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
if (Tools.tl.get() == null) {
Tools.tl.set("ThreadB" + (i + 1));
} else {
System.out.println("ThreadB get Value=" + Tools.tl.get());
}
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
try {
ThreadA a = new ThreadA();
ThreadB b = new ThreadB();
a.start();
b.start();
for (int i = 0; i < 5; i++) {
if (Tools.tl.get() == null) {
Tools.tl.set("Main" + (i + 1));
} else {
System.out.println("Main get Value=" + Tools.tl.get());
}
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Tools {
public static ThreadLocal<Date> tl = new ThreadLocal<Date>();
}
public class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
if (Tools.tl.get() == null) {
Tools.tl.set(new Date());
}
System.out.println("A " + Tools.tl.get().getTime());
Thread.sleep(100);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
if (Tools.tl.get() == null) {
Tools.tl.set(new Date());
}
System.out.println("B " + Tools.tl.get().getTime());
Thread.sleep(100);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
try {
ThreadA a = new ThreadA();
a.start();
Thread.sleep(1000);
ThreadB b = new ThreadB();
b.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
隔离性
4.InheritableThreadLocal类的使用
4.1InheritableThreadLocal可以让子线程从父线程中取得值
public class InheritableThreadLocalExt extends InheritableThreadLocal {
@Override
protected Object initialValue() {
return new Date().getTime();
}
}
public class Tools {
public static InheritableThreadLocalExt tl = new InheritableThreadLocalExt();
}
public class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
System.out.println("在ThreadA线程中取值=" + Tools.tl.get());
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
try {
for (int i = 0; i < 5; i++) {
System.out.println(" 在Main线程中取值=" + Tools.tl.get());
Thread.sleep(200);
}
Thread.sleep(5000);
ThreadA a = new ThreadA();
a.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
在Main线程中取值=1472802419879
在Main线程中取值=1472802419879
在Main线程中取值=1472802419879
在Main线程中取值=1472802419879
在Main线程中取值=1472802419879
在ThreadA线程中取值=1472802419879
在ThreadA线程中取值=1472802419879
在ThreadA线程中取值=1472802419879
在ThreadA线程中取值=1472802419879
在ThreadA线程中取值=1472802419879
4.2取得之后再修改
public class InheritableThreadLocalExt extends InheritableThreadLocal {
@Override
protected Object initialValue() {
return new Date().getTime();
}
@Override
protected Object childValue(Object parentValue) {
return parentValue + " 我在子线程加的~!";
}
}
public class Tools {
public static InheritableThreadLocalExt tl = new InheritableThreadLocalExt();
}
public class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
System.out.println("在ThreadA线程中取值=" + Tools.tl.get());
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
try {
for (int i = 0; i < 5; i++) {
System.out.println(" 在Main线程中取值=" + Tools.tl.get());
Thread.sleep(200);
}
Thread.sleep(5000);
ThreadA a = new ThreadA();
a.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
在Main线程中取值=1472802724324
在Main线程中取值=1472802724324
在Main线程中取值=1472802724324
在Main线程中取值=1472802724324
在Main线程中取值=1472802724324
在ThreadA线程中取值=1472802724324 我在子线程加的~!
在ThreadA线程中取值=1472802724324 我在子线程加的~!
在ThreadA线程中取值=1472802724324 我在子线程加的~!
在ThreadA线程中取值=1472802724324 我在子线程加的~!
在ThreadA线程中取值=1472802724324 我在子线程加的~!