Android进阶之路Android开发Android技术知识

多线程详解

2020-05-27  本文已影响0人  A_si

进程和线程

进程和线程都是一个时间段的描述,是CPU工作时间段的描述,不过是颗粒大小不同。

来自知乎的图片
他们主要区别是:进程不共享内存,线程可以共享内存。
引用知乎地址

线程:

Thread和Runnable

Java中线程的创建有两种方式:
1.通过继承Thread类,重写Thread的run()方法,将线程运行的逻辑放在其中。

2.通过实现Runnable接口,实例化Thread类。

我们通常使用第二种,因为可以复用Runnable,更容易实现资源共享,能多个线程同时处理一个资源。

// 1
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("this is a Runnable");
    }
}
// 2
public class MyThread extends Thread {
    @Override
    public void run() {
        super.run();
        System.out.println("this is thread");
    }
}

// 具体使用
public class Main {
    public static void main(String[] args) {
        // 第一种
        Thread thread1 = new Thread(new MyRunnable());
        thread1.start();
        // 第二种
        MyThread thread2 = new MyThread();
        thread2.start();
    }
}

而实际Android开发工作中,以上两种都不用,我们通常使用Android提供的Handler和java.util包里的Executor。

Executor

Executor 是一个接口,execute执行Runnable。

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

看下使用:

      val executor: Executor = Executors.newCachedThreadPool()
        executor.execute { }

点进去newCachedThreadPool,发现返回的是一个ExecutorService。ExecutorService就是Executor的实现了。

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

ExecutorService

ExecutorService有两个方法:

void shutdown();是指不再添加任务,执行完已有任务后结束。
List<Runnable> shutdownNow();是立即调用线程的interrupt()结束所有的线程。

ThreadPoolExecutor

上面看到Executors里面new的是ThreadPoolExecutor,我们看下ThreadPoolExecutor的构造方法:

//五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

核心线程:在创建完线程池之后,核心线程先不创建,在接到任务之后创建核心线程。并且会一直存在于线程池中(即使这个线程啥都不干),有任务要执行时,如果核心线程没有被占用,会优先用核心线程执行任务。数量一般情况下设置为CPU核数的二倍即可。

线程总数=核心线程数+非核心线程数。

非核心线程:简单理解,即核心线程都被占用,但还有任务要做,就创建非核心线程。

这个参数可以理解为,任务少,但池中线程多,非核心线程不能白养着,超过这个时间不工作的就会被干掉,但是核心线程会保留。

TimeUnit是一个枚举类型,其包括:

NANOSECONDS:1微毫秒 = 1微秒 / 1000
MICROSECONDS:1微秒 = 1毫秒 / 1000
MILLISECONDS:1毫秒 = 1秒 /1000
SECONDS:秒
MINUTES:分
HOURS:小时
DAYS:天

默认情况下,任务进来之后先分配给核心线程执行,核心线程如果都被占用,并不会立刻开启非核心线程执行任务,而是将任务插入任务队列等待执行,核心线程会从任务队列取任务来执行,任务队列可以设置最大值,一旦插入的任务足够多,达到最大值,才会创建非核心线程执行任务。

常见的workQueue有四种:

  1. SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。

  2. LinkedBlockingQueue:这个队列接收到任务的时候,如果当前已经创建的核心线程数小于线程池的核心线程数上限,则新建线程(核心线程)处理任务;如果当前已经创建的核心线程数等于核心线程数上限,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize

  3. ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误,或是执行实现定义好的饱和策略。

  4. DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。

可以用线程工厂给每个创建出来的线程设置名字。一般情况下无须设置该参数。

这是当任务队列和线程池都满了时所采取的应对策略,默认是AbordPolicy。

AbordPolicy:表示无法处理新任务,并抛出 RejectedExecutionException 异常。此外还有3种策略,它们分别如下。

CallerRunsPolicy:用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

DiscardPolicy:不能执行的任务,并将该任务删除。

DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。

四种线程池

Executors类为我们提供的四种简单创建线程池的方法:

private val fix = Executors.newFixedThreadPool(4)
private val cache = Executors.newCachedThreadPool()
private val single = Executors.newSingleThreadExecutor()
private val scheduled = Executors.newScheduledThreadPool(4)

