java

简单实现一个初级线程池

2017-10-11  本文已影响53人  柠檬乌冬面

前言

面试中经常会有考官问道,让你自己手写是实现一个线程池。这里我就按照网上的一些参考来进行实现一个简单的线程池。主要目的是为了理解和记忆实现过程中遇到的类 以及实现过程。

首先我们来看一些具体的成员变量有哪些:

成员变量

终于知道为什么面试官喜欢这个了吧,有并发锁,有阻塞队列,有volatile关键字。很多和多线程并发的东西都会在线程池中涉及。
那么面试官就会围绕这些知识点去展开,线程池可能只是开始(套路,全是套路!)

RUNNING的作用是标记线程池的整体状态是否在工作
lock是为了在线程池内部的一些操作上加上并发锁,来保证程序不出错。
workers是一个工作集,用来存放工人。而且是hashSet类型,这就代表是一个没有重复worker的集合。
queue 阻塞队列,用来存放线程池将用执行的任务。使用的是并发包下的阻塞队列,可以保证在任务的存取上是线程安全的。
threads是一个简易的线程工厂,源码中相对复杂。用来存放生成的线程
poolsize代表核心线程数 就是这个线程池中主要大部分情况下有多少线程
coreSize 代表正在线程池中工作的线程数

关于这些Size的变量,源码中比这个多。而且不好理解,这里就简单的这样认为。

shutdown是标记线程池停止运行的标记

下面是整个线程池的最主要方法 execute,是执行任务的入口

execute方法

这个方法我们看,当任务为空,抛出异常。如果当前线程池中的空闲线程小于核心线程数的话就增加线程进入addThread方法,否则就会直接加入阻塞队列去等待。

下面是addThread方法

增加线程

我们看到,这里创建一个工人进行工作,然后把工人加入到工作集中。创建一个线程去执行工人的工作。线程启动。整个过程在并发锁的保护下进行。

下面是shutdown方法


shutdown

首先把运行标志记为false,然后把工作集中的工人都停止手中工作 阻塞掉。然后阻塞完成,改变线程池的停止状态为真。

下面是线程池的内部类 worker的简易实现


worker

这里我直接把工人获得的任务放入阻塞队列中,然后每次执行都从里面去拿。和源码的实现有点不一样(源码会先去执行每个工人自己拿到的任务,之后才去阻塞队列中拿取)

worker的run方法

run方法

在线程池正常运行的状态下,一直获取阻塞队列中的任务并且执行

下面是用于线程池停止时的方法

interrupt

当线程池停止工作时,就对所有线程发出阻塞指令不再继续工作。

完整代码:

package com.Thread.ThreadPoolExecutor;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

public class MyThreadPoolExecutor {
    private volatile boolean RUNNING = true;// 是否正在运行
    private final ReentrantLock lock = new ReentrantLock();// 并发锁
    private final HashSet<Worker> workers = new HashSet<>();// 不重复的工作集
    private static BlockingQueue<Runnable> queue = null;// 任务阻塞队列
    private final ArrayList<Thread> threads = new ArrayList<>();// 线程工厂
    private volatile int poolsize;// 线程池的核心线程数
    private volatile int coresize;// 当前线程池中的线程数
    private volatile boolean shutdown = false;// 是否停止工作

    public MyThreadPoolExecutor(int poolsize) {
        // TODO Auto-generated constructor stub
        this.poolsize = poolsize;
        queue = new ArrayBlockingQueue<>(poolsize);
    }

    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }

        if (coresize < poolsize) {
            addThread(command);
        } else {
            try {
                queue.put(command);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    private void addThread(Runnable task) {

        lock.lock();
        try {
            coresize++;
            Worker worker = new Worker(task);
            workers.add(worker);
            Thread thread = new Thread(worker);
            threads.add(thread);
            thread.start();
        } finally {
            // TODO: handle finally clause
            lock.unlock();
        }

    }

    public void shutdown() {
        RUNNING = false;
        if (!workers.isEmpty()) {
            for (Worker worker : workers) {
                worker.interruptIfIdle();
            }
        }
        shutdown = true;
        Thread.currentThread().interrupt();
    }

    private final class Worker implements Runnable {

        public Worker(Runnable task) {
            // TODO Auto-generated constructor stub
            queue.offer(task);
        }

        public Runnable getTask() throws InterruptedException {
            return queue.take();
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            while (true && RUNNING) {
                if (shutdown) {
                    Thread.interrupted();
                }
                Runnable task = null;
                try {
                    task = getTask();
                    task.run();
                } catch (InterruptedException e) {

                }
            }
        }

        public void interruptIfIdle() {
            for (Thread thread : threads) {
                System.out.println(thread.getName() + " interrupt");
                thread.interrupt();
            }
        }

    }

    public static void main(String[] args) {

    }
}

class Main {
    public static void main(String[] args) {
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(3);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {

                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    System.out.println("线程" + Thread.currentThread().getName() + "在工作....");
                }
            });
        }

        executor.shutdown();
    }
}

上一篇 下一篇

猜你喜欢

热点阅读