多线程-基础

2020-12-04  本文已影响0人  麦大大吃不胖

by shihang.mai

1. 创建线程&停止线程

1.1 创建线程

  1. extends Thread->new Class().start();
  2. implements Runnable->new Thread(new Class()).start();
  3. Lambda表达式
  4. 线程池new ThreadPoolExecutor()
public class Testmsh {

    public static void main(String[] args) {

        //1. 继承Thread
        new MyThread().start();
        //2. 实现Runnable接口
        new Thread(new MyRun()).start();
        //3. lambda表达式
        new Thread(()->{
            System.out.println("i am lambda");
        }).start();
        //4. 线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolExecutor.submit(new MyRun());
    }

    static class MyRun implements Runnable {
        @Override
        public void run() {
            System.out.println("Hello MyRun!");
        }
    }

    static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("Hello MyThread!");
        }
    }
}

1.2 停止线程

  1. interrupt
  2. 标志位

2. 线程状态

线程状态迁移
  1. new Thread()进入NEW状态
  2. start()进入RUNNABLE状态,而RUNNABLE状态包括READY和RUNNING状态,它们之间通过cpu调度切换,或者RUNNING状态的线程主动掉用yield()进入READY状态
  3. 线程执行完毕就进入TEMINATED销毁状态
  4. RUNNABLE状态可通过

3. 多线程的某写类用法

3.1 ReentrantLock

基本和sync一样,写sync的地方直接用lock即可.注意必须手动unlock释放锁。

public class T02_ReentrantLock2 {
    Lock lock = new ReentrantLock();

