手写并发框架(一)

2019-01-26  本文已影响13人  Theodore的技术站

并发框架是为了将业务代码和多线程代码隔离开来,为了可以让不懂并发的人员也可以开发使用这个框架。

框架中提供线程池、任务储存容器、使用者需要实现的任务接口、提交给框架执行的工作实体类、任务返回结果实体类、有可能还需要缓存定期清理已完成的任务。

框架业务示意图:

不论那种类型的任务都需要兼容,支持用户注册提交任务,查询任务进度和结果。


框架流程图:

框架内部跟业务代码是松耦合的,业务人员不需要了解内部代码,用线程池支持并发,提供给业务人员注册任务,并提供查询任务结果方法。内部需要并发容器储存任务,已经完成的任务放入队列中过期清除。


废话不多说,直接上代码:

这个接口需要业务人员实现,实现想要执行的任务

/*
*要求框架使用者实现的任务接口。
* data 是方法使用的业务数据类型
* return 方法执行后业务返回的结果
* */
public interface ITaskProcesser<T,R> {
    TaskResult<R> taskExcutor(T data);
}

工作实体类:

用泛型实现,提供了任务计数器,任务计数器用到了原子类型,避免内存重排序导致的错误。
提供了放回任务结果的方法供开发人员查看。
提供了将完成的任务放入队列的方法。
其中任务计数器是不能让业务人员随意操作的,所以构造的时候没有传入参数。

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;

/*
* 提交给框架执行的工作实体类
* */
public class JobInfo<R> {
    //工作名字
    private final String jobName;
    //工作的任务个数
    private final int jobLength;
    //工作任务处理器
    private final ITaskProcesser<?,?> taskProcesser;
    //任务计数器
    private AtomicInteger successCount;//原子类型
    private AtomicInteger taskProcesserCount;
    //放入已完成的队列
    private LinkedBlockingDeque<TaskResult<R>> taskDetailQueue;
    //完成任务超时 定时器 超时清除
    private final long expireTime;

    //构造方法
    public JobInfo(String jobName, int jobLength, ITaskProcesser<?, ?> taskProcesser, long expireTime) {
        this.jobName = jobName;
        this.jobLength = jobLength;
        this.taskProcesser = taskProcesser;
        //开发人员不应该修改任务的计数 由框架来控制
        this.successCount = new AtomicInteger();
        this.taskProcesserCount = new AtomicInteger();
        this.taskDetailQueue = new LinkedBlockingDeque<TaskResult<R>>(jobLength);
        this.expireTime = expireTime;
    }

    public ITaskProcesser<?, ?> getTaskProcesser() {
        return taskProcesser;
    }

    //返回成功处理的结果数
    public int getSuccessCount() {
        return successCount.get();
    }

    //返回当前已处理的结果数
    public int getTaskProcesserCount() {
        return taskProcesserCount.get();
    }

    //提供工作中失败的次数
    public int getFailCount() {
        return taskProcesserCount.get() - successCount.get();
    }


    public String getTotalProcess() {
        return "Success["+successCount.get()+"]/Current["
                +taskProcesserCount.get()+"] Total["+jobLength+"]";
    }

    //获取每个任务结果,从头部获取
    public List<TaskResult<R>> getTaskDetail(){
        List<TaskResult<R>> taskList = new LinkedList<>();
        TaskResult<R> taskResult;
        while((taskResult = taskDetailQueue.pollFirst()) != null){
            taskList.add(taskResult);
        }
        return  taskList;
    }

    //从业务角度来讲,保证最终一致性就行,不需要加锁,影响性能。已经用了源自操作和并发安全队列
    public void addTaskResult(TaskResult<R> result, CheckJobProcesser checkJob){
        if (TaskResultType.Success.equals(result.getResultType())){
            successCount.incrementAndGet();
        }
        taskDetailQueue.addLast(result);//结果从尾部添加
        taskProcesserCount.incrementAndGet();
        if (taskProcesserCount.get() == jobLength){
            checkJob.putJob(jobName,expireTime);
        }

    }
}

任务结果类

提供返回任务结果和原因的方法。

/*
* 任务返回结果实体类
* */
public class TaskResult<R> {
    private final TaskResultType resultType;
    private final R returnValue;//业务结果数据
    private final String reason;//方法失败原因

    public TaskResult(TaskResultType resultType, R returnValue, String reason) {
        this.resultType = resultType;
        this.returnValue = returnValue;
        this.reason = reason;
    }

    public TaskResult(TaskResultType resultType, R returnValue) {
        this.resultType = resultType;
        this.returnValue = returnValue;
        this.reason = "Success";
    }

    public TaskResultType getResultType() {
        return resultType;
    }

