Java

多线程应用设计 Master-Worker模式

2017-06-03  本文已影响56人  ThingLin

Master-Worker模式,Master线程接收任务分配给Worker线程并统计Worker线程执行结果,Worker线程真正处理任务。

image.png

Task定义


package cn.thinglin.mw;

public abstract class Task {

    private String taskId;
    
    public String getTaskId(){
        return this.taskId;
    }
    
}

Worker的实现


package cn.thinglin.mw;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * Worker
 * @author ThingLin
 *
 * @param <T> 任务
 * @param <V> 结果
 */
public abstract class Worker<T extends Task,V> implements Runnable {

    /* 任务队列 */
    protected ConcurrentLinkedQueue<T> tasks = new ConcurrentLinkedQueue<T>();
    
    /* 结果集 */
    protected ConcurrentHashMap<String,V> results = new ConcurrentHashMap<String,V>();
    
    @Override
    public void run() {
        while(true){
            T task = this.tasks.poll(); //取出任务
            if(null == task){ //没有任务可执行跳出循环
                break;
            }
            V v = dispose(task);
            this.results.put(task.getTaskId(), v);
        }
    }
    
    public void setTasks(ConcurrentLinkedQueue<T> tasks){
        this.tasks = tasks;
    }
    
    public void setResults(ConcurrentHashMap<String,V> results){
        this.results = results;
    }
    
    /**
     * 处理任务
     * @param task
     * @return
     */
    abstract V dispose(T task);
    
}


Master的实现


package cn.thinglin.mw;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * Master
 * @author ThingLin
 *
 * @param <T> 任务
 * @param <V> 结果
 */
public abstract class Master<T extends Task,V> {

    /* 任务队列 */
    protected ConcurrentLinkedQueue<T> tasks = new ConcurrentLinkedQueue<T>();
    
    /* Worker管理 */
    protected HashMap<Integer,Thread> workers = new HashMap<Integer,Thread>();
    
    /* 结果集 */
    protected ConcurrentHashMap<String,V> results = new ConcurrentHashMap<String,V>();
    
    public Master(Worker<T,V> worker,int workerCount){
        worker.setTasks(this.tasks);
        worker.setResults(this.results);
        for(int i=0;i<workerCount;i++){
            this.workers.put(i, new Thread(worker,String.format("%s,%d","worker",i)));
        }
    }
    
    public void submit(T task){
        tasks.add(task);
    }
    
    public void excute(){
        for(Map.Entry<Integer,Thread> item : workers.entrySet()){
            item.getValue().start();
        }
    }
    
    /**
     * 
     * @return 线程全部执行完成时任务执行完true
     */
    public boolean isComplete(){
        for(Map.Entry<Integer,Thread> item : workers.entrySet()){
            if(item.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }
    
    /**
     * 结果集计算
     * @return
     */
    abstract V result();
    
}


测试:


package cn.thinglin.mw;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * 测试
 * @author ThingLin
 *
 */
public class Main {

    /* 线程数 */
    private static final int process = Runtime.getRuntime().availableProcessors(); 

    public static void main(String[] args) {
        
        System.out.println("线程数量:"+process);
        
        //创建Master
        Master<MyTask,Integer> master = new Master<MyTask,Integer>(new Worker<MyTask, Integer>() {

            @Override
            Integer dispose(MyTask task) {
                try {
                    TimeUnit.MICROSECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return task.getData();
            }
        }, process) {

            @Override
            Integer result() {
                int result = 0;
                int i=0;
                for(Map.Entry<String,Integer> item : this.results.entrySet()){
                    result += item.getValue();
                    i++;
                }
                System.out.println("有"+i+"个结果");
                return result;
            }
        };
        
        //提交任务
        for(int i=0;i<1000;i++){
            master.submit(new MyTask(UUID.randomUUID().toString(),i));
        }
        
        //执行任务
        long beginTime = System.currentTimeMillis();
        master.excute();
        
        while(true){
            if(master.isComplete()){
                System.out.println("耗时 :"+(System.currentTimeMillis() - beginTime));
                System.out.println("结果:"+master.result());
                break;
            }
        }
        
        
    }
    
}


/**
 * 任务
 */
class MyTask extends Task{
    
    private String taskId;
    private int data;
    
    public MyTask(String taskId,int data){
        this.taskId = taskId;
        this.data = data;
    }

    public String getTaskId() {
        return taskId;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public int getData() {
        return data;
    }

    public void setData(int data) {
        this.data = data;
    }
    
}

testmw.gif
上一篇下一篇

猜你喜欢

热点阅读