OkDownload源码分析

2018-12-14  本文已影响0人  Drc15H

OkDownload是一款多线程断点续传下载引擎,它的功能完整,性能高,可配置性高,可以注入自定义组件来修改下载策略、替换网络请求框架等等,而且在项目中已有成熟应用(英语流利说),是一个很不错的开源下载框架。

项目地址:https://github.com/lingochamp/okdownload

OkDownload的简单使用

OkDownload的使用非常简单:
1.引入该开源库:
implementation 'com.liulishuo.okdownload:okhttp:1.0.5' (提供okhttp连接,ps:如果使用的话,需要引入okhttp网络请求库)
implementation 'com.liulishuo.okdownload:okdownload:1.0.5' (下载核心库)
implementation 'com.liulishuo.okdownload:sqlite:1.0.5' (存储断点信息的数据库)
2.开始一个任务:

task = new DownloadTask.Builder(url, parentFile)
         .setFilename(filename)
         // 下载进度回调的间隔时间(毫秒)
         .setMinIntervalMillisCallbackProcess(30)
         // 任务过去已完成是否要重新下载
         .setPassIfAlreadyCompleted(false)
         .build();
//异步执行任务
task.enqueue(listener);
// 取消任务
task.cancel();
// 同步执行任务
task.execute(listener);

当然也可以同时异步执行多个任务

DownloadTask.enqueue(tasks, listener);

3.任务队列的构建、开始和停止

DownloadContext.Builder builder = new DownloadContext.QueueSet()
        .setParentPathFile(parentFile)
        .setMinIntervalMillisCallbackProcess(150)
        .commit();
builder.bind(url1);
builder.bind(url2).addTag(key, value);
builder.bind(url3).setTag(tag);
builder.setListener(contextListener);

DownloadTask task = new DownloadTask.Builder(url4, parentFile)
        .setPriority(10).build();
builder.bindSetTask(task);

DownloadContext context = builder.build();

context.startOnParallel(listener);

// stop
context.stop();

4.获取任务状态

Status status = StatusUtil.getStatus(task)

status = StatusUtil.getStatus(url, parentPath, null);
status = StatusUtil.getStatus(url, parentPath, filename);

boolean isCompleted = StatusUtil.isCompleted(task);
isCompleted = StatusUtil.isCompleted(url, parentPath, null);
isCompleted = StatusUtil.isCompleted(url, parentPath, filename);

Status completedOrUnknown = StatusUtil.isCompletedOrUnknown(task);

5.获取断点信息

// 注意:任务完成后,断点信息将会被删除 
BreakpointInfo info = OkDownload.with().breakpointStore().get(id);
info = StatusUtil.getCurrentInfo(url, parentPath, null);
info = StatusUtil.getCurrentInfo(url, parentPath, filename);
// 断点信息将被缓存在任务对象中,即使任务已经完成了
info = task.getInfo();

6.设置任务监听
可以为任务设置五种不同类型的监听器,同时,也可以给任务和监听器建立1对1、1对多、多对1、多对多的关联。



给一个任务设置多种监听:

DownloadListener listener1 = new DownloadListener1();
DownloadListener listener2 = new DownloadListener2();

DownloadListener combinedListener = new DownloadListenerBunch.Builder()
                   .append(listener1)
                   .append(listener2)
                   .build();

DownloadTask task = new DownloadTask.build(url, file).build();
task.enqueue(combinedListener);

为多个任务动态设置监听:

UnifiedListenerManager manager = new UnifiedListenerManager();
DownloadListener listener1 = new DownloadListener1();
DownloadListener listener2 = new DownloadListener2();
DownloadListener listener3 = new DownloadListener3();
DownloadListener listener4 = new DownloadListener4();

DownloadTask task = new DownloadTask.build(url, file).build();
manager.attachListener(task, listener1);
manager.attachListener(task, listener2);
manager.detachListener(task, listener2);

// 当一个任务结束时,这个任务的所有监听器都被移除
manager.addAutoRemoveListenersWhenTaskEnd(task.getId());

// enqueue task to start.
manager.enqueueTaskWithUnifiedListener(task, listener3);
manager.attachListener(task, listener4);

