Android Video Cache 源码分析

2021-07-20  本文已影响0人  风中的理想乡

0.总结

缓存框架的思路是使用一个本地的代理服务器来代理掉目标请求,以此来添加缓存实现或者进行一些hook操作。
服务器(HttpProxyCacheServer)启动后会阻塞等待socket连接进来;socket进来后会跟根据request的url来创建对应的客户端实例(HttpProxyCacheServerClients),需要注意的是这个部分的操作都是需要保证线程安全的;客户端根据request判断是否使用缓存然后使用代理缓存(HttpProxyCache)写回socket,这里有个小细节就是客户端支持多连接访问,当访问的会话(或者说客户端)都结束后就会关闭自己(所以我们可以实现XX%下载后停止预存的逻辑);如果使用缓存的话代理缓存类就会分发一下数据,协调从缓存(FileCache)里取数据,如果缓存没有可用数据了就会阻塞等待从数据源(HttpUrlSource)里取数据直到缓存可用继续读取。
还有很多小细节(比如,咋处理seek to这种断点请求的,怎么记录资源信息的,咋处理ping请求,咋处理https证书等),直接看分析吧。

└── src
    └── main
        └── java
            └── com
                └── danikula
                    └── videocache
                        ├── ByteArrayCache.java // 字节数组缓存实现,实现了读字节流内容,追加字节流内容等能力
                        ├── ByteArraySource.java // 字节数组数据源实现,简单封装了下字节数组io流的读取
                        ├── Cache.java   // 缓存接口
                        ├── CacheListener.java  // 开放的回调监听
                        ├── Config.java // 配置类持有了缓存目录、文件名生成器、磁盘使用规则、数据源信息存储实现、请求header注入
                        ├── GetRequest.java // get请求模型,封装get请求
                        ├── HttpProxyCache.java  // 代理类的子类,扩展了请求头的写入、处理资源的加载是否使用缓存
                        ├── HttpProxyCacheServer.java  // 代理服务器对象,启动代理服务器,获取代理地址,获取缓存的入口
                        ├── HttpProxyCacheServerClients.java  // 缓存客户端,代表一个目标资源请求,可以接入多个客户端使用目标资源。
                        ├── HttpUrlSource.java   // 网络地址数据源实现,内部使用了httpURLConnection进行网络请求。单独拉出来分析
                        ├── IgnoreHostProxySelector.java
                        ├── InterruptedProxyCacheException.java
                        ├── Pinger.java  // 用来处理 ping 请求
                        ├── Preconditions.java  // 一些断言
                        ├── ProxyCache.java  // 数据源的代理类,桥接了数据源(Source)和缓存(cache)
                        ├── ProxyCacheException.java
                        ├── ProxyCacheUtils.java
                        ├── Source.java  // 缓存数据源接口,主要是提供数据源的读取能力
                        ├── SourceInfo.java // 数据源信息实体类,存储了数据源的url,数据长度,mime类型
                        ├── StorageUtils.java
                        ├── file
                        │   ├── DiskUsage.java
                        │   ├── FileCache.java
                        │   ├── FileNameGenerator.java
                        │   ├── Files.java
                        │   ├── LruDiskUsage.java
                        │   ├── Md5FileNameGenerator.java
                        │   ├── TotalCountLruDiskUsage.java
                        │   ├── TotalSizeLruDiskUsage.java
                        │   └── UnlimitedDiskUsage.java
                        ├── headers
                        │   ├── EmptyHeadersInjector.java  // 空实现
                        │   └── HeaderInjector.java  // header信息注入接口,提供给HttpUrlSource注入header的能力
                        └── sourcestorage
                            ├── DatabaseSourceInfoStorage.java  // 数据库数据源信息,继承自SQLiteOpenHelper,使用数据库记录数据
                            ├── NoSourceInfoStorage.java  // 空实现
                            ├── SourceInfoStorage.java  // 数据源信息存储接口,读/写/释放
                            └── SourceInfoStorageFactory.java  // 简单工厂,产出数据库数据源信息对象、空实现对象

1.HttpUrlSource分析

