直播音视频开发经验之路直播相关知识

移动端直播开发(三)RTMP推流

2017-02-22  本文已影响1906人  前世小书童

写在前面的话

前面一篇文章已经对移动端数据源采集与编码进行了说明,接下来就是将之前采集的数据上传给我们的视频服务器了,通过视频服务器的转发,可以在web端,app端观看我们采集的数据,从而实现直播效果,对于上传直播数据,我们一般采用RTMP推流方式,那么首先我们要了解一下RTMP协议。

一.RTMP协议

RTMP协议是Real Time Message Protocol(实时信息传输协议)的缩写,它是由Adobe公司提出的一种应用层的协议,用来解决多媒体数据传输流的多路复用(Multiplexing)和分包(packetizing)的问题。

1.简要介绍

RTMP协议是应用层协议,是要靠底层可靠的传输层协议(通常是TCP)来保证信息传输的可靠性的。在基于传输层协议的链接建立完成后,一个RTMP协议的流媒体推流需要经过以下几个步骤:握手,建立连接,建立流,推流。RTMP连接都是以握手作为开始的。建立连接阶段用于建立客户端与服务器之间的“网络连接”;建立流阶段用于建立客户端与服务器之间的“网络流”;推流阶段用于传输视音频数据。

接下来就简单介绍下这一过程

2.握手

在rtmp连接建立后,服务端与客户端需要通过3次交换报文完成握手,握手其他的协议不同,是由三个静态大小的块,而不是可变大小的块组成的,客户端与服务器发送相同的三个chunk,客户端发送c0,c1,c2,服务端发送s0,s1,s2。

发送规则

数据格式

C0与S0
C0和S0的长度是一个字节,在 S0 中这个字段表示服务器选择的 RTMP 版本。rtmp1.0规范所定义的版本是 3;0-2 是早期产品所用的,已被丢弃;4-31保留在未来使用;32-255 不允许使用(为了区分其他以某一字符开始的文本协议)。如果服务无法识别客户端请求的版本,应该返回 3 。客户端可以选择减到版本 3 或选择取消握手。

C1与S1
C1 和 S1 有 1536 字节长,由下列字段组成:
时间:4 字节 本字段包含时间戳。该时间戳应该是发送这个数据块的端点的后续块的时间起始点。可以是 0,* 或其他的 任何值。为了同步多个流,端点可能发送其块流的当前值。
零:4 字节 本字段必须是全零。
随机数据:1528 字节。 本字段可以包含任何值。 因为每个端点必须用自己初始化的握手和对端初始化的握 手来区分身份,所以这个数据应有充分的随机性。但是并不需要加密安全的随机值,或者动态值

C2与S2
C2 和 S2 消息有 1536 字节长。只是 S1 和 C1 的回复。本消息由下列字段组成。
时间:4 字节 本字段必须包含对等段发送的时间(对 C2 来说是 S1,对 S2 来说是 C1)。
时间 2:4 字节 本字段必须包含先前发送的并被对端读取的包的时间戳。
随机回复:1528 字节 本字段必须包含对端发送的随机数据字段(对 C2 来说是 S1,对 S2 来说是 C1) 。 每个对等端可以用时间和时间 2 字段中的时间戳来快速地估计带宽和延迟。 但这样做可 能并不实用。

RTMP握手的这个过程就是完成了两件事:1. 校验客户端和服务器端RTMP协议版本号,2. 是发了一堆数据,猜想应该是测试一下网络状况,看看有没有传错或者不能传的情况。

3.建立网络连接

注意:

  1. 这里面的connect 命令消息,命令里面包含什么东西,协议中没有说,真实通信中要指定一些编解码的信息,这些信息是以AMF格式发送的, 其中audioCodecs和videoCodecs这两个指定音视频编码信息的不能少的。

  2. Window Acknowledgement Size 是设置接收端消息窗口大小,一般是2500000字节,即告诉客户端你在收到我设置的窗口大小的这么多数据之后给我返回一个ACK消息,告诉我你收到了这么多消息。在实际做推流的时候推流端要接收很少的服务器数据,远远到达不了窗口大小,所以基本不用考虑这点。而对于服务器返回的ACK消息一般也不做处理,我们默认服务器都已经收到了这么多消息。

  3. 服务器返回的_result命令类型消息的payload length一般不会大于128字节,但是在最新的nginx-rtmp中返回的消息长度会大于128字节,所以一定要做好收包,组包的工作。