    void m1() {
        try {
            lock.lock(); //synchronized(this)
            for (int i = 0; i < 10; i++) {
                TimeUnit.SECONDS.sleep(1);

                System.out.println(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    void m2() {
        try {
            lock.lock();
            System.out.println("m2 ...");
        } finally {
            lock.unlock();
        }

    }

    public static void main(String[] args) {
        T02_ReentrantLock2 rl = new T02_ReentrantLock2();
        new Thread(rl::m1).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(rl::m2).start();
    }
}

特点:

  1. 有尝试锁操作

    lock.tryLock(5, TimeUnit.SECONDS);
    
  2. 可被打断的锁

    Thread t2 = new Thread(()->{
             try {
                 //lock.lock();
                 lock.lockInterruptibly(); //可以对interrupt()方法做出响应
                 System.out.println("t2 start");
                 TimeUnit.SECONDS.sleep(5);
                 System.out.println("t2 end");
             } catch (InterruptedException e) {
                 System.out.println("interrupted!");
             } finally {
                 lock.unlock();
             }
         });
         t2.start();
         
         try {
             TimeUnit.SECONDS.sleep(1);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         t2.interrupt(); 
    
  3. 可设置公平锁

ReentrantLock lock=new ReentrantLock(true);

公平锁:去麦当劳买早餐

非公平锁:去麦当劳买早餐

3.2 CountDownLatch

倒数器

CountDownLatch latch = new CountDownLatch(100);
//减1
latch.countDown();
//阻塞,等待减到为0,代码继续向下走
latch.await();
...............

3.3 CyclicBarrier

栅栏

CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));
barrier.await();

3.4 Phaser

栅栏组

public class T09_TestPhaser2 {
    static Random r = new Random();
    static MarriagePhaser phaser = new MarriagePhaser();


    static void milliSleep(int milli) {
        try {
            TimeUnit.MILLISECONDS.sleep(milli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        phaser.bulkRegister(7);

        for(int i=0; i<5; i++) {

            new Thread(new Person("p" + i)).start();
        }

        new Thread(new Person("新郎")).start();
        new Thread(new Person("新娘")).start();

    }



    static class MarriagePhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {

            switch (phase) {
                case 0:
                    System.out.println("所有人到齐了!" + registeredParties);
                    System.out.println();
                    return false;
                case 1:
                    System.out.println("所有人吃完了!" + registeredParties);
                    System.out.println();
                    return false;
                case 2:
                    System.out.println("所有人离开了!" + registeredParties);
                    System.out.println();
                    return false;
                case 3:
                    System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
                    return true;
                default:
                    return true;
            }
        }
    }


    static class Person implements Runnable {
        String name;

        public Person(String name) {
            this.name = name;
        }

        public void arrive() {

            milliSleep(r.nextInt(1000));
            System.out.printf("%s 到达现场!\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        public void eat() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 吃完!\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        public void leave() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 离开!\n", name);


            phaser.arriveAndAwaitAdvance();
        }

        private void hug() {
            if(name.equals("新郎") || name.equals("新娘")) {
                milliSleep(r.nextInt(1000));
                System.out.printf("%s 洞房!\n", name);
                phaser.arriveAndAwaitAdvance();
            } else {
                phaser.arriveAndDeregister();
                //phaser.register()
            }
        }

        @Override
        public void run() {
            arrive();


            eat();


            leave();


            hug();

        }
    }
}

3.5 ReadWriteLock

readLock-共享锁:当是读锁时,其他读线程可以读

writeLock-互斥锁,排他锁:当是写锁时,其他的写和读都不能进行

读必须加锁,如果读不加锁的话,那么在读期间就可能进行写,那么读就会发生"脏读"

public class T10_TestReadWriteLock {
    static Lock lock = new ReentrantLock();
    private static int value;

    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static Lock readLock = readWriteLock.readLock();
    static Lock writeLock = readWriteLock.writeLock();

    public static void read(Lock lock) {
        try {
            lock.lock();
            Thread.sleep(1000);
            System.out.println("read over!");
            //模拟读取操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void write(Lock lock, int v) {
        try {
            lock.lock();
            Thread.sleep(1000);
            value = v;
            System.out.println("write over!");
            //模拟写操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }





    public static void main(String[] args) {
        //Runnable readR = ()-> read(lock);
        Runnable readR = ()-> read(readLock);

        //Runnable writeR = ()->write(lock, new Random().nextInt());
        Runnable writeR = ()->write(writeLock, new Random().nextInt());

        for(int i=0; i<18; i++) new Thread(readR).start();
        for(int i=0; i<2; i++) new Thread(writeR).start();


    }

3.6 Semaphore

信号量,限流用

public class T11_TestSemaphore {
    public static void main(String[] args) {
        //Semaphore s = new Semaphore(2);
        Semaphore s = new Semaphore(2, true);
        //允许一个线程同时执行
        //Semaphore s = new Semaphore(1);

        new Thread(()->{
            try {
                s.acquire();

                System.out.println("T1 running...");
                Thread.sleep(200);
                System.out.println("T1 running...");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }).start();

        new Thread(()->{
            try {
                s.acquire();

                System.out.println("T2 running...");
                Thread.sleep(200);
                System.out.println("T2 running...");

                s.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

3.7 Exchanger

交换器,线程间交换值

public class T12_TestExchanger {

    static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
        new Thread(()->{
            String s = "T1";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t1").start();


        new Thread(()->{
            String s = "T2";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t2").start();


    }
}

3.8 LockSupport

//停止当前线程
LockSupport.park()
//放行线程
LockSupport.unpark(线程对象) 

4. 多线程使用

问题1:
实现一个容器,提供两个方法,add,size
写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束

//有问题的解法
public class T03_NotifyHoldingLock { //wait notify

    //添加volatile,使t2能够得到通知
  //-----volatile一般别修饰引用类型,因为修改引用里面的值,并不可见------
    volatile List lists = new ArrayList();

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }
    
    public static void main(String[] args) {
        T03_NotifyHoldingLock c = new T03_NotifyHoldingLock();
        
        final Object lock = new Object();
        
        new Thread(() -> {
            synchronized(lock) {
                System.out.println("t2启动");
                if(c.size() != 5) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t2 结束");
            }
            
        }, "t2").start();
        
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1启动");
            synchronized(lock) {
                for(int i=0; i<10; i++) {
                    c.add(new Object());
                    System.out.println("add " + i);
                    
                    if(c.size() == 5) {
                        lock.notify();
                    }
                    
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "t1").start();
        
        
    }
}
/*t2启动
t1启动
add 0
add 1
add 2
add 3
add 4
add 5
add 6
add 7
add 8
add 9
t2 结束
*/

上面代码不行,因为notify()并不会释放所,而lock.wait()后必须重新获取锁才能继续执行

/**
 * 问题1: wait()& notify()
 * 实现一个容器,提供两个方法,add,size
 * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
 */
public class Question1 {

    private List list = new ArrayList();

    public void add(Object o){
        list.add(o);
    }

    public int size(){
        return list.size();
    }

    public static void main(String[] args) {
        Question1 question1= new Question1();

        final Object lock =new Object();

        Thread thread1 =new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock){
                    for (int i = 0; i < 10; i++) {
                        question1.add(i);
                        System.out.println("线程1加入"+i);
                        if(question1.size()==5){
                            lock.notify();
                            try {
                                lock.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        });



        Thread thread2 =new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock){
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程2结束");
                    lock.notify();
                }
            }
        });

        thread2.start();
        thread1.start();

    }

}
/**
 * 问题1: CountDownLatch
 * 实现一个容器,提供两个方法,add,size
 * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
 */
public class Question1_2 {

    private List list = new ArrayList();

    public void add(Object o){
        list.add(o);
    }

    public int size(){
        return list.size();
    }

    public static void main(String[] args) {
        Question1_2 question1_2= new Question1_2();
        CountDownLatch lock1 = new CountDownLatch(1);
        CountDownLatch lock2 = new CountDownLatch(1);

        Thread thread1 =new Thread(new Runnable() {
            @Override
            public void run() {
                    for (int i = 0; i < 10; i++) {
                        question1_2.add(i);
                        System.out.println("线程1加入"+i);
                        if(question1_2.size()==5){
                            lock1.countDown();
                            try {
                                lock2.await();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
        });



        Thread thread2 =new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("线程2结束");
                lock2.countDown();
            }
        });

        thread2.start();
        thread1.start();

    }



}
/**
 * 问题1: LockSupport
 * 实现一个容器,提供两个方法,add,size
 * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
 */
public class Question1_3 {

    private List list = new ArrayList();

    public void add(Object o){
        list.add(o);
    }

    public int size(){
        return list.size();
    }

    static Thread thread1,thread2 = null;


    public static void main(String[] args) {
        Question1_3 question1_3= new Question1_3();

        thread1 =new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    question1_3.add(i);
                    System.out.println("线程1加入"+i);
                    if(question1_3.size()==5){
                        LockSupport.unpark(thread2);
                        LockSupport.park();
                    }
                }
            }
        });



        thread2 =new Thread(new Runnable() {
            @Override
            public void run() {
                LockSupport.park();
                System.out.println("线程2结束");
                LockSupport.unpark(thread1);
            }
        });

        thread2.start();
        thread1.start();



    }
}

问题2:
写一个固定容量同步容器,拥有put和get方法,以及getCount方法
能够支持2个生产者线程以及10个消费者线程的阻塞调用

/**
 * 问题2: synchronized
 * 写一个固定容量同步容器,拥有put和get方法,以及getCount方法
 * 能够支持2个生产者线程以及10个消费者线程的阻塞调用
 * 1. 看put方法。因为,当用if时,生产线程A进入判断条件,调用this.wait()让出,自己等待
 * 2. 消费线程C取走一个数据,数量降为9,叫醒线程,生产线程B向里面加入值,达到10
 * 3. 线程A继续向下走,数据已经超过规定的10个,出问题。
 */
public class Question2<T> {
    private LinkedList<T> list = new LinkedList();

    private int max= 10;

    public Question2(){}

    public Question2(int capcity){
        max = capcity;
    }


    public synchronized void put(T t) throws InterruptedException {
        while(list.size()==max){
            System.out.println("生产者:"+Thread.currentThread().getName()+"阻塞,让出锁");
            this.wait();
        }
        list.add(t);
        System.out.println("生产者:"+Thread.currentThread().getName()+"向容器加入值"+t);
        this.notifyAll();
    }

    public synchronized T get() throws InterruptedException {
        while(list.size()==0){
            System.out.println("消费者:"+Thread.currentThread().getName()+"阻塞,让出锁");
            this.wait();
        }
        T t = list.removeFirst();
        System.out.println("消费者:"+Thread.currentThread().getName()+"从容器获得值"+t);
        this.notifyAll();
        return t;
    }

    public static void main(String[] args) {
        Question2<String> c = new Question2<>();
        //启动消费者线程
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {

                    try {
                        while (true){
                            c.get();
                            TimeUnit.SECONDS.sleep(1);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            }).start();
        }

        //启动生产者线程
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 25; j++) {
                        try {
                            c.put(String.valueOf(j));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }


}
/**
 * 问题2: ReentrantLock&Condition
 * 写一个固定容量同步容器,拥有put和get方法,以及getCount方法
 * 能够支持2个生产者线程以及10个消费者线程的阻塞调用
 *
 * 用synchronized,在生产者满数据的情况下,也会叫醒另外的一个生产者。存在极端情况,CPU一直被两个生产者占用,导致消费者消费不了
 * 用ReentrantLock&Condition,那么生产者满数据,叫醒的只是消费者,消费者没数据时,只叫醒生产者
 * Condition的本质是不同的等待队列。这个例子有两个等待队列,一个生产者等待队列,另一个是消费者等待队列
 */
public class Question2_1<T> {

    private LinkedList<T> list = new LinkedList();

    private Lock lock = new ReentrantLock();

    private Condition producer = lock.newCondition();

    private Condition consumer = lock.newCondition();

    private int max= 10;

    public Question2_1(){}

    public Question2_1(int capcity){
        max = capcity;
    }

    public void put(T t) throws InterruptedException {
        try{
            lock.lock();
            while(list.size()==max){
                System.out.println("生产者:"+Thread.currentThread().getName()+"阻塞,让出锁");
                producer.await();
            }
            list.add(t);
            System.out.println("生产者:"+Thread.currentThread().getName()+"向容器加入值"+t);
            consumer.signalAll();
        }finally {
            lock.unlock();
        }

    }

    public T get() throws InterruptedException {
        try {
            lock.lock();
            while(list.size()==0){
                System.out.println("消费者:"+Thread.currentThread().getName()+"阻塞,让出锁");
                consumer.await();
            }
            T t = list.removeFirst();
            System.out.println("消费者:"+Thread.currentThread().getName()+"从容器获得值"+t);
            producer.signalAll();
            return t;
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Question2_1<String> c = new Question2_1<>();
        //启动消费者线程
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {

                    try {
                        while (true){
                            c.get();
                            TimeUnit.SECONDS.sleep(1);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            }).start();
        }

        //启动生产者线程
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 25; j++) {
                        try {
                            c.put(String.valueOf(j));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }

}

5. 线程池最多能开线程数

2 ^ 29 -1

6. 多线程之间的通讯

  1. 共享内存

    运行时数区

  2. 消息传递

    通过queue,wait notify等等

7. as-if-serial&happens-before

  1. as-if-serial语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同步的多线程程序的执行结果不被改变。

  2. as-if-serial语义给编写单线程程序的程序员创造了一个幻境:单线程程序是按程序的顺序来执行的。happens-before关系给编写正确同步的多线程程序的程序员创造了一个幻境:正确同步的多线程程序是按happens-before指定的顺序来执行的。

  3. as-if-serial语义和happens-before这么做的目的,都是为了在不改变程序执行结果的前提下,尽可能地提高程序执行的并行度

上一篇下一篇

猜你喜欢

热点阅读