网络地址数据源实现,内部使用了httpURLConnection进行网络请求,提供了读取网络数据源io流的能力,重定向拦截、“断点续传”也是在这里实现的。

        // 构造方法
    // 空源信息的重载,只有ping服务器的请求用到了
        public HttpUrlSource(String url) {
        this(url, SourceInfoStorageFactory.newEmptySourceInfoStorage());
    }

    public HttpUrlSource(String url, SourceInfoStorage sourceInfoStorage) {
        this(url, sourceInfoStorage, new EmptyHeadersInjector());
    }

        // HttpProxyCacheServerClients.newHttpProxyCache()使用了这个构造
        // HttpProxyCacheServer.Builder提供了一个SourceInfoStorage数据库实现对象,headerInjector类似的也是builder内的参数
    public HttpUrlSource(String url, SourceInfoStorage sourceInfoStorage, HeaderInjector headerInjector) {
        this.sourceInfoStorage = checkNotNull(sourceInfoStorage);
        this.headerInjector = checkNotNull(headerInjector);
        // 内部实现是从数据库里读取了url的数据组装成一个SourceInfo
        SourceInfo sourceInfo = sourceInfoStorage.get(url);
        this.sourceInfo = sourceInfo != null ? sourceInfo :
                new SourceInfo(url, Integer.MIN_VALUE, ProxyCacheUtils.getSupposablyMime(url));//没有查到数据就构造一个,根据url地址获取一个Mime类型
    }
    
    // ProxyCacheUtils:
    static String getSupposablyMime(String url) {
        MimeTypeMap mimes = MimeTypeMap.getSingleton();
        String extension = MimeTypeMap.getFileExtensionFromUrl(url); // 截取url的文件后缀
        return TextUtils.isEmpty(extension) ? null : mimes.getMimeTypeFromExtension(extension);//根据后缀信息找Mime类型
    }

有两个场景会打开连接:
1.请求获取内容信息

private void fetchContentInfo() throws ProxyCacheException {
        LOG.debug("Read content info from " + sourceInfo.url);
        HttpURLConnection urlConnection = null;
        InputStream inputStream = null;
        try {
            urlConnection = openConnection(0, 10000);
            long length = getContentLength(urlConnection); // connection.getHeaderField("Content-Length")获取头内的长度信息
            String mime = urlConnection.getContentType();
            inputStream = urlConnection.getInputStream();
            this.sourceInfo = new SourceInfo(sourceInfo.url, length, mime);
            this.sourceInfoStorage.put(sourceInfo.url, sourceInfo); // 将源信息存入数据库
            LOG.debug("Source info fetched: " + sourceInfo);
        } catch (IOException e) {
            LOG.error("Error fetching info from " + sourceInfo.url, e);
        } finally {
            ProxyCacheUtils.close(inputStream);
            if (urlConnection != null) {
                urlConnection.disconnect();
            }
        }
    }

2.打开信息源

    @Override
    public void open(long offset) throws ProxyCacheException {
        try {
            connection = openConnection(offset, -1); // 留着连接的引用在read的时候用
            String mime = connection.getContentType();
            inputStream = new BufferedInputStream(connection.getInputStream(), DEFAULT_BUFFER_SIZE); // 留着io流的引用
            long length = readSourceAvailableBytes(connection, offset, connection.getResponseCode()); // 读取可用的字节大小
            this.sourceInfo = new SourceInfo(sourceInfo.url, length, mime);
            this.sourceInfoStorage.put(sourceInfo.url, sourceInfo); // 保存源信息到数据库
        } catch (IOException e) {
            throw new ProxyCacheException("Error opening connection for " + sourceInfo.url + " with offset " + offset, e);
        }
    }

    // 算出来视频的总长度
    private long readSourceAvailableBytes(HttpURLConnection connection, long offset, int responseCode) throws IOException {
        long contentLength = getContentLength(connection); // connection.getHeaderField("Content-Length")获取头内的长度信息
        // 如果请求的内容不是完整的视频,则视频真实的大小是偏移量 + 当前请求内容的长度
        return responseCode == HTTP_OK ? contentLength
                : responseCode == HTTP_PARTIAL ? contentLength + offset : sourceInfo.length;
    }