下面我们来分析一下这个下载框架的源码:

OkDownload

首先看一下OkDownload这个类,这个类定义了所有的下载策略,我们可以自定义一些下载策略,可以通过OkDownload的Builder构造自定义的一个OkDownload实例,再通过OkDownload.setSingletonInstance进行设置:

OkDownload.Builder builder = new OkDownload.Builder(context)
        .downloadStore(downloadStore) //断点信息存储的位置,默认是SQLite数据库 
        .callbackDispatcher(callbackDispatcher) //监听回调分发器,默认在主线程回调 
        .downloadDispatcher(downloadDispatcher) //下载管理机制,最大下载任务数、同步异步执行下载任务的处理
        .connectionFactory(connectionFactory) //选择网络请求框架,默认是OkHttp 
        .outputStreamFactory(outputStreamFactory) //构建文件输出流DownloadOutputStream,是否支持随机位置写入
        .downloadStrategy(downloadStrategy) //下载策略,文件分为几个线程下载
        .processFileStrategy(processFileStrategy) //多文件写文件的方式,默认是根据每个线程写文件的不同位置,支持同时写入。 
        .monitor(monitor); //下载状态监听 
OkDownload.setSingletonInstance(builder.build());

DownloadTask

DownloadTask下载任务类,可通过它的Builder来构造一个下载任务,我们看它是如何执行的:

public void execute(DownloadListener listener) {
        this.listener = listener;
        OkDownload.with().downloadDispatcher().execute(this);
    }

public void enqueue(DownloadListener listener) {
        this.listener = listener;
        OkDownload.with().downloadDispatcher().enqueue(this);
    }

可以看到都是通过downloadDispatcher来执行下载任务的,默认的downloadDispatcher是一个DownloadDispatcher实例,我们以同步执行一个下载任务为例,看它是如何下载的:

    public void execute(DownloadTask task) {
        Util.d(TAG, "execute: " + task);
        final DownloadCall call;

        synchronized (this) {
            if (inspectCompleted(task)) return;
            if (inspectForConflict(task)) return;

            call = DownloadCall.create(task, false, store);
            runningSyncCalls.add(call);
        }

        syncRunCall(call);
    }

void syncRunCall(DownloadCall call) {
        call.run();
    }

在execute方法里将一个DownloadTask实例又封装为了一个DownloadCall对象,然后在syncRunCall方法里执行了DownloadCall对象的run方法。通过看DownloadCall源码可以知道该类继承自NamedRunnable,而NamedRunnable实现了Runnable,在run方法里调用了execute方法。(enqueue执行任务最终则是调用 getExecutorService().execute(call);来异步执行的)



那我们看一下DownloadCall这个类。

DownloadCall

先看一下DownloadCall是如何实现execute方法的,该方法比较长,首先执行的是inspectTaskStart:



先看一下这个store是什么:




通过看OkDownload这个类的源码可以知道,DownloadCall的store是调用BreakpointStoreOnSQLite的createRemitSelf方法生成的一个实例:



可以看到是RemitStoreOnSQLite的一个实例,其主要用来保存任务及断点信息至本地数据库。RemitStoreOnSQLite里持有BreakpointStoreOnSQLite对象,BreakpointStoreOnSQLite里面包含了BreakpointSQLiteHelper(用于操作数据)和BreakpointStoreOnCache(用于做数据操作之前的数据缓存)。

@Override public void syncCacheToDB(int id) throws IOException {
        sqLiteHelper.removeInfo(id);

        final BreakpointInfo info = sqliteCache.get(id);
        if (info == null || info.getFilename() == null || info.getTotalOffset() <= 0) return;

        sqLiteHelper.insert(info);
    }

最终会调用上述syncCacheToDB方法,先删除数据库中的任务信息,若缓存(创建BreakpointStoreOnCache对象时,会调用loadToCache方法将数据库中所有任务信息进行缓存)

this.onCache = new BreakpointStoreOnCache(helper.loadToCache(),
                helper.loadResponseFilenameToMap());

