第一版本线程池-生产者消费者

2020-06-01  本文已影响0人  陈桐Caliburn

用到知识点

1、shutdown 优雅关闭线程
2、notify wait 实现生产者消费者

/**
 * 线程池技术
 * @author chentong
 */
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {

    private static final int MAX_WORKER_NUM = 10;
    private static final int DEFAULT_WORKER_NUM = 5;
    private static final int MIN_WORKER_NUM = 1;

    private final List<Worker> workers = Collections.synchronizedList( new ArrayList<Worker>() );

    //lock同步
    private final Object lock = new Object();

    //原子变量操作
    private volatile AtomicInteger workerNum = new AtomicInteger( 0 );

    private final JobQueue<Job> queue = new JobQueue<>();

    public DefaultThreadPool() {
        addWorkers( DEFAULT_WORKER_NUM );
    }

    @Override
    public void execute(Job job) {
        if (job != null) {
            queue.enqueue( job );
        }
    }

    @Override
    public void addWorkers(int num) {

        synchronized (lock) {
            if (num + this.workerNum.get() > MAX_WORKER_NUM) {
                num = MAX_WORKER_NUM - this.workerNum.get();
                if (num <= 0) return;
            }

            for (int i = 0; i < num; i++) {
                Worker worker = new Worker( queue );
                workers.add( worker );
                Thread thread = new Thread( worker, "ThreadPool-Worker-" + workerNum.incrementAndGet() );
                thread.start();
            }
        }

    }

    @Override
    public void removeWorker(int num) {

        synchronized (lock) {
            //线程池最小线程数
            if (num >= this.workerNum.get()) {
                num = this.workerNum.get() - MIN_WORKER_NUM;
                if (num <= 0) return;
            }

            int count = 0;

            while (count < num) {

                Worker worker = workers.get( count );
                if (workers.remove( worker )) {
                    worker.shutdown();
                    count++;
                }
            }
            //减少线程
            workerNum.getAndAdd( -num );
        }

    }

    @Override
    public void shutdown() {
        synchronized (lock){
            for (Worker worker : workers) {
                worker.shutdown();
            }
            workers.clear();
        }
    }

    @Override
    public int getJobSize() {
        return queue.getJobSize();
    }

}

//优雅关闭线程
class Worker implements Runnable {

    private volatile boolean running = true;

    private JobQueue queue;

    public Worker(JobQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (running) {
            Runnable job = queue.dequeue();
            if (job != null) {
                try {
                    job.run();
                } catch (Exception e) {
                }
            }
        }
    }

    public void shutdown() {
        running = false;
    }
}

//工作队列
//2、notify wait 实现生产者消费者
class JobQueue<Job extends Runnable> {

    private final LinkedList<Job> jobs = new LinkedList<>();

    private final Object lock = new Object();

    public void enqueue(Job job) {
        synchronized (lock) {
            lock.notifyAll();
            jobs.addLast( job );
        }
    }

    public Job dequeue() {

        synchronized (lock) {
            while (jobs.isEmpty()) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            Job job = jobs.removeFirst();
            return job;
        }
    }

    public int getJobSize() {
        synchronized (lock) {
            return jobs.size();
        }
    }

}

//线程池
interface ThreadPool<Job extends Runnable> {

    //执行job
    void execute(Job job);

    //关闭连接池
    void shutdown();

    //增加works
    void addWorkers(int num);

    //减少工作线程
    void removeWorker(int num);

    int getJobSize();

}

如何设置线程数

线程数的设定
上一篇下一篇

猜你喜欢

热点阅读