4.建立网络流

创建完网络连接之后就可以创建网络流了

解析服务器返回的消息会得到一个stream ID, 这个ID也就是以后和服务器通信的 message stream ID, 一般返回的是1,不固定。

5.推流命令

推流准备工作的最后一步是 Publish Stream,即向服务器发一个publish命令,这个命令的message stream ID 就是上面 create stream 之后服务器返回的stream ID,发完这个命令一般不用等待服务器返回的回应,直接下一步发送音视频数据。有些rtmp库 还会发setMetaData消息,这个消息可以发也可以不发,里面包含了一些音视频编码的信息。

当以上工作都完成的时候,就可以发送音视频了。

二.RTMP推流的实现流程

前面已经介绍了RTMP协议推流的流程,那么我们如何在Android上面实现推流呢?一般采用FFmpeg来进行推流的,我这里采用的是一款纯Java的推流库yasea,之所以选择这个推流第三方库,主要是为了了解上述的RTMP协议推流的流程。

接下来就结合代码来分析下实现推流的功能

1.握手
public void connect(String url) throws IOException {
    int port;
    String host;
    Matcher matcher = rtmpUrlPattern.matcher(url);
    if (matcher.matches()) {
        tcUrl = url.substring(0, url.lastIndexOf('/'));
        swfUrl = "";            
        pageUrl = "";            
        host = matcher.group(1);
        String portStr = matcher.group(3);
        port = portStr != null ? Integer.parseInt(portStr) : 1935;
        appName = matcher.group(4);
        streamName = matcher.group(6);
    } else {
        throw new IllegalArgumentException("Invalid RTMP URL. Must be in format: rtmp://host[:port]/application[/streamName]");
    }

    // socket connection
    Log.d(TAG, "connect() called. Host: " + host + ", port: " + port + ", appName: " + appName + ", publishPath: " + streamName);
    socket = new Socket();
    SocketAddress socketAddress = new InetSocketAddress(host, port);
    socket.connect(socketAddress, 3000);
    BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
    BufferedOutputStream out = new BufferedOutputStream(socket.getOutputStream());
    Log.d(TAG, "connect(): socket connection established, doing handhake...");
    handshake(in, out);
    active = true;
    Log.d(TAG, "connect(): handshake done");
    rtmpSessionInfo = new RtmpSessionInfo();
    readThread = new ReadThread(rtmpSessionInfo, in, this);
    writeThread = new WriteThread(rtmpSessionInfo, out, this);
    readThread.start();
    writeThread.start();

    // Start the "main" handling thread
    new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                Log.d(TAG, "starting main rx handler loop");
                handleRxPacketLoop();
            } catch (IOException ex) {
                Logger.getLogger(RtmpConnection.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }).start();
}

private void handshake(InputStream in, OutputStream out) throws IOException {
    Handshake handshake = new Handshake();
    handshake.writeC0(out);
    handshake.writeC1(out); // Write C1 without waiting for S0
    out.flush();
    handshake.readS0(in);
    handshake.readS1(in);
    handshake.writeC2(out);
    handshake.readS2(in);
}

这里首先匹配我们需要上传的服务器地址进行匹配,接下来连接到视频服务器,接下来通过handshake方法来进行握手协议,接下来开启了两个线程,这两个线程是用来进行读写操作的,读是读取服务器返回的指令,写是向服务器发送指令,或者音视频信息,最后开启一个线程里面是handleRxPacketLoop方法,这个方法不断的读取服务器返回的指令。