中有该任务,则检查任务信息是否合法,若合法则再次将该任务及断点信息保存在本地数据库中。
inspectTaskStart方法结束后,会进入一个do-while循环,首先做一些下载前的准备工作:

        do {
            //1.判断当前任务的下载链接长度是否大于0,否则就抛出异常;
            if (task.getUrl().length() <= 0) {
                this.cache = new DownloadCache.PreError(
                        new IOException("unexpected url: " + task.getUrl()));
                break;
            }

            if (canceled) break;

            //2.从缓存中获取任务的断点信息,若没有断点信息,则创建断点信息并保存至数据库;
            @NonNull final BreakpointInfo info;
            try {
                BreakpointInfo infoOnStore = store.get(task.getId());
                if (infoOnStore == null) {
                    info = store.createAndInsert(task);
                } else {
                    info = infoOnStore;
                }
                setInfoToTask(info);
            } catch (IOException e) {
                this.cache = new DownloadCache.PreError(e);
                break;
            }
            if (canceled) break;

            // 3.创建带缓存的下载输出流;
            @NonNull final DownloadCache cache = createCache(info);
            this.cache = cache;

            // 4.访问下载链接判断断点信息是否合理;
            final BreakpointRemoteCheck remoteCheck = createRemoteCheck(info);
            try {
                remoteCheck.check();
            } catch (IOException e) {
                cache.catchException(e);
                break;
            }

            //5.确定文件路径后等待文件锁释放;
            fileStrategy.getFileLock().waitForRelease(task.getFile().getAbsolutePath());

            // 6. 判断缓存中是否有相同的任务,若有则复用缓存中的任务的分块信息;
            OkDownload.with().downloadStrategy()
                    .inspectAnotherSameInfo(task, info, remoteCheck.getInstanceLength());

            try {
                //7.检查断点信息是否是可恢复的,若不可恢复,则根据文件大小进行分块,重新下载,否则继续进行下一步;
                if (remoteCheck.isResumable()) {
                    // 8.判断断点信息是否是脏数据(文件存在且断点信息正确且下载链接支持断点续传);
                    final BreakpointLocalCheck localCheck = createLocalCheck(info,
                            remoteCheck.getInstanceLength());
                    localCheck.check();
                    // 9.若是脏数据则根据文件大小进行分块,重新开始下载,否则从断点位置开始下载;
                    if (localCheck.isDirty()) {
                        Util.d(TAG, "breakpoint invalid: download from beginning because of "
                                + "local check is dirty " + task.getId() + " " + localCheck);
                        fileStrategy.discardProcess(task);
                        assembleBlockAndCallbackFromBeginning(info, remoteCheck,
                                localCheck.getCauseOrThrow());
                    } else {
                        okDownload.callbackDispatcher().dispatch()
                                .downloadFromBreakpoint(task, info);
                    }
                } else {
                    Util.d(TAG, "breakpoint invalid: download from beginning because of "
                            + "remote check not resumable " + task.getId() + " " + remoteCheck);
                    fileStrategy.discardProcess(task);
                    assembleBlockAndCallbackFromBeginning(info, remoteCheck,
                            remoteCheck.getCauseOrThrow());
                }
            } catch (IOException e) {
                cache.setUnknownError(e);
                break;
            }

            // 10. 开始下载
            start(cache, info);

            if (canceled) break;

            // 11. 错误重试机制
            if (cache.isPreconditionFailed()
                    && retryCount++ < MAX_COUNT_RETRY_FOR_PRECONDITION_FAILED) {
                store.remove(task.getId());
                retry = true;
            } else {
                retry = false;
            }
        } while (retry);

1.判断当前任务的下载链接长度是否大于0,否则就抛出异常;2.从缓存中获取任务的断点信息,若没有断点信息,则创建断点信息并保存至数据库;3.创建带缓存的下载输出流;4.访问下载链接判断断点信息是否合理;5.确定文件路径后等待文件锁释放; 6. 判断缓存中是否有相同的任务,若有则复用缓存中的任务的分块信息;7.检查断点信息是否是可恢复的,若不可恢复,则根据文件大小进行分块,重新下载,否则继续进行下一步;8.判断断点信息是否是脏数据(文件存在且断点信息正确且下载链接支持断点续传);9.若是脏数据则根据文件大小进行分块,重新开始下载,否则从断点位置开始下载;10.开始下载。

