多线程笔记 三

2018-05-11  本文已影响46人  骑着乌龟追小兔

1. ThreadPoolExecutor

ThreadPoolExecutor是一种比较常见的处理多线程的执行器。你可以配置线程池的最小线程数,当执行器没有太多的任务要处理的时候。亦可以配置最大线程size,如果有很多任务需要处理。一旦当工作负载降下来,线程池就会慢慢的减少线程数量,知道线程数量达到最小值。

    ThreadPoolExecutor pool = new ThreadPoolExecutor(
            1, // keep at least one thread ready,
            // even if no Runnables are executed
            5, // at most five Runnables/Threads
            // executed in parallel
            1, TimeUnit.MINUTES, // idle Threads terminated after one
            // minute, when min Pool size exceeded
            new ArrayBlockingQueue<Runnable>(10)); // outstanding Runnables are kept here
         pool.execute(new Runnable() {
            @Override 
            public void run () {
                //code to run
            }
    });
    
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)

如果线程池的数量大于corePoolSize并且小于maximumPoolSize,那么只有在线程池队列满了的情况下才会创建新线程。

线程池的优点:

  1. BlockingQueue 能够避免内存溢出的场景。应用的表现不会被队列的尺寸限制。

  2. 可以采用不同的Rejection Handler 策略。

    1. 默认策略:抛出RejectedExecutionException
    2. CallerRunsPolicy :如果线程池没有被关闭,那么就执行,否则丢弃
    3. DiscardPolicy: 无法执行,直接丢弃
    4. DiscardOldestPolicy :丢弃队首的任务。
  3. 配置自定义的ThreadFactory的好处.

    1. 定义更有描述性的名称
    2. 设置进程的状态
    3. 设置线程优先权

2.获取计算任务的值-callable

// Submit a callable for execution
    ExecutorService pool = anExecutorService;
    Future<Integer> future = pool.submit(new Callable<Integer>() {
    @Override public Integer call() {
    //do some computation
    return new Random().nextInt();
    }
    });