具体的开启连接的代码,这里有关键代码-断点下载

    private HttpURLConnection openConnection(long offset, int timeout) throws IOException, ProxyCacheException {
        HttpURLConnection connection;
        boolean redirected;
        int redirectCount = 0;
        String url = this.sourceInfo.url;
        do {
            LOG.debug("Open connection " + (offset > 0 ? " with offset " + offset : "") + " to " + url);
            connection = (HttpURLConnection) new URL(url).openConnection();
            injectCustomHeaders(connection, url);// 注入自定义头
            // 关键代码:塞入断点位置,这样请求的数据就是从偏移量开始的。
            if (offset > 0) {
                connection.setRequestProperty("Range", "bytes=" + offset + "-");
            }
            if (timeout > 0) {
                connection.setConnectTimeout(timeout);
                connection.setReadTimeout(timeout);
            }
            int code = connection.getResponseCode();
            redirected = code == HTTP_MOVED_PERM || code == HTTP_MOVED_TEMP || code == HTTP_SEE_OTHER;
            if (redirected) {
                url = connection.getHeaderField("Location");
                redirectCount++;
                connection.disconnect();
            }
            if (redirectCount > MAX_REDIRECTS) {
                throw new ProxyCacheException("Too many redirects: " + redirectCount);
            }
        } while (redirected); // 这个while是用来处理重定向的
        return connection;
    }

    // 注入自定义头
    private void injectCustomHeaders(HttpURLConnection connection, String url) {
        Map<String, String> extraHeaders = headerInjector.addHeaders(url);
        for (Map.Entry<String, String> header : extraHeaders.entrySet()) {
            connection.setRequestProperty(header.getKey(), header.getValue());
        }
    }

最后就剩一个简单的方法了,read 从io流里读出来字节数组

    @Override
    public int read(byte[] buffer) throws ProxyCacheException {
        if (inputStream == null) {
            throw new ProxyCacheException("Error reading data from " + sourceInfo.url + ": connection is absent!");
        }
        try {
            return inputStream.read(buffer, 0, buffer.length); // 读个字节数组完事儿
        } catch (InterruptedIOException e) {
            throw new InterruptedProxyCacheException("Reading source " + sourceInfo.url + " is interrupted", e);
        } catch (IOException e) {
            throw new ProxyCacheException("Error reading data from " + sourceInfo.url, e);
        }
    }

2.FileCache分析

文件缓存实现,负责持有文件引用和提供文件读写的能力。
分析前先看下RandomAccessFile是干啥的,jdk注释:

Instances of this class support both reading and writing to a random access file. A random access file behaves like a large array of bytes stored in the file system. There is a kind of cursor, or index into the implied array, called the file pointer; input operations read bytes starting at the file pointer and advance the file pointer past the bytes read. If the random access file is created in read/write mode, then output operations are also available; output operations write bytes starting at the file pointer and advance the file pointer past the bytes written. Output operations that write past the current end of the implied array cause the array to be extended. The file pointer can be read by the getFilePointer method and set by the seek method.
It is generally true of all the reading routines in this class that if end-of-file is reached before the desired number of bytes has been read, an EOFException (which is a kind of IOException) is thrown. If any byte cannot be read for any reason other than end-of-file, an IOException other than EOFException is thrown. In particular, an IOException may be thrown if the stream has been closed.
Since: JDK1.0
Author: unascribed

构造方法:

    public FileCache(File file, DiskUsage diskUsage) throws ProxyCacheException {
        try {
            if (diskUsage == null) {
                throw new NullPointerException();
            }
            this.diskUsage = diskUsage; // 注入磁盘使用接口,将缓存策略抛出
            // 然后是操作文件的基本操作
            File directory = file.getParentFile();
            Files.makeDir(directory);
            boolean completed = file.exists();  
            // 判断文件是否存在,不存的话创建一个name.ex.download的缓存文件来下载数据
            this.file = completed ? file : new File(file.getParentFile(), file.getName() + TEMP_POSTFIX);  
            Log.i("file", file.getAbsolutePath());
            this.dataFile = new RandomAccessFile(this.file, completed ? "r" : "rw");  // 控制下随机访问文件的访问权限,如果下好了就只读就OK了。
        } catch (IOException e) {
            throw new ProxyCacheException("Error using file " + file + " as disc cache", e);
        }
    }

