音视频开发之旅(49)-边缓存边播放之AndroidVideoC
目录
- 背景
- AndroidVideoCache简单使用
- 实现原理
- 源码分析
- AndroidVideoCache的不足
- 资料
- 收获
一、背景
播放音视频时,播放器数据的请求是由播放器内部发起的,我们只是提供了一个url,而不能控制数据的请求过程,
都是要先进行下载,下载到一定量之后播放器再开始播放,当下载进度减去播放进度小于一定阀值,进入缓冲状态。
比如MediaPlayer的最小缓存大小是4M,最大20M
//framework/av/media/libdatasource/include/datasource/NuCachedSource2.h:30
enum {
kPageSize = 65536,
//缓冲 最大阀值 20M
kDefaultHighWaterThreshold = 20 * 1024 * 1024,
//缓冲 最小阀值 4M
kDefaultLowWaterThreshold = 4 * 1024 * 1024,
// Read data after a 15 sec timeout whether we're actively
// fetching or not.
kDefaultKeepAliveIntervalUs = 15000000,
};
这样的设计有如下两个弊端:
- 造成首帧时长、卡顿恢复时长,都会比较高,影响用户体验。
- 每次都要重新跟进url重新下载视频,造成了严重的流量(真金白银)浪费。
这就需要一种自定义播放器结合边下边播的策略,对下载、解码、播放进行控制。我们今天分析的开源项目AndroidVideoCache给我们提供了一种很好的思路,我们一起来分析学习吧。
二、AndroidVideoCache简单使用
public void setDataSource(String path ){
...
// 获取APP单例的proxy
HttpProxyCacheServer proxy = MyApplication.getProxy();
//把网络的url转为代理的url
String proxyUrl = proxy.getProxyUrl(path);
//内部触发请求,socketServer根据host和port监听有socket连接进行代理请求下载音视频流数据
mediaPlayer.setDataSource(proxyUrl);
...
}
public class MyApplication extend Application
public static HttpProxyCacheServer getProxy() {
return getInstance().proxy == null ? (getInstance().proxy = getInstance().newProxy()) : getInstance().proxy;
}
private HttpProxyCacheServer newProxy() {
return new HttpProxyCacheServer.Builder(mContext)
//设置缓存路径
.cacheDirectory(CacheUtils.getVideoCacheDir(mContext))
//设置缓存的名称
.fileNameGenerator(new MyMd5FileNameGenerator())
.build();
}
}
三、实现原理
在业务层和播放器层直接加入本地代理,通过Socket的的方式,首先建立本地的socketServer,监听local host和指定(bind的时候指定让系统来分配一个可用的)端口的请求。每次数据的请求都发给local host,socketSrever监听到有Socket连接时,由 socketServer来代理视频数据的请求,请求到的数据不返回给播放器,而是直接写入到文件缓存中,再从改文件缓存中读取buffer数据给到播放器。
四、源码分析
主流程图
下面我们结合源码进行分析,我们从HttpProxyCacheServer获取本地代理以及转换请求地址的getProxyUrl方法开始入手具体分析下。
1. HttpProxyCacheServer.Builder通过构造器来生成本地代理服务器。
public HttpProxyCacheServer build() {
Config config = buildConfig();
return new HttpProxyCacheServer(config);
}
private Config buildConfig() {
//cacheRoot: 设置缓存路径
//fileNameGenerator: 设置文件名,一般用url的md5或者唯一表示的业务id/hash
//diskUsage: 缓存的lru策略,有个touch方法,用于更新文件的修改时间(这个的实现也很有意思)。
// 支持设置缓存总大小以及缓存总个数的阀值。也可以自行扩展比如设置缓存的有效期
//sourceInfoStorage : 缓存信息的存储,根据唯一表示存储/查询对应的缓存路径等信息
return new Config(cacheRoot, fileNameGenerator, diskUsage, sourceInfoStorage);
}
2. HttpProxyCacheServer构造方法
private static final String PROXY_HOST = "127.0.0.1";
private HttpProxyCacheServer(Config config) {
this.config = checkNotNull(config);
try {
//根据host生成本地代理服务器的地址
InetAddress inetAddress = InetAddress.getByName(PROXY_HOST);
//创建ServerSocket,最大可于8个client进行连接
this.serverSocket = new ServerSocket(0, 8, inetAddress);
//有系统自动分配一个端口
this.port = serverSocket.getLocalPort();
IgnoreHostProxySelector.install(PROXY_HOST, port);
//等待waitConnectionThread线程启动
CountDownLatch startSignal = new CountDownLatch(1);
//开启一个线程接收socket连接
this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
this.waitConnectionThread.start();
//阻塞当前线程,直到startSignal.countDown();
startSignal.await(); // freeze thread, wait for server starts
this.pinger = new Pinger(PROXY_HOST, port);
} catch (IOException | InterruptedException e) {
socketProcessor.shutdown();
throw new IllegalStateException("Error starting local proxy server", e);
}
}
3. WaitRequestsRunnable:开启一个线程,在线程中轮训
private final class WaitRequestsRunnable implements Runnable {
private final CountDownLatch startSignal;
public WaitRequestsRunnable(CountDownLatch startSignal) {
this.startSignal = startSignal;
}
@Override
public void run() {
startSignal.countDown();
//开启一个线程,在线程中轮训
waitForRequest();
}
}
4. waitForRequest
private final ExecutorService socketProcessor = Executors.newFixedThreadPool(8);
private void waitForRequest() {
try {
//如果线程没有interrupt,不断的轮询,用于检测是否有新的socket连接
while (!Thread.currentThread().isInterrupted()) {
//阻塞的方法 用于socket连接
//socketServer通过监听本地host:port,如果有对应的请求触发就进行一个socket连接
Socket socket = serverSocket.accept();
//线程池,同时最大可以有8个socket连接
// 每个socket独占一个线程,最大可以有8个并发连接
// submit一个runnable进行处理socket
socketProcessor.submit(new SocketProcessorRunnable(socket));
}
} catch (IOException e) {
onError(new ProxyCacheException("Error during waiting connection", e));
}
}
5. 等到有看下getProxyUrl调用,serverSocket的accept就会收到socket连接走到SocketProcessorRunnable,我们先看下getProxyUrl的实现。
public String getProxyUrl(String url, boolean allowCachedFileUri) {
if (allowCachedFileUri && isCached(url)) {
File cacheFile = getCacheFile(url);
touchFileSafely(cacheFile);
return Uri.fromFile(cacheFile).toString();
}
return isAlive() ? appendToProxyUrl(url) : url;
}
private boolean isAlive() {
return pinger.ping(3, 70); // 70+140+280=max~500ms
}
private String appendToProxyUrl(String url) {
return String.format(Locale.US, "http://%s:%d/%s", PROXY_HOST, port, ProxyCacheUtils.encode(url));
}
6. 接着继续看SocketProcessorRunnable:处理这个socket连接
private final class SocketProcessorRunnable implements Runnable {
private final Socket socket;
public SocketProcessorRunnable(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
//处理这个socket连接
processSocket(socket);
}
}
7. processSocket:获取 HttpProxyCacheServerClients ,并进行request处理
//HttpProxyCacheServer#processSocket
private void processSocket(Socket socket) {
try {
//通过输入流(即请求转换过的url等信息)生成GetRequest对象
GetRequest request = GetRequest.read(socket.getInputStream());
String url = ProxyCacheUtils.decode(request.uri);
//url是"ping" 返回200,可以ping通
if (pinger.isPingRequest(url)) {
pinger.responseToPing(socket);
} else {
//获取 HttpProxyCacheServerClients ,并进行request处理
HttpProxyCacheServerClients clients = getClients(url);
clients.processRequest(request, socket);
}
} catch (SocketException e) {
// There is no way to determine that client closed connection http://stackoverflow.com/a/10241044/999458
// So just to prevent log flooding don't log stacktrace
} catch (ProxyCacheException | IOException e) {
onError(new ProxyCacheException("Error processing request", e));
} finally {
//socket处理完毕之后,在finally中,关闭socket连接释放资源
releaseSocket(socket);
}
}
8. HttpProxyCacheServerClients#processRequest: 构造proxyCache,并进行请求
//HttpProxyCacheServerClients#processRequest
public void processRequest(GetRequest request, Socket socket) throws ProxyCacheException, IOException {
//proxyCache的初始化,如果没有则重新newHttpProxyCache,否则复用即可
startProcessRequest();
try {
//原子操作用于记录当前有多少个socketClient
clientsCount.incrementAndGet();
//缓存代理开始处理
proxyCache.processRequest(request, socket);
} finally {
//结束
finishProcessRequest();
}
}
private synchronized void startProcessRequest() throws ProxyCacheException {
proxyCache = proxyCache == null ? newHttpProxyCache() : proxyCache;
}
private synchronized void finishProcessRequest() {
if (clientsCount.decrementAndGet() <= 0) {
//sourceReaderThread中断
//FileChannel关闭
//touch下文件
proxyCache.shutdown();
proxyCache = null;
}
}
9.1 HttpProxyCacheServerClients#newHttpProxyCache:进行httpProxyCache的初始化
private HttpProxyCache newHttpProxyCache() throws ProxyCacheException {
//HttpUrlSource 持有url,开启HttpUrlConnetcion来获取inputStream
HttpUrlSource source = new HttpUrlSource(url, config.sourceInfoStorage, config.headerInjector);
//缓存总以.download存在,缓存完后更名,并会进行一次touch
FileCache cache = new FileCache(config.generateCacheFile(url), config.diskUsage);
HttpProxyCache httpProxyCache = new HttpProxyCache(source, cache);
httpProxyCache.registerCacheListener(uiCacheListener);
return httpProxyCache;
}
9.2 HttpProxyCache#processRequest:这个方法是边缓存边播放的关键
把数据先以流的方式 写入到缓存,在通过socket的outStream给到播放器
public void processRequest(GetRequest request, Socket socket) throws IOException, ProxyCacheException {
//socket.getOutputStream() 就是clientSocket需要的stream(会以流的方式,先缓存到本地再给到播放器)
OutputStream out = new BufferedOutputStream(socket.getOutputStream());
//先添加 响应头
//HTTP/1.1 200 OK
//Accept-Ranges: bytes
//Content-Length: 4585263
//Content-Type: audio/mpeg
String responseHeaders = newResponseHeaders(request);
out.write(responseHeaders.getBytes("UTF-8"));
long offset = request.rangeOffset;
//判断是否需要缓存,TODO 这里的可以进行优化,否则一旦seek后就可能不会在缓存了
//要处理seek后继续缓存就要考虑文件空洞的以及merge的事情
if (isUseCache(request)) {
//如果使用缓存,先把请求数据写入缓存文件,再返回给播放器
responseWithCache(out, offset);
} else {
responseWithoutCache(out, offset);
}
}
private boolean isUseCache(GetRequest request) throws ProxyCacheException {
//原始长度
long sourceLength = source.length();
boolean sourceLengthKnown = sourceLength > 0;
//已经缓存的长度
long cacheAvailable = cache.available();
// do not use cache for partial requests which too far from available cache. It seems user seek video.
return !sourceLengthKnown || !request.partial || request.rangeOffset <= cacheAvailable + sourceLength * NO_CACHE_BARRIER;
}
10. HttpProxyCache#responseWithCache: 每次从网络六种读取8192个字节,先写入到缓存文件,再从缓存文件中取出给到播放器
static final int DEFAULT_BUFFER_SIZE = 8 * 1024;
private void responseWithCache(OutputStream out, long offset) throws ProxyCacheException, IOException {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int readBytes;
//这里的read方法,每次读取8192个字节,直到读完为止
while ((readBytes = read(buffer, offset, buffer.length)) != -1) {
out.write(buffer, 0, readBytes);
offset += readBytes;
}
out.flush();
}
11. ProxyCache#read
/**
* 这个是边缓存边播放的关键,先往文件中写入数据,直到写完(整个文件写完或者8192个写完)或者中断。
* buffer:一次读取的buffer
* offset:当前的已有缓存的偏移
* lenght: 一次读取buffer的大小
*/
public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
ProxyCacheUtils.assertBuffer(buffer, offset, length);
//如果没有缓存完,并且缓存的大小小于需要缓存的大小(一次8192个字节),并且sourceReaderThread线程没有停止
while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
//异步的读取数据, 这里为什么要这样设计呐??(本来已经在子线程了,为什么还要在开启线程进行读取网络数据呐?sourceReaderThread)
readSourceAsync();
//等待,最大时长1s秒钟,每过1s中检查是否有错误发生
waitForSourceData();
checkReadSourceErrorsCount();
}
//从缓存中读取最大的8192个字节数据给到播放器
int read = cache.read(buffer, offset, length);
if (cache.isCompleted() && percentsAvailable != 100) {
percentsAvailable = 100;
onCachePercentsAvailableChanged(100);
}
return read;
}
private void waitForSourceData() throws ProxyCacheException {
synchronized (wc) {
try {
wc.wait(1000);
} catch (InterruptedException e) {
throw new ProxyCacheException("Waiting source data is interrupted!", e);
}
}
}
12. ProxyCache#readSourceAsync: 如果已经还没有停止,并且 还没有缓存完 并且 没有在读取中 则开启新的数据读取线程 线程
private synchronized void readSourceAsync() throws ProxyCacheException {
boolean readingInProgress = sourceReaderThread != null && sourceReaderThread.getState() != Thread.State.TERMINATED;
//如果已经还没有停止,并且 还没有缓存完 并且 没有在读取中 则开启新的数据读取线程 线程
if (!stopped && !cache.isCompleted() && !readingInProgress) {
//在这个SourceReaderRunnable中进行
sourceReaderThread = new Thread(new SourceReaderRunnable(), "Source reader for " + source);
sourceReaderThread.start();
}
}
13. 下面再来看下SourceReaderRunnable的的run中的ProxyCache#readSource
从网络连接的HttpUrlConnetion拿到inputStream,不断的读取数据(每次8192个字节),直到读完。
private void readSource() {
long sourceAvailable = -1;
long offset = 0;
try {
//已经缓存的大小
offset = cache.available();
//开启 HttpUrlConnetion,获取一个inputStream
source.open(offset);
//文件的大小
sourceAvailable = source.length();
byte[] buffer = new byte[ProxyCacheUtils.DEFAULT_BUFFER_SIZE];
int readBytes;
//HttpUrlSource.read,不断的读取数据从inputstream
while ((readBytes = source.read(buffer)) != -1) {
synchronized (stopLock) {
if (isStopped()) {
return;
}
//往缓存文件中写入数据,一次写入8192字节
cache.append(buffer, readBytes);
}
offset += readBytes;
notifyNewCacheDataAvailable(offset, sourceAvailable);
}
tryComplete();
onSourceRead();
} catch (Throwable e) {
//如果读取过程中发生了错误,则进行原子加操作,每过1s秒会检查该标记位
readSourceErrorsCount.incrementAndGet();
onError(e);
} finally {
closeSource();
notifyNewCacheDataAvailable(offset, sourceAvailable);
}
}
14.HttpUrlSource#read
这里的inputStream就是HttpUrlconnection的输入
//
@Override
public int read(byte[] buffer) throws ProxyCacheException {
if (inputStream == null) {
throw new ProxyCacheException("Error reading data from " + sourceInfo.url + ": connection is absent!");
}
try {
return inputStream.read(buffer, 0, buffer.length);
} catch (InterruptedIOException e) {
throw new InterruptedProxyCacheException("Reading source " + sourceInfo.url + " is interrupted", e);
} catch (IOException e) {
throw new ProxyCacheException("Error reading data from " + sourceInfo.url, e);
}
}
为什么使用HttpUrlconnection而不是OKHttp呐,这里完全可以使用OKHttp替换。可以结合自己业务的实际情况来进行切换。
截图来自:performance-okhttp-vs.-httpurlconnection
主要流程到这里基本上就分析完了
在请求远程url时将文件写到本地缓存中,然后从这个本地缓存中读数据,写入到客户端socket里面。服务器Socket主要还是一个代理的作用,从中间拦截掉网络请求,然后实现对socket的读取和写入。
五、AndroidVideoCache的不足
5.1 Seek的场景
Seek后有可能就不缓存了
我们在上一小节的4.9.2的HttpProxyCache#processRequest的isUseCache就是来判断是否进行缓存。
private boolean isUseCache(GetRequest request) throws ProxyCacheException {
//原始长度
long sourceLength = source.length();
boolean sourceLengthKnown = sourceLength > 0;
//已经缓存的长度
long cacheAvailable = cache.available();
// do not use cache for partial requests which too far from available cache. It seems user seek video.
return !sourceLengthKnown || !request.partial || request.rangeOffset <= cacheAvailable + sourceLength * NO_CACHE_BARRIER;
}
这个不符合我们的预期,seek后也应该进行缓存,这是缓存文件之间可能存在空洞,需要针对这种情况做些特殊处理。下面一篇我们来分析下另外一个开源项目是如何处理这种情况的。
5.2 预缓存(脱离播放器实现缓存)
提前下载,无论视频是否下载完成,都可以将这提前下载好的部分作为视频缓存使用
参考上一小节的4.7,进行下扩展。根据url创建GetRequest,然后调用HttpProxyCacheServerClients#processRequest即可
HttpProxyCacheServerClients clients = getClients(url);
clients.processRequest(request);
5.3 线程管理
开启线程过多,过多线程的内存消耗以及状态同步是一个需要注意点。可以把线程改为线程池的方式实现。但是要特别并发和状态同步。这个后面也会有单独一篇再来分析
有哪些线程?
- HttpProxyCacheServer.WaitRequestsRunnable—》等待socket连接
- HttpProxyCacheServer.SocketProcessorRunnable—》处理单个socket连接
- ProxyCache.SourceReaderRunnable —>分块(8192个字节)读取网络数据流写入到缓存文件并且返回给clientSocket 【这个线程要重点分析】
5.4 缓存是根据url来进行区分,对于大的视频,没有进行分片下载,节省流量
可以参考m3u8的方式,给一个视频进行分片。这个后面再分析另外一个开源项目是再来一些拆解。
5.5 AndroidVideoCache采用数据库进行存储缓存的信息,可以不使用,减少IO操作
5.6 如果我们的有其他代理,那么这个socket方式拿url就会出问题,因为我们拿到的也是一个代理url,所以在开发时需要考虑代理用户提供兼容性处理。
六、资料
- AndroidVideoCache-视频边播放边缓存的代理策略
- 网易云音乐-音视频播放
- [QQ空间十亿级视频播放技术优化揭秘王辉终稿2.key]
- Android MediaPlayer buffer大小
- Android主流视频播放及缓存实现原理调研
- Qzone视频下载如何做到多快好省?
- AndroidVideoCache优化
- Android 平台视频边下边播技术
七、收获
通过本篇的学习实践,
- 理解边下边播的必要性以其实现原理
- 分析AndroidVideoCache源码,从整体和重要流程上进行拆解分析
- AndroidVideoCache存在的一些不足,以及对应的方案。
感谢你的阅读
下一篇我们对seek的场景如何实现边缓存边播放进行分析和实现,欢迎关注公众号“音视频开发之旅”,一起学习成长。
欢迎交流