文件分成多少块进行下载由DownloadStrategy决定的:

    // 1 connection: [0, 1MB)
    private static final long ONE_CONNECTION_UPPER_LIMIT = 1024 * 1024; // 1MiB
    // 2 connection: [1MB, 5MB)
    private static final long TWO_CONNECTION_UPPER_LIMIT = 5 * 1024 * 1024; // 5MiB
    // 3 connection: [5MB, 50MB)
    private static final long THREE_CONNECTION_UPPER_LIMIT = 50 * 1024 * 1024; // 50MiB
    // 4 connection: [50MB, 100MB)
    private static final long FOUR_CONNECTION_UPPER_LIMIT = 100 * 1024 * 1024; // 100MiB

    public ResumeAvailableResponseCheck resumeAvailableResponseCheck(
            DownloadConnection.Connected connected,
            int blockIndex,
            BreakpointInfo info) {
        return new ResumeAvailableResponseCheck(connected, blockIndex, info);
    }

    public int determineBlockCount(@NonNull DownloadTask task, long totalLength) {
        if (task.getSetConnectionCount() != null) return task.getSetConnectionCount();

        if (totalLength < ONE_CONNECTION_UPPER_LIMIT) {
            return 1;
        }

        if (totalLength < TWO_CONNECTION_UPPER_LIMIT) {
            return 2;
        }

        if (totalLength < THREE_CONNECTION_UPPER_LIMIT) {
            return 3;
        }

        if (totalLength < FOUR_CONNECTION_UPPER_LIMIT) {
            return 4;
        }

        return 5;
    }

文件大小在0-1MB、1-5MB、5-50MB、50-100MB、100MB以上时分别开启1、2、3、4、5个线程进行下载。

我们重点看一下下载部分的源码,也就是start(cache,info)方法:

void start(final DownloadCache cache, BreakpointInfo info) throws InterruptedException {
        final int blockCount = info.getBlockCount();
        final List<DownloadChain> blockChainList = new ArrayList<>(info.getBlockCount());
        for (int i = 0; i < blockCount; i++) {
            final BlockInfo blockInfo = info.getBlock(i);
            if (Util.isCorrectFull(blockInfo.getCurrentOffset(), blockInfo.getContentLength())) {
                continue;
            }

            Util.resetBlockIfDirty(blockInfo);
            blockChainList.add(DownloadChain.createChain(i, task, info, cache, store));
        }

        if (canceled) {
            return;
        }

        startBlocks(blockChainList);
    }

可以看到它是分块下载的,每一个分块都是一个DownloadChain实例,DownloadChain实现了Runnable接口,继续看startBlocks方法:



对于每一个分块任务,都调用了submitChain方法,由一个线程池去处理每一个DownloadChain分块,核心代码就在这里:

    void start() throws IOException {
        final CallbackDispatcher dispatcher = OkDownload.with().callbackDispatcher();
        // 处理请求拦截链
        final RetryInterceptor retryInterceptor = new RetryInterceptor();
        final BreakpointInterceptor breakpointInterceptor = new BreakpointInterceptor();
        connectInterceptorList.add(retryInterceptor);
        connectInterceptorList.add(breakpointInterceptor);
        connectInterceptorList.add(new RedirectInterceptor());
        connectInterceptorList.add(new HeaderInterceptor());
        connectInterceptorList.add(new CallServerInterceptor());

        connectIndex = 0;
        final DownloadConnection.Connected connected = processConnect();
        if (cache.isInterrupt()) {
            throw InterruptException.SIGNAL;
        }

        dispatcher.dispatch().fetchStart(task, blockIndex, getResponseContentLength());
        // 获取数据拦截链
        final FetchDataInterceptor fetchDataInterceptor =
                new FetchDataInterceptor(blockIndex, connected.getInputStream(),
                        getOutputStream(), task);
        fetchInterceptorList.add(retryInterceptor);
        fetchInterceptorList.add(breakpointInterceptor);
        fetchInterceptorList.add(fetchDataInterceptor);

        fetchIndex = 0;
        final long totalFetchedBytes = processFetch();
        dispatcher.dispatch().fetchEnd(task, blockIndex, totalFetchedBytes);
    }