读写方法:

@Override
    public synchronized int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
        try {
                // 随机读写文件的api,操作读
            dataFile.seek(offset);
            return dataFile.read(buffer, 0, length);
        } catch (IOException e) {
            String format = "Error reading %d bytes with offset %d from file[%d bytes] to buffer[%d bytes]";
            throw new ProxyCacheException(String.format(format, length, offset, available(), buffer.length), e);
        }
    }

    @Override
    public synchronized void append(byte[] data, int length) throws ProxyCacheException {
        try {
            if (isCompleted()) {
                throw new ProxyCacheException("Error append cache: cache file " + file + " is completed!");
            }
            // 随机读写文件的api,操作写
            dataFile.seek(available());
            dataFile.write(data, 0, length);
        } catch (IOException e) {
            String format = "Error writing %d bytes to %s from buffer with size %d";
            throw new ProxyCacheException(String.format(format, length, dataFile, data.length), e);
        }
    }

文件下完后:

        @Override
    public synchronized void complete() throws ProxyCacheException {  // 这里记一下,看看后面啥时候掉的它完成文件下载的
        if (isCompleted()) {
            return;
        }

        close();
        // 去掉.download后缀
        String fileName = file.getName().substring(0, file.getName().length() - TEMP_POSTFIX.length());
        File completedFile = new File(file.getParentFile(), fileName);
        boolean renamed = file.renameTo(completedFile);
        if (!renamed) {
            throw new ProxyCacheException("Error renaming file " + file + " to " + completedFile + " for completion!");
        }
        file = completedFile;
        try {
                // 重新搞一个随机访问文件对象
            dataFile = new RandomAccessFile(file, "r");
            // todo
            diskUsage.touch(file);
        } catch (IOException e) {
            throw new ProxyCacheException("Error opening " + file + " as disc cache", e);
        }
    }

3.GetRequest分析

get请求模型,主要是三个字段:是否是断点下载、断点位置、请求地址

    public GetRequest(String request) {
        checkNotNull(request);
        long offset = findRangeOffset(request);
        this.rangeOffset = Math.max(0, offset);
        this.partial = offset >= 0;
        this.uri = findUri(request);
    }

    // 构造一个 GetRequest 对象,只读取header信息即可
    public static GetRequest read(InputStream inputStream) throws IOException {
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
        StringBuilder stringRequest = new StringBuilder();
        String line;
        while (!TextUtils.isEmpty(line = reader.readLine())) { // until new line (headers ending)
            stringRequest.append(line).append('\n');
        }
        return new GetRequest(stringRequest.toString());
    }

    // 正则查找偏移量位置
    private long findRangeOffset(String request) {
        Matcher matcher = RANGE_HEADER_PATTERN.matcher(request);
        if (matcher.find()) {
            String rangeValue = matcher.group(1);
            return Long.parseLong(rangeValue);
        }
        return -1;
    }

    // 正则查找url位置
    private String findUri(String request) {
        Matcher matcher = URL_PATTERN.matcher(request);
        if (matcher.find()) {
            return matcher.group(1);
        }
        throw new IllegalArgumentException("Invalid request `" + request + "`: url not found!");
    }

4.ProxyCache分析

ProxyCache是一个代理类,代理了数据源,并给数据源提供了缓存支持。
核心字段

    private final Source source;  // 数据源
    private final Cache cache;  // 缓存实现
    private final Object wc = new Object();  // 等待缓存写入的锁 
    private final Object stopLock = new Object();  // 关闭io流使用的锁
    private final AtomicInteger readSourceErrorsCount;  // 错误重试次数(默认最大1次)
    private volatile Thread sourceReaderThread;  //  读数据的现成
    private volatile boolean stopped;
    private volatile int percentsAvailable = -1;

