多线程应用设计 Master-Worker模式
2017-06-03 本文已影响56人
ThingLin
Master-Worker模式,Master线程接收任务分配给Worker线程并统计Worker线程执行结果,Worker线程真正处理任务。
image.pngTask定义
package cn.thinglin.mw;
public abstract class Task {
private String taskId;
public String getTaskId(){
return this.taskId;
}
}
Worker的实现
package cn.thinglin.mw;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Worker
* @author ThingLin
*
* @param <T> 任务
* @param <V> 结果
*/
public abstract class Worker<T extends Task,V> implements Runnable {
/* 任务队列 */
protected ConcurrentLinkedQueue<T> tasks = new ConcurrentLinkedQueue<T>();
/* 结果集 */
protected ConcurrentHashMap<String,V> results = new ConcurrentHashMap<String,V>();
@Override
public void run() {
while(true){
T task = this.tasks.poll(); //取出任务
if(null == task){ //没有任务可执行跳出循环
break;
}
V v = dispose(task);
this.results.put(task.getTaskId(), v);
}
}
public void setTasks(ConcurrentLinkedQueue<T> tasks){
this.tasks = tasks;
}
public void setResults(ConcurrentHashMap<String,V> results){
this.results = results;
}
/**
* 处理任务
* @param task
* @return
*/
abstract V dispose(T task);
}
Master的实现
package cn.thinglin.mw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Master
* @author ThingLin
*
* @param <T> 任务
* @param <V> 结果
*/
public abstract class Master<T extends Task,V> {
/* 任务队列 */
protected ConcurrentLinkedQueue<T> tasks = new ConcurrentLinkedQueue<T>();
/* Worker管理 */
protected HashMap<Integer,Thread> workers = new HashMap<Integer,Thread>();
/* 结果集 */
protected ConcurrentHashMap<String,V> results = new ConcurrentHashMap<String,V>();
public Master(Worker<T,V> worker,int workerCount){
worker.setTasks(this.tasks);
worker.setResults(this.results);
for(int i=0;i<workerCount;i++){
this.workers.put(i, new Thread(worker,String.format("%s,%d","worker",i)));
}
}
public void submit(T task){
tasks.add(task);
}
public void excute(){
for(Map.Entry<Integer,Thread> item : workers.entrySet()){
item.getValue().start();
}
}
/**
*
* @return 线程全部执行完成时任务执行完true
*/
public boolean isComplete(){
for(Map.Entry<Integer,Thread> item : workers.entrySet()){
if(item.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
/**
* 结果集计算
* @return
*/
abstract V result();
}
测试:
package cn.thinglin.mw;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 测试
* @author ThingLin
*
*/
public class Main {
/* 线程数 */
private static final int process = Runtime.getRuntime().availableProcessors();
public static void main(String[] args) {
System.out.println("线程数量:"+process);
//创建Master
Master<MyTask,Integer> master = new Master<MyTask,Integer>(new Worker<MyTask, Integer>() {
@Override
Integer dispose(MyTask task) {
try {
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return task.getData();
}
}, process) {
@Override
Integer result() {
int result = 0;
int i=0;
for(Map.Entry<String,Integer> item : this.results.entrySet()){
result += item.getValue();
i++;
}
System.out.println("有"+i+"个结果");
return result;
}
};
//提交任务
for(int i=0;i<1000;i++){
master.submit(new MyTask(UUID.randomUUID().toString(),i));
}
//执行任务
long beginTime = System.currentTimeMillis();
master.excute();
while(true){
if(master.isComplete()){
System.out.println("耗时 :"+(System.currentTimeMillis() - beginTime));
System.out.println("结果:"+master.result());
break;
}
}
}
}
/**
* 任务
*/
class MyTask extends Task{
private String taskId;
private int data;
public MyTask(String taskId,int data){
this.taskId = taskId;
this.data = data;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public int getData() {
return data;
}
public void setData(int data) {
this.data = data;
}
}
testmw.gif