可以看到它主要使用责任链模式进行了两个链式调用:处理请求拦截链和获取数据拦截链。
处理请求拦截链包含了RetryInterceptor重试拦截器、BreakpointInterceptor断点拦截器、RedirectInterceptor重定向拦截器、HeaderInterceptor头部信息处理拦截器、CallServerInterceptor请求拦截器,该链式调用过程会逐个调用拦截器的interceptConnect方法:

public class RetryInterceptor implements Interceptor.Connect, Interceptor.Fetch {

    @NonNull @Override
    public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
        final DownloadCache cache = chain.getCache();

        // 如果产生了RetryException,则重新执行该链式调用
        while (true) {
            try {
                if (cache.isInterrupt()) {
                    throw InterruptException.SIGNAL;
                }
                return chain.processConnect();
            } catch (IOException e) {
                if (e instanceof RetryException) {
                    chain.resetConnectForRetry();
                    continue;
                }

                chain.getCache().catchException(e);
                throw e;
            }
        }
    }
......
}
public class BreakpointInterceptor implements Interceptor.Connect, Interceptor.Fetch {

    private static final String TAG = "BreakpointInterceptor";

    @NonNull @Override
    public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
        final DownloadConnection.Connected connected = chain.processConnect();
        final BreakpointInfo info = chain.getInfo();

        if (chain.getCache().isInterrupt()) {
            throw InterruptException.SIGNAL;
        }

        if (info.getBlockCount() == 1 && !info.isChunked()) {
            // 当只有一个线程进行下载文件时,如果断点信息中保存的文件长度和服务端返回的文件长度不一致,则以服务端返回的为准重新进行下载
            final long blockInstanceLength = getExactContentLengthRangeFrom0(connected);
            final long infoInstanceLength = info.getTotalLength();
            if (blockInstanceLength > 0 && blockInstanceLength != infoInstanceLength) {
                Util.d(TAG, "SingleBlock special check: the response instance-length["
                        + blockInstanceLength + "] isn't equal to the instance length from trial-"
                        + "connection[" + infoInstanceLength + "]");
                final BlockInfo blockInfo = info.getBlock(0);
                boolean isFromBreakpoint = blockInfo.getRangeLeft() != 0;

                final BlockInfo newBlockInfo = new BlockInfo(0, blockInstanceLength);
                info.resetBlockInfos();
                info.addBlock(newBlockInfo);

                if (isFromBreakpoint) {
                    final String msg = "Discard breakpoint because of on this special case, we have"
                            + " to download from beginning";
                    Util.w(TAG, msg);
                    throw new RetryException(msg);
                }
                OkDownload.with().callbackDispatcher().dispatch()
                        .downloadFromBeginning(chain.getTask(), info, CONTENT_LENGTH_CHANGED);
            }
        }

        // update for connected.
        final DownloadStore store = chain.getDownloadStore();
        try {
            if (!store.update(info)) {
                throw new IOException("Update store failed!");
            }
        } catch (Exception e) {
            throw new IOException("Update store failed!", e);
        }

        return connected;
    }
......
}
public class RedirectInterceptor implements Interceptor.Connect {

    //最大重定向次数
    static final int MAX_REDIRECT_TIMES = 10;

    private static final int HTTP_TEMPORARY_REDIRECT = 307;
  
    private static final int HTTP_PERMANENT_REDIRECT = 308;

    @NonNull @Override
    public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
        int redirectCount = 0;