读数据相关代码:

        public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
        ProxyCacheUtils.assertBuffer(buffer, offset, length);
                // 未结束、缓存未完毕、当前缓存大小不够这次要读取的长度的话(偏移量+要读取的长度=文件需要缓存的长度),就死循环等待数据的读取
        while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
            readSourceAsync();  // 尝试异步读取资源
            waitForSourceData();  // 来个锁锁上,wait等待数据源的写入
            checkReadSourceErrorsCount();
        }
        int read = cache.read(buffer, offset, length);
        // 缓存写入完毕的时候回调一下
        if (cache.isCompleted() && percentsAvailable != 100) {
            percentsAvailable = 100;
            onCachePercentsAvailableChanged(100);
        }
        return read;
    }
    
    private synchronized void readSourceAsync() throws ProxyCacheException {
        boolean readingInProgress = sourceReaderThread != null && sourceReaderThread.getState() != Thread.State.TERMINATED;
        // 如果没有停止、缓存未缓存完毕、也没有线程在运行,就创建一个读取资源的线程并启动
        if (!stopped && !cache.isCompleted() && !readingInProgress) {
            sourceReaderThread = new Thread(new SourceReaderRunnable(), "Source reader for " + source);
            sourceReaderThread.start();
        }
    }

        // 上锁然后将线程wait掉,等待有可用的缓存
    private void waitForSourceData() throws ProxyCacheException {
        synchronized (wc) {
            try {
                wc.wait(1000);
            } catch (InterruptedException e) {
                throw new ProxyCacheException("Waiting source data is interrupted!", e);
            }
        }
    }

开启的线程是如何读入数据的:

        private void readSource() {
        long sourceAvailable = -1;
        long offset = 0;
        try {
            offset = cache.available();
            // 参考http source 的实现,打开了一个httpURLConnection
            source.open(offset);
            // 然后就是io流基操
            sourceAvailable = source.length();
            byte[] buffer = new byte[ProxyCacheUtils.DEFAULT_BUFFER_SIZE];
            int readBytes;
            while ((readBytes = source.read(buffer)) != -1) {
                    // 这个地方使用锁的目的是防止正在写入缓存的时候结束读取
                synchronized (stopLock) {
                    if (isStopped()) {
                        return;
                    }
                    // 将数据写入到缓存里,goto FileCache分析
                    cache.append(buffer, readBytes);
                }
                offset += readBytes;
                // todo
                notifyNewCacheDataAvailable(offset, sourceAvailable);  // 主要就是计算了下缓存可用的百分比回调给onCachePercentsAvailableChanged(注这是个空的方法)
            }
            tryComplete();  // 检查下文件是否下完了,goto tryComplete()分析
            onSourceRead();  // 回调一下缓存可用了
        } catch (Throwable e) {
            readSourceErrorsCount.incrementAndGet();  // 报错了就增加一次错误计数
            onError(e);
        } finally {
            closeSource(); // 调用了source.close(),go to HttpUrlSource.close()
            notifyNewCacheDataAvailable(offset, sourceAvailable);
        }
    }
    
    private void notifyNewCacheDataAvailable(long cacheAvailable, long sourceAvailable) {
        onCacheAvailable(cacheAvailable, sourceAvailable);

        synchronized (wc) {
            wc.notifyAll();
        }
    }
    
    private void tryComplete() throws ProxyCacheException {
            // 同样的完成缓存的时候也要防止关闭线程
        synchronized (stopLock) {
            if (!isStopped() && cache.available() == source.length()) {
                cache.complete();  // goto FileCache.complete,这个地方就是.download文件变完整文件后缀名的地方。
            }
        }
    }

HttpProxyCache分析

