WebRTC RTP Video 数据接收及组包分析
-
经过对WebRTC RTP/RTCP协议分析分析,得出BaseChannel类通过信号和RtpTransport建立关系,同时RtpTransport也通过信号和PacketTransportInternal建立关系,数据的接收全部通过信号回调进行触发
rtp_rtcp_rcv_to_call.png
1 Video RTP 接收处理
rtp_video_to_packet_buffer.png- 首先经过call模块处理将rtp视频数据送到RtpVideoStreamReceiver::OnRtpPacket函数,然后将调用RtpVideoStreamReceiver::ReceivePacket函数进行RTP包解析
- 解析完之后的数据,会通过RtpVideoStreamReceiver::OnReceivedPayloadData回调,在该函数中会将rtp数据包打包成VCMPacket包,然后将VCMPacket包插入到packet_buffer_
- 在PacketBuffer::InsertPacket函数中插入完后会调用PacketBuffer::FindFrames函数查找有没有合适的帧
-
video_coding::PacketBuffer的工作原理如下
video_coding::PacketBuffer_001.png - video_coding::PacketBuffer维护两个集合sequence_buffer_用于管理seqNum对应ContinuityInfo结构体
- data_buffer_管理实际的VCMPacket包
- video_coding::PacketBuffer::ContinuityInfo的定义如下
struct ContinuityInfo {
// The sequence number of the packet.
uint16_t seq_num = 0;
// If this is the first packet of the frame.
bool frame_begin = false;
// If this is the last packet of the frame.
bool frame_end = false;
// If this slot is currently used.
bool used = false;
// If all its previous packets have been inserted into the packet buffer.
bool continuous = false;
// If this packet has been used to create a frame already.
bool frame_created = false;
};
- used表示当前位置已经使用
- video_coding::PacketBuffer依赖OnAssembledFrameCallback而OnAssembledFrameCallback由RtpVideoStreamReceiver类实现
- video_coding::PacketBuffer在构造的时候会初始化data_buffer_和sequence_buffer_容器的大小默认为512
- video_coding::PacketBuffer的创建发生在RtpVideoStreamReceiver类的构造阶段,RtpVideoStreamReceiver通过调用PacketBuffer::Create函数创建video_coding::PacketBuffer并将实例对象保存成其私有成员变量packet_buffer_
- video_coding::PacketBuffer支持扩张,其扩张通过ExpandBufferSize函数实现,具体代码如下:
bool PacketBuffer::ExpandBufferSize() {
if (size_ == max_size_) {
RTC_LOG(LS_WARNING) << "PacketBuffer is already at max size (" << max_size_
<< "), failed to increase size.";
return false;
}
size_t new_size = std::min(max_size_, 2 * size_);
std::vector<VCMPacket> new_data_buffer(new_size);
std::vector<ContinuityInfo> new_sequence_buffer(new_size);
for (size_t i = 0; i < size_; ++i) {
if (sequence_buffer_[i].used) {
size_t index = sequence_buffer_[i].seq_num % new_size;
new_sequence_buffer[index] = sequence_buffer_[i];
new_data_buffer[index] = data_buffer_[i];
}
}
size_ = new_size;
sequence_buffer_ = std::move(new_sequence_buffer);
data_buffer_ = std::move(new_data_buffer);
RTC_LOG(LS_INFO) << "PacketBuffer size expanded to " << new_size;
return true;
}
- 首先判断size_是否已经为最大值2048,如果是则返回,因为最大支持2048
- 假设原来的大小为512,经过调用ExpandBufferSize则扩大到1024
- 重新创建new_data_buffer和new_sequence_buffer容器,其大小为新值1024
- 将新容器移动构造到原有容器,并且保留原有容器中的数据
2 Video RTP 数据组包流程
- video rtp包组包发生在PacketBuffer::InsertPacket函数
bool PacketBuffer::InsertPacket(VCMPacket* packet) {
std::vector<std::unique_ptr<RtpFrameObject>> found_frames;
{
rtc::CritScope lock(&crit_);
OnTimestampReceived(packet->timestamp);
uint16_t seq_num = packet->seqNum;
size_t index = seq_num % size_;
if (!first_packet_received_) {
first_seq_num_ = seq_num;
first_packet_received_ = true;
} else if (AheadOf(first_seq_num_, seq_num)) {
// If we have explicitly cleared past this packet then it's old,
// don't insert it, just silently ignore it.
if (is_cleared_to_first_seq_num_) {
delete[] packet->dataPtr;
packet->dataPtr = nullptr;
return true;
}
first_seq_num_ = seq_num;
}
if (sequence_buffer_[index].used) {
// Duplicate packet, just delete the payload.
if (data_buffer_[index].seqNum == packet->seqNum) {
delete[] packet->dataPtr;
packet->dataPtr = nullptr;
return true;
}
// The packet buffer is full, try to expand the buffer.
while (ExpandBufferSize() && sequence_buffer_[seq_num % size_].used) {
}
index = seq_num % size_;
// Packet buffer is still full since we were unable to expand the buffer.
if (sequence_buffer_[index].used) {
// Clear the buffer, delete payload, and return false to signal that a
// new keyframe is needed.
RTC_LOG(LS_WARNING) << "Clear PacketBuffer and request key frame.";
Clear();
delete[] packet->dataPtr;
packet->dataPtr = nullptr;
return false;
}
}
sequence_buffer_[index].frame_begin = packet->is_first_packet_in_frame();
sequence_buffer_[index].frame_end = packet->is_last_packet_in_frame();
sequence_buffer_[index].seq_num = packet->seqNum;
sequence_buffer_[index].continuous = false;
sequence_buffer_[index].frame_created = false;
sequence_buffer_[index].used = true;
data_buffer_[index] = *packet;
packet->dataPtr = nullptr;
UpdateMissingPackets(packet->seqNum);
int64_t now_ms = clock_->TimeInMilliseconds();
last_received_packet_ms_ = now_ms;
if (packet->video_header.frame_type == VideoFrameType::kVideoFrameKey)
last_received_keyframe_packet_ms_ = now_ms;
found_frames = FindFrames(seq_num);
}
for (std::unique_ptr<RtpFrameObject>& frame : found_frames)
assembled_frame_callback_->OnAssembledFrame(std::move(frame));
return true;
}
- 首先根据VCMPacket包的seqNum求容器索引,使用seq_num % size_,假设容器大小为512,第一个seqNumber为2680,那么得出当前要插入到容器中的位置为120,根据rtp包seqNum的连续性可以得出后续如果不出现丢包的情况索引会依次递增+1(120,121,122,123,124.....),使用这种办法取索引是为了丢包重传包插入到该容器的时候确保索引依然连续吧?
- 判断是否为第一次插入VCMPacket,如果是则记录首次VCMPacket对应的seqNum将其赋值给first_seq_num_
- 每次插入会判断first_seq_num_是否在VCMPacket的seqNum的前面,如果first_seq_num__是在VCMPacket的seqNum的前面则根据is_cleared_to_first_seq_num_是true删除VCMPacket的数据部分,用处后续分析
- 如果容器已经满了则进行扩张,扩张原则不超过2048并且按原有容器的双倍大小进行扩展
- data_buffer_[index] = *packet调用移动拷贝构造函数将要插入的VCMPacket的信息赋值给data_buffer_[index]
- 调用UpdateMissingPackets更新missing_packets_容器(未看明白算法)
- 最后回调FindFrames查找是否由合适的帧
bool PacketBuffer::PotentialNewFrame(uint16_t seq_num) const {
size_t index = seq_num % size_;
int prev_index = index > 0 ? index - 1 : size_ - 1;
....
if (sequence_buffer_[index].frame_begin)
return true;
....
if (sequence_buffer_[prev_index].continuous)
return true;
return false;
}
- 判断当前seq_num对应的VCMPackage包是否是潜在的新帧
- 如果当前seq_num对应索引的sequence_buffer_[index].frame_begin为true则表示当前seq_number为新的一帧的开始
- 如果当前seq_num对应索引的前一个索引的sequence_buffer_[prev_index].continuous为true则表示是潜在的新的一帧
- FindFrames函数的查询实现如下:
std::vector<std::unique_ptr<RtpFrameObject>> PacketBuffer::FindFrames(
uint16_t seq_num) {
std::vector<std::unique_ptr<RtpFrameObject>> found_frames;
for (size_t i = 0; i < size_ && PotentialNewFrame(seq_num); ++i) {
size_t index = seq_num % size_;
sequence_buffer_[index].continuous = true;
// If all packets of the frame is continuous, find the first packet of the
// frame and create an RtpFrameObject.
if (sequence_buffer_[index].frame_end) {
size_t frame_size = 0;
int max_nack_count = -1;
uint16_t start_seq_num = seq_num;
RtpPacketInfos::vector_type packet_infos;
// Find the start index by searching backward until the packet with
// the |frame_begin| flag is set.
int start_index = index;
size_t tested_packets = 0;
int64_t frame_timestamp = data_buffer_[start_index].timestamp;
// Identify H.264 keyframes by means of SPS, PPS, and IDR.
bool is_h264 = data_buffer_[start_index].codec() == kVideoCodecH264;
bool has_h264_sps = false;
bool has_h264_pps = false;
bool has_h264_idr = false;
bool is_h264_keyframe = false;
/*使用while true 循环递减组包*/
while (true) {
++tested_packets;
/*计算一帧RTP包的总大小*/
frame_size += data_buffer_[start_index].sizeBytes;
/*.....省略...*/
sequence_buffer_[start_index].frame_created = true;
// Should use |push_front()| since the loop traverses backwards. But
// it's too inefficient to do so on a vector so we'll instead fix the
// order afterwards.
/*将data_buffer_[start_index].packet_info存入到packet_infos容器*/
packet_infos.push_back(data_buffer_[start_index].packet_info);
/*非H264类型的RTP包当检测到sequence_buffer_[start_index].frame_begin为
true,也就是按照下面分析的检测到seqNumber=2680的时候会退出循环
*/
if (!is_h264 && sequence_buffer_[start_index].frame_begin)
break;
if (is_h264 && !is_h264_keyframe) {
const auto* h264_header = absl::get_if<RTPVideoHeaderH264>(
&data_buffer_[start_index].video_header.video_type_header);
if (!h264_header || h264_header->nalus_length >= kMaxNalusPerPacket)
return found_frames;
for (size_t j = 0; j < h264_header->nalus_length; ++j) {
if (h264_header->nalus[j].type == H264::NaluType::kSps) {
has_h264_sps = true;
} else if (h264_header->nalus[j].type == H264::NaluType::kPps) {
has_h264_pps = true;
} else if (h264_header->nalus[j].type == H264::NaluType::kIdr) {
has_h264_idr = true;
}
}
if ((sps_pps_idr_is_h264_keyframe_ && has_h264_idr && has_h264_sps &&
has_h264_pps) ||
(!sps_pps_idr_is_h264_keyframe_ && has_h264_idr)) {
is_h264_keyframe = true;
}
}
/*.....全部检索...*/
if (tested_packets == size_)
break;
start_index = start_index > 0 ? start_index - 1 : size_ - 1;
// In the case of H264 we don't have a frame_begin bit (yes,
// |frame_begin| might be set to true but that is a lie). So instead
// we traverese backwards as long as we have a previous packet and
// the timestamp of that packet is the same as this one. This may cause
// the PacketBuffer to hand out incomplete frames.
// See: https://bugs.chromium.org/p/webrtc/issues/detail?id=7106
/*对于H264数据包当frame_timestamp不一样,根据同一帧数据的RTP包的rtp时间戳相等的原则*/
if (is_h264 &&
(!sequence_buffer_[start_index].used ||
data_buffer_[start_index].timestamp != frame_timestamp)) {
break;
}
--start_seq_num;
}
//while(true)结束完后start_seq_num已经对应为seqNum的包号了
/*上面的检测是递减组包这里将packet_infos的包的顺序进行逆向调整
按照下文的分析调整完后packet_infos[0]对应seqNumb = 2680的包
*/
// Fix the order since the packet-finding loop traverses backwards.
std::reverse(packet_infos.begin(), packet_infos.end());
if (is_h264) {
// Now that we have decided whether to treat this frame as a key frame
// or delta frame in the frame buffer, we update the field that
// determines if the RtpFrameObject is a key frame or delta frame.
const size_t first_packet_index = start_seq_num % size_;
RTC_CHECK_LT(first_packet_index, size_);
if (is_h264_keyframe) {
data_buffer_[first_packet_index].video_header.frame_type =
VideoFrameType::kVideoFrameKey;
} else {
data_buffer_[first_packet_index].video_header.frame_type =
VideoFrameType::kVideoFrameDelta;
}
// With IPPP, if this is not a keyframe, make sure there are no gaps
// in the packet sequence numbers up until this point.
const uint8_t h264tid =
data_buffer_[start_index].video_header.frame_marking.temporal_id;
if (h264tid == kNoTemporalIdx && !is_h264_keyframe &&
missing_packets_.upper_bound(start_seq_num) !=
missing_packets_.begin()) {
uint16_t stop_index = (index + 1) % size_;
while (start_index != stop_index) {
sequence_buffer_[start_index].frame_created = false;
start_index = (start_index + 1) % size_;
}
return found_frames;
}
}
missing_packets_.erase(missing_packets_.begin(),
missing_packets_.upper_bound(seq_num));
found_frames.emplace_back(
new RtpFrameObject(this, start_seq_num, seq_num, frame_size,
max_nack_count, min_recv_time, max_recv_time,
RtpPacketInfos(std::move(packet_infos))));
ClearInterval(start_seq_num, seq_num);
}
++seq_num;
}
return found_frames;
}
- 先假设现在有一帧数据分成5个RTP包他们的seqNumber从2680~2684,假设seqNumber=2680的包是帧开始包,2684包为帧最后一个包
- 进入InsertPacket函数2680包对应的index为120经过赋值得到如下:
sequence_buffer_[120].frame_begin = true;
sequence_buffer_[120].frame_end = false;
sequence_buffer_[120].seq_num = 2680;
sequence_buffer_[120].continuous = false;
sequence_buffer_[120].frame_created = false;
sequence_buffer_[120].used = true;
- 紧接着调用FindFrames(2680)函数,该函数进入for循环首先调用PotentialNewFrame(2680)判断是否为true,由于当前的sequence_buffer_[120].frame_begin=true,所以会返回true,从而顺利进入for循环函数主体
- 根据for循环的实现对于seqNumber=2680的包FindFrames函数只做了一件事
sequence_buffer_[120].continuous = true;
- 然后跳出循环,接着InsertPacket函数返回等待第二个包插入,根据其算法截止到seqNumber=2683包插入完sequence_buffer_[i]的赋值情况如下:
sequence_buffer_[120].frame_begin = true;
sequence_buffer_[120~123].frame_end = false;
sequence_buffer_[120~123].seq_num = 2680~2683;
sequence_buffer_[120~123].continuous = true;
sequence_buffer_[120~123].frame_created = false;
sequence_buffer_[120~123].used = true;
-
其中seqNum为2681到2683之间的包插入的时候由于其前一个包的continuous为true所以PotentialNewFrame()函数会返回true从而会将sequence_buffer_[121~123].continuous的值设置成true
-
当seqNumber=2684的包被插入时此时sequence_buffer_[124].frame_end=true,因为为该帧当中的最后一个包
-
当FindFrames(2684)被调用时,首先将sequence_buffer_[124].continuous设置成true,然后由于sequence_buffer_[124].frame_end=true所以会顺利进入if (sequence_buffer_[index].frame_end)分之进行处理
-
到此再回过头分析FindFrames实现中的注释当组包完成后packet_infos容器已经组合完一帧数据
-
最后通过found_frames.emplace_back将以packet_infos容器为参数构造的RtpFrameObject对象插入到found_frames当中found_frames容器中的每一个元素代表一帧完整的RTP包的集合,而这些集合被存储在RtpPacketInfos::vector_type当中,它们之间的关系如下图:
video_coding::PacketBuffer_002.png -
真正的组包是发生在RtpFrameObject对象构造过程当中,在RtpFrameObject的构造函数中会通过调用其父类EncodedImage::SetEncodedData函数分配当前一帧数据的空间,然后调用PacketBuffer::GetBitstream函数将连续的seqNum的RTP包中的数据拷贝到所分配的空间当中
-
RtpFrameObject的派生关系如下:
RtpFrameObject_UML.png -
组包完毕后会通过PacketBuffer::ClearInterval清除PacketBuffer所管理的sequence_buffer_和data_buffer_,主要是清除其索引成未被引用以及释放data_buffer_[index]的空间,供后续RTP包使用
-
PacketBuffer::GetBitstream函数的代码如下:
bool PacketBuffer::GetBitstream(const RtpFrameObject& frame,
uint8_t* destination) {
rtc::CritScope lock(&crit_);
size_t index = frame.first_seq_num() % size_;
size_t end = (frame.last_seq_num() + 1) % size_;
uint16_t seq_num = frame.first_seq_num();
uint32_t timestamp = frame.Timestamp();
uint8_t* destination_end = destination + frame.size();
do {
// Check both seq_num and timestamp to handle the case when seq_num wraps
// around too quickly for high packet rates.
......
RTC_DCHECK_EQ(data_buffer_[index].seqNum, sequence_buffer_[index].seq_num);
size_t length = data_buffer_[index].sizeBytes;
......
const uint8_t* source = data_buffer_[index].dataPtr;
memcpy(destination, source, length);
destination += length;
index = (index + 1) % size_;
++seq_num;
} while (index != end);
return true;
}
- destination为在构造RtpFrameObject的时候通过其父亲类EncodedImage::SetEncodedData所分配的空间
- 通过循环从data_buffer_[120~124]的数据拷贝到destination空间
- 继续回到PacketBuffer::InsertPacket函数,当有组包完成时,found_frames容器中已经由数据,最后通过遍历found_frames容器对每帧RtpFrameObject通过assembled_frame_callback_->OnAssembledFrame(std::move(frame));将求回调到RtpVideoStreamReceiver类中,经由RtpVideoStreamReceiver::OnAssembledFrame函数对已组包完成的的数据进行操作(如寻找对应的解码器,然后将数据发送给解码器进行解码操作等.)