后端应用技术Android知识Java学习笔记

Java 并发入门

2017-07-15  本文已影响863人  SeanMa

一、并发

进程:每个进程都拥有自己的一套变量

线程:线程之间共享数据

1.线程

Java中为多线程任务提供了很多的类。包括最基础的Thread类、Runnable等接口,用于线程同步的锁、阻塞队列、同步器,使用线程池的执行器、执行框架,还有可以在多线程中使用的线程安全集合等。

(1)使用多线程给其他任务提供机会

创建线程:

public interface Runnable{
    void run();
}
Thread t = new Thread(r);
t.start();

注意:不要调用Thread或Runnable对象中的run方法,只会执行同一个线程中的任务,不会启动新的线程,应该使用Thread的start方法。


二、中断线程

没有终止线程的方法,只能通过interrupt方法来请求中断。

Thread或Runnable对象的run()方法包装了新线程中执行的代码,在run()方法中遇到下面的情况,线程会终止。

强制结束:

//栗子

public class InterruptTest {  
    public static void main(String[] args) throws InterruptedException {  
        MyThread t = new MyThread("MyThread");  
        t.start();  
        Thread.sleep(100);// 睡眠100毫秒  
        t.interrupt();// 中断t线程  
    }  
}  
class MyThread extends Thread {  
    int i = 0;  
    public MyThread(String name) {  
        super(name);  
    }  
    public void run() {  
        while(!isInterrupted()) {// 当前线程没有被中断,则执行  
            System.out.println(getName() + getId() + "执行了" + ++i + "次");  
        }  
    }  
}  

void interrupt()方法和InterruptedException特别说明

Thread的static boolean interrupted():

Thread的boolean isInterrupted():

可见如果不设置中断,InterruptedException肯定不会出现,而只要抛出InterruptedException,设置的中断状态肯定已经被清理了,这种情况只有InterruptedException这个异常是我们知道有中断请求的唯一标识了,因此我们要向外层通知有中断发生,千万不要再把这个异常压制住,否则怎么调用interrupt()方法请求中断都不会有作用,线程中外层的代码压根不知道有中断这回事,照常运行。将这个中断请求通知给外层有两种方式:

public class Erupt {
    static class MyInterruptableExceptionTask implements Runnable
    {
        private int begin=0;
        public MyInterruptableExceptionTask(int s){begin=s;}
        @Override
        public void run() {
            try {
                int end=begin+10;
                for(int i=begin; i<end; i++){
                    System.out.println("sub: "+i);
                    Thread.sleep(1000);    //如果设置中断时正在sleep,或设置完中断后一个循环里遇到sleep,都会抛出InterruptedException异常,不需要再手动检测中断状态了
                }
            } catch (InterruptedException e) {
                System.out.println("the call Thread.sleep(n) is interrupted by InterruptedExcetpion");
                Thread.currentThread().interrupt();    //产生InterruptedException异常时中断状态被清除,所以要重新设置中断或将中断异常向外抛出供后续代码检测是否发生了中断
            }

            if(Thread.currentThread().isInterrupted())
                System.out.println("sub thread is interrupted");
            else
                System.out.println("sub natural stop");
        }
    }

    public static void main(String[] args) {
        Thread t=new Thread(new MyInterruptableExceptionTask(111));
        t.start();

        for(int i=0; i<10; i++){
            System.out.println("main: "+i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if(i==5)
                t.interrupt();
        }
    }


}

三、线程状态

线程有6中状态:

image

BlockingQueue 是个接口,你需要使用它的实现之一来使用BlockingQueue,Java.util.concurrent包下具有以下 BlockingQueue 接口的实现类:

demo:

package blocking_queue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author Sean
 * @version 1.0
 * @date 创建时间:2017/7/15 14:29
 * @parameter
 * @return
 */
public class BlockingQueueTest {
    public static class Producer implements Runnable{

        private final BlockingQueue<Integer> blockingQueue;
        private volatile boolean flag;
        private Random random;


        public Producer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
            flag = false;
            random = new Random();
        }


