WebRTC动态码率-基于transport wide cc的延

2020-08-29  本文已影响0人  JeffreyLau

1)前言

2)工作流程一

a=extmap:5 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions
Transport_wide_cc动态码率估计原理_01_00.png
void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
                              const PacedPacketInfo& cluster_info) {
  rtc::CritScope cs(&modules_crit_);
  // With the new pacer code path, transport sequence numbers are only set here,
  // on the pacer thread. Therefore we don't need atomics/synchronization.
  if (packet->IsExtensionReserved<TransportSequenceNumber>()) {
    packet->SetExtension<TransportSequenceNumber>(AllocateSequenceNumber());
  }
  ......
}
bool RTPSender::TrySendPacket(RtpPacketToSend* packet,
                              const PacedPacketInfo& pacing_info) {
  RTC_DCHECK(packet);
  .....
  // Downstream code actually uses this flag to distinguish between media and
  // everything else.
  if (auto packet_id = packet->GetExtension<TransportSequenceNumber>()) {
    options.packet_id = *packet_id;
    options.included_in_feedback = true;
    options.included_in_allocation = true;
    AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
  }
  ......
  return true;
}
void RTPSender::AddPacketToTransportFeedback(
    uint16_t packet_id,
    const RtpPacketToSend& packet,
    const PacedPacketInfo& pacing_info) {
  if (transport_feedback_observer_) {
    size_t packet_size = packet.payload_size() + packet.padding_size();
    if (send_side_bwe_with_overhead_) {
      packet_size = packet.size();
    }
    RtpPacketSendInfo packet_info;
    packet_info.ssrc = SSRC();
    packet_info.transport_sequence_number = packet_id;
    packet_info.has_rtp_sequence_number = true;
    packet_info.rtp_sequence_number = packet.SequenceNumber();
    packet_info.length = packet_size;
    packet_info.pacing_info = pacing_info;
    transport_feedback_observer_->OnAddPacket(packet_info);
  }
}
void RtpTransportControllerSend::OnAddPacket(
    const RtpPacketSendInfo& packet_info) {
  transport_feedback_adapter_.AddPacket(
      packet_info,
      send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load()
                                   : 0,
      Timestamp::ms(clock_->TimeInMilliseconds()));
}
const int64_t kNoTimestamp = -1;
const int64_t kSendTimeHistoryWindowMs = 60000;
void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
                                         size_t overhead_bytes,
                                         Timestamp creation_time) {
  {
    rtc::CritScope cs(&lock_);
    PacketFeedback packet(creation_time.ms(),
                          packet_info.transport_sequence_number,
                          packet_info.length + overhead_bytes, local_net_id_,
                          remote_net_id_, packet_info.pacing_info);
    if (packet_info.has_rtp_sequence_number) {
      packet.ssrc = packet_info.ssrc;
      packet.rtp_sequence_number = packet_info.rtp_sequence_number;
    }
    packet.long_sequence_number =
        seq_num_unwrapper_.Unwrap(packet.sequence_number);
    /*历史记录的生命周期是当前时间和PacketFeedback包创建的时间的差值小于60000的窗口,
     也就是,历史记录可以保留6秒以内的待发送包信息?,超过该时间窗口的历史记录将被清除*/
    while (!history_.empty() &&
           creation_time.ms() - history_.begin()->second.creation_time_ms >
               packet_age_limit_ms_) {//默认值为kSendTimeHistoryWindowMs,6s的时间窗口
      // TODO(sprang): Warn if erasing (too many) old items?
      RemoveInFlightPacketBytes(history_.begin()->second);
      history_.erase(history_.begin());
    }
    history_.insert(std::make_pair(packet.long_sequence_number, packet));
  }
  ....
}  

3)工作流程二

void RtpTransportControllerSend::OnSentPacket(
    const rtc::SentPacket& sent_packet) {
  absl::optional<SentPacket> packet_msg =
      transport_feedback_adapter_.ProcessSentPacket(sent_packet);
  if (packet_msg) {
    task_queue_.PostTask([this, packet_msg]() {
      RTC_DCHECK_RUN_ON(&task_queue_);
      if (controller_)
        PostUpdates(controller_->OnSentPacket(*packet_msg));
    });
  }
  pacer()->UpdateOutstandingData(
      transport_feedback_adapter_.GetOutstandingData());
}

3.1)发送反馈SentPacket包的封装

absl::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket(
    const rtc::SentPacket& sent_packet) {
  rtc::CritScope cs(&lock_);
  // TODO(srte): Only use one way to indicate that packet feedback is used.
  if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) {
    int64_t unwrapped_seq_num =
        seq_num_unwrapper_.Unwrap(sent_packet.packet_id);
    auto it = history_.find(unwrapped_seq_num);
    if (it != history_.end()) {
      bool packet_retransmit = it->second.send_time_ms >= 0;
      it->second.send_time_ms = sent_packet.send_time_ms;
      last_send_time_ms_ =
          std::max(last_send_time_ms_, sent_packet.send_time_ms);
      // TODO(srte): Don't do this on retransmit.
      if (pending_untracked_size_ > 0) {
        if (sent_packet.send_time_ms < last_untracked_send_time_ms_)
          RTC_LOG(LS_WARNING)
              << "appending acknowledged data for out of order packet. (Diff: "
              << last_untracked_send_time_ms_ - sent_packet.send_time_ms
              << " ms.)";
        it->second.unacknowledged_data += pending_untracked_size_;
        pending_untracked_size_ = 0;
      }
      if (!packet_retransmit) {
        AddInFlightPacketBytes(it->second);
        auto packet = it->second;
        SentPacket msg;
        msg.size = DataSize::bytes(packet.payload_size);
        msg.send_time = Timestamp::ms(packet.send_time_ms);
        msg.sequence_number = packet.long_sequence_number;
        msg.prior_unacked_data = DataSize::bytes(packet.unacknowledged_data);
        msg.data_in_flight = GetOutstandingData();
        return msg;
      }
    }
  } else if (sent_packet.info.included_in_allocation) {
    if (sent_packet.send_time_ms < last_send_time_ms_) {
      RTC_LOG(LS_WARNING) << "ignoring untracked data for out of order packet.";
    }
    pending_untracked_size_ += sent_packet.info.packet_size_bytes;
    last_untracked_send_time_ms_ =
        std::max(last_untracked_send_time_ms_, sent_packet.send_time_ms);
  }
  return absl::nullopt;
}
using RemoteAndLocalNetworkId = std::pair<uint16_t, uint16_t>;
std::map<RemoteAndLocalNetworkId, size_t> in_flight_bytes_;
void TransportFeedbackAdapter::AddInFlightPacketBytes(
    const PacketFeedback& packet) {
  ....
  auto it = in_flight_bytes_.find({packet.local_net_id, packet.remote_net_id});
  if (it != in_flight_bytes_.end()) {
    it->second += packet.payload_size;
  } else {
    in_flight_bytes_[{packet.local_net_id, packet.remote_net_id}] =
        packet.payload_size;
  }
}
void TransportFeedbackAdapter::RemoveInFlightPacketBytes(
    const PacketFeedback& packet) {
  ....
  auto it = in_flight_bytes_.find({packet.local_net_id, packet.remote_net_id});
  if (it != in_flight_bytes_.end()) {
    it->second -= packet.payload_size;
    if (it->second == 0)
      in_flight_bytes_.erase(it);
  }
}
DataSize TransportFeedbackAdapter::GetOutstandingData() const {
  rtc::CritScope cs(&lock_);
  auto it = in_flight_bytes_.find({local_net_id_, remote_net_id_});
  if (it != in_flight_bytes_.end()) {
    return DataSize::bytes(it->second);
  } else {
    return DataSize::Zero();
  }
}

3.2)发送反馈SentPacket包作用到GoogCcNetworkController模块

NetworkControlUpdate GoogCcNetworkController::OnSentPacket(
    SentPacket sent_packet) {
  alr_detector_->OnBytesSent(sent_packet.size.bytes(),
                             sent_packet.send_time.ms());
  acknowledged_bitrate_estimator_->SetAlr(
      alr_detector_->GetApplicationLimitedRegionStartTime().has_value());

  if (!first_packet_sent_) {
    first_packet_sent_ = true;
    // Initialize feedback time to send time to allow estimation of RTT until
    // first feedback is received.
    bandwidth_estimation_->UpdatePropagationRtt(sent_packet.send_time,
                                                TimeDelta::Zero());
  }
  bandwidth_estimation_->OnSentPacket(sent_packet);

  if (congestion_window_pushback_controller_) {
    congestion_window_pushback_controller_->UpdateOutstandingData(
        sent_packet.data_in_flight.bytes());
    NetworkControlUpdate update;
    MaybeTriggerOnNetworkChanged(&update, sent_packet.send_time);
    return update;
  } else {
    return NetworkControlUpdate();
  }
}

3.3)当前发送总字节数作用到pacer模块

void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
  rtc::CritScope cs(&critsect_);
  pacing_controller_.UpdateOutstandingData(outstanding_data);
}
void PacingController::UpdateOutstandingData(DataSize outstanding_data) {
  outstanding_data_ = outstanding_data;
}
bool PacingController::Congested() const {
  if (congestion_window_size_.IsFinite()) {
    /*congestion_window_size_为经过GoogCcNetworkController模块动态码率估计后得出的网络拥塞窗口上限,
      经RtpTransportControllerSend::PostUpdates()函数将拥塞窗口上限配置到pacer模块。
    */  
    return outstanding_data_ >= congestion_window_size_;
  }
  return false;
}
Transport_wide_cc动态码率估计原理_01_04.png

4)总结

上一篇 下一篇

猜你喜欢

热点阅读