2.建立网络连接
private void rtmpConnect() throws IOException, IllegalStateException {
if (fullyConnected || connecting) {
    throw new IllegalStateException("Already connected or connecting to RTMP server");
}

// Mark session timestamp of all chunk stream information on connection.
ChunkStreamInfo.markSessionTimestampTx();

Log.d(TAG, "rtmpConnect(): Building 'connect' invoke packet");
ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_COMMAND_CHANNEL);
Command invoke = new Command("connect", ++transactionIdCounter, chunkStreamInfo);
invoke.getHeader().setMessageStreamId(0);
AmfObject args = new AmfObject();
args.setProperty("app", appName);
args.setProperty("flashVer", "LNX 11,2,202,233"); // Flash player OS: Linux, version: 11.2.202.233
args.setProperty("swfUrl", swfUrl);
args.setProperty("tcUrl", tcUrl);
args.setProperty("fpad", false);
args.setProperty("capabilities", 239);
args.setProperty("audioCodecs", 3575);
args.setProperty("videoCodecs", 252);
args.setProperty("videoFunction", 1);
args.setProperty("pageUrl", pageUrl);
args.setProperty("objectEncoding", 0);
invoke.addData(args);
writeThread.send(invoke);

connecting = true;
mHandler.onRtmpConnecting("connecting");
}

这里配置了connect命令,前面也说到这个命令里面包含了很多东西
接下来就是服务器的返回信息分别是窗口大小与带宽信息,这些信息则由前面说的handleRxPacketLoop来读取并进行相关设置与反馈

private void handleRxPacketLoop() throws IOException {
    // Handle all queued received RTMP packets
    while (active) {
        while (!rxPacketQueue.isEmpty()) {
            RtmpPacket rtmpPacket = rxPacketQueue.poll();
            //Log.d(TAG, "handleRxPacketLoop(): RTMP rx packet message type: " + rtmpPacket.getHeader().getMessageType());
            switch (rtmpPacket.getHeader().getMessageType()) {
                 ...
                case WINDOW_ACKNOWLEDGEMENT_SIZE:
                    WindowAckSize windowAckSize = (WindowAckSize) rtmpPacket;
                    int size = windowAckSize.getAcknowledgementWindowSize();
                    Log.d(TAG, "handleRxPacketLoop(): Setting acknowledgement window size: " + size);
                    rtmpSessionInfo.setAcknowledgmentWindowSize(size);
                    // Set socket option
                    socket.setSendBufferSize(size);
                    break;
                case SET_PEER_BANDWIDTH:
                    int acknowledgementWindowsize = rtmpSessionInfo.getAcknowledgementWindowSize();
                    final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_CONTROL_CHANNEL);
                    Log.d(TAG, "handleRxPacketLoop(): Send acknowledgement window size: " + acknowledgementWindowsize);
                    writeThread.send(new WindowAckSize(acknowledgementWindowsize, chunkStreamInfo));
                    break;
                case COMMAND_AMF0:
                    handleRxInvoke((Command) rtmpPacket);
                    break;
                default:
                    Log.w(TAG, "handleRxPacketLoop(): Not handling unimplemented/unknown packet of type: " + rtmpPacket.getHeader().getMessageType());
                    break;
            }
        }
        // Wait for next received packet
        synchronized (rxPacketLock) {
            try {
                rxPacketLock.wait(500);
            } catch (InterruptedException ex) {
                Log.w(TAG, "handleRxPacketLoop: Interrupted", ex);
            }
        }
    }
}

这里我们看到收到WINDOW_ACKNOWLEDGEMENT_SIZE这个命令后,将socket的BufferSize设置为指定的size了,收到SET_PEER_BANDWIDTH,writeThread发送了窗口大小的消息给服务器了,接下来服务器就会返回上面说的结果命令了,结果命令的处理为handleRxInvoke方法