        @Override
        public void run() {
            while (!flag){
                int info = random.nextInt(100);

                try {
                    blockingQueue.put(info);
                    System.out.println(Thread.currentThread().getName()+" procuce "+info);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void shutDown(){
            flag = true;
        }
    }

    public static class Consumer implements Runnable{
        private final BlockingQueue<Integer> blockingQueue;
        private volatile boolean flag;
        public Consumer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
        public void run() {
            while(!flag){
                int info;
                try {
                    info = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()+" consumer "+info);
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
        public void shutDown(){
            flag=true;
        }
    }

    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
        Producer producer=new Producer(blockingQueue);
        Consumer consumer=new Consumer(blockingQueue);
        //创建5个生产者,5个消费者
        for(int i=0;i<10;i++){
            if(i<5){
                new Thread(producer,"producer"+i).start();
            }else{
                new Thread(consumer,"consumer"+(i-5)).start();
            }
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        producer.shutDown();
        consumer.shutDown();

    }
}

java.util.concurrent包提供了以下几种阻塞队列:

package blockingQueue;

import java.util.concurrent.ArrayBlockingQueue;

public class BlockingQueueTest {
    private int size = 20;
    private ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(size);

    public static void main(String[] args)  {
        BlockingQueueTest test = new BlockingQueueTest();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();

        producer.start();
        consumer.start();
    }

    class Consumer extends Thread{
        @Override
        public void run() {
            while(true){
                try {
                    //从阻塞队列中取出一个元素
                    blockingQueue.take();
                    System.out.println("队列剩余" + blockingQueue.size() + "个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Producer extends Thread{
        @Override
        public void run() {
            while (true) {
                try {
                    //向阻塞队列中插入一个元素
                    blockingQueue.put(1);
                    System.out.println("队列剩余空间:" + (size - blockingQueue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

在以上代码中,我们有一个生产者线程不断地向一个阻塞队列中插入元素,同时消费者线程从这个队列中取出元素。若生产者生产的比较快,消费者取的比较慢导致队列满,此时生产者再尝试插入时就会阻塞在put方法中,直到消费者取出一个元素;反过来,若消费者消费的比较快,生产者生产的比较慢导致队列空,此时消费者尝试从中取出时就会阻塞在take方法中,直到生产者插入一个元素。


七、Callable与Future

Callable和Future,它俩很有意思的,一个产生结果,一个拿到结果。

我们之前提到了创建线程的两种方式,它们有一个共同的缺点,那就是异步方法run没有返回值,也就是说我们无法直接获取它的执行结果,只能通过共享变量或者线程间通信等方式来获取。好消息是通过使用Callable和Future,我们可以方便的获得线程的执行结果。
Callable接口与Runnable接口类似,区别在于它定义的异步方法call有返回值。Callable接口的定义如下:

public interface Callable<V> {
    V call() throws Exception;
}

类型参数V即为异步方法call的返回值类型。

来看个简单栗子:

public class CallableAndFuture {
    public static void main(String[] args) {
        Callable<Integer> callable = new Callable<Integer>() {
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        };
        FutureTask<Integer> future = new FutureTask<Integer>(callable);
        new Thread(future).start();
        try {
            Thread.sleep(5000);// 可能做一些事情
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,那么这个组合的使用有什么好处呢?假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future得到,岂不美哉!

Future可以对具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成以及获取结果。可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。Future接口的定义如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中声明了5个方法,每个方法的作用如下:

public class FutureTask<V> implements RunnableFuture<V>

FutureTask类实现了RunnableFuture接口,这个接口的定义如下:

public interface RunnableFuture<V> implements Runnable, Future<V> {
    void run();
}

可以看到RunnableFuture接口扩展了Runnable接口和Future接口。

FutureTask类有如下两个构造器:

public FutureTask(Callable<V> callable) 
public FutureTask(Runnable runnable, V result) 


FutureTask通常与线程池配合使用,通常会创建一个包装了Callable对象的FutureTask实例,并用submit方法将它提交到一个线程池去执行,我们可以通过FutureTask的get方法获取返回结果。

public class CallableAndFuture {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        Future<Integer> future = threadPool.submit(new Callable<Integer>() {
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });
        try {
            Thread.sleep(5000);// 可能做一些事情
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

九、执行器

创建一个新线程涉及和操作系统的交互,因此会产生一定的开销。在有些应用场景下,我们会在程序中创建大量生命周期很短的线程,这时我们应该使用线程池(thread pool)。通常,一个线程池中包含一些准备运行的空闲线程,每次将Runnable对象交给线程池,就会有一个线程执行run方法。当run方法执行完毕时,线程不会进入Terminated
状态,而是在线程池中准备等下一个Runnable到来时提供服务。使用线程池统一管理线程可以减少并发线程的数目,线程数过多往往会在线程上下文切换上以及同步操作上浪费过多时间。

执行器类(java.util.concurrent.Executors)提供了许多静态工厂方法来构建线程池。

1.线程池

在Java中,线程池通常指一个ThreadPoolExecutor对象,ThreadPoolExecutor类继承了AbstractExecutorService类,而AbstractExecutorService抽象类实现了ExecutorService接口,ExecutorService接口又扩展了Executor接口。也就是说,Executor接口是Java中实现线程池的最基本接口。我们在使用线程池时通常不直接调用ThreadPoolExecutor类的构造方法,二回使用Executors类提供给我们的静态工厂方法,这些静态工厂方法内部会调用ThreadPoolExecutor的构造方法,并为我们准备好相应的构造参数。

Executor是类中的以下三个方法会返回一个实现了ExecutorService接口的ThreadPoolExecutor类的对象:


newCachedThreadPool() //返回一个带缓存的线程池,该池在必要的时候创建线程,在线程空闲60s后终止线程
newFixedThreadPool(int threads) //返回一个线程池,线程数目由threads参数指明
newSingleThreadExecutor() //返回只含一个线程的线程池,它在一个单一的线程中依次执行各个任务
newScheduledThreadPool()//包含预定执行而构建的线程池

以下方法可将一个Runnable对象或Callable对象提交给线程池:

Future<T> submit(Callable<T> task)
Future<T> submit(Runnable task, T result)
Future<?> submit(Runnable task)

调用submit方法会返回一个Future对象,可通过这个对象查询该任务的状态。我们可以在这个Future对象上调用isDone、cancle、isCanceled等方法(Future接口会在下面进行介绍)。第一个submit方法提交一个Callable对象到线程池中;第二个方法提交一个Runnable对象,并且Future的get方法在完成的时候返回指定的result对象。

当我们使用完线程池时,就调用shutdown方法,该方法会启动该线程池的关闭例程。被关闭的线程池不能再接受新的任务,当关闭前已存在的任务执行完毕后,线程池死亡。shutdownNow方法可以取消线程池中尚未开始的任务并尝试中断所有线程池中正在运行的线程。

在使用线程池时,我们通常应该按照以下步骤来进行:

package test;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
public class ThreadPoolExecutorTest {  
 public static void main(String[] args) {  
  ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);  
  for (int i = 0; i < 10; i++) {  
   final int index = i;  
   fixedThreadPool.execute(new Runnable() {  
    public void run() {  
     try {  
      System.out.println(index);  
      Thread.sleep(2000);  
     } catch (InterruptedException e) {  
      e.printStackTrace();  
     }  
    }  
   });  
  }  
 }  
} 

2.预定执行

ScheduledExecutorService接口含有为预定执行(Scheduled Execution)或重复执行的任务专门设计的方法。Executors类的newScheduledThreadPool和newSingleThreadScheduledExecutor方法会返回实现了ScheduledExecutorService接口的对象。可以使用以下方法来预定执行的任务:

ScheduledFuture<V> schedule(Callable<V> task, long time, TimeUnit unit)
ScheduledFuture<?> schedule(Runnable task, long time, TimeUnit unit)
//以上两个方法预定在指定时间过后执行任务
SchedukedFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) //在指定的延迟(initialDelay)过后,周期性地执行给定任务
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) //在指定延迟(initialDelay)过后周期性的执行任务,每两个任务间的间隔为delay指定的时间
package test;  
import java.util.concurrent.Executors;  
import java.util.concurrent.ScheduledExecutorService;  
import java.util.concurrent.TimeUnit;  
public class ThreadPoolExecutorTest {  
 public static void main(String[] args) {  
  ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);  
  scheduledThreadPool.schedule(new Runnable() {  
   public void run() {  
    System.out.println("delay 3 seconds");  
   }  
  }, 3, TimeUnit.SECONDS);  
 }  
}  

3.控制任务组

对ExecutorService对象调用invokeAny方法可以把一个Callable对象集合提交到相应的线程池中执行,并返回某个已经完成的任务的结果,该方法的定义如下:

T invokeAny(Collection<Callable<T>> tasks)
T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)

该方法可以指定一个超时参数。这个方法的不足在于我们无法知道它返回的结果是哪个任务执行的结果。如果集合中的任意Callable对象的执行结果都能满足我们的需求的话,使用invokeAny方法是很好的。

invokeAll方法也会提交Callable对象集合到相应的线程池中,并返回一个Future对象列表,代表所有任务的解决方案。该方法的定义如下:

List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)

上一篇 下一篇

猜你喜欢

热点阅读