    public R getReturnValue() {
        return returnValue;
    }

    public String getReason() {
        return reason;
    }
}

任务执行的结果类:

/*
*
* 任务执行的结果类
* */
public enum TaskResultType {
    Success,Failure,Exception;
    //返回业务人员需要的结果
    //返回业务人员不需要的结果
    //返回异常
}

查询结果类:

主要实现将完成的任务放入队列,供查询,如果一点时间过了就将任务清除,防止占用大量内存。
使用了单例模式,也是为了节省内存。
启用线程清理任务,设置为守护线程。

/*
* 任务完成后,在一定的时间供查询,之后为释放资源节约内存,需要定期处理过期的任务
* */
public class CheckJobProcesser {

    private static DelayQueue<ItemVo<String>> queue = new DelayQueue<>();//存放已完成任务,超时过期

    //单例模式------
    private CheckJobProcesser(){}

    private static class ProcesserHolder{
        public static CheckJobProcesser processer = new CheckJobProcesser();
    }

    public static CheckJobProcesser getInstance(){
        return ProcesserHolder.processer;
    }
    //单例模式------

    //处理队列中到期任务的实行
    private static class FetchJob implements Runnable{

        @Override
        public void run() {
            while (true){
                try{
                    ItemVo<String> item = queue.take();
                    String jobName = (String)item.getDate();
                    PendingJobPool.getMap().remove(jobName);
                    System.out.println(jobName + "is out of date , remove from map");
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    /*任务完成后,放入队列,经过expireTime时间后,从整个框架中移除*/
    public void putJob(String jobName,long expireTime){
        ItemVo<String> item = new ItemVo<>(expireTime,jobName);
        queue.offer(item);
        System.out.println("job[" + jobName + "已经放入过期检查缓存,过期时长:" + expireTime);
    }

    //类初始化的时候就运行线程
    static {
        Thread thread = new Thread(new FetchJob());
        thread.setDaemon(true);
        thread.start();
        System.out.println("开启守护线程");
    }
}

框架主体:

这里使用了线程池,保守估计线程个数,使用的个数和cpu数量相同,这个可以根据业务需求修改。

线程池没有用 JDK 提供的那几个,主要考虑到想要用有界队列,就自己定义线程池。
用 ConcurrentHashMap 存放任务。

提供了注册任务方法、提交任务方法、获得任务结果等。

对工作中的任务进行了包装,提交给线程池使用,并处理任务的结果,写入缓存以供查询
同样使用了单例模式。

import com.enjoy.MultiThread.ch8a.vo.ITaskProcesser;
import com.enjoy.MultiThread.ch8a.vo.JobInfo;
import com.enjoy.MultiThread.ch8a.vo.TaskResult;
import com.enjoy.MultiThread.ch8a.vo.TaskResultType;

import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
 * 框架的主体类,也是调用者主要使用的类
 */
public class PendingJobPool {

    //保守估计
    private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();

    //有界队列
    private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(5000);

    //创建固定大小有界队列线程池
    private static ExecutorService taskExecutor
            = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT,60, TimeUnit.SECONDS,taskQueue);

    //job 存放容器
    private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap = new ConcurrentHashMap<>();

    public static Map<String, JobInfo<?>> getMap(){
        return jobInfoMap;
    }

    private static CheckJobProcesser checkJob
            = CheckJobProcesser.getInstance();

    //单例模式---
    private PendingJobPool(){}

    private static class JobPoolHolder{
        public static PendingJobPool pool = new PendingJobPool();
    }

    public static PendingJobPool getInstance(){
        return JobPoolHolder.pool;
    }
    //单例模式---

    //调用者注册工作,如工作名,任务的处理器等等
    public<R> void registerJob(String jobName, int jobLength, ITaskProcesser<?,?> taskProcesser,long expireTime){
        JobInfo<R> jobInfo = new JobInfo(jobName,jobLength,taskProcesser,expireTime);
        if (jobInfoMap.putIfAbsent(jobName,jobInfo) != null){
            throw new RuntimeException("当前任务已经注册");
        }

    }

    //调用者提交任务
    public <T,R> void putTask(String jobName,T t){
        JobInfo<R> jobInfo = getJob(jobName);
        PendingTask<T,R> task = new PendingTask<>(jobInfo,t);
        taskExecutor.execute(task);
    }

    //根据工作名称检索工作
    private <R> JobInfo<R> getJob(String jobName){
        JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
        if (null == jobInfo){
            throw new RuntimeException(jobName + "是个非法任务");
        }
        return jobInfo;
    }

    //对工作中的任务进行包装,提交给线程池使用,并处理任务的结果,写入缓存以供查询
    private static class PendingTask<T,R> implements Runnable{
        private JobInfo<R> jobInfo;
        private T processData;

        public PendingTask(JobInfo<R> jobInfo,T processData){
            this.jobInfo = jobInfo;
            this.processData = processData;
        }

        @Override
        public void run(){
            R r = null;
            ITaskProcesser<T,R> taskProcesser = (ITaskProcesser<T,R>) jobInfo.getTaskProcesser();
            TaskResult<R> result = null;
            //调用业务员人员实现的方法
            result = taskProcesser.taskExcutor(processData);
            //要做检查,防止异常

            try{
                if (result == null){
                    result = new TaskResult<R>(TaskResultType.Exception,r,"result id null");
                }
                if (result.getResultType() == null) {
                    if (result.getReason() == null) {
                        result = new TaskResult<R>(TaskResultType.Exception, r, "reason is null");
                    } else {
                        result = new TaskResult<R>(TaskResultType.Exception, r,
                                "result is null,but reason:" + result.getReason());
                    }
                }
            }catch(Exception e){
                    e.printStackTrace();
                    result = new TaskResult<R>(TaskResultType.Exception,r,e.getMessage());
            }finally {
                jobInfo.addTaskResult(result,checkJob);
            }
        }
    }

    //获得每个任务的处理详情
    public <R> List<TaskResult<R>> getTaskDetail(String jobName){
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTaskDetail();
    }

    //获得工作的整体处理进度
    public <R> String getTaskProgess(String jobName) {
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTotalProcess();
    }
}

实现了自己的任务测试一下整个框架,构造了成功失败异常的情况:

import com.enjoy.MultiThread.ch8a.vo.ITaskProcesser;
import com.enjoy.MultiThread.ch8a.vo.TaskResult;
import com.enjoy.MultiThread.ch8a.vo.TaskResultType;
import com.enjoy.MultiThread.tools.SleepTools;

import java.util.Random;
/**
 *类说明:一个实际任务类,将数值加上一个随机数,并休眠随机时间
 */
public class MyTask implements ITaskProcesser<Integer,Integer> {

    @Override
    public TaskResult<Integer> taskExcutor(Integer data) {
        Random r = new Random();
        int flag = r.nextInt(500);
        SleepTools.ms(flag);
        if(flag<=300) {//正常处理的情况
            Integer returnValue = data.intValue()+flag;
            return new TaskResult<Integer>(TaskResultType.Success,returnValue);
        }else if(flag>301&&flag<=400) {//处理失败的情况
            return new TaskResult<Integer>(TaskResultType.Failure,-1,"Failure");
        }else {//发生异常的情况
            try {
                throw new RuntimeException("异常发生了!!");
            } catch (Exception e) {
                return new TaskResult<Integer>(TaskResultType.Exception,
                        -1,e.getMessage());
            }
        }
    }
}

测试代码:

import com.enjoy.MultiThread.ch8a.PendingJobPool;
import com.enjoy.MultiThread.ch8a.vo.TaskResult;
import com.enjoy.MultiThread.tools.SleepTools;

import java.util.List;
import java.util.Random;

public class AppTest {
    private final static String JOB_NAME = "计算数值";
    private final static int JOB_LENGTH = 1000;

    //查询任务进度的线程
    private static class QueryResult implements Runnable{

        private PendingJobPool pool;

        public QueryResult(PendingJobPool pool) {
            super();
            this.pool = pool;
        }

        @Override
        public void run() {
            int i=0;//查询次数
            while(i<350) {
                List<TaskResult<String>> taskDetail = pool.getTaskDetail(JOB_NAME);
                if(!taskDetail.isEmpty()) {
                    System.out.println(pool.getTaskProgess(JOB_NAME));
                    System.out.println(taskDetail);
                }
                SleepTools.ms(100);
                i++;
            }
        }
    }

    public static void main(String[] args) {
        MyTask myTask = new MyTask();
        //拿到框架的实例
        PendingJobPool pool = PendingJobPool.getInstance();
        //注册job
        pool.registerJob(JOB_NAME, JOB_LENGTH, myTask,1000*5);
        Random r = new Random();
        for(int i=0;i<JOB_LENGTH;i++) {
            //依次推入Task
            pool.putTask(JOB_NAME, r.nextInt(1000));
        }
        Thread t = new Thread(new QueryResult(pool));
        t.start();
    }
}

测试结果:

结果比较多没有放全,可以自己本地跑一下。


image.png

代码 github 地址:

https://github.com/theodore816/javastudy/tree/master/com/enjoy/MultiThread/ch8a

上一篇下一篇

猜你喜欢

热点阅读