其实就是调用不同的ThreadPoolExecutor的构造方法。下面一个一个分析:

  1. FixedThreadPool

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    FixedThreadPool的corePoolSize和maximumPoolSize都设置为参数nThreads,也就是只有固定数量的核心线程,不存在非核心线程。keepAliveTime为0L表示多余的线程立刻终止,因为不会产生多余的线程,所以这个参数是无效的,也就是说线程不会被回收一直保存在线程池。FixedThreadPool的任务队列采用的是LinkedBlockingQueue。一般我们设置为cpu核心数+1。

    private val fix = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1)

    FixThreadPool其实就像一堆人排队上公厕一样,可以无数多人排队,但是厕所位置就那么多,而且没人上时,厕所闲置着也不会搬走。

  1. SingleThreadPool

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    我们可以看到总线程数和核心线程数都是1,所以就只有一个核心线程。该线程池才用链表阻塞队列LinkedBlockingQueue,先进先出原则,所以保证了任务的按顺序逐一进行。

    SingleThreadPool可以理解为公厕里只有一个坑位,先来先上。

  2. CachedThreadPool

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    CachedThreadPool的corePoolSize是0,maximumPoolSize是Int的最大值,也就是说CachedThreadPool没有核心线程,全部都是非核心线程,并且没有上限。keepAliveTime是60秒,就是说空闲线程等待新任务60秒,超时则销毁。此处用到的队列是阻塞队列SynchronousQueue,这个队列没有缓冲区,所以其中最多只能存在一个元素,有新的任务则阻塞等待。

    适用于频繁IO的操作,因为他们的任务量小,但是任务基数非常庞大,使用核心线程处理的话,数量创建方面就很成问题。

CachedThreadPool有点像去冲浪,因为海洋无限大,随时去都有位置冲浪,一个人冲完60秒内可以免费给下一个人玩。超过60秒冲浪板就被商家回收。

  1. ScheduledThreadPool
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

可以看出corePoolSize是传进来的固定值,maximumPoolSize无限大,因为采用的队列DelayedWorkQueue是无解的,所以maximumPoolSize参数无效。如果运行的线程达到了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务会放在队列的前面。在跟此前介绍的线程池不同的是,当执行完任务后,会将ScheduledFutureTask中的time变量改为下次要执行的时间并放回到DelayedWorkQueue中。

ScheduledThreadPool主要用于执行定时任务以及有固定周期的重复任务。

Callable

Callable是java1.5添加进来的一个增强版本。类似于Runnable,却又有差异:

  1. Runnable是自从java1.1就有了,而Callable是1.5之后才加上去的。
  2. Callable规定的方法是call(),Runnable规定的方法是run()。
  3. Callable的任务执行后可返回值,而Runnable的任务是不能返回值(是void)。
  4. call方法可以抛出异常,run方法不可以。
  5. 运行Callable任务可以拿到一个Future对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。通过Future对象可以了解任务执行情况,可取消任务的执行,还可获取执行结果。
  6. 加入线程池运行,Runnable使用ExecutorService的execute方法,Callable使用submit方法。

下面看下使用:

    val executor: ExecutorService = Executors.newSingleThreadExecutor()
    val future: Future<String> = executor.submit(MyCallable())
    try {
        val string: String = future.get()
    } catch (e: ExecutionException) {

    }
    executor.shutdown()
    class MyCallable() : Callable<String> {
        override fun call(): String {
            return "done"
        }
    }

线程安全

JMM

因为硬件架构,会导致一些问题,特别在多线程的时候更为突出:

线程间通信必须要经过主内存。

如下,如果线程A与线程B之间要通信的话,必须要经历下面2个步骤:


1)线程A把本地内存A中更新过的共享变量刷新到主内存中去。

2)线程B到主内存中去读取线程A之前已更新过的共享变量。

当对象和变量被存放在计算机中各种不同的内存区域中时,就可能会出现一些具体的问题。Java内存模型建立所围绕的问题:在多线程并发过程中,如何处理多线程读同步问题与可见性(多线程缓存与指令重排序)、多线程写同步问题与原子性。