        String url;
        DownloadConnection connection;
        while (true) {

            if (chain.getCache().isInterrupt()) {
                throw InterruptException.SIGNAL;
            }

            final DownloadConnection.Connected connected = chain.processConnect();
            final int code = connected.getResponseCode();

            if (!isRedirect(code)) {
                return connected;
            }
            //若需要重定向,则根据返回的新的url重新进行网络请求
            if (++redirectCount >= MAX_REDIRECT_TIMES) {
                throw new ProtocolException("Too many redirect requests: " + redirectCount);
            }

            url = connected.getResponseHeaderField("Location");
            if (url == null) {
                throw new ProtocolException(
                        "Response code is " + code + " but can't find Location field");
            }

            chain.releaseConnection();

            connection = OkDownload.with().connectionFactory().create(url);
            chain.setConnection(connection);
            chain.setRedirectLocation(url);

        }
    }

    private static boolean isRedirect(int code) {
        return code == HttpURLConnection.HTTP_MOVED_PERM
                || code == HttpURLConnection.HTTP_MOVED_TEMP
                || code == HttpURLConnection.HTTP_SEE_OTHER
                || code == HttpURLConnection.HTTP_MULT_CHOICE
                || code == HTTP_TEMPORARY_REDIRECT
                || code == HTTP_PERMANENT_REDIRECT;
    }
}
public class HeaderInterceptor implements Interceptor.Connect {
    private static final String TAG = "HeaderInterceptor";

    @NonNull @Override
    public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
        final BreakpointInfo info = chain.getInfo();
        final DownloadConnection connection = chain.getConnectionOrCreate();
        final DownloadTask task = chain.getTask();

        // 添加User-Agent字段
        final Map<String, List<String>> userHeader = task.getHeaderMapFields();
        if (userHeader != null) Util.addUserRequestHeaderField(userHeader, connection);
        if (userHeader == null || !userHeader.containsKey(USER_AGENT)) {
            Util.addDefaultUserAgent(connection);
        }

        //添加Range字段
        final int blockIndex = chain.getBlockIndex();
        final BlockInfo blockInfo = info.getBlock(blockIndex);
        if (blockInfo == null) {
            throw new IOException("No block-info found on " + blockIndex);
        }

        String range = "bytes=" + blockInfo.getRangeLeft() + "-";
        range += blockInfo.getRangeRight();

        connection.addHeader(RANGE, range);
        Util.d(TAG, "AssembleHeaderRange (" + task.getId() + ") block(" + blockIndex + ") "
                + "downloadFrom(" + blockInfo.getRangeLeft() + ") currentOffset("
                + blockInfo.getCurrentOffset() + ")");

        // 如果有Etag信息,则添加If-Match字段
        final String etag = info.getEtag();
        if (!Util.isEmpty(etag)) {
            connection.addHeader(IF_MATCH, etag);
        }

        if (chain.getCache().isInterrupt()) {
            throw InterruptException.SIGNAL;
        }

        OkDownload.with().callbackDispatcher().dispatch()
                .connectStart(task, blockIndex, connection.getRequestProperties());

        DownloadConnection.Connected connected = chain.processConnect();

        Map<String, List<String>> responseHeaderFields = connected.getResponseHeaderFields();
        if (responseHeaderFields == null) responseHeaderFields = new HashMap<>();

        OkDownload.with().callbackDispatcher().dispatch().connectEnd(task, blockIndex,
                connected.getResponseCode(), responseHeaderFields);
        if (chain.getCache().isInterrupt()) {
            throw InterruptException.SIGNAL;
        }

        // 检查Etag字段是否一致
        final DownloadStrategy strategy = OkDownload.with().downloadStrategy();
        final DownloadStrategy.ResumeAvailableResponseCheck responseCheck =
                strategy.resumeAvailableResponseCheck(connected, blockIndex, info);
        responseCheck.inspect();
       
        //获取Content-Length、Content-Range字段信息
        final long contentLength;
        final String contentLengthField = connected.getResponseHeaderField(CONTENT_LENGTH);
        if (contentLengthField == null || contentLengthField.length() == 0) {
            final String contentRangeField = connected.getResponseHeaderField(CONTENT_RANGE);
            contentLength = Util.parseContentLengthFromContentRange(contentRangeField);
        } else {
            contentLength = Util.parseContentLength(contentLengthField);
        }

        chain.setResponseContentLength(contentLength);
        return connected;
    }
}
public class CallServerInterceptor implements Interceptor.Connect {
    @NonNull @Override
    public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
        OkDownload.with().downloadStrategy().inspectNetworkOnWifi(chain.getTask());
        OkDownload.with().downloadStrategy().inspectNetworkAvailable();
        \\进行网络请求,获得响应
        return chain.getConnectionOrCreate().execute();
    }
}

