OkDownload源码分析
OkDownload是一款多线程断点续传下载引擎,它的功能完整,性能高,可配置性高,可以注入自定义组件来修改下载策略、替换网络请求框架等等,而且在项目中已有成熟应用(英语流利说),是一个很不错的开源下载框架。
项目地址:https://github.com/lingochamp/okdownload
OkDownload的简单使用
OkDownload的使用非常简单:
1.引入该开源库:
implementation 'com.liulishuo.okdownload:okhttp:1.0.5' (提供okhttp连接,ps:如果使用的话,需要引入okhttp网络请求库)
implementation 'com.liulishuo.okdownload:okdownload:1.0.5' (下载核心库)
implementation 'com.liulishuo.okdownload:sqlite:1.0.5' (存储断点信息的数据库)
2.开始一个任务:
task = new DownloadTask.Builder(url, parentFile)
.setFilename(filename)
// 下载进度回调的间隔时间(毫秒)
.setMinIntervalMillisCallbackProcess(30)
// 任务过去已完成是否要重新下载
.setPassIfAlreadyCompleted(false)
.build();
//异步执行任务
task.enqueue(listener);
// 取消任务
task.cancel();
// 同步执行任务
task.execute(listener);
当然也可以同时异步执行多个任务
DownloadTask.enqueue(tasks, listener);
3.任务队列的构建、开始和停止
DownloadContext.Builder builder = new DownloadContext.QueueSet()
.setParentPathFile(parentFile)
.setMinIntervalMillisCallbackProcess(150)
.commit();
builder.bind(url1);
builder.bind(url2).addTag(key, value);
builder.bind(url3).setTag(tag);
builder.setListener(contextListener);
DownloadTask task = new DownloadTask.Builder(url4, parentFile)
.setPriority(10).build();
builder.bindSetTask(task);
DownloadContext context = builder.build();
context.startOnParallel(listener);
// stop
context.stop();
4.获取任务状态
Status status = StatusUtil.getStatus(task)
status = StatusUtil.getStatus(url, parentPath, null);
status = StatusUtil.getStatus(url, parentPath, filename);
boolean isCompleted = StatusUtil.isCompleted(task);
isCompleted = StatusUtil.isCompleted(url, parentPath, null);
isCompleted = StatusUtil.isCompleted(url, parentPath, filename);
Status completedOrUnknown = StatusUtil.isCompletedOrUnknown(task);
5.获取断点信息
// 注意:任务完成后,断点信息将会被删除
BreakpointInfo info = OkDownload.with().breakpointStore().get(id);
info = StatusUtil.getCurrentInfo(url, parentPath, null);
info = StatusUtil.getCurrentInfo(url, parentPath, filename);
// 断点信息将被缓存在任务对象中,即使任务已经完成了
info = task.getInfo();
6.设置任务监听
可以为任务设置五种不同类型的监听器,同时,也可以给任务和监听器建立1对1、1对多、多对1、多对多的关联。
给一个任务设置多种监听:
DownloadListener listener1 = new DownloadListener1();
DownloadListener listener2 = new DownloadListener2();
DownloadListener combinedListener = new DownloadListenerBunch.Builder()
.append(listener1)
.append(listener2)
.build();
DownloadTask task = new DownloadTask.build(url, file).build();
task.enqueue(combinedListener);
为多个任务动态设置监听:
UnifiedListenerManager manager = new UnifiedListenerManager();
DownloadListener listener1 = new DownloadListener1();
DownloadListener listener2 = new DownloadListener2();
DownloadListener listener3 = new DownloadListener3();
DownloadListener listener4 = new DownloadListener4();
DownloadTask task = new DownloadTask.build(url, file).build();
manager.attachListener(task, listener1);
manager.attachListener(task, listener2);
manager.detachListener(task, listener2);
// 当一个任务结束时,这个任务的所有监听器都被移除
manager.addAutoRemoveListenersWhenTaskEnd(task.getId());
// enqueue task to start.
manager.enqueueTaskWithUnifiedListener(task, listener3);
manager.attachListener(task, listener4);
下面我们来分析一下这个下载框架的源码:
OkDownload
首先看一下OkDownload这个类,这个类定义了所有的下载策略,我们可以自定义一些下载策略,可以通过OkDownload的Builder构造自定义的一个OkDownload实例,再通过OkDownload.setSingletonInstance进行设置:
OkDownload.Builder builder = new OkDownload.Builder(context)
.downloadStore(downloadStore) //断点信息存储的位置,默认是SQLite数据库
.callbackDispatcher(callbackDispatcher) //监听回调分发器,默认在主线程回调
.downloadDispatcher(downloadDispatcher) //下载管理机制,最大下载任务数、同步异步执行下载任务的处理
.connectionFactory(connectionFactory) //选择网络请求框架,默认是OkHttp
.outputStreamFactory(outputStreamFactory) //构建文件输出流DownloadOutputStream,是否支持随机位置写入
.downloadStrategy(downloadStrategy) //下载策略,文件分为几个线程下载
.processFileStrategy(processFileStrategy) //多文件写文件的方式,默认是根据每个线程写文件的不同位置,支持同时写入。
.monitor(monitor); //下载状态监听
OkDownload.setSingletonInstance(builder.build());
DownloadTask
DownloadTask下载任务类,可通过它的Builder来构造一个下载任务,我们看它是如何执行的:
public void execute(DownloadListener listener) {
this.listener = listener;
OkDownload.with().downloadDispatcher().execute(this);
}
public void enqueue(DownloadListener listener) {
this.listener = listener;
OkDownload.with().downloadDispatcher().enqueue(this);
}
可以看到都是通过downloadDispatcher来执行下载任务的,默认的downloadDispatcher是一个DownloadDispatcher实例,我们以同步执行一个下载任务为例,看它是如何下载的:
public void execute(DownloadTask task) {
Util.d(TAG, "execute: " + task);
final DownloadCall call;
synchronized (this) {
if (inspectCompleted(task)) return;
if (inspectForConflict(task)) return;
call = DownloadCall.create(task, false, store);
runningSyncCalls.add(call);
}
syncRunCall(call);
}
void syncRunCall(DownloadCall call) {
call.run();
}
在execute方法里将一个DownloadTask实例又封装为了一个DownloadCall对象,然后在syncRunCall方法里执行了DownloadCall对象的run方法。通过看DownloadCall源码可以知道该类继承自NamedRunnable,而NamedRunnable实现了Runnable,在run方法里调用了execute方法。(enqueue执行任务最终则是调用 getExecutorService().execute(call);来异步执行的)
那我们看一下DownloadCall这个类。
DownloadCall
先看一下DownloadCall是如何实现execute方法的,该方法比较长,首先执行的是inspectTaskStart:
先看一下这个store是什么:
通过看OkDownload这个类的源码可以知道,DownloadCall的store是调用BreakpointStoreOnSQLite的createRemitSelf方法生成的一个实例:
可以看到是RemitStoreOnSQLite的一个实例,其主要用来保存任务及断点信息至本地数据库。RemitStoreOnSQLite里持有BreakpointStoreOnSQLite对象,BreakpointStoreOnSQLite里面包含了BreakpointSQLiteHelper(用于操作数据)和BreakpointStoreOnCache(用于做数据操作之前的数据缓存)。
@Override public void syncCacheToDB(int id) throws IOException {
sqLiteHelper.removeInfo(id);
final BreakpointInfo info = sqliteCache.get(id);
if (info == null || info.getFilename() == null || info.getTotalOffset() <= 0) return;
sqLiteHelper.insert(info);
}
最终会调用上述syncCacheToDB方法,先删除数据库中的任务信息,若缓存(创建BreakpointStoreOnCache对象时,会调用loadToCache方法将数据库中所有任务信息进行缓存)
this.onCache = new BreakpointStoreOnCache(helper.loadToCache(),
helper.loadResponseFilenameToMap());
中有该任务,则检查任务信息是否合法,若合法则再次将该任务及断点信息保存在本地数据库中。
inspectTaskStart方法结束后,会进入一个do-while循环,首先做一些下载前的准备工作:
do {
//1.判断当前任务的下载链接长度是否大于0,否则就抛出异常;
if (task.getUrl().length() <= 0) {
this.cache = new DownloadCache.PreError(
new IOException("unexpected url: " + task.getUrl()));
break;
}
if (canceled) break;
//2.从缓存中获取任务的断点信息,若没有断点信息,则创建断点信息并保存至数据库;
@NonNull final BreakpointInfo info;
try {
BreakpointInfo infoOnStore = store.get(task.getId());
if (infoOnStore == null) {
info = store.createAndInsert(task);
} else {
info = infoOnStore;
}
setInfoToTask(info);
} catch (IOException e) {
this.cache = new DownloadCache.PreError(e);
break;
}
if (canceled) break;
// 3.创建带缓存的下载输出流;
@NonNull final DownloadCache cache = createCache(info);
this.cache = cache;
// 4.访问下载链接判断断点信息是否合理;
final BreakpointRemoteCheck remoteCheck = createRemoteCheck(info);
try {
remoteCheck.check();
} catch (IOException e) {
cache.catchException(e);
break;
}
//5.确定文件路径后等待文件锁释放;
fileStrategy.getFileLock().waitForRelease(task.getFile().getAbsolutePath());
// 6. 判断缓存中是否有相同的任务,若有则复用缓存中的任务的分块信息;
OkDownload.with().downloadStrategy()
.inspectAnotherSameInfo(task, info, remoteCheck.getInstanceLength());
try {
//7.检查断点信息是否是可恢复的,若不可恢复,则根据文件大小进行分块,重新下载,否则继续进行下一步;
if (remoteCheck.isResumable()) {
// 8.判断断点信息是否是脏数据(文件存在且断点信息正确且下载链接支持断点续传);
final BreakpointLocalCheck localCheck = createLocalCheck(info,
remoteCheck.getInstanceLength());
localCheck.check();
// 9.若是脏数据则根据文件大小进行分块,重新开始下载,否则从断点位置开始下载;
if (localCheck.isDirty()) {
Util.d(TAG, "breakpoint invalid: download from beginning because of "
+ "local check is dirty " + task.getId() + " " + localCheck);
fileStrategy.discardProcess(task);
assembleBlockAndCallbackFromBeginning(info, remoteCheck,
localCheck.getCauseOrThrow());
} else {
okDownload.callbackDispatcher().dispatch()
.downloadFromBreakpoint(task, info);
}
} else {
Util.d(TAG, "breakpoint invalid: download from beginning because of "
+ "remote check not resumable " + task.getId() + " " + remoteCheck);
fileStrategy.discardProcess(task);
assembleBlockAndCallbackFromBeginning(info, remoteCheck,
remoteCheck.getCauseOrThrow());
}
} catch (IOException e) {
cache.setUnknownError(e);
break;
}
// 10. 开始下载
start(cache, info);
if (canceled) break;
// 11. 错误重试机制
if (cache.isPreconditionFailed()
&& retryCount++ < MAX_COUNT_RETRY_FOR_PRECONDITION_FAILED) {
store.remove(task.getId());
retry = true;
} else {
retry = false;
}
} while (retry);
1.判断当前任务的下载链接长度是否大于0,否则就抛出异常;2.从缓存中获取任务的断点信息,若没有断点信息,则创建断点信息并保存至数据库;3.创建带缓存的下载输出流;4.访问下载链接判断断点信息是否合理;5.确定文件路径后等待文件锁释放; 6. 判断缓存中是否有相同的任务,若有则复用缓存中的任务的分块信息;7.检查断点信息是否是可恢复的,若不可恢复,则根据文件大小进行分块,重新下载,否则继续进行下一步;8.判断断点信息是否是脏数据(文件存在且断点信息正确且下载链接支持断点续传);9.若是脏数据则根据文件大小进行分块,重新开始下载,否则从断点位置开始下载;10.开始下载。
文件分成多少块进行下载由DownloadStrategy决定的:
// 1 connection: [0, 1MB)
private static final long ONE_CONNECTION_UPPER_LIMIT = 1024 * 1024; // 1MiB
// 2 connection: [1MB, 5MB)
private static final long TWO_CONNECTION_UPPER_LIMIT = 5 * 1024 * 1024; // 5MiB
// 3 connection: [5MB, 50MB)
private static final long THREE_CONNECTION_UPPER_LIMIT = 50 * 1024 * 1024; // 50MiB
// 4 connection: [50MB, 100MB)
private static final long FOUR_CONNECTION_UPPER_LIMIT = 100 * 1024 * 1024; // 100MiB
public ResumeAvailableResponseCheck resumeAvailableResponseCheck(
DownloadConnection.Connected connected,
int blockIndex,
BreakpointInfo info) {
return new ResumeAvailableResponseCheck(connected, blockIndex, info);
}
public int determineBlockCount(@NonNull DownloadTask task, long totalLength) {
if (task.getSetConnectionCount() != null) return task.getSetConnectionCount();
if (totalLength < ONE_CONNECTION_UPPER_LIMIT) {
return 1;
}
if (totalLength < TWO_CONNECTION_UPPER_LIMIT) {
return 2;
}
if (totalLength < THREE_CONNECTION_UPPER_LIMIT) {
return 3;
}
if (totalLength < FOUR_CONNECTION_UPPER_LIMIT) {
return 4;
}
return 5;
}
文件大小在0-1MB、1-5MB、5-50MB、50-100MB、100MB以上时分别开启1、2、3、4、5个线程进行下载。
我们重点看一下下载部分的源码,也就是start(cache,info)方法:
void start(final DownloadCache cache, BreakpointInfo info) throws InterruptedException {
final int blockCount = info.getBlockCount();
final List<DownloadChain> blockChainList = new ArrayList<>(info.getBlockCount());
for (int i = 0; i < blockCount; i++) {
final BlockInfo blockInfo = info.getBlock(i);
if (Util.isCorrectFull(blockInfo.getCurrentOffset(), blockInfo.getContentLength())) {
continue;
}
Util.resetBlockIfDirty(blockInfo);
blockChainList.add(DownloadChain.createChain(i, task, info, cache, store));
}
if (canceled) {
return;
}
startBlocks(blockChainList);
}
可以看到它是分块下载的,每一个分块都是一个DownloadChain实例,DownloadChain实现了Runnable接口,继续看startBlocks方法:
对于每一个分块任务,都调用了submitChain方法,由一个线程池去处理每一个DownloadChain分块,核心代码就在这里:
void start() throws IOException {
final CallbackDispatcher dispatcher = OkDownload.with().callbackDispatcher();
// 处理请求拦截链
final RetryInterceptor retryInterceptor = new RetryInterceptor();
final BreakpointInterceptor breakpointInterceptor = new BreakpointInterceptor();
connectInterceptorList.add(retryInterceptor);
connectInterceptorList.add(breakpointInterceptor);
connectInterceptorList.add(new RedirectInterceptor());
connectInterceptorList.add(new HeaderInterceptor());
connectInterceptorList.add(new CallServerInterceptor());
connectIndex = 0;
final DownloadConnection.Connected connected = processConnect();
if (cache.isInterrupt()) {
throw InterruptException.SIGNAL;
}
dispatcher.dispatch().fetchStart(task, blockIndex, getResponseContentLength());
// 获取数据拦截链
final FetchDataInterceptor fetchDataInterceptor =
new FetchDataInterceptor(blockIndex, connected.getInputStream(),
getOutputStream(), task);
fetchInterceptorList.add(retryInterceptor);
fetchInterceptorList.add(breakpointInterceptor);
fetchInterceptorList.add(fetchDataInterceptor);
fetchIndex = 0;
final long totalFetchedBytes = processFetch();
dispatcher.dispatch().fetchEnd(task, blockIndex, totalFetchedBytes);
}
可以看到它主要使用责任链模式进行了两个链式调用:处理请求拦截链和获取数据拦截链。
处理请求拦截链包含了RetryInterceptor重试拦截器、BreakpointInterceptor断点拦截器、RedirectInterceptor重定向拦截器、HeaderInterceptor头部信息处理拦截器、CallServerInterceptor请求拦截器,该链式调用过程会逐个调用拦截器的interceptConnect方法:
public class RetryInterceptor implements Interceptor.Connect, Interceptor.Fetch {
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
final DownloadCache cache = chain.getCache();
// 如果产生了RetryException,则重新执行该链式调用
while (true) {
try {
if (cache.isInterrupt()) {
throw InterruptException.SIGNAL;
}
return chain.processConnect();
} catch (IOException e) {
if (e instanceof RetryException) {
chain.resetConnectForRetry();
continue;
}
chain.getCache().catchException(e);
throw e;
}
}
}
......
}
public class BreakpointInterceptor implements Interceptor.Connect, Interceptor.Fetch {
private static final String TAG = "BreakpointInterceptor";
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
final DownloadConnection.Connected connected = chain.processConnect();
final BreakpointInfo info = chain.getInfo();
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
if (info.getBlockCount() == 1 && !info.isChunked()) {
// 当只有一个线程进行下载文件时,如果断点信息中保存的文件长度和服务端返回的文件长度不一致,则以服务端返回的为准重新进行下载
final long blockInstanceLength = getExactContentLengthRangeFrom0(connected);
final long infoInstanceLength = info.getTotalLength();
if (blockInstanceLength > 0 && blockInstanceLength != infoInstanceLength) {
Util.d(TAG, "SingleBlock special check: the response instance-length["
+ blockInstanceLength + "] isn't equal to the instance length from trial-"
+ "connection[" + infoInstanceLength + "]");
final BlockInfo blockInfo = info.getBlock(0);
boolean isFromBreakpoint = blockInfo.getRangeLeft() != 0;
final BlockInfo newBlockInfo = new BlockInfo(0, blockInstanceLength);
info.resetBlockInfos();
info.addBlock(newBlockInfo);
if (isFromBreakpoint) {
final String msg = "Discard breakpoint because of on this special case, we have"
+ " to download from beginning";
Util.w(TAG, msg);
throw new RetryException(msg);
}
OkDownload.with().callbackDispatcher().dispatch()
.downloadFromBeginning(chain.getTask(), info, CONTENT_LENGTH_CHANGED);
}
}
// update for connected.
final DownloadStore store = chain.getDownloadStore();
try {
if (!store.update(info)) {
throw new IOException("Update store failed!");
}
} catch (Exception e) {
throw new IOException("Update store failed!", e);
}
return connected;
}
......
}
public class RedirectInterceptor implements Interceptor.Connect {
//最大重定向次数
static final int MAX_REDIRECT_TIMES = 10;
private static final int HTTP_TEMPORARY_REDIRECT = 307;
private static final int HTTP_PERMANENT_REDIRECT = 308;
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
int redirectCount = 0;
String url;
DownloadConnection connection;
while (true) {
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
final DownloadConnection.Connected connected = chain.processConnect();
final int code = connected.getResponseCode();
if (!isRedirect(code)) {
return connected;
}
//若需要重定向,则根据返回的新的url重新进行网络请求
if (++redirectCount >= MAX_REDIRECT_TIMES) {
throw new ProtocolException("Too many redirect requests: " + redirectCount);
}
url = connected.getResponseHeaderField("Location");
if (url == null) {
throw new ProtocolException(
"Response code is " + code + " but can't find Location field");
}
chain.releaseConnection();
connection = OkDownload.with().connectionFactory().create(url);
chain.setConnection(connection);
chain.setRedirectLocation(url);
}
}
private static boolean isRedirect(int code) {
return code == HttpURLConnection.HTTP_MOVED_PERM
|| code == HttpURLConnection.HTTP_MOVED_TEMP
|| code == HttpURLConnection.HTTP_SEE_OTHER
|| code == HttpURLConnection.HTTP_MULT_CHOICE
|| code == HTTP_TEMPORARY_REDIRECT
|| code == HTTP_PERMANENT_REDIRECT;
}
}
public class HeaderInterceptor implements Interceptor.Connect {
private static final String TAG = "HeaderInterceptor";
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
final BreakpointInfo info = chain.getInfo();
final DownloadConnection connection = chain.getConnectionOrCreate();
final DownloadTask task = chain.getTask();
// 添加User-Agent字段
final Map<String, List<String>> userHeader = task.getHeaderMapFields();
if (userHeader != null) Util.addUserRequestHeaderField(userHeader, connection);
if (userHeader == null || !userHeader.containsKey(USER_AGENT)) {
Util.addDefaultUserAgent(connection);
}
//添加Range字段
final int blockIndex = chain.getBlockIndex();
final BlockInfo blockInfo = info.getBlock(blockIndex);
if (blockInfo == null) {
throw new IOException("No block-info found on " + blockIndex);
}
String range = "bytes=" + blockInfo.getRangeLeft() + "-";
range += blockInfo.getRangeRight();
connection.addHeader(RANGE, range);
Util.d(TAG, "AssembleHeaderRange (" + task.getId() + ") block(" + blockIndex + ") "
+ "downloadFrom(" + blockInfo.getRangeLeft() + ") currentOffset("
+ blockInfo.getCurrentOffset() + ")");
// 如果有Etag信息,则添加If-Match字段
final String etag = info.getEtag();
if (!Util.isEmpty(etag)) {
connection.addHeader(IF_MATCH, etag);
}
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
OkDownload.with().callbackDispatcher().dispatch()
.connectStart(task, blockIndex, connection.getRequestProperties());
DownloadConnection.Connected connected = chain.processConnect();
Map<String, List<String>> responseHeaderFields = connected.getResponseHeaderFields();
if (responseHeaderFields == null) responseHeaderFields = new HashMap<>();
OkDownload.with().callbackDispatcher().dispatch().connectEnd(task, blockIndex,
connected.getResponseCode(), responseHeaderFields);
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
// 检查Etag字段是否一致
final DownloadStrategy strategy = OkDownload.with().downloadStrategy();
final DownloadStrategy.ResumeAvailableResponseCheck responseCheck =
strategy.resumeAvailableResponseCheck(connected, blockIndex, info);
responseCheck.inspect();
//获取Content-Length、Content-Range字段信息
final long contentLength;
final String contentLengthField = connected.getResponseHeaderField(CONTENT_LENGTH);
if (contentLengthField == null || contentLengthField.length() == 0) {
final String contentRangeField = connected.getResponseHeaderField(CONTENT_RANGE);
contentLength = Util.parseContentLengthFromContentRange(contentRangeField);
} else {
contentLength = Util.parseContentLength(contentLengthField);
}
chain.setResponseContentLength(contentLength);
return connected;
}
}
public class CallServerInterceptor implements Interceptor.Connect {
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
OkDownload.with().downloadStrategy().inspectNetworkOnWifi(chain.getTask());
OkDownload.with().downloadStrategy().inspectNetworkAvailable();
\\进行网络请求,获得响应
return chain.getConnectionOrCreate().execute();
}
}
获取数据拦截链包含了RetryInterceptor重试拦截器、BreakpointInterceptor断点拦截器、RedirectInterceptor重定向拦截器、HeaderInterceptor头部信息处理拦截器、FetchDataInterceptor获取数据拦截器,该链式调用过程会逐个调用拦截器的interceptFetch方法:
public class RetryInterceptor implements Interceptor.Connect, Interceptor.Fetch {
......
@Override
public long interceptFetch(DownloadChain chain) throws IOException {
try {
return chain.processFetch();
} catch (IOException e) {
chain.getCache().catchException(e);
throw e;
}
}
}
public class BreakpointInterceptor implements Interceptor.Connect, Interceptor.Fetch {
......
@Override
public long interceptFetch(DownloadChain chain) throws IOException {
final long contentLength = chain.getResponseContentLength();
final int blockIndex = chain.getBlockIndex();
final boolean isNotChunked = contentLength != CHUNKED_CONTENT_LENGTH;
long fetchLength = 0;
long processFetchLength;
final MultiPointOutputStream outputStream = chain.getOutputStream();
try {
while (true) {
//循环调用FetchDataInterceptor拦截器读写文件
processFetchLength = chain.loopFetch();
if (processFetchLength == -1) {
break;
}
fetchLength += processFetchLength;
}
} finally {
chain.flushNoCallbackIncreaseBytes();
if (!chain.getCache().isUserCanceled()) outputStream.done(blockIndex);
}
if (isNotChunked) {
outputStream.inspectComplete(blockIndex);
if (fetchLength != contentLength) {
throw new IOException("Fetch-length isn't equal to the response content-length, "
+ fetchLength + "!= " + contentLength);
}
}
return fetchLength;
}
......
public class FetchDataInterceptor implements Interceptor.Fetch {
private final InputStream inputStream;
private final byte[] readBuffer;
private final MultiPointOutputStream outputStream;
private final int blockIndex;
private final DownloadTask task;
private final CallbackDispatcher dispatcher;
public FetchDataInterceptor(int blockIndex,
@NonNull InputStream inputStream,
@NonNull MultiPointOutputStream outputStream,
DownloadTask task) {
this.blockIndex = blockIndex;
this.inputStream = inputStream;
this.readBuffer = new byte[task.getReadBufferSize()];
this.outputStream = outputStream;
this.task = task;
this.dispatcher = OkDownload.with().callbackDispatcher();
}
@Override
public long interceptFetch(DownloadChain chain) throws IOException {
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
OkDownload.with().downloadStrategy().inspectNetworkOnWifi(chain.getTask());
// 读取数据
int fetchLength = inputStream.read(readBuffer);
if (fetchLength == -1) {
return fetchLength;
}
//写文件
outputStream.write(blockIndex, readBuffer, fetchLength);
// 判断是否回调下载进度
chain.increaseCallbackBytes(fetchLength);
if (this.dispatcher.isFetchProcessMoment(task)) {
chain.flushNoCallbackIncreaseBytes();
}
return fetchLength;
}
}
每一个DownloadChain都完成后,最终会调用inspectTaskEnd方法,从数据库中删除该任务,并回调通知任务完成。这样,一个完整的下载任务就完成了。总体流程如下:
OkDownload的优势在于:
1.OkDownload内部使用的网络请求框架默认为OkHttp,OkHttp底层使用的IO库为Okio,相较于原生Java IO流,它更加简便高效。
2.使用了数据库缓存+内存缓存的二级缓存模式,操作效率更高。
3.功能更完善,除了多线程断点续传外,还提供了暂停功能,多种回调监听功能,多任务管理功能等。
4.更加可靠:下载前有多重检查机制来判断重新下载还是从断点处下载;每次从断点续传时,都会对比响应信息跟之前是否一致;对重定向做了处理;有错误重试机制。
5.可配置性高,可以注入自定义组件。