Java线程池的使用

2019-01-04  本文已影响0人  yehongyu_2018

线程类型:

  1. 固定线程
  2. cached线程
  3. 定时线程

固定线程池使用

public class WorkerService {
private ExecutorService executorService;
Map<Integer, Worker> workerMap = new HashMap();
public boolean init() {
    int workerNum = 3;
    executorService = 
    Executors.newFixedThreadPool(workerNum);
    IntStream.range(0, workerNum).forEach(id -> {
                    Worker worker = new Worker(id);
                    workerMap.put(id, worker);
                    executorService.submit(worker);
                }
        );
    }
public void close() {
        workerMap.forEach((k, v) -> v.stop());
        //shutdown the consumer thread
        if (executorService != null) {
            // disable new tasks from being submitted
            executorService.shutdown();
            try {
                // wait a while for existing tasks to terminate
                if (!executorService.awaitTermination(60000, TimeUnit.MILLISECONDS)) {
                    LOGGER.error("Still waiting...");
                    executorService.shutdownNow(); // cancel currently executing tasks
                    // Wait a while for tasks to respond to being cancelled
                    if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                        LOGGER.error("Pool did not terminate");
                    }
                }
            } catch (InterruptedException e) {
                LOGGER.error("force to interrupted");
                return;
            }
            LOGGER.info("Exiting normally...");
        }
    }
}
private class Worker implements Runnable {
        int id;
        private volatile boolean isRunning;
        Worker(int i) {
            id = i;
            isRunning = true;
        }
        @Override
        public void run() {
                while (isRunning) {
                    try {
                          // TODO
                     } catch (Exception e) {
                     }
                }
          }
        public void stop() {
            isRunning = false;
        }
    }

cache线程池使用

public class ConsumingExecutor {
    private Map<String, List<ConsumingWorker>>       workerMap = new HashMap<>();
    private ExecutorService executor;
    public ConsumingExecutor() {
    }
    public void start(int workerNum) {
        int workerNum = 10;
        ThreadFactory tf = new ThreadFactoryBuilder()
                .setNameFormat("ConsumingWorker-thread-%d")
                .setUncaughtExceptionHandler(new LoggingExceptionHandler())
                .build();
        executor = Executors.newCachedThreadPool(tf);
        Runtime.getRuntime().addShutdownHook(new Thread(ConsumingExecutor.this::shutdown));
    }

    public void addWorker(String key, KafkaTopic topic, ElementBlockingQueue queue, PipelineHolder.PipelineType pipelineType) {
        for (int i = 0; i < CONSUMING_WORKER_NUM; ++i) {
            ConsumingWorker consumingWorker = new ConsumingWorker(i, topic, queue, pipelineType);
            executor.submit(consumingWorker);
            if (!workerMap.containsKey(key)) {
                List<ConsumingWorker> workers = new ArrayList<>();
                workerMap.put(key, workers);
            }
           workerMap.get(key).add(consumingWorker);
        }
    }
    public void removeWorker(String key) {
        if (workerMap.containsKey(key)) {
            List<ConsumingWorker> workers = workerMap.get(key);
            workers.forEach(worker -> {
                worker.shutdown();
            });
            workerMap.remove(key);
        }
    }

    public void updateWorker(String key, KafkaTopic topic, ElementBlockingQueue queue, PipelineHolder.PipelineType pipelineType) {
        removeWorker(key);
        addWorker(key, topic, queue, pipelineType);
    }
    public void shutdown() {
        workerMap.entrySet().stream().forEach(workers -> {
            if (workers.getValue() != null) {
                workers.getValue().forEach(worker -> {
                    worker.shutdown();
                });
            }
        });

        if (null != executor && !executor.isTerminated() && !executor.isShutdown()) {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SEC, TimeUnit.SECONDS)) {
                    executor.shutdown();
                    if (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SEC, TimeUnit.SECONDS)) {
                        LOGGER.error("ConsumingExecutor did not terminate");
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Waiting active task termination fails.");
            }
        }
    }

}

定时调度线程池使用

public class CleanService {
    private ScheduledExecutorService executor;
    public void init(ServiceConfig serviceConfig) {
        executor =     Executors.newScheduledThreadPool(cleanerNumber);
        executor.scheduleWithFixedDelay(
                    new ACleanerTask(expiryDate.longValue(), clientExpiryDate.longValue(), offset.longValue()),
                    initalDelay,
                    scanInterval,
                    TimeUnit.SECONDS);

        executor.scheduleWithFixedDelay(new BCleanerTask(expiryDate),
                    initalDelay,
                    scanInterval,
                    TimeUnit.SECONDS);

    }

    public void close() {
        if (null != executor && !executor.isTerminated() && !executor.isShutdown()) {
            LOGGER.info("close clean executor ..");
            executor.shutdownNow();
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读