多线程-基础
2020-12-04 本文已影响0人
麦大大吃不胖
by shihang.mai
1. 创建线程&停止线程
1.1 创建线程
- extends Thread->new Class().start();
- implements Runnable->new Thread(new Class()).start();
- Lambda表达式
- 线程池new ThreadPoolExecutor()
public class Testmsh {
public static void main(String[] args) {
//1. 继承Thread
new MyThread().start();
//2. 实现Runnable接口
new Thread(new MyRun()).start();
//3. lambda表达式
new Thread(()->{
System.out.println("i am lambda");
}).start();
//4. 线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolExecutor.submit(new MyRun());
}
static class MyRun implements Runnable {
@Override
public void run() {
System.out.println("Hello MyRun!");
}
}
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Hello MyThread!");
}
}
}
1.2 停止线程
- interrupt
- 标志位
2. 线程状态
线程状态迁移- new Thread()进入NEW状态
- start()进入RUNNABLE状态,而RUNNABLE状态包括READY和RUNNING状态,它们之间通过cpu调度切换,或者RUNNING状态的线程主动掉用yield()进入READY状态
- 线程执行完毕就进入TEMINATED销毁状态
- RUNNABLE状态可通过
- Thread.sleep(time),o.wait(time),t.join(time)变为TIMEWAITING状态,时间结束回到RUNNABLE状态
- o.wait(),t.join()变为WAITING状态,o.notify(),o.notifyAll()回到RUNNABLE状态
- 等待锁变为BLOCKED状态,获得锁回到RUNNABLE状态
3. 多线程的某写类用法
3.1 ReentrantLock
基本和sync一样,写sync的地方直接用lock即可.注意必须手动unlock释放锁。
public class T02_ReentrantLock2 {
Lock lock = new ReentrantLock();
void m1() {
try {
lock.lock(); //synchronized(this)
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
void m2() {
try {
lock.lock();
System.out.println("m2 ...");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
T02_ReentrantLock2 rl = new T02_ReentrantLock2();
new Thread(rl::m1).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(rl::m2).start();
}
}
特点:
-
有尝试锁操作
lock.tryLock(5, TimeUnit.SECONDS);
-
可被打断的锁
Thread t2 = new Thread(()->{ try { //lock.lock(); lock.lockInterruptibly(); //可以对interrupt()方法做出响应 System.out.println("t2 start"); TimeUnit.SECONDS.sleep(5); System.out.println("t2 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t2.start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t2.interrupt();
-
可设置公平锁
ReentrantLock lock=new ReentrantLock(true);
公平锁:去麦当劳买早餐
- 如果没人排队,直接去买
- 如果有人排队,自己排在后面
非公平锁:去麦当劳买早餐
- 如果没人排队,直接去买
- 就算有人排队,我先去插一下队
- 如果插到,就买
- 如果插不到,自己排在队后
3.2 CountDownLatch
倒数器
CountDownLatch latch = new CountDownLatch(100);
//减1
latch.countDown();
//阻塞,等待减到为0,代码继续向下走
latch.await();
...............
3.3 CyclicBarrier
栅栏
CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));
barrier.await();
3.4 Phaser
栅栏组
public class T09_TestPhaser2 {
static Random r = new Random();
static MarriagePhaser phaser = new MarriagePhaser();
static void milliSleep(int milli) {
try {
TimeUnit.MILLISECONDS.sleep(milli);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
phaser.bulkRegister(7);
for(int i=0; i<5; i++) {
new Thread(new Person("p" + i)).start();
}
new Thread(new Person("新郎")).start();
new Thread(new Person("新娘")).start();
}
static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齐了!" + registeredParties);
System.out.println();
return false;
case 1:
System.out.println("所有人吃完了!" + registeredParties);
System.out.println();
return false;
case 2:
System.out.println("所有人离开了!" + registeredParties);
System.out.println();
return false;
case 3:
System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
return true;
default:
return true;
}
}
}
static class Person implements Runnable {
String name;
public Person(String name) {
this.name = name;
}
public void arrive() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 到达现场!\n", name);
phaser.arriveAndAwaitAdvance();
}
public void eat() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 吃完!\n", name);
phaser.arriveAndAwaitAdvance();
}
public void leave() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 离开!\n", name);
phaser.arriveAndAwaitAdvance();
}
private void hug() {
if(name.equals("新郎") || name.equals("新娘")) {
milliSleep(r.nextInt(1000));
System.out.printf("%s 洞房!\n", name);
phaser.arriveAndAwaitAdvance();
} else {
phaser.arriveAndDeregister();
//phaser.register()
}
}
@Override
public void run() {
arrive();
eat();
leave();
hug();
}
}
}
3.5 ReadWriteLock
readLock-共享锁:当是读锁时,其他读线程可以读
writeLock-互斥锁,排他锁:当是写锁时,其他的写和读都不能进行
读必须加锁,如果读不加锁的话,那么在读期间就可能进行写,那么读就会发生"脏读"
public class T10_TestReadWriteLock {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over!");
//模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
value = v;
System.out.println("write over!");
//模拟写操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
//Runnable readR = ()-> read(lock);
Runnable readR = ()-> read(readLock);
//Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable writeR = ()->write(writeLock, new Random().nextInt());
for(int i=0; i<18; i++) new Thread(readR).start();
for(int i=0; i<2; i++) new Thread(writeR).start();
}
3.6 Semaphore
信号量,限流用
public class T11_TestSemaphore {
public static void main(String[] args) {
//Semaphore s = new Semaphore(2);
Semaphore s = new Semaphore(2, true);
//允许一个线程同时执行
//Semaphore s = new Semaphore(1);
new Thread(()->{
try {
s.acquire();
System.out.println("T1 running...");
Thread.sleep(200);
System.out.println("T1 running...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}).start();
new Thread(()->{
try {
s.acquire();
System.out.println("T2 running...");
Thread.sleep(200);
System.out.println("T2 running...");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
3.7 Exchanger
交换器,线程间交换值
public class T12_TestExchanger {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(()->{
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
new Thread(()->{
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();
}
}
3.8 LockSupport
//停止当前线程
LockSupport.park()
//放行线程
LockSupport.unpark(线程对象)
4. 多线程使用
问题1:
实现一个容器,提供两个方法,add,size
写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
//有问题的解法
public class T03_NotifyHoldingLock { //wait notify
//添加volatile,使t2能够得到通知
//-----volatile一般别修饰引用类型,因为修改引用里面的值,并不可见------
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
public static void main(String[] args) {
T03_NotifyHoldingLock c = new T03_NotifyHoldingLock();
final Object lock = new Object();
new Thread(() -> {
synchronized(lock) {
System.out.println("t2启动");
if(c.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 结束");
}
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
new Thread(() -> {
System.out.println("t1启动");
synchronized(lock) {
for(int i=0; i<10; i++) {
c.add(new Object());
System.out.println("add " + i);
if(c.size() == 5) {
lock.notify();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}
/*t2启动
t1启动
add 0
add 1
add 2
add 3
add 4
add 5
add 6
add 7
add 8
add 9
t2 结束
*/
上面代码不行,因为notify()并不会释放所,而lock.wait()后必须重新获取锁才能继续执行
/**
* 问题1: wait()& notify()
* 实现一个容器,提供两个方法,add,size
* 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
*/
public class Question1 {
private List list = new ArrayList();
public void add(Object o){
list.add(o);
}
public int size(){
return list.size();
}
public static void main(String[] args) {
Question1 question1= new Question1();
final Object lock =new Object();
Thread thread1 =new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock){
for (int i = 0; i < 10; i++) {
question1.add(i);
System.out.println("线程1加入"+i);
if(question1.size()==5){
lock.notify();
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
});
Thread thread2 =new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程2结束");
lock.notify();
}
}
});
thread2.start();
thread1.start();
}
}
/**
* 问题1: CountDownLatch
* 实现一个容器,提供两个方法,add,size
* 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
*/
public class Question1_2 {
private List list = new ArrayList();
public void add(Object o){
list.add(o);
}
public int size(){
return list.size();
}
public static void main(String[] args) {
Question1_2 question1_2= new Question1_2();
CountDownLatch lock1 = new CountDownLatch(1);
CountDownLatch lock2 = new CountDownLatch(1);
Thread thread1 =new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
question1_2.add(i);
System.out.println("线程1加入"+i);
if(question1_2.size()==5){
lock1.countDown();
try {
lock2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
Thread thread2 =new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程2结束");
lock2.countDown();
}
});
thread2.start();
thread1.start();
}
}
/**
* 问题1: LockSupport
* 实现一个容器,提供两个方法,add,size
* 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
*/
public class Question1_3 {
private List list = new ArrayList();
public void add(Object o){
list.add(o);
}
public int size(){
return list.size();
}
static Thread thread1,thread2 = null;
public static void main(String[] args) {
Question1_3 question1_3= new Question1_3();
thread1 =new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
question1_3.add(i);
System.out.println("线程1加入"+i);
if(question1_3.size()==5){
LockSupport.unpark(thread2);
LockSupport.park();
}
}
}
});
thread2 =new Thread(new Runnable() {
@Override
public void run() {
LockSupport.park();
System.out.println("线程2结束");
LockSupport.unpark(thread1);
}
});
thread2.start();
thread1.start();
}
}
问题2:
写一个固定容量同步容器,拥有put和get方法,以及getCount方法
能够支持2个生产者线程以及10个消费者线程的阻塞调用
/**
* 问题2: synchronized
* 写一个固定容量同步容器,拥有put和get方法,以及getCount方法
* 能够支持2个生产者线程以及10个消费者线程的阻塞调用
* 1. 看put方法。因为,当用if时,生产线程A进入判断条件,调用this.wait()让出,自己等待
* 2. 消费线程C取走一个数据,数量降为9,叫醒线程,生产线程B向里面加入值,达到10
* 3. 线程A继续向下走,数据已经超过规定的10个,出问题。
*/
public class Question2<T> {
private LinkedList<T> list = new LinkedList();
private int max= 10;
public Question2(){}
public Question2(int capcity){
max = capcity;
}
public synchronized void put(T t) throws InterruptedException {
while(list.size()==max){
System.out.println("生产者:"+Thread.currentThread().getName()+"阻塞,让出锁");
this.wait();
}
list.add(t);
System.out.println("生产者:"+Thread.currentThread().getName()+"向容器加入值"+t);
this.notifyAll();
}
public synchronized T get() throws InterruptedException {
while(list.size()==0){
System.out.println("消费者:"+Thread.currentThread().getName()+"阻塞,让出锁");
this.wait();
}
T t = list.removeFirst();
System.out.println("消费者:"+Thread.currentThread().getName()+"从容器获得值"+t);
this.notifyAll();
return t;
}
public static void main(String[] args) {
Question2<String> c = new Question2<>();
//启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true){
c.get();
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
//启动生产者线程
for (int i = 0; i < 2; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 25; j++) {
try {
c.put(String.valueOf(j));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
}
/**
* 问题2: ReentrantLock&Condition
* 写一个固定容量同步容器,拥有put和get方法,以及getCount方法
* 能够支持2个生产者线程以及10个消费者线程的阻塞调用
*
* 用synchronized,在生产者满数据的情况下,也会叫醒另外的一个生产者。存在极端情况,CPU一直被两个生产者占用,导致消费者消费不了
* 用ReentrantLock&Condition,那么生产者满数据,叫醒的只是消费者,消费者没数据时,只叫醒生产者
* Condition的本质是不同的等待队列。这个例子有两个等待队列,一个生产者等待队列,另一个是消费者等待队列
*/
public class Question2_1<T> {
private LinkedList<T> list = new LinkedList();
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
private int max= 10;
public Question2_1(){}
public Question2_1(int capcity){
max = capcity;
}
public void put(T t) throws InterruptedException {
try{
lock.lock();
while(list.size()==max){
System.out.println("生产者:"+Thread.currentThread().getName()+"阻塞,让出锁");
producer.await();
}
list.add(t);
System.out.println("生产者:"+Thread.currentThread().getName()+"向容器加入值"+t);
consumer.signalAll();
}finally {
lock.unlock();
}
}
public T get() throws InterruptedException {
try {
lock.lock();
while(list.size()==0){
System.out.println("消费者:"+Thread.currentThread().getName()+"阻塞,让出锁");
consumer.await();
}
T t = list.removeFirst();
System.out.println("消费者:"+Thread.currentThread().getName()+"从容器获得值"+t);
producer.signalAll();
return t;
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
Question2_1<String> c = new Question2_1<>();
//启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true){
c.get();
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
//启动生产者线程
for (int i = 0; i < 2; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 25; j++) {
try {
c.put(String.valueOf(j));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
}
5. 线程池最多能开线程数
2 ^ 29 -1
6. 多线程之间的通讯
-
共享内存
运行时数区
-
消息传递
通过queue,wait notify等等
7. as-if-serial&happens-before
-
as-if-serial语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同步的多线程程序的执行结果不被改变。
-
as-if-serial语义给编写单线程程序的程序员创造了一个幻境:单线程程序是按程序的顺序来执行的。happens-before关系给编写正确同步的多线程程序的程序员创造了一个幻境:正确同步的多线程程序是按happens-before指定的顺序来执行的。
-
as-if-serial语义和happens-before这么做的目的,都是为了在不改变程序执行结果的前提下,尽可能地提高程序执行的并行度