继承自ProxyCache,扩展了三点能力:1.解析get请求来向socket写入数据,进行数据转发 2.判断是否使用缓存 3.回调缓存可用信息。
入口,解析请求内容判断如何响应请求

    public synchronized void processRequest(GetRequest request, Socket socket) throws IOException, ProxyCacheException {
        OutputStream out = new BufferedOutputStream(socket.getOutputStream());
        String responseHeaders = newResponseHeaders(request);  // 构建一个响应头
        out.write(responseHeaders.getBytes("UTF-8")); // 写入响应头
        long offset = request.rangeOffset;
        // 然后就该判断是否要用缓存了
        if (isUseCache(request)) {
            responseWithCache(out, offset);
        } else {
            responseWithoutCache(out, offset);
        }
    }

    // 数据源长度大于0、是断点请求、并且请求的偏移量大于阈值(目前可用的缓存+数据源总长度的20%)=》不使用缓存
    // 看上去是特指seek to 很后边的情况,这样下来的数据也不好写入文件,索性就直接不用缓存了?
    private boolean isUseCache(GetRequest request) throws ProxyCacheException {
        long sourceLength = source.length();
        boolean sourceLengthKnown = sourceLength > 0;
        long cacheAvailable = cache.available();
        // do not use cache for partial requests which too far from available cache. It seems user seek video.
        return !sourceLengthKnown || !request.partial || request.rangeOffset <= cacheAvailable + sourceLength * NO_CACHE_BARRIER;
    }

    // 创建一个响应头,注意断点信息是在Content-Range打入的
    private String newResponseHeaders(GetRequest request) throws IOException, ProxyCacheException {
        String mime = source.getMime();
        boolean mimeKnown = !TextUtils.isEmpty(mime);
        long length = cache.isCompleted() ? cache.available() : source.length();
        boolean lengthKnown = length >= 0;
        long contentLength = request.partial ? length - request.rangeOffset : length;
        boolean addRange = lengthKnown && request.partial;
        return new StringBuilder()
                .append(request.partial ? "HTTP/1.1 206 PARTIAL CONTENT\n" : "HTTP/1.1 200 OK\n")
                .append("Accept-Ranges: bytes\n")
                .append(lengthKnown ? format("Content-Length: %d\n", contentLength) : "")
                .append(addRange ? format("Content-Range: bytes %d-%d/%d\n", request.rangeOffset, length - 1, length) : "")
                .append(mimeKnown ? format("Content-Type: %s\n", mime) : "")
                .append("\n") // headers end
                .toString();
    }

用缓存的响应:

    private void responseWithCache(OutputStream out, long offset) throws ProxyCacheException, IOException {
        // 调用自己的read方法(go to ProxyCache),基本io操作
        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
        int readBytes;
        while ((readBytes = read(buffer, offset, buffer.length)) != -1) {
            out.write(buffer, 0, readBytes);
            offset += readBytes;
        }
        out.flush();
    }

不用缓存的响应:

    // 实际上就是单独开了个请求去拉这种偏移量很靠后的数据,同时并不会写入缓存
    private void responseWithoutCache(OutputStream out, long offset) throws ProxyCacheException, IOException {
        HttpUrlSource newSourceNoCache = new HttpUrlSource(this.source);
        try {
            newSourceNoCache.open((int) offset);
            byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
            int readBytes;
            while ((readBytes = newSourceNoCache.read(buffer)) != -1) {
                out.write(buffer, 0, readBytes);
                offset += readBytes;
            }
            out.flush();
        } finally {
            newSourceNoCache.close();
        }
    }

重写了回调方法:

@Override
    protected void onCachePercentsAvailableChanged(int percents) {
        if (listener != null) {
            listener.onCacheAvailable(cache.file, source.getUrl(), percents);
        }
    }

5.HttpProxyCacheServerClients分析

