Java学习

Java:线程池Executors.newFixedThread

2021-08-24  本文已影响0人  xiaogp

摘要:Java多线程线程池

多线程编程和线程池概述

(1)多线程程序:

计算机可以实现多任务 ( multitasking ),在同一刻运行多个程序,多线程程序在较低的层次上扩展了多任务的概念,即一个程序同时执行多个任务,每一个任务称为一个线程 ( thread ) 。可以同时运行一个以上线程的程序称为多线程程序 ( multithreaded ) 。和多进程相比,多线程之间是共享变量的,这使得线程之间的通信比进程之间的通信更有效更容易,另外线程更轻量级, 创建撤销一个线程比启动新进程的开销要小得多

(2)线程池

构建一个新的线程是有一定代价的 , 因为涉及与操作系统的交互 。 如果程序中创建了大量的生命期很短的线程 , 应该使用线程池 (thread pool) 。 一个线程池中包含许多准备运行的空闲线程,线程执行完任务后不会死亡, 而是在池中准备为下一个请求提供服务,另一个使用线程池的理由是减少并发线程的数目,创建大量线程会大大降低性能甚至使机器崩溃 ,提交的任务数多于空闲的线程数, 那么把得不到服务的任务放置到队列中,总结线程池的出现的目的:


Java线程池Executors.newFixedThreadPool简单使用

实现线程池的一种常用方法是调用Executors.newFixedThreadPool静态方法,execute() 没有返回值;而 submit() 有返回值

public static void main(String[] args) {
        ExecutorService executorService1 = Executors.newFixedThreadPool(3);
        for (int i = 1; i < 20; i++) {
            final int finalI = i;
            executorService1.execute(() -> {
                try {
                    System.out.println(finalI);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService1.shutdown();
public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService1 = Executors.newFixedThreadPool(3);
        for (int i = 1; i < 10; i++) {
            int finalI = i;
            Future<?> sub = executorService1.submit(() -> {
                try {
                    System.out.println(finalI);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            System.out.println(sub.get());
        }
        executorService1.shutdown();
    }

Java线程池工程案例

写一个完成的Java多线程任务,分为6个步骤

package com.example.Multithreading;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.fastjson.JSONObject;
import com.example.Multithreading.utils.Config;
import com.example.Multithreading.utils.FileUtils;
import com.example.Multithreading.utils.SolrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Main {
    private static final Logger LOGGER = LoggerFactory.getLogger(Config.class);

    public static void main(String[] args) {
        List<JSONObject> data = FileUtils.readDate();
        LOGGER.info("初始数据量{}", data.size());
        long startTime =  System.currentTimeMillis();
        ExecutorService threadPoolExecutor = null;
        AtomicInteger finishBatch = new AtomicInteger(0);
        AtomicBoolean error = new AtomicBoolean(false);
        try {
            // 线程池配置
            int workCount = Integer.parseInt(Config.getString("executorWorkCount", "5"));
            threadPoolExecutor = Executors.newFixedThreadPool(workCount);
            List<JSONObject> errorList = new ArrayList<>();
            for (JSONObject jsonObject : data) {
                threadPoolExecutor.execute(() -> {
                    try {
                        String entName = jsonObject.getString("entName");
                        long entDocumentCount = SolrUtils.getEntDocumentCount(entName);
                        if (entDocumentCount != 0) {
                            String recent = SolrUtils.getEntRecentDate(entName);
                            if (recent != null) {
                                jsonObject.put("count", entDocumentCount);
                                jsonObject.put("recent", recent);
                            } else {
                                errorList.add(jsonObject);
                            }
                        } else {
                            errorList.add(jsonObject);
                        }
                    } catch (Exception e) {
                        error.set(true);
                        e.printStackTrace();
                    } finally {
                        finishBatch.getAndIncrement();
                    }
                });
            }
            // 等待所有查询完
            while (finishBatch.get() != data.size()) {
                TimeUnit.MILLISECONDS.sleep(1000);
                LOGGER.info("完成" + finishBatch.get() * 100 / data.size() + "%");
                if (error.get()) {
                    LOGGER.error("执行线程发生异常退出!");
                    System.exit(1);
                }
            }
            if (error.get()) {
                LOGGER.error("执行线程发生异常退出!");
                System.exit(1);
            }
            // 过滤
            if (!errorList.isEmpty()) {
                data.removeAll(errorList);
            }
            LOGGER.info("过滤后数据量{}", data.size());
            // 结果写入本地
            FileUtils.writeDate(data);
            LOGGER.info("写入本地完成,任务结束");
            long endTime =  System.currentTimeMillis();
            LOGGER.info("耗时{}秒", (endTime - startTime) / 1000);  // 耗时371秒

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != threadPoolExecutor) {
                threadPoolExecutor.shutdownNow();
                try {
                    while (!threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                        LOGGER.info("线程池还未关闭");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

输出日志如下

2021-08-24 18:26:29 INFO  Config:61 - 完成99%
2021-08-24 18:26:30 INFO  Config:61 - 完成99%
2021-08-24 18:26:31 INFO  Config:61 - 完成100%
2021-08-24 18:26:31 INFO  Config:75 - 过滤后数据量371
2021-08-24 18:26:31 INFO  Config:78 - 写入本地完成,任务结束
2021-08-24 18:26:31 INFO  Config:80 - 耗时412秒

配置线程池的大小

一般需要根据任务的类型来配置线程池大小:

在《Java并发编程实践》中,是这样来计算线程池的线程数目的:
给定下列定义:
Ncpu = CPU的数量
Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1,期望Ucpu等于1
W/C = 等待时间与计算时间的比率,如果以IO密集为主,W/C接近1,如果以计算密集为主,W/C接近0

为保持处理器达到期望的使用率,最优的池的大小等于:

Nthreads = Ncpu x Ucpu x (1 + W/C)

如果以计算密集为主,W/C等于0,假设目标CPU使用率为1,则Nthreads=Ncpu+1的目的是防止恰好在某时因为发生一个页错误或者因其他原因而暂停,刚好有一个“额外”的线程,可以确保在这种情况下CPU周期不会中断工作
如果以IO密集为主,W/C等于1,假设目标CPU使用率为1,则Nthreads=2Ncpu

结论:线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程
参考:https://cloud.tencent.com/developer/article/1806245

上一篇下一篇

猜你喜欢

热点阅读