造轮子之 Android 多线程多任务断点续传下载器(实现篇)
上一篇 分析了实现一个 Android 多任务多线程下载器需要考虑的问题,这一篇就开始分析具体的代码实现了
Demo地址:https://github.com/4dcity/DownloadDemo
1. 下载请求 DownloadRequest
这是一个单纯的保存网络请求的不可变类,采用Builder模式,方便后期扩展更多的请求参数:
public class DownloadRequest{
@Expose private final String downloadUrl;
@Expose private final String downloadDir;
@Expose private final String downloadName;
private DownloadRequest(Builder builder) {
downloadUrl = builder.downloadUrl;
downloadDir = builder.downloadDir;
downloadName = builder.downloadName;
}
//省略Builder、getter、setter代码...
}
@Expose 注解是为了使用Gson序列化
2. 下载记录 DownloadRecord
这个类也比较简单,主要用来记录跟下载任务相关的数据。不过有几个字段 downloadState
,currentLength
,completedSubTask
会有并发读写的情况,所以需要做同步处理。另外实现了 Comparable 可以按照创建时间对任务进行排序
public class DownloadRecord implements Comparable<DownloadRecord>{
@Expose private final DownloadRequest request; //下载请求
@Expose private int downloadState; //下载状态
@Expose private int currentLength; // 已经下载的数据大小
@Expose private int fileLength; // 文件总大小
@Expose private int completedSubTask; // 完成的子任务数
@Expose private List<SubTask> subTaskList; //记录子任务的列表
@Expose private long createTime;// 任务创建时间,可以用来排序
DownloadRecord(DownloadRequest request) {
this.request = request;
subTaskList = new ArrayList<>();
downloadState = DownloadUtil.STATE_INITIAL;
createTime = System.currentTimeMillis();
}
synchronized public int getCurrentLength() {
return currentLength;
}
synchronized public int getDownloadState() {
return downloadState;
}
synchronized void setDownloadState(int downloadState) {
this.downloadState = downloadState;
}
synchronized boolean completeSubTask(){
completedSubTask++;
if(completedSubTask == subTaskList.size()){
return true;
}
return false;
}
synchronized void increaseLength(int length) {
currentLength+=length;
}
public int getProgress(){
return Math.round(getCurrentLength() / (getFileLength() * 1.0f) * 100);
}
synchronized void reset(){
currentLength = 0;
fileLength = 0;
completedSubTask = 0;
downloadState = DownloadUtil.STATE_INITIAL;
subTaskList.clear();
}
void linkSubTask(){
for (SubTask subTask : subTaskList) {
subTask.setRecord(this);
}
}
@Override
public int compareTo(@NonNull DownloadRecord o) {
if(createTime < o.getCreateTime()) return -1;
if(createTime > o.getCreateTime()) return 1;
return 0;
}
// 省略一些 getter,setter方法
...
}
3. 开启一个下载任务 DownloadTask
继承一个 AsyncTask,实现获取文件长度,并开启子线程开始下载的功能。这两步是串行操作,因为必须要获取文件长度才知道如何分配子任务。
public class DownloadTask extends AsyncTask<DownloadRecord, Integer, DownloadRecord> {
@Override
protected DownloadRecord doInBackground(DownloadRecord... params) {
DownloadRecord record = params[0];
try {
URL url = new URL(record.getDownloadUrl());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Charset", "UTF-8");
conn.setConnectTimeout(DownloadUtil.TIME_OUT);
conn.connect();
int fileLength = conn.getContentLength(); // 这里获取到文件长度
RandomAccessFile file = new RandomAccessFile(record.getFilePath(), "rwd");
file.setLength(fileLength);
record.setFileLength(fileLength); // 保存文件长度到 DownloadRecord
DownloadUtil.get().fileLengthSet(record);
return record;
} catch (IOException e) {
DownloadUtil.get().downloadFailed(record, "Get filelength failed!");
e.printStackTrace();
}
return null;
}
@Override
protected void onPostExecute(DownloadRecord record) {
if (record != null) {
//把要下载的文件分成N段,N为设置的下载子线程的数量
int blockSize = record.getFileLength() / DownloadUtil.sThreadNum;
// 初始化 N 个子任务,丢到线程池里开始执行真正的下载
for (int i = 0; i < DownloadUtil.sThreadNum; i++) {
int startL = i * blockSize;
int endL = (i + 1) * blockSize;
if (i == DownloadUtil.sThreadNum - 1)
endL = record.getFileLength();
SubTask subTask = new SubTask(record, startL, endL);
record.getSubTaskList().add(subTask);
DownloadUtil.sExecutor.execute(subTask);
}
//保存一下当前 record 的状态数据到本地存储
DownloadUtil.get().saveRecord(record);
}
}
}
4. 下载的子任务 SubTask
这里是真正执行文件下载的部分,实现为 Runnable 方便在线程池执行,并且通过在循环里判断任务状态实现任务的暂停:
public class SubTask implements Runnable {
private DownloadRecord record;
@Expose private int startLocation; // 下载文件的起点位置
@Expose private int endLocation; // 下载文件的终点位置
private InputStream is;
private RandomAccessFile file;
public SubTask(DownloadRecord record, int startLocation, int endLocation) {
this.record = record;
this.startLocation = startLocation;
this.endLocation = endLocation;
}
void setRecord(DownloadRecord record) {
this.record = record;
}
@Override
public void run() {
try {
URL url = new URL(record.getDownloadUrl());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty("Range", "bytes=" + startLocation + "-" + endLocation);
conn.setRequestMethod("GET");
conn.setRequestProperty("Charset", "UTF-8");
conn.setConnectTimeout(DownloadUtil.TIME_OUT);
conn.setReadTimeout(30 * 1000);
is = conn.getInputStream();
file = new RandomAccessFile(record.getFilePath(), "rwd");
file.seek(startLocation); // 指定开始写文件的位置
byte[] buffer = new byte[4096];
int len;
// 为了实现任务的中断,必须在循环写入的时候,判断当前任务的状态,
// 如果状态不是 STATE_DOWNLOADING,需要立即跳出循环,以实现暂停下载的功能
while (record.getDownloadState() == DownloadUtil.STATE_DOWNLOADING
&& (len = is.read(buffer)) != -1) {
file.write(buffer, 0, len);
startLocation += len;
record.increaseLength(len); // 每成功写入一段,就修改已下载的文件长度
DownloadUtil.get().progressUpdated(record);
}
// 满足这个条件,代表该子任务下载完了自己那部分的数据,需要把 DownloadRecord 里记录已完成子任务数的变量值+1
// 如果自己是最后一个完成的,那么表示整个下载任务完成
if (record.getDownloadState() == DownloadUtil.STATE_DOWNLOADING) {
if (record.completeSubTask()) {
DownloadUtil.get().taskFinished(record);
}
}
} catch (IOException exception) {
DownloadUtil.get().downloadFailed(record, "subtask failed!");
} finally {
try {
DownloadUtil.get().saveRecord(record);
file.close();
is.close();
} catch (IOException | NullPointerException e) {
e.printStackTrace();
}
}
}
}
5. 任务调度者 TaskDispatcher
维护一个无界的阻塞队列,不断的从队列头部取出待执行的任务,如果没有任务就阻塞,知道有新的任务被加入队列。
但是从队列取出任务之后还不能立即开始执行,比如可能设置了最大同时下载的任务数是5,现在达到这个上限了,那么新加入的下载任务就必须等待,直到有任务完成后,会释放信号量(后面 DownloadUtil 部分会讲)
public class TaskDispatcher extends Thread {
private BlockingQueue<DownloadRecord> mRecordQueue;
private volatile boolean mQuit = false;
public TaskDispatcher() {
mRecordQueue = new LinkedBlockingQueue<>();
}
public void quit() {
mQuit = true;
interrupt();
}
@Override
public void run() {
while (!isInterrupted()) {
try {
DownloadRecord record = mRecordQueue.take(); // 取出一个下载任务,如果队列为空就阻塞在这里
DownloadUtil.sDownloadPermit.acquire(); // 获取信号量,获取不到就阻塞在这里,直到有下载完成的任务释放一个信号量
if (record.getDownloadState() == DownloadUtil.STATE_REENQUEUE) {
// 这里对应暂停后重新启动的情形,详见后面 DownloadUtil 的分析
DownloadUtil.get().resume(record.getId());
} else {
// 这里对应新开始的任务的情形
DownloadUtil.get().start(record);
}
} catch (InterruptedException e) {
e.printStackTrace();
if (mQuit) {
return;
}
}
}
}
public void enqueueRecord(DownloadRecord record) {
mRecordQueue.add(record);
}
}
6. 持久化存储 DataHelper
这个没什么好说的,用 Gson 实现序列化和反序列化,需要注意用 @Expose 标记需要序列化的字段。也可以改用数据库实现。
7. 大总管 DownloadUtil
这个类主要有以下几个功能:
- init() 初始化各种变量
- sRecordMap 保存所有的下载任务
- 暴露接口给调用者发起下载请求(直接调用 enqueue() 方法)
- 通过任务 id 控制任务的执行状态(pasue(),resume() 等)
- 供调用者注册回调,监听任务状态
有以下需要注意的地方:
- setMaxConcurrentTask() 可以设置最多同时下载的任务数,但是必须在 init() 方法之前调用,不支持初始化后动态修改。因为这涉及到信号量的动态增减,会增加很多复杂性。
- 退出的时候(比如在 Activity 的 onDestroy() 方法里)一定要调用 destroy() 方法,不然很多静态对象都没法释放
public class DownloadUtil {
// 省略一大堆常量 ...
static ExecutorService sExecutor;
static Map<String, DownloadRecord> sRecordMap;
static Semaphore sDownloadPermit;
static int sThreadNum;
private static DownloadUtil instance;
private TaskDispatcher mTaskDispatcher;
private LocalBroadcastManager mBroadcastManager;
private DataHelper mDataHelper;
private boolean initialized = false;
private BroadcastReceiver mReceiver;
/**
* 使用前必须调用初始化方法
*/
public DownloadUtil init(Context context) {
if (initialized) return instance; // 如果已经调用过就直接返回
mBroadcastManager = LocalBroadcastManager.getInstance(context.getApplicationContext());
mDataHelper = new DataHelper(context.getApplicationContext());
loadAll();
initialized = true;
return instance;
}
/**
* 必须在init之前调用
* @param number 能够同时下载的任务数
*/
public DownloadUtil setMaxConcurrentTask(int number) {
if (initialized) return instance;
sDownloadPermit = new Semaphore(number);
return instance;
}
private DownloadUtil() {
sRecordMap = new LinkedHashMap<>();
sExecutor = Executors.newCachedThreadPool();
sDownloadPermit = new Semaphore(DEFAULT_TASK_AMOUNT);
sThreadNum = DEFAULT_THREAD_AMOUNT;
mTaskDispatcher = new TaskDispatcher();
mTaskDispatcher.start();
}
/**
* 单例模式
*/
public static DownloadUtil get() {
if (instance == null) {
synchronized (DownloadUtil.class) {
if (instance == null)
instance = new DownloadUtil();
}
}
return instance;
}
/**
* 发起一个下载请求,发起成功会返回对应下载任务的id
* @param request
* @return
*/
public String enqueue(DownloadRequest request) {
if (!checkRequest(request)) {
return null;
}
DownloadRecord record = new DownloadRecord(request);
sRecordMap.put(request.getId(), record);
mTaskDispatcher.enqueueRecord(record);
DownloadUtil.get().newTaskAdd(record);
return request.getId();
}
/**
* 只针对 STATE_REENQUEUE 和 STATE_DOWNLOADING 的任务有效,
* 其他情况返回 false
*/
public boolean pause(String taskId) {
DownloadRecord record = sRecordMap.get(taskId);
if (record != null) {
if (record.getDownloadState() == STATE_INITIAL
|| record.getDownloadState() == STATE_PAUSED
|| record.getDownloadState() == STATE_CANCELED
|| record.getDownloadState() == STATE_FAILED
|| record.getDownloadState() == STATE_FINISHED)
return false;
sRecordMap.get(taskId).setDownloadState(STATE_PAUSED);
sDownloadPermit.release();
Intent intent = new Intent(ACTION_PAUSED);
intent.putExtra(EXTRA_TASK_ID, taskId);
mBroadcastManager.sendBroadcast(intent);
return true;
}
return false;
}
public boolean resume(String taskId) {
if (sRecordMap.get(taskId) != null) {
DownloadRecord record = sRecordMap.get(taskId);
record.setDownloadState(STATE_DOWNLOADING);
Intent intent = new Intent(ACTION_RESUME);
intent.putExtra(EXTRA_TASK_ID, taskId);
mBroadcastManager.sendBroadcast(intent);
for (int i = 0; i < record.getSubTaskList().size(); i++) {
sExecutor.execute(record.getSubTaskList().get(i));
}
return true;
}
return false;
}
/**
* 注册回调监听,通过内部的 BroadcastReceiver 分发事件
* 也可以调用者自己注册 BroadcastReceiver 监听事件
*/
public void registerListener(Context context, final DownloadListener listener) {
if (listener == null)
return;
IntentFilter filter = new IntentFilter();
filter.addAction(ACTION_NEW_TASK_ADD);
filter.addAction(ACTION_START);
filter.addAction(ACTION_FILE_LENGTH_GET);
filter.addAction(ACTION_PROGRESS);
filter.addAction(ACTION_PAUSED);
filter.addAction(ACTION_REENQUEUE);
filter.addAction(ACTION_RESUME);
filter.addAction(ACTION_FINISHED);
filter.addAction(ACTION_FAILED);
mReceiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
DownloadRecord record = DownloadUtil.parseRecord(intent);
switch (intent.getAction()) {
case ACTION_NEW_TASK_ADD:
listener.onNewTaskAdd(record);
break;
case ACTION_START:
listener.onStart(record);
break;
case ACTION_FILE_LENGTH_GET:
listener.onFileLengthGet(record);
break;
case ACTION_PROGRESS:
listener.onProgress(record);
break;
case ACTION_PAUSED:
listener.onPaused(record);
break;
case ACTION_RESUME:
listener.onResume(record);
break;
case ACTION_REENQUEUE:
listener.onReEnqueue(record);
break;
case ACTION_CANCELED:
listener.onCanceled(record);
break;
case ACTION_FAILED:
listener.onFailed(record, intent.getStringExtra(EXTRA_ERROR_MSG));
break;
case ACTION_FINISHED:
listener.onFinish(record);
break;
}
}
};
LocalBroadcastManager.getInstance(context).registerReceiver(mReceiver, filter);
}
public void destroy() {
mBroadcastManager.unregisterReceiver(mReceiver);
mTaskDispatcher.quit();
stopAllTask();
saveAll();
sRecordMap.clear();
sRecordMap = null;
sDownloadPermit = null;
sExecutor = null;
instance = null;
initialized = false;
}
//省略一大堆其他方法 ...
}
8. 总结
实现一个多任务多线程支持断点续传的下载管理器,最大的收获是熟悉了很多多线程中的概念,同步,信号量,线程池,阻塞队列,线程中断等,真的是掌握多线程的一个完美的练手项目。
最后项目还有很多需要完善的地方,比如可以把 DownloadUtil 封装到 Service 里面,以及一些健壮性上的考虑,后期再继续完善吧。