代理服务器的一个客户端,对应了一个目标资源请求和多个需要请求缓存的客户端。
最核心的部分,处理server分发过来的请求:

    public void processRequest(GetRequest request, Socket socket) throws ProxyCacheException, IOException {
        startProcessRequest();
        try {
            // 记录下接入的客户端数量
            clientsCount.incrementAndGet();
            // 开始处理请求,goto ProxyCache分析
            proxyCache.processRequest(request, socket);
        } finally {
            finishProcessRequest(); // 上面会同步读数据直到读完数据
        }
    }

    // 如果有代理类就不用创建了
    private synchronized void startProcessRequest() throws ProxyCacheException {
        proxyCache = proxyCache == null ? newHttpProxyCache() : proxyCache;
    }

    // 请求结束了减少一个客户端,然后判断下是否没得客户端了,关闭资源
    // 因为在 finally 代码块,如果socket特殊情况断掉了也会走这里。所以我们可以实现X%后cancel掉请求来达到缓存目标大小的资源的逻辑。
    private synchronized void finishProcessRequest() {
        if (clientsCount.decrementAndGet() <= 0) {
            proxyCache.shutdown();
            proxyCache = null;
        }
    }

其他代码:

    // 关闭的封装,防止内存泄漏,清空客户端数量    
    public void shutdown() {
        listeners.clear();
        if (proxyCache != null) {
            proxyCache.registerCacheListener(null);
            proxyCache.shutdown();
            proxyCache = null;
        }
        clientsCount.set(0);
    }

    public int getClientsCount() {
        return clientsCount.get();
    }

    // 创建资源和缓存,HttpProxyCache、HttpUrlSource、FileCache、sourceInfoStorage、headerInjector在这串联起来了。
    private HttpProxyCache newHttpProxyCache() throws ProxyCacheException {
        HttpUrlSource source = new HttpUrlSource(url, config.sourceInfoStorage, config.headerInjector);
        FileCache cache = new FileCache(config.generateCacheFile(url), config.diskUsage);
        HttpProxyCache httpProxyCache = new HttpProxyCache(source, cache);
        httpProxyCache.registerCacheListener(uiCacheListener);
        return httpProxyCache;
    }

6.HttpProxyCacheServer分析

代理服务器具体实现,主要承载了代理服务的入口,创建一个本地的代理服务器,处理客户端请求的分发,参数初始化。
构造方法,启动了一个本地socket服务器:

    private HttpProxyCacheServer(Config config) {
        this.config = checkNotNull(config);
        try {
            InetAddress inetAddress = InetAddress.getByName(PROXY_HOST);
            // 创建一个ServerSocket
            this.serverSocket = new ServerSocket(0, 8, inetAddress);
            this.port = serverSocket.getLocalPort();
            IgnoreHostProxySelector.install(PROXY_HOST, port);  // todo 
            CountDownLatch startSignal = new CountDownLatch(1);  // 锁
            this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
            this.waitConnectionThread.start();
            startSignal.await(); // freeze thread, wait for server starts    // todo 这个锁应该线程一启动就解开了,不知道干啥用的
            this.pinger = new Pinger(PROXY_HOST, port);
            LOG.info("Proxy cache server started. Is it alive? " + isAlive());
        } catch (IOException | InterruptedException e) {
            socketProcessor.shutdown();
            throw new IllegalStateException("Error starting local proxy server", e);
        }
    }

