线程池+队列并发处理

2019-01-08  本文已影响0人  舒尔诚

package com.mqtt.thread;

import java.util.Hashtable;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;

/**

private  Hashtable<String, Object>  tables=new Hashtable<String, Object>();

/* @Autowired
DBThread dBThread;
*/

public Hashtable<String, Object> getTables() {
    return tables;
}

public void setTables(Hashtable<String, Object> tables) {
    this.tables = tables;
}

public ThreadPoolManager(Hashtable<String, Object>  tables ) {
    this.tables=tables;
}

public ThreadPoolManager() {
    super();
}


//由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("太忙了,把该订单交给调度线程池逐一处理" +r.toString()+"   "+
    //((DBThread) r).getMsg()+
    "--size="+msgQueue.size()+"---tsize="+threadPool.getQueue().size());
       
        
        //msgQueue.offer(dBThread.getMsg());
        msgQueue.offer(r);
    }
};

// 订单线程池
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
        TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);

// 调度线程池。此线程池支持定时以及周期性执行任务的需求。
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

// 访问消息缓存的调度线程,每秒执行一次
// 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        if (!msgQueue.isEmpty()) {
            if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                System.out.print("调度:");
               /* String orderId = (String) msgQueue.poll();
                DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
                accessDBThread.setMsg(orderId);
                threadPool.execute(accessDBThread);*/
                DBThread dt = (DBThread) msgQueue.poll();
                threadPool.execute(dt);
                
            }
             /*while (msgQueue.peek() != null) {
             }*/
        }
    }
}, 0, 10, TimeUnit.MILLISECONDS);//.SECONDS

//终止订单线程池+调度线程池
public void shutdown() {
    //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止
    System.out.println(taskHandler.cancel(false));
    scheduler.shutdown();
    threadPool.shutdown();
}

public Queue<Object> getMsgQueue() {
    return msgQueue;
}


//将任务加入订单线程池
public void processOrders(String orderId) {
    if (cacheMap.get(orderId) == null) {
        cacheMap.put(orderId,new Object());
        
        DBThread dt = this.getNewThread(orderId, orderId);
        threadPool.execute(dt);
        
    }
}


/**
 * 无状态返回
 * @param topic
 * @param msg
 */
public void processOrders2(String topic,String msg ) {
 
    DBThread dt = this.getNewThread(topic, msg);
    threadPool.execute(dt);
    
        
}

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    factory = beanFactory;
}

public DBThread getNewThread(String topic,String msg ){
    DBThread db=new DBThread(tables);
    db.setMsg(msg);
    db.setTopic(topic);
    return db;
}

public static void main(String[] agrs){

noReturnTest();

}



public static void noReturnTest(){

    Hashtable<String, Object>  tables=new Hashtable<String, Object>();
        ThreadPoolManager tmp2=new ThreadPoolManager(tables);
        int size=1000;
        long start = System.currentTimeMillis();
        
        for (int i = 0; i < size; i++) {
            //模拟并发500条记录            
            tmp2.processOrders2("for-"+i, i+"");
        
        }
        
        //等待执行完,处理完的个数等于总数则完成
        while(tables.size()<size){
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
            
            //等待
            System.out.println("----等待完成,已处理:"+tables.size()+"~~~"+tmp2.getTables().size());
            
        }
        for(Map.Entry<String, Object> entry: tables.entrySet()){
            System.out.println("key---------"+entry.getKey()+"  "+"value--------"+entry.getValue());
        }
        long end = System.currentTimeMillis();
        
        System.out.println("-----------处理完毕----------------"+(end-start));
        tmp2.shutdown();
        System.out.println("----------------00000000000000000--------------");
    
}

}

-----------------线程类--------------------------------------------
package com.mqtt.thread;

import java.io.IOException;
import java.util.Hashtable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import com.mqtt.AlibabaMQTTSendRecive;

//线程池中工作的线程
@Component
@Scope("prototype")//spring 多例
public class DBThread implements Runnable {
private String msg;
private String topic ;
private Logger log = LoggerFactory.getLogger(DBThread.class);
private Hashtable<String, Object> tables=new Hashtable<String, Object>();

public DBThread(Hashtable<String, Object>tables ) {
this.tables=tables;
}
public DBThread() {
super();
}

public DBThread(String msg, String topic) {
super();
this.msg = msg;
this.topic = topic;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

@Override
public void run() {
//模拟在数据库插入数据
log.info("发送消息线程进程 DBThread= "+ this.toString()+ " ---- ->" + msg+"");
System.out.println("发送消息线程进程 DBThread= "+ this.toString()+ " ---- ->" + msg+"");

  try {
        Thread.sleep(3000);
                    //TODO dosomething
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
  
  tables.put(topic, msg);//线程执行完毕,消息反馈

}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}

上一篇 下一篇

猜你喜欢

热点阅读