获取数据拦截链包含了RetryInterceptor重试拦截器、BreakpointInterceptor断点拦截器、RedirectInterceptor重定向拦截器、HeaderInterceptor头部信息处理拦截器、FetchDataInterceptor获取数据拦截器,该链式调用过程会逐个调用拦截器的interceptFetch方法:

public class RetryInterceptor implements Interceptor.Connect, Interceptor.Fetch {

......

    @Override
    public long interceptFetch(DownloadChain chain) throws IOException {
        try {
            return chain.processFetch();
        } catch (IOException e) {
            chain.getCache().catchException(e);
            throw e;
        }
    }
}

public class BreakpointInterceptor implements Interceptor.Connect, Interceptor.Fetch {

......

    @Override
    public long interceptFetch(DownloadChain chain) throws IOException {
        final long contentLength = chain.getResponseContentLength();
        final int blockIndex = chain.getBlockIndex();
        final boolean isNotChunked = contentLength != CHUNKED_CONTENT_LENGTH;

        long fetchLength = 0;
        long processFetchLength;

        final MultiPointOutputStream outputStream = chain.getOutputStream();

        try {
            while (true) {
                //循环调用FetchDataInterceptor拦截器读写文件
                processFetchLength = chain.loopFetch();
                if (processFetchLength == -1) {
                    break;
                }

                fetchLength += processFetchLength;
            }
        } finally {
            chain.flushNoCallbackIncreaseBytes();
            if (!chain.getCache().isUserCanceled()) outputStream.done(blockIndex);
        }

        if (isNotChunked) {
            outputStream.inspectComplete(blockIndex);

            if (fetchLength != contentLength) {
                throw new IOException("Fetch-length isn't equal to the response content-length, "
                        + fetchLength + "!= " + contentLength);
            }
        }

        return fetchLength;
    }

......

public class FetchDataInterceptor implements Interceptor.Fetch {

    private final InputStream inputStream;

    private final byte[] readBuffer;
    private final MultiPointOutputStream outputStream;
    private final int blockIndex;
    private final DownloadTask task;
    private final CallbackDispatcher dispatcher;

    public FetchDataInterceptor(int blockIndex,
                                @NonNull InputStream inputStream,
                                @NonNull MultiPointOutputStream outputStream,
                                DownloadTask task) {
        this.blockIndex = blockIndex;
        this.inputStream = inputStream;
        this.readBuffer = new byte[task.getReadBufferSize()];
        this.outputStream = outputStream;

        this.task = task;
        this.dispatcher = OkDownload.with().callbackDispatcher();
    }

    @Override
    public long interceptFetch(DownloadChain chain) throws IOException {
        if (chain.getCache().isInterrupt()) {
            throw InterruptException.SIGNAL;
        }

        OkDownload.with().downloadStrategy().inspectNetworkOnWifi(chain.getTask());
        // 读取数据
        int fetchLength = inputStream.read(readBuffer);
        if (fetchLength == -1) {
            return fetchLength;
        }

        //写文件
        outputStream.write(blockIndex, readBuffer, fetchLength);

        // 判断是否回调下载进度
        chain.increaseCallbackBytes(fetchLength);
        if (this.dispatcher.isFetchProcessMoment(task)) {
            chain.flushNoCallbackIncreaseBytes();
        }

        return fetchLength;
    }
}


每一个DownloadChain都完成后,最终会调用inspectTaskEnd方法,从数据库中删除该任务,并回调通知任务完成。这样,一个完整的下载任务就完成了。总体流程如下:

OkDownload的优势在于:
1.OkDownload内部使用的网络请求框架默认为OkHttp,OkHttp底层使用的IO库为Okio,相较于原生Java IO流,它更加简便高效。
2.使用了数据库缓存+内存缓存的二级缓存模式,操作效率更高。
3.功能更完善,除了多线程断点续传外,还提供了暂停功能,多种回调监听功能,多任务管理功能等。
4.更加可靠:下载前有多重检查机制来判断重新下载还是从断点处下载;每次从断点续传时,都会对比响应信息跟之前是否一致;对重定向做了处理;有错误重试机制。
5.可配置性高,可以注入自定义组件。

上一篇下一篇

猜你喜欢

热点阅读