Java 杂谈多线程Java

线程池正确使用姿势

2019-05-11  本文已影响1人  LandHu

为什么不推荐使用Executors

底层确实是通过LinkedBlockingQueue实现的,默认不设置队列大小的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,而创建这么多线程,必然就有可能导致OOM,报错如下:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
   at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
   at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

创建线程池的正确姿势

public class ExecutorsDemo {
   private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
       .setNameFormat("demo-pool-%d").build();

   private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
       0L, TimeUnit.MILLISECONDS,
       new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

   public static void main(String[] args) {
       for (int i = 0; i < Integer.MAX_VALUE; i++) {
           pool.execute(new SubThread());
       }
   }
}

关闭线程池

long start = System.currentTimeMillis();
for (int i = 0; i <= 5; i++) {
    pool.execute(new Job());
}
    pool.shutdown();
    while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
           LOGGER.info("线程还在执行。。。");
}
    long end = System.currentTimeMillis();
    LOGGER.info("一共处理了【{}】", (end - start));

SpringBoot 使用线程池

//线程池配置
@Configuration
public class TreadPoolConfig {
    /**
     * 消费队列线程
     * @return
     */
    @Bean(value = "consumerQueueThreadPool")
    public ExecutorService buildConsumerQueueThreadPool(){
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("consumer-queue-thread-%d").build();

        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());

        return pool ;
    }
}
//线程池使用
@Resource(name = "consumerQueueThreadPool")
    private ExecutorService consumerQueueThreadPool;
    @Override
    public void execute() {
        //消费队列
        for (int i = 0; i < 5; i++) {
            consumerQueueThreadPool.execute(new ConsumerQueueThread());
        }
    }
线程池隔离

hystrix 隔离
首先需要定义两个线程池,分别用于执行订单、处理用户。

/**
 * Function:订单服务
 *
 * @author crossoverJie
 *         Date: 2018/7/28 16:43
 * @since JDK 1.8
 */
public class CommandOrder extends HystrixCommand<String> {

    private final static Logger LOGGER = LoggerFactory.getLogger(CommandOrder.class);

    private String orderName;

    public CommandOrder(String orderName) {


        super(Setter.withGroupKey(
                //服务分组
                HystrixCommandGroupKey.Factory.asKey("OrderGroup"))
                //线程分组
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("OrderPool"))

                //线程池配置
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(10)
                        .withKeepAliveTimeMinutes(5)
                        .withMaxQueueSize(10)
                        .withQueueSizeRejectionThreshold(10000))

                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
        )
        ;
        this.orderName = orderName;
    }


    @Override
    public String run() throws Exception {

        LOGGER.info("orderName=[{}]", orderName);

        TimeUnit.MILLISECONDS.sleep(100);
        return "OrderName=" + orderName;
    }


}


/**
 * Function:用户服务
 *
 * @author crossoverJie
 *         Date: 2018/7/28 16:43
 * @since JDK 1.8
 */
public class CommandUser extends HystrixCommand<String> {

    private final static Logger LOGGER = LoggerFactory.getLogger(CommandUser.class);

    private String userName;

    public CommandUser(String userName) {


        super(Setter.withGroupKey(
                //服务分组
                HystrixCommandGroupKey.Factory.asKey("UserGroup"))
                //线程分组
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserPool"))

                //线程池配置
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(10)
                        .withKeepAliveTimeMinutes(5)
                        .withMaxQueueSize(10)
                        .withQueueSizeRejectionThreshold(10000))

                //线程池隔离
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
        )
        ;
        this.userName = userName;
    }


    @Override
    public String run() throws Exception {

        LOGGER.info("userName=[{}]", userName);

        TimeUnit.MILLISECONDS.sleep(100);
        return "userName=" + userName;
    }
}

模拟运行:

public static void main(String[] args) throws Exception {
        CommandOrder commandPhone = new CommandOrder("手机");
        CommandOrder command = new CommandOrder("电视");

        //阻塞方式执行
        String execute = commandPhone.execute();
        LOGGER.info("execute=[{}]", execute);

        //异步非阻塞方式
        Future<String> queue = command.queue();
        String value = queue.get(200, TimeUnit.MILLISECONDS);
        LOGGER.info("value=[{}]", value);

        CommandUser commandUser = new CommandUser("张三");
        String name = commandUser.execute();
        LOGGER.info("name=[{}]", name);
    }

原理:利用一个 Map 来存放不同业务对应的线程池。
注意:自定义的 Command 并不是一个单例,每次执行需要 new 一个实例,不然会报 This instance can only be executed once. Please instantiate a new instance.

参考:如何优雅的使用和理解线程池


技术讨论 & 疑问建议 & 个人博客

版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 许可协议,转载请注明出处!

上一篇下一篇

猜你喜欢

热点阅读