private void handleRxInvoke(Command invoke) throws IOException {
    String commandName = invoke.getCommandName();

    if (commandName.equals("_result")) {
        // This is the result of one of the methods invoked by us
        String method = rtmpSessionInfo.takeInvokedCommand(invoke.getTransactionId());

        Log.d(TAG, "handleRxInvoke: Got result for invoked method: " + method);
        if ("connect".equals(method)) {
            // Capture server ip/pid/id information if any
            String serverInfo = onSrsServerInfo(invoke);
            mHandler.onRtmpConnected("connected" + serverInfo);
            // We can now send createStream commands
            connecting = false;
            fullyConnected = true;
            synchronized (connectingLock) {
                connectingLock.notifyAll();
            }
        }
       ...
}

result信息匹配到是connect命令,会进行一些参数的设置

这里网络连接就已经建立起来了

3.建立网络流
private void createStream() {
    if (!fullyConnected) {
        throw new IllegalStateException("Not connected to RTMP server");
    }
    if (currentStreamId != -1) {
        throw new IllegalStateException("Current stream object has existed");
    }

    Log.d(TAG, "createStream(): Sending releaseStream command...");
    // transactionId == 2
    Command releaseStream = new Command("releaseStream", ++transactionIdCounter);
    releaseStream.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
    releaseStream.addData(new AmfNull());  // command object: null for "createStream"
    releaseStream.addData(streamName);  // command object: null for "releaseStream"
    writeThread.send(releaseStream);

    Log.d(TAG, "createStream(): Sending FCPublish command...");
    // transactionId == 3
    Command FCPublish = new Command("FCPublish", ++transactionIdCounter);
    FCPublish.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
    FCPublish.addData(new AmfNull());  // command object: null for "FCPublish"
    FCPublish.addData(streamName);
    writeThread.send(FCPublish);

    Log.d(TAG, "createStream(): Sending createStream command...");
    ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_COMMAND_CHANNEL);
    // transactionId == 4
    Command createStream = new Command("createStream", ++transactionIdCounter, chunkStreamInfo);
    createStream.addData(new AmfNull());  // command object: null for "createStream"
    writeThread.send(createStream);

    // Waiting for "NetStream.Publish.Start" response.
    synchronized (publishLock) {
        try {
            publishLock.wait(5000);
        } catch (InterruptedException ex) {
            // do nothing
        }
    }
}

这里主要是向服务器发送了releaseStream,FCPublish与createStream三个命令,服务器收到这些命令后会向客户端返回result命令,命令中包含后面通讯用的stream ID