看下线程里执行的runnable的执行内容,在这里进行的socket连接的分发:

    private void waitForRequest() {
        try {
            // 线程不死就一直死循环
            while (!Thread.currentThread().isInterrupted()) {
                // 获取当前新的客户端连接
                // 需要注意serverSocket.accept()是一个阻塞方法,没有客户端连接会阻塞线程等待
                Socket socket = serverSocket.accept();
                LOG.debug("Accept new socket " + socket);
                socketProcessor.submit(new SocketProcessorRunnable(socket));
            }
        } catch (IOException e) {
            onError(new ProxyCacheException("Error during waiting connection", e));
        }
    }

    // SocketProcessorRunnable 执行的方法:
    private void processSocket(Socket socket) {
        try {
            // 构造get请求模型,goto GetRequest分析
            GetRequest request = GetRequest.read(socket.getInputStream());
            Log.i("kyle", "Request to cache proxy:" + request);
            LOG.debug("Request to cache proxy:" + request);
            // uri是encode后的字符串没法直接用
            String url = ProxyCacheUtils.decode(request.uri);
            // 如果是ping请求的话响应一下ping成功,知识点:ping请求的uri是"ping"
            if (pinger.isPingRequest(url)) {
                pinger.responseToPing(socket);
            } else {
                HttpProxyCacheServerClients clients = getClients(url);
                clients.processRequest(request, socket);
            }
        } catch (SocketException e) {
            // There is no way to determine that client closed connection http://stackoverflow.com/a/10241044/999458
            // So just to prevent log flooding don't log stacktrace
            LOG.debug("Closing socket… Socket is closed by client.");
        } catch (ProxyCacheException | IOException e) {
            onError(new ProxyCacheException("Error processing request", e));
        } finally {
            // 在这里释放socket
            releaseSocket(socket);
            LOG.debug("Opened connections: " + getClientsCount());
        }
    }

    private HttpProxyCacheServerClients getClients(String url) throws ProxyCacheException {
        // 锁上防止并发创建client
        synchronized (clientsLock) {
            // map里查一下有没有,没有就创建一个新的client
            // todo 已经加锁了还有必要用ConcurrentHashMap吗
            HttpProxyCacheServerClients clients = clientsMap.get(url);
            if (clients == null) {
                clients = new HttpProxyCacheServerClients(url, config);
                clientsMap.put(url, clients);
            }
            return clients;
        }
    }

客户端是如何获取一个可以连接到代理服务器的url呢:

    // 注意第二个参数,true的话会返回 file:// 形式的uri,有的下载器是不支持这种地址的,当然遇到file直接塞给播放器就好了
    public String getProxyUrl(String url, boolean allowCachedFileUri) {
        if (allowCachedFileUri && isCached(url)) {
            File cacheFile = getCacheFile(url);
            touchFileSafely(cacheFile);
            return Uri.fromFile(cacheFile).toString();
        }
        return isAlive() ? appendToProxyUrl(url) : url;
    }

    // 生成一个代理服务器的url映射
    private String appendToProxyUrl(String url) {
        return String.format(Locale.US, "http://%s:%d/%s", PROXY_HOST, port, ProxyCacheUtils.encode(url));  // 其实就是url encode了一下
    }

关闭:

    // 关闭服务器:
    public void shutdown() {
        LOG.info("Shutdown proxy server");
        // 加锁循环关闭client,清空map
        shutdownClients();
        // 释放缓存实现
        config.sourceInfoStorage.release();
        // 关闭线程
        waitConnectionThread.interrupt();
        try {
            // 收工下班
            if (!serverSocket.isClosed()) {
                serverSocket.close();
            }
        } catch (IOException e) {
            onError(new ProxyCacheException("Error shutting down proxy server", e));
        }
    }

注册监听:

    // 找到 url对应的client注册监听
    public void registerCacheListener(CacheListener cacheListener, String url) {
        checkAllNotNull(cacheListener, url);
        synchronized (clientsLock) {
            try {
                getClients(url).registerCacheListener(cacheListener);
            } catch (ProxyCacheException e) {
                LOG.warn("Error registering cache listener", e);
            }
        }
    }

    public void unregisterCacheListener(CacheListener cacheListener, String url) {
        checkAllNotNull(cacheListener, url);
        synchronized (clientsLock) {
            try {
                getClients(url).unregisterCacheListener(cacheListener);
            } catch (ProxyCacheException e) {
                LOG.warn("Error registering cache listener", e);
            }
        }
    }

    // 直接for循环解绑目标监听
    public void unregisterCacheListener(CacheListener cacheListener) {
        checkNotNull(cacheListener);
        synchronized (clientsLock) {
            for (HttpProxyCacheServerClients clients : clientsMap.values()) {
                clients.unregisterCacheListener(cacheListener);
            }
        }
    }

// todo 文件在磁盘上的缓存策略实现(简单来说使用策略模式提供了两种lru缓存大小控制)
// todo2 魔改源码支持m3u8格式视频的预加载

参考资料
1.https://www.cnblogs.com/bylijian/p/7344874.html
2.https://blog.csdn.net/akon_vm/article/details/7429245

上一篇 下一篇

猜你喜欢

热点阅读