FixedThreadPool

2022-03-08  本文已影响0人  程序员札记

java.util.concurrent.Executors

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

适合:任务量已知,相对耗时的任务

简要说明,FixedThreadPool,也就是可重用固定线程数的线程池。 它corePoolSize和 maximumPoolSize是一样的。并且他的keepAliveTime=0, 也就是当线程池中的线程数大于corePoolSize, 多余的空闲线程会被立即终止。

它的基本执行过程如下
1, 如果当前运行的线程数少于corePoolSize, 会立刻创建新线程执行任务。
2,当线程数到达corePoolSize后,将任务加入到LinkedBlockingQueue中。
3,当线程执行完任务后,会循环从LinkedBlockingQueue中获取任务来执行。


image.png

FixedThreadPool使用了LinkedBlockingQueue, 也就是无界队列(队列最大可容纳Integer.MAX_VALUE), 因此会造成以下影响:
a, 线程池线程数到达corePoolSize后,任务会被存放在LinkedBlockingQueue中
b, 因为无界队列,运行中(未调用shutdown()或者shutdownNow()方法)的不会拒绝任务(队列无界,可以放入"无限"任务)

LinkedBlockingQueue

很多人问为啥不用ConcurrentLinkedQueue,这里用了LinkedBlockingQueue。主要是因为LinkedBlockingQueue 是阻塞算法, 实现起来比较简单。

有时候我们把并发包下面的所有容器都习惯叫作并发容器,但是严格来讲,类似 ConcurrentLinkedQueue 这种“Concurrent”容器,才是真正代表并发。不知道你有没有注意到,java.util.concurrent 包提供的容器(Queue、List、Set)、Map,从命名上可以大概区分为 Concurrent、CopyOnWrite和 Blocking等三类,同样是线程安全容器,可以简单认为:

关于ConcurrentLinkedQueue和LinkedBlockingQueue:

package com.dxz.queue.linked;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConcurrnetLinkedQueue implements Runnable{  
      
    //容器  
    private final ConcurrentLinkedQueue<Bread> queue;
    private final CountDownLatch cdl;
      
    public ProducerConcurrnetLinkedQueue(ConcurrentLinkedQueue<Bread> queue, CountDownLatch cdl){  
        this.queue = queue;  
        this.cdl = cdl;
    }  
  
    @Override  
    public void run() {  
        for(int i=0;i<100000; i++){  
            produce(i);  
        }
        cdl.countDown();
    }  
      
    public void produce(int i){  
        /** 
         * put()方法是如果容器满了的话就会把当前线程挂起 
         * offer()方法是容器如果满的话就会返回false。 
         */  
        try {  
            Bread bread = new Bread();
            bread.setName(""+i);
            queue.offer(bread);
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}  

package com.dxz.queue.linked;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ClientPut {  
  
    public static void main(String[] args) throws InterruptedException {  
        int capacity = 9000000;
        //testArray(capacity);    //put in ArrayBlockingQueue size:=1000000,use time:=624
        //testLinked(capacity);    //put in LinkedBlockingQueue size:=1000000,use time:=289
        testConcurrentLinked(); //put in ConcurrentLinkedQueue size:=1000000,use time:=287
        
    }
    
    private static void testArray(int capacity) throws InterruptedException {
        ArrayBlockingQueue<Bread> queue = new ArrayBlockingQueue<Bread>(capacity);  
        CountDownLatch cdl = new CountDownLatch(10);
        ExecutorService es = Executors.newFixedThreadPool(10);
        long start = System.currentTimeMillis();
        for(int i = 0; i < 10;i++) {
            es.submit(new ProducerArray(queue, cdl));
        }
        cdl.await();
        long end = System.currentTimeMillis();
        es.shutdown();
        System.out.println("put in ArrayBlockingQueue size:="+queue.size() +",use time:="+(end-start));
    }

    private static void testLinked(int capacity) throws InterruptedException {
        LinkedBlockingQueue<Bread> queue = new LinkedBlockingQueue<Bread>(capacity);
        CountDownLatch cdl = new CountDownLatch(10);
        ExecutorService es = Executors.newFixedThreadPool(10);
        long start = System.currentTimeMillis();
        for(int i = 0; i < 10;i++) {
            es.submit(new ProducerLinked(queue, cdl));
        }
        cdl.await();
        long end = System.currentTimeMillis();
        es.shutdown();
        System.out.println("put in LinkedBlockingQueue size:="+queue.size() +",use time:="+(end-start));
    }
    
    private static void testConcurrentLinked() throws InterruptedException {
        ConcurrentLinkedQueue<Bread> queue = new ConcurrentLinkedQueue<Bread>();
        CountDownLatch cdl = new CountDownLatch(10);
        ExecutorService es = Executors.newFixedThreadPool(10);
        long start = System.currentTimeMillis();
        for(int i = 0; i < 10;i++) {
            es.submit(new ProducerConcurrnetLinkedQueue(queue, cdl));
        }
        cdl.await();
        long end = System.currentTimeMillis();
        es.shutdown();
        System.out.println("put in ConcurrentLinkedQueue size:="+queue.size() +",use time:="+(end-start));
    }
  
}

可能的问题

因为使用了 newFixedThreadPool 线程池,而它的工作机制是,固定了N个线程,而提交给线程池的任务队列是不限制大小的,如果Kafka发消息被阻塞或者变慢,那么显然队列里面的内容会越来越多,但是永远不会触及到reject handler。造成OOM。

比如下面一段代码

private static ExecutorService executor = Executors.newFixedThreadPool(10);

public void push2Kafka(String message){
  executor.execute(new KafkaMessageWriteTask(message));

}

这段代码的功能是:每次线上调用,都会把计算结果的日志打到 Kafka,Kafka消费方再继续后续的逻辑。从表面上看,这个代码是没有任何问题,但是如果kafka 出现连接问题,Kafka发消息被阻塞或者变慢,那么显然[队列]里面的内容会越来越多,也就会导致线程的积压问题。

上一篇 下一篇

猜你喜欢

热点阅读