private void handleRxInvoke(Command invoke) throws IOException {
    String commandName = invoke.getCommandName();

    if (commandName.equals("_result")) {
        // This is the result of one of the methods invoked by us
        String method = rtmpSessionInfo.takeInvokedCommand(invoke.getTransactionId());

        Log.d(TAG, "handleRxInvoke: Got result for invoked method: " + method);
        ...
         else if ("createStream".contains(method)) {
            // Get stream id
            currentStreamId = (int) ((AmfNumber) invoke.getData().get(1)).getValue();
            Log.d(TAG, "handleRxInvoke(): Stream ID to publish: " + currentStreamId);
            if (streamName != null && publishType != null) {
                fmlePublish();
            }
        } 
       ...
}

可以看到最后调用了fmlePublish方法,这个方法是发送推流命令的

4.推流命令
private void fmlePublish() throws IllegalStateException {
    if (!fullyConnected) {
        throw new IllegalStateException("Not connected to RTMP server");
    }
    if (currentStreamId == -1) {
        throw new IllegalStateException("No current stream object exists");
    }

    Log.d(TAG, "fmlePublish(): Sending publish command...");
    // transactionId == 0
    Command publish = new Command("publish", 0);
    publish.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
    publish.getHeader().setMessageStreamId(currentStreamId);
    publish.addData(new AmfNull());  // command object: null for "publish"
    publish.addData(streamName);
    publish.addData(publishType);
    writeThread.send(publish);
}

这里就是向服务器发送推流的命令

到这里就是完成了RTMP推流的协议流程,完成后我们就可以将获取的音视频推流发送到视频服务器了

如下

@Override
public void publishAudioData(byte[] data) throws IllegalStateException {
    if (!fullyConnected) {
        throw new IllegalStateException("Not connected to RTMP server");
    }
    if (currentStreamId == -1) {
        throw new IllegalStateException("No current stream object exists");
    }
    if (!publishPermitted) {
        throw new IllegalStateException("Not get the _result(Netstream.Publish.Start)");
    }
    Audio audio = new Audio();
    audio.setData(data);
    audio.getHeader().setMessageStreamId(currentStreamId);
    writeThread.send(audio);
    mHandler.onRtmpAudioStreaming("audio streaming");
}

@Override
public void publishVideoData(byte[] data) throws IllegalStateException {
    if (!fullyConnected) {
        throw new IllegalStateException("Not connected to RTMP server");
    }
    if (currentStreamId == -1) {
        throw new IllegalStateException("No current stream object exists");
    }
    if (!publishPermitted) {
        throw new IllegalStateException("Not get the _result(Netstream.Publish.Start)");
    }
    Video video = new Video();
    video.setData(data);
    video.getHeader().setMessageStreamId(currentStreamId);
    writeThread.send(video);
    videoFrameCacheNumber.getAndIncrement();
    mHandler.onRtmpVideoStreaming("video streaming");
}

由于我们的视频和音频是分开推流的,那么音视频同步问题怎么解决呢?

一般来说,视频同步指的是视频和音频同步,也就是说播放的声音要和当前显示的画面保持一致。想象以下,看一部电影的时候只看到人物嘴动没有声音传出;或者画面是激烈的战斗场景,而声音不是枪炮声却是人物说话的声音,这是非常差的一种体验。
在视频流和音频流中已包含了其以怎样的速度播放的相关数据,视频的帧率(Frame Rate)指示视频一秒显示的帧数(图像数);音频的采样率(Sample Rate)表示音频一秒播放的样本(Sample)的个数。可以使用以上数据通过简单的计算得到其在某一Frame(Sample)的播放时间,以这样的速度音频和视频各自播放互不影响,在理想条件下,其应该是同步的,不会出现偏差。但,理想条件是什么大家都懂得。如果用上面那种简单的计算方式,慢慢的就会出现音视频不同步的情况。要不是视频播放快了,要么是音频播放快了,很难准确的同步。这就需要一种随着时间会线性增长的量,视频和音频的播放速度都以该量为标准,播放快了就减慢播放速度;播放快了就加快播放的速度。所以呢,视频和音频的同步实际上是一个动态的过程,同步是暂时的,不同步则是常态。以选择的播放速度量为标准,快的等待慢的,慢的则加快速度,是一个你等我赶的过程。

播放速度标准量的的选择一般来说有以下三种:

所以只要我们表示上正确的时间戳(dts),这样视频播放器就会根据这个这个时间戳去做音视频同步

这个时间戳是在上传前添加上去的,时间可以用系统当前时间,也可以做其他设置

yasea中也添加了时间戳如下

while (!writeQueue.isEmpty()) {
    RtmpPacket rtmpPacket = writeQueue.poll();
    ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId());
    chunkStreamInfo.setPrevHeaderTx(rtmpPacket.getHeader());
    rtmpPacket.getHeader().setAbsoluteTimestamp((int) chunkStreamInfo.markAbsoluteTimestampTx());
    rtmpPacket.writeTo(out, rtmpSessionInfo.getTxChunkSize(), chunkStreamInfo);
    Log.d(TAG, "WriteThread: wrote packet: " + rtmpPacket + ", size: " + rtmpPacket.getHeader().getPacketLength());
    if (rtmpPacket instanceof Command) {
        rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName());
    }
    if (rtmpPacket instanceof Video) {
        publisher.getVideoFrameCacheNumber().getAndDecrement();
        calcFps();
    }
}
out.flush();

到这里就完成了RTMP推流相关的讲解,其实前面提到的yasea也把关于直播相关的内容集成进去了,虽然不明白为什么一个推流库要集成这些东西。。。

写在后面的话

推流已经完成了,我们可以通过前面提到的vlc进行查看,但是我们要做的是移动端直播,所以下一篇就说一下关于移动端的播放与弹幕评论相关的知识,peace~~~

上一篇下一篇

猜你喜欢

热点阅读