WebRTC PacedSender 原理分析(一)
2020-05-16 本文已影响0人
JeffreyLau
PacedSender 的族普关系
paced_class_uml_001.png-
PacedSender继承Module类,实现其Process和TimeUntilNextProcess方法,其中TimeUntilNextProcess的实现便是相隔多少时间Process函数会被paced_thread回调一次
-
PacedSender类依赖PacingController类事实上,PacedSender把大部分工作都交给了PacingController
和PacketRouter
PacedSender 入队操作
#modules/pacing/paced_sender.cc
void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
rtc::CritScope cs(&critsect_);
pacing_controller_.EnqueuePacket(std::move(packet));
}
- 通过RTPSenderVideo::SendVideoPacket将rtp包通过回调EnqueuePacket将rtp包存入PacedSender所管理的队列当中
- PacedSender::EnqueuePacket把工作交给PacingController
#modules/pacing/pacing_controller.cc
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
Timestamp now = CurrentTime();
prober_.OnIncomingPacket(packet->payload_size());
if (packet->capture_time_ms() < 0) {
packet->set_capture_time_ms(now.ms());
}
RTC_CHECK(packet->packet_type());
int priority = GetPriorityForType(*packet->packet_type());
packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
- 如果capture_time_ms小于0,在这里为期初始化时间
- 获取优先级,kAudio(0),kRetransmission(1),kVideo(2),kPadding(3)
- 最终根据优先级将packet放入packet_queue_队列
RoundRobinPacketQueue原理分析
RoundRobinPacketQueue_01.png- RoundRobinPacketQueue队列的核心实现是内部管理4个数据结构
- streams_容器用来管理以ssrc为key,以Stream对象为value的容器,依次可以看出,对于不同ssrc的流都会被该容器所管理
- rtp_packets_列表用来托管真正的rtp流对应std::unique_ptr<RtpPacketToSend>,所有的的发送真实的rtp流都会存到这里,后续发送到网络通过从该列表中获得发送
- enqueue_times_集合用来记录每次rtp流入队列的时间
- 在每个数据包插入到队列的时候会创建一个QueuedPacket,同时会根据QueuedPacket的优先级创建一个StreamPrioKey对象,并且会以此对象为key,该包的ssrc值为value,将其插入到stream_priorities_集合
#modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(int priority,
Timestamp enqueue_time,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet) {
uint32_t ssrc = packet->Ssrc();
uint16_t sequence_number = packet->SequenceNumber();
int64_t capture_time_ms = packet->capture_time_ms();
DataSize size =
DataSize::bytes(send_side_bwe_with_overhead_
? packet->size()
: packet->payload_size() + packet->padding_size());
auto type = packet->packet_type();
RTC_DCHECK(type.has_value());
rtp_packets_.push_front(std::move(packet));
Push(QueuedPacket(
priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
}
- 首先得到ssrc,sequence_number,capture_time_ms,size(rtp包的大小)
- 将RtpPacketToSend包通过rtp_packets_.push_front存入rtp_packets_列表
- 以各参数创建QueuedPacket,由此可见每个RtpPacketToSend对应一个QueuedPacket,但是它并不正在存放RtpPacketToSend数据,只是记录了其szie,ssrc,sequence_number,以及rtp_packets_.begin()迭代器头,因为每次将RtpPacketToSend插入到rtp_packets_列表都是从头部插入,这里相当于得到其索引,便于后续发送到网络使用
QueuedPacket数据结构的实现
RoundRobinPacketQueue_QueuedPacket.png- QueuedPacket重要的成员变量就是packet_it_,它就是真实rtp包的索引所在
- QueuedPacket提供了如下函数用于获取当前QueuedPacket对应的RtpPacketToSend包
#modules/pacing/round_robin_packet_queue.cc
std::unique_ptr<RtpPacketToSend>
RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
return packet_it_ ? std::move(**packet_it_) : nullptr;
}
- 与上面分析对应通过std::move(**packet_it_)返回
Stream数据结构的实现
RoundRobinPacketQueue_Stream.png- 以下结合代码来分析该数据结构
##modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
auto stream_info_it = streams_.find(packet.ssrc());
if (stream_info_it == streams_.end()) {
stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
stream_info_it->second.priority_it = stream_priorities_.end();
stream_info_it->second.ssrc = packet.ssrc();
}
Stream* stream = &stream_info_it->second;
if (stream->priority_it == stream_priorities_.end()) {
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
} else if (packet.priority() < stream->priority_it->first.priority) {
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
// and insert a new one with the new priority. Note that |priority_| uses
// lower ordinal for higher priority.
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
}
RTC_CHECK(stream->priority_it != stream_priorities_.end());
// In order to figure out how much time a packet has spent in the queue while
// not in a paused state, we subtract the total amount of time the queue has
// been paused so far, and when the packet is popped we subtract the total
// amount of time the queue has been paused at that moment. This way we
// subtract the total amount of time the packet has spent in the queue while
// in a paused state.
UpdateQueueTime(packet.enqueue_time());
packet.SubtractPauseTime(pause_time_sum_);
size_packets_ += 1;
size_ += packet.size();
stream->packet_queue.push(packet);
}
- 根据QueuedPacket的ssrc来查询streams_集合中是否有该ssrc对应的Stream对象,如果没有则根据该ssrc实例化一个Stream对象并以ssrc为key将其插入到streams_集合
- 在插入后Stream的成员变量priority_it是指向stream_priorities_.end的
- 下面的处理如果Stream的成员变量priority_it是指向stream_priorities_.end则为当前的QueuedPacket包通过stream_priorities__.emplace 以StreamPrioKey对象为key,以ssrc为value插入到stream_priorities_集合当中并放回当前迭代器赋值给Stream的成员变量priority_it
- 假设同一路stream也就是同一个ssrc,在插入的时候,本次的priority小于上一次的priority(越小优先级越高?),那么首先需要将原来stream_priorities_管理的擦除,然后在重新创建StreamPrioKey插入到stream_priorities_
- 最后通过stream->packet_queue.push(packet)将QueuedPacket插入到Stream管理的packet_queue集合当中
- 经过以上分析大致可得出如下关系
- 每一个RtpPacketToSend包对应一个QueuedPacket对象
- 每一路ssrc对应的stream对应一个Stream,而每一个Stream对象管理着入队的多个QueuedPacket
PacedSender 出队操作
- PacedSender 出队操作是一个十分复杂的过程,涉及到动态码率估计,webrtc经过bwe发送端码率估计评测出新码率后会将码率作用到paced模块,让PacedSender按照新的码率进行数据发送,本文为便于分析不考虑码率估计进行分析假设码率已知
- PacedSender 出队操作要从PacedSender派生Module模块谈起,经paced_thread_处理,检测PacedSender重载的TimeUntilNextProcess函数判断下一次回调PacedSender::Process函数
- webrtc初始化创建PacedSender过程会通过SetPacingRates设置初始化码率
#modules/pacing/paced_sender.cc
int64_t PacedSender::TimeUntilNextProcess() {
rtc::CritScope cs(&critsect_);
// When paused we wake up every 500 ms to send a padding packet to ensure
// we won't get stuck in the paused state due to no feedback being received.
TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();
if (pacing_controller_.IsPaused()) {
return std::max(PacingController::kPausedProcessInterval - elapsed_time,
TimeDelta::Zero())
.ms();
}
auto next_probe = pacing_controller_.TimeUntilNextProbe();
if (next_probe) {
return next_probe->ms();
}
const TimeDelta min_packet_limit = TimeDelta::ms(5);
return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();
}
- 首先通过pacing_controller_.TimeElapsedSinceLastProcess()得到已经流逝的时间,也就是当前时间和上一次处理时间相减
- 假设next_probe为-1或nullptr也就是不做码率探测
- 默认最小发包间隔是5ms,这里将min_packet_limit - elapsed_time和0取最大值,超过5ms则立即执行
void PacedSender::Process() {
rtc::CritScope cs(&critsect_);
pacing_controller_.ProcessPackets();
}
- PacedSender将真正的处理交给PacingController
void PacingController::ProcessPackets() {
Timestamp now = CurrentTime();
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
....
if (paused_)
return;
if (elapsed_time > TimeDelta::Zero()) {
DataRate target_rate = pacing_bitrate_;
DataSize queue_size_data = packet_queue_.Size();
if (queue_size_data > DataSize::Zero()) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packet_queue_.UpdateQueueTime(CurrentTime());
if (drain_large_queues_) {
TimeDelta avg_time_left =
std::max(TimeDelta::ms(1),
queue_time_limit - packet_queue_.AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > target_rate) {
target_rate = min_rate_needed;
RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
<< target_rate.kbps();
}
}
}
media_budget_.set_target_rate_kbps(target_rate.kbps());
UpdateBudgetWithElapsedTime(elapsed_time);
}
bool is_probing = prober_.IsProbing();
PacedPacketInfo pacing_info;
absl::optional<DataSize> recommended_probe_size;
if (is_probing) {
pacing_info = prober_.CurrentCluster();
recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
}
DataSize data_sent = DataSize::Zero();
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!paused_) {
auto* packet = GetPendingPacket(pacing_info);
if (packet == nullptr) {
// No packet available to send, check if we should send padding.
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
}
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
}
std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
RTC_DCHECK(rtp_packet);
packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);
data_sent += packet->size();
// Send succeeded, remove it from the queue.
OnPacketSent(packet);
if (recommended_probe_size && data_sent > *recommended_probe_size)
break;
}
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
}
}
}
- 获取流逝的时间病更新上一次处理的时间为当前时间,流逝时间不得大于2s,如果大于2s则elapsed_time为2s
- 在drain_large_queues_支持的情况下(一次处理可以发送多个数据包?),根据时间差来计算本次发送的最小码率,如果当前的码率比实际发送的最小码率要小则通过media_budget_.set_target_rate_kbps(target_rate.kbps())设置码率
- 如果正在进行码率探测,则获取本次码率探测得出的本次推荐发送的数(推荐发送多少数据)
- 进入while循环通过GetPendingPacket()从RoundRobinPacketQueue中获取QueuedPacket包,然后通过packet->ReleasePacket()得到RtpPacketToSend,最后通过packet_sender_->SendRtpPacket进行发送
- GetPendingPacket如果在网络拥塞并且码率探测其未进入探测的的情况下会返回空,并且会将从RoundRobinPacketQueue弹出的QueuedPacket重新插入到队列当中,同时跳出循环
- 如果RoundRobinPacketQueue为空GetPendingPacket获取不到数据while循环会跳出
- 如果成功发送后recommended_probe_size的值大于0并且实际发送值已经大于或等于recommended_probe_size也会跳出循环结束本次process
RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
const PacedPacketInfo& pacing_info) {
if (packet_queue_.Empty()) {
return nullptr;
}
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
bool apply_pacing = !audio_packet || pace_audio_;
if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
pacing_info.probe_cluster_id ==
PacedPacketInfo::kNotAProbe))) {
packet_queue_.CancelPop();
return nullptr;
}
return packet;
}
- packet_queue_.BeginPop()弹出QueuedPacket
- packet_queue_.CancelPop()重新将QueuedPacket加入到队列
- BeginPop的原理是首先通过GetHighestPriorityStream遍历stream_priorities_获取优先发送的流对应的ssrc
- 其次通过对应的ssrc查找streams_集合得到Stream,然后通过Stream得到依次要发送的QueuedPacket