// ...  perform other tasks while future is executed in a different thread
    try {
    // Blocks current thread until future is completed
    Integer result = future.get();
    catch (InterruptedException || ExecutionException e) {
    // handle appropriately
    }
    try {
    // Blocks current thread for a maximum of 500 milliseconds.
    // If the future finishes before that, result is returned,
    // otherwise TimeoutException is thrown.
    Integer result = future.get(500, TimeUnit.MILLISECONDS);
    catch (InterruptedException || ExecutionException || TimeoutException e) {}
如果计算结果不在需要了,你可以调用Future.cancel(boolean)

3. submit()与execute() 异常处理的区别

示例代码如下:

案例1:用excute 命令来执行 runnable 任务,然后上报异常
import java.util.concurrent .*;
import java.util .*;

    public class ExecuteSubmitDemo {
        public ExecuteSubmitDemo() {
            System.out.println("creating service");
            ExecutorService service = Executors.newFixedThreadPool(2);
//ExtendedExecutor service = new ExtendedExecutor();
            for (int i = 0; i < 2; i++) {
                service.execute(new Runnable() {
                    public void run() {
                        int a = 4, b = 0;
                        System.out.println("a and b=" + a + ":" + b);
                        System.out.println("a/b:" + (a / b));
                        System.out.println("Thread Name in Runnable after divide by
                                zero:"+Thread.currentThread().getName());
                    }
                });
            }
            service.shutdown();
        }

        public static void main(String args[]) {
            ExecuteSubmitDemo demo = new ExecuteSubmitDemo();
        }
    }

    class ExtendedExecutor extends ThreadPoolExecutor {
        public ExtendedExecutor() {
            super(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
        }

        // ...
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t == null && r instanceof Future<?>) {
                try {
                    Object result = ((Future<?>) r).get();
                } catch (CancellationException ce) {
                    t = ce;
                } catch (ExecutionException ee) {
                    t = ee.getCause();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null)
                System.out.println(t);
        }
    }

输出:

creating service
a and b=4:0
a and b=4:0
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2"
java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
案例2:用submit 替换excute ,service.submit(new Runnable() 在这个案例中,异常被框架吃了。

输出

creating service
a and b=4:0
a and b=4:0

案例3 将newFixedThreadPool换成ExtendedExecutor

ExtendedExecutor service = new ExtendedExecutor();

输出:

creating service
a and b=4:0
java.lang.ArithmeticException: / by zero
a and b=4:0
java.lang.ArithmeticException: / by zero

4. 处理拒绝执行

如果:

  1. 你试图向一个一个关闭的Executor 提交任务
  2. 队列已经满了,线程数已经达到最大值

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) // <--
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) // <
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)

5 :发射后不管 - Runnable Tasks

Executors接受一个 java.lang.Runnable 参数对象,用来处理耗时或者计算量较大的任务。

 Executor exec = Executors.newCachedThreadPool();
    exec.ex(new Runnable() {
            @Override public void run () {
    //offloaded work, no need to get result backJava® Notes for Professionals 696
            }
        });
        

注意使用这个excutor ,你获取不到任何数据返回值。在Java8 中你可以应用lamda 来简化代码

    Executor exec = anExecutor;
    exec.execute(() -> {
    //offloaded work, no need to get result back
    });

6:不同类型的并发的构造的使用案例:

  1. ExecutorService
  1. CountDownLatch

使用条件:

  1. ThreadPoolExecutor
  1. 你可能不知道 ForkJoinPool
 public static ExecutorService newWorkStealingPool()

创建work-stealing线程池会根据处理器的并行程度最大程度的利用处理器

上述提到的四种原理互不影响。你可以根据你自己的需求,选用合适的框架进行使用

7. 使用ExecutorService等待所有任务执行完毕

示例代码:

    import java.util.concurrent .*;
    import java.util .*;

    public class InvokeAllDemo {
        public InvokeAllDemo() {
            System.out.println("creating service");
            ExecutorService service =
                    Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            List<MyCallable> futureList = new ArrayList<MyCallable>();
   
            for (int i = 0; i < 10; i++) {
                MyCallable myCallable = new MyCallable((long) i);
                futureList.add(myCallable);
            }
            System.out.println("Start");
            try {
                List<Future<Long>> futures = service.invokeAll(futureList);
            } catch (Exception err) {
                err.printStackTrace();
            }
            System.out.println("Completed");
            service.shutdown();
        }

        public static void main(String args[]) {
            InvokeAllDemo demo = new InvokeAllDemo();
        }

        class MyCallable implements Callable<Long> {
            Long id = 0L;

            public MyCallable(Long val) {
                this.id = val;
            }

            public Long call() {
// Add your business logic
                return id;
            }
        }
    }

8. 使用不同类型的ExecutorService

Executors 返回不同类型的线程池来满足不同需求

1. 1. public static ExecutorService newSingleThreadExecutor()

创建一个单工作线程来操作一个无界队列

它与 newFixedThreadPool(1) 和 newSingleThreadExecutor()的区别Java doc 是这样说的:

与类似的 newFixedThreadPool(1)相比其不保证重新配置异常线程来使用替代线程。

2. public static ExecutorService newFixedThreadPool(int nThreads)

创建一个固定数量的线程池,复用线程来操作一个共享的无界队列。在任何时刻,大多数线程都会在执行任务的时候被激活。如果提交了额外的任务,在所有线程都是激活状态的时候,那么他们将会阻塞知道有空闲线程。

使用场景:

  1.         可以通过获取cpu的数量来提高线程运行情况。
    
  2.          你可以选择线程池的最大线程数
    

缺点:无界队列有风险

3. public static ExecutorService newCachedThreadPool()

创建一个按需分配的线程池,会重复利用之前已经创建的线程。

使用条件:

  1.   一些执行时间段的异步任务
    
4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个线程池,是的能够在指定时间段之后执行任务,或者每隔一段时间执行

使用场景:

  1.     处理周期性的事件
    

缺点:无界队列有风险。

5. public static ExecutorService newWorkStealingPool()

创建任务偷取型的线程池,取决于处理器的并发水平

使用场景
  1.     将任务分割成很多子任务
    
  2.     对于空闲线程处理能力较高
    

缺点:无界队列有风险

    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
    TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
    RejectedExecutionHandler handler)

使用线程池你可以:

  1. 动态控制线程池尺寸
  2. 设置BlockingQueue 容积
  3. 定了拒绝策略
  4. 自定义 CustomThreadFactory 可以有一些附带功能

9. 调度线程在固定的时间执行,或者在延迟一段时间之后,或者重复执行

ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
在固定时间之后开始一个任务
ScheduledFuture<Integer> future = pool.schedule(new Callable<>() {
    @Override 
    public Integer call() {
    // do something
    return 42;
    }
},10, TimeUnit.MINUTES);
以固定的频率来执行任务
ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
    @Override 
    public void run() {
    // do something
    }
},10, 1, TimeUnit.MINUTES);

任务会一直执行到线程池被关闭,future 被去小,或者某个任务发生了异常。


10. 线程池的使用

  1. submit: 执行提交的任务返回一个future 对象
  2. execute: 执行任务并不期望返回任何值
  3. invokeAll:执行一组任务,并且得到一个返回值列表
  4. invokeAny:执行所有任务,获得其中一个正确执行的(没有异常的),其余没有执行的任务或被取消。
上一篇下一篇

猜你喜欢

热点阅读