Android网络请求框架核心-线程池处理高并发
最近公司开发一个新的SDK,需要配合他们写一些上层代码,下载时不能缺少的,当然,既然是SDK就不能引用其他第三方的开源网络框架,只能自己写了,这个SDK对网络高并发有严格要求,所以需要特殊处理,查了些资料,模仿了volley编写了自己的高并发框架,下面就来简单介绍。
一、设置线程池
线程池是高并发的核心,所以我们先搞懂他。这里只是入门理解理解,毕竟能力有限:
1、系统封装的线程池:Executors类
Executors类里面封装了几个常用的线程池:
1、newCachedThreadPool:可缓存的线程池
创建一个可缓存线程池,如果线程池中线程数量超过核心线程数,可灵活回收空闲线程,若无可回收,则新建线程。
这种类型的线程池特点是:
工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2、newFixedThreadPool
创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程(因为他设置的回收时间为0),还会占用一定的系统资源。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3、newSingleThreadExecutor
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定FIFO(先进先出)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4、newScheduleThreadPool
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行
以上四种就是系统封装的,那我们可以参考他们来封装一下,直接上ThreadPoolManager的代码
import android.util.Log;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolManager {
private static ThreadPoolManager instence = null;
private ThreadPoolExecutor mThreadPoolExecutor;
//线程数量,核心线程数
private int corePoolSize = 4;
//池中允许的最大线程数
private int maximumPoolSize = 10;
//设置空闲线程的空闲时间,如果一共有6个线程,那么超出核心线程数的两个线程就会被记录时间,超过该时间就会被杀死,如果没有超过核心线程数,那么线程是不会被倒计时的。
private long keepAliveTime = 10;
//等待执行的容器容量大小
private int capacity = 10;
//拒绝后的执行任务容器--》凉快的地方
private LinkedBlockingQueue taskQueue =new LinkedBlockingQueue();
public synchronized static ThreadPoolManager getInstence() {
if (instence == null) {
synchronized (ThreadPoolManager.class) {
if (instence == instence) {
instence = new ThreadPoolManager();
}
}
}
return instence;
}
/**
* 构造方法里面就初始化线程池
* ArrayBlockingQueue是一个执行任务的容量,当调用mThreadPoolExecutor的execute,容量加1,执行run完后,容量减1
* ArrayBlockingQueue后面传入true就是以FIFO规则存储:先进先出
*/
public ThreadPoolManager(){
if(mThreadPoolExecutor==null){
mThreadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(capacity,true),handler);
}
//开启线程一直循环从等待队列里面取出可执行任务并执行
mThreadPoolExecutor.execute(runnable);
}
/**
* 往队列里面存入可执行任务
* @param runnable
*/
public void putExecutableTasks(Runnable runnable){
try {
taskQueue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* ThreadPoolExecutor的run
*/
private Runnable runnable=new Runnable() {
@Override
public void run() {
//开启循环
while(true){
//取出等待的执行任务
Runnable taskQueueRunnable = null;
try {
Log.d("yanjin","等待队列大小:"+taskQueue.size());
taskQueueRunnable = (Runnable) taskQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
if(runnable!=null){
mThreadPoolExecutor.execute(taskQueueRunnable);
}
Log.d("yanjin","线程池大小"+mThreadPoolExecutor.getPoolSize());
}
}
};
/**
* 拒绝策略
* 当ArrayBlockingQueue容量过大,就要执行拒绝策略,对来的执行任务说:放不下了,先到一边凉快去,那么就要有一个凉快的容器撞他们
*
*/
private RejectedExecutionHandler handler = new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
taskQueue.put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
名字取ThreadPoolManager,
public synchronized static ThreadPoolManager getInstence() {
if (instence == null) {
synchronized (ThreadPoolManager.class) {
if (instence == instence) {
instence = new ThreadPoolManager();
}
}
}
return instence;
}
这个代表他是个单例。在他的构造方法里面我们就初始化线程池,这样我们的线程池也只有唯一一份了
/**
* 构造方法里面就初始化线程池
* ArrayBlockingQueue是一个执行任务的容量,当调用mThreadPoolExecutor的execute,容量加1,执行run完后,容量减1
* ArrayBlockingQueue后面传入true就是以FIFO规则存储:先进先出
*/
public ThreadPoolManager(){
if(mThreadPoolExecutor==null){
mThreadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(capacity,true),handler);
}
//开启线程一直循环从等待队列里面取出可执行任务并执行
mThreadPoolExecutor.execute(runnable);
}
ThreadPoolExecutor是系统的线程池,之前我们介绍的四种系统封装的都是对他的一个分装而来的,我们点进去,看看他的参数含义,他有好几个构造方法不要弄错了
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
corePoolSize代表核心线程数、maximumPoolSize是池中允许的最大线程数、keepAliveTime是设置空闲线程的空闲时间,如果一共有6个线程,那么超出核心线程数的两个线程就会被记录时间,超过该时间就会被杀死,如果没有超过核心线程数,那么线程是不会被倒计时的。workQueue用于保存任务的队列,我们每添加一个,就执行一个,这里我们用new ArrayBlockingQueue<Runnable>(capacity,true),ArrayBlockingQueue是使用数组保的方法,capacity是容量大小,true代表按照FIFA的顺序(先进先出)执行,handler代表拒绝策略,一旦当前ArrayBlockingQueue容器满了,我们定义的是10个,那么就会调用走到handler里面。
/**
* 拒绝策略
* 当ArrayBlockingQueue容量过大,就要执行拒绝策略,对来的执行任务说:放不下了,先到一边凉快去,那么就要有 一个凉快的容器装他们
*
*/
private RejectedExecutionHandler handler = new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
taskQueue.put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
这个拒绝策略,我们只是把被拒绝的任务存储起来,taskQueue就是这个容器是LinkedBlockingQueue类型(遵循FIFA先进先出顺序的阻塞是容器),那么我们就需要开一个线程一直循环去taskQueue取出执行任务。还记得构造方法里面我们初始化了线程池,并且执行了,对!这个就是他开线程循环取的地方,就是我们已初始化就开始轮询了
//开启线程一直循环从等待队列里面取出可执行任务并执行
mThreadPoolExecutor.execute(runnable);
..........
..........
..........
/**
* ThreadPoolExecutor的run
*/
private Runnable runnable=new Runnable() {
@Override
public void run() {
//开启循环
while(true){
//取出等待的执行任务
Runnable taskQueueRunnable = null;
try {
Log.d("yanjin","等待队列大小:"+taskQueue.size());
//take: 获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞
taskQueueRunnable = (Runnable) taskQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
if(runnable!=null){
mThreadPoolExecutor.execute(taskQueueRunnable);
}
Log.d("yanjin","线程池大小"+mThreadPoolExecutor.getPoolSize());
}
}
};
这里我们用到了taskQueue.take();,,take: 获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞。那么就是有数据就执行线面的mThreadPoolExecutor.execute(taskQueueRunnable);,没有数据就一直停留。这里我记得有一个惊群效应就是外面是while循环这里面还有一个while循环判断,不能用if简单判断,take已经为我们做好了。
/**
* 往队列里面存入可执行任务
* @param runnable
*/
public void putExecutableTasks(Runnable runnable){
try {
taskQueue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
这是插入执行任务。上面的轮询会监听taskQueue是否有数据,那么我们插入一个taskQueue.put(runnable);,就会排队等待轮询。
二、自定义处理类
public class TaskImp implements Runnable {
private int age = 0;
private String name = null;
private int current_number;
public TaskImp(int age, String name, int number) {
this.age = age;
this.name = name;
this.current_number = number;
}
@Override
public void run() {
Log.d("yanjin", "第" + current_number + "个 " + name + "的年龄是" + age);
}
}
他肯定是实现Runable接口上面mThreadPoolExecutor.execute(taskQueueRunnable);执行的Runnable 其实是在这个处理类的run里面,这样我们就能在这里写了,也没有那么耦合了。
三、客户端添加执行任务
/**
* 测试并发
* @param view
*/
public void test(View view) {
ThreadPoolManager threadPoolManager = ThreadPoolManager.getInstence();
for(int x= 0; x<1000;x++){
if((x+1)%2==0){
TaskImp taskImp=new TaskImp(16,"yanjin",x+1);
threadPoolManager.putExecutableTasks(taskImp);
}else{
Task2Imp taskImp=new Task2Imp(14,"zj",x+1);
threadPoolManager.putExecutableTasks(taskImp);
}
}
}
这里我创了1000个,偶数和奇数区分传入,只要一调用threadPoolManager.putExecutableTasks(taskImp);,那就立马执行。