Java内存模型(即Java Memory Model,简称JMM)本身是一种抽象的概念,并不真实存在,它描述的是一组规则或规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。

  1. 多线程读同步与可见性
    线程对共享变量修改的可见性。当一个线程修改了共享变量的值,其他线程能够立刻得知这个修改。

  2. 原子性
    指一个操作是按原子的方式执行的。要么该操作不被执行;要么以原子方式执行,即执行过程中不会被其它线程中断。

  3. 有序性
    有序性是指对于单线程的执行代码,我们总是认为代码的执行是按顺序依次执行的,这样的理解并没有毛病,毕竟对于单线程而言确实如此,但对于多线程环境,则可能出现乱序现象,因为程序编译成机器码指令后可能会出现指令重排现象,重排后的指令与原指令的顺序未必一致,要明白的是,在Java程序中,倘若在本线程内,所有操作都视为有序行为,如果是多线程环境下,一个线程中观察另外一个线程,所有操作都是无序的,前半句指的是单线程内保证串行语义执行的一致性,后半句则指指令重排现象和工作内存与主内存同步延迟现象。

volatile

volatile关键字有如下两个作用

  1. 保证被volatile修饰的共享变量对所有线程总数可见的,也就是当一个线程修改了一个被volatile修饰共享变量的值,新值总数可以被其他线程立即得知。
  2. 禁止指令重排序优化。
//线程1
boolean stop = false;
while(!stop){
    doSomething();
}
 
//线程2
stop = true;

如果线程2改变了stop的值,线程1一定会停止吗?不一定。当线程2更改了stop变量的值之后,但是还没来得及写入主存当中,线程2转去做其他事情了,那么线程1由于不知道线程2对stop变量的更改,因此还会一直循环下去。

但是用volatile修饰之后就变得不一样了:

//线程1
volatile boolean stop = false;
while(!stop){
    doSomething();
}
 
//线程2
stop = true;

第一:使用volatile关键字会强制将修改的值立即写入主存;

第二:使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量stop的缓存行无效(反映到硬件层的话,就是CPU的L1或者L2缓存中对应的缓存行无效);

第三:由于线程1的工作内存中缓存变量stop的缓存行无效,所以线程1再次读取变量stop的值时会去主存读取。

那么在线程2修改stop值时(当然这里包括2个操作,修改线程2工作内存中的值,然后将修改后的值写入内存),会使得线程1的工作内存中缓存变量stop的缓存行无效,然后线程1读取时,发现自己的缓存行无效,它会等待缓存行对应的主存地址被更新之后,然后去对应的主存读取最新的值。

那么线程1读取到的就是最新的正确的值

这也就是内存模型JMM的内存可见性。

   private volatile int inc = 0;

    void count() {
        inc++;
    }

    void add() {
        new Thread() {
            @Override
            public void run() {
                for (int j = 0; j < 100_00_00; j++) {
                    count();
                }
                System.out.println(inc);
            }
        }.start();
        new Thread() {
            @Override
            public void run() {
                for (int j = 0; j < 100_00_00; j++) {
                    count();
                }
                System.out.println(inc);
            }
        }.start();

    }

看这段代码,2个线程分别加一百万次。结果会打印出两百万次吗?不会的。可能有的人就会有疑问,不对啊,上面是对变量inc进行自增操作,由于volatile保证了可见性,那么在每个线程中对inc自增完之后,在其他线程中都能看到修改后的值啊,所以有两个线程分别进行了一百万次操作,那么最终inc的值应该是两百万啊。

这里面就有一个误区了,volatile关键字能保证可见性没有错,但是上面的程序错在没能保证原子性。可见性只能保证每次读取的是最新的值,但是volatile没办法保证对变量的操作的原子性。

inc++; 其实是两个步骤,先加加,然后再赋值。不是原子性操作,所以volatile不能保证线程安全。

synchronized

synchronized是Java中的关键字,是利用锁的机制来实现同步的。Synchronized的作用主要有三个:

  1. 原子性:确保线程互斥的访问同步代码;
  2. 可见性:保证共享变量的修改能够及时可见,其实是通过Java内存模型中的 “对一个变量unlock操作之前,必须要同步到主内存中;如果对一个变量进行lock操作,则将会清空工作内存中此变量的值,在执行引擎使用此变量前,需要重新从主内存中load操作或assign操作初始化变量值” 来保证的;
  3. 有序性:有效解决重排序问题,即 “一个unlock操作先行发生(happen-before)于后面对同一个锁的lock操作”;

synchronized 可以修饰方法和代码块,进入synchronized修饰的方法或者代码块的线程,就会获取monitor对象,monitor也就是Java里的对象锁。

下面看下经典的卖票案例:

class Ticket implements Runnable {
    /* 五百张票 */
    private int tickets = 500;

    @Override
    public void run() {

        while (true) {
            //同步锁
            synchronized (this) {
                if (tickets > 0) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d张票!\n", Thread.currentThread().getName(), tickets--);
                } else {
                    System.out.printf("%s窗口已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }
}
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        Thread thread1= new Thread(ticket);
        Thread thread2 = new Thread(ticket);
        Thread thread3 = new Thread(ticket);
        thread1.start();
        thread2.start();
        thread3.start();
    }

3个线程卖500张票。利用synchronized实现线程安全,下面修改下实现:

class Ticket  {
    /* 五百张票 */
    private int tickets = 500;

    public void sellTckets() {
        while (true) {
            //同步锁
            synchronized (this) {
                if (tickets > 0) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d张票!\n", Thread.currentThread().getName(), tickets--);
                } else {
                    System.out.printf("%s窗口已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }
}
public static void main(String[] args) {
        final Ticket ticket = new Ticket();
        Thread thread1= new Thread(){
            @Override
            public void run() {
                ticket.sellTckets();
            }
        };
        Thread thread2 = new Thread(){
            @Override
            public void run() {
                ticket.sellTckets();
            }
        };
        Thread thread3 = new Thread(){
            @Override
            public void run() {
                ticket.sellTckets();
            }
        };
        thread1.start();
        thread2.start();
        thread3.start();
    }

一样的线程安全,多线程卖票,但是现在我不仅要卖票,还要订餐,卖票和订餐是两个互不干涉的操作,但是因为 synchronized (this)拿到的是同一个对象锁,所以如果线程1在卖票,那么线程2就不能拿到对象锁去订餐:

class Ticket  {
    /* 二百张票 */
    private int tickets = 200;
    /* 二百份盒饭 */
    private int foods = 200;

    public void sell​​Tckets() {
        while (true) {
            //同步锁
            synchronized (this) {
                if (tickets > 0) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d张票!\n", Thread.currentThread().getName(), tickets--);
                } else {
                    System.out.printf("%s窗口车票已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }

    public void sellFoods() {
        while (true) {
            //同步锁
            synchronized (this) {
                if (foods > 0) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d份盒饭!\n", Thread.currentThread().getName(), foods--);
                } else {
                    System.out.printf("%s窗口盒饭已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }

那么怎么能多线程订票的同时,别的线程也可以订餐呢?用不同的对象即可:

class Ticket {
    private int tickets = 200;
 
    private int foods = 200;
    Object object1 = new Object();
    Object object2 = new Object();

    public void sellTickets() {
        while (true) {
            //同步锁
            synchronized (object1) {
                if (tickets > 0) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d张票!\n", Thread.currentThread().getName(), tickets--);
                } else {
                    System.out.printf("%s窗口车票已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }

    public void sellFoods() {
        while (true) {
            //同步锁
            synchronized (object2) {
                if (foods > 0) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("%s窗口正在卖出第%d份盒饭!\n", Thread.currentThread().getName(), foods--);
                } else {
                    System.out.printf("%s窗口盒饭已售罄\n", Thread.currentThread().getName());
                    System.exit(0);
                }
            }
        }
    }
}

这就像你家里2个卧室,门锁是一样的锁所以都用同一把钥匙。老王拿着钥匙进入主卧反锁了门睡觉,你想去次卧睡,但是钥匙被老王拿进主卧了。你去不了次卧。只能等他出来把钥匙给你。怎么能你俩都去睡觉呢?那就配两把钥匙。老王拿着主卧的钥匙去了主卧,你拿着次卧的钥匙去次卧睡。

上一篇下一篇

猜你喜欢

热点阅读