WebRTCwebrtc数据流分析

WebRTC PacedSender 原理分析(一)

2020-05-16  本文已影响0人  JeffreyLau

PacedSender 的族普关系

paced_class_uml_001.png

PacedSender 入队操作

#modules/pacing/paced_sender.cc
void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  rtc::CritScope cs(&critsect_);
  pacing_controller_.EnqueuePacket(std::move(packet));
}
#modules/pacing/pacing_controller.cc
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
      << "SetPacingRate must be called before InsertPacket.";

  Timestamp now = CurrentTime();
  prober_.OnIncomingPacket(packet->payload_size());

  if (packet->capture_time_ms() < 0) {
    packet->set_capture_time_ms(now.ms());
  }

  RTC_CHECK(packet->packet_type());
  int priority = GetPriorityForType(*packet->packet_type());
  packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}

RoundRobinPacketQueue原理分析

RoundRobinPacketQueue_01.png
#modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(int priority,
                                 Timestamp enqueue_time,
                                 uint64_t enqueue_order,
                                 std::unique_ptr<RtpPacketToSend> packet) {
  uint32_t ssrc = packet->Ssrc();
  uint16_t sequence_number = packet->SequenceNumber();
  int64_t capture_time_ms = packet->capture_time_ms();
  DataSize size =
      DataSize::bytes(send_side_bwe_with_overhead_
                          ? packet->size()
                          : packet->payload_size() + packet->padding_size());
  auto type = packet->packet_type();
  RTC_DCHECK(type.has_value());

  rtp_packets_.push_front(std::move(packet));
  Push(QueuedPacket(
      priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
      size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
      enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
}

QueuedPacket数据结构的实现

RoundRobinPacketQueue_QueuedPacket.png
#modules/pacing/round_robin_packet_queue.cc
std::unique_ptr<RtpPacketToSend>
RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
  return packet_it_ ? std::move(**packet_it_) : nullptr;
}

Stream数据结构的实现

RoundRobinPacketQueue_Stream.png
##modules/pacing/round_robin_packet_queue.cc
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
  auto stream_info_it = streams_.find(packet.ssrc());
  if (stream_info_it == streams_.end()) {
    stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
    stream_info_it->second.priority_it = stream_priorities_.end();
    stream_info_it->second.ssrc = packet.ssrc();
  }

  Stream* stream = &stream_info_it->second;

  if (stream->priority_it == stream_priorities_.end()) {
    // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
    RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
  } else if (packet.priority() < stream->priority_it->first.priority) {
    // If the priority of this SSRC increased, remove the outdated StreamPrioKey
    // and insert a new one with the new priority. Note that |priority_| uses
    // lower ordinal for higher priority.
    stream_priorities_.erase(stream->priority_it);
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
  }
  RTC_CHECK(stream->priority_it != stream_priorities_.end());

  // In order to figure out how much time a packet has spent in the queue while
  // not in a paused state, we subtract the total amount of time the queue has
  // been paused so far, and when the packet is popped we subtract the total
  // amount of time the queue has been paused at that moment. This way we
  // subtract the total amount of time the packet has spent in the queue while
  // in a paused state.
  UpdateQueueTime(packet.enqueue_time());
  packet.SubtractPauseTime(pause_time_sum_);

  size_packets_ += 1;
  size_ += packet.size();

  stream->packet_queue.push(packet);
}
RoundRobinPacketQueue_Stream_2.png

PacedSender 出队操作

#modules/pacing/paced_sender.cc
int64_t PacedSender::TimeUntilNextProcess() {
  rtc::CritScope cs(&critsect_);

  // When paused we wake up every 500 ms to send a padding packet to ensure
  // we won't get stuck in the paused state due to no feedback being received.
  TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();
  if (pacing_controller_.IsPaused()) {
    return std::max(PacingController::kPausedProcessInterval - elapsed_time,
                    TimeDelta::Zero())
        .ms();
  }

  auto next_probe = pacing_controller_.TimeUntilNextProbe();
  if (next_probe) {
    return next_probe->ms();
  }

  const TimeDelta min_packet_limit = TimeDelta::ms(5);
  return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();
}
void PacedSender::Process() {
  rtc::CritScope cs(&critsect_);
  pacing_controller_.ProcessPackets();
}
void PacingController::ProcessPackets() {
  Timestamp now = CurrentTime();
  TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
  ....
  if (paused_)
    return;

  if (elapsed_time > TimeDelta::Zero()) {
    DataRate target_rate = pacing_bitrate_;
    DataSize queue_size_data = packet_queue_.Size();
    if (queue_size_data > DataSize::Zero()) {
      // Assuming equal size packets and input/output rate, the average packet
      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
      // time constraint shall be met. Determine bitrate needed for that.
      packet_queue_.UpdateQueueTime(CurrentTime());
      if (drain_large_queues_) {
        TimeDelta avg_time_left =
            std::max(TimeDelta::ms(1),
                     queue_time_limit - packet_queue_.AverageQueueTime());
        DataRate min_rate_needed = queue_size_data / avg_time_left;
        if (min_rate_needed > target_rate) {
          target_rate = min_rate_needed;
          RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
                              << target_rate.kbps();
        }
      }
    }

    media_budget_.set_target_rate_kbps(target_rate.kbps());
    UpdateBudgetWithElapsedTime(elapsed_time);
  }

  bool is_probing = prober_.IsProbing();
  PacedPacketInfo pacing_info;
  absl::optional<DataSize> recommended_probe_size;
  if (is_probing) {
    pacing_info = prober_.CurrentCluster();
    recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
  }

  DataSize data_sent = DataSize::Zero();
  // The paused state is checked in the loop since it leaves the critical
  // section allowing the paused state to be changed from other code.
  while (!paused_) {
    auto* packet = GetPendingPacket(pacing_info);
    if (packet == nullptr) {
      // No packet available to send, check if we should send padding.
      DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
      if (padding_to_add > DataSize::Zero()) {
        std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
            packet_sender_->GeneratePadding(padding_to_add);
        if (padding_packets.empty()) {
          // No padding packets were generated, quite send loop.
          break;
        }
        for (auto& packet : padding_packets) {
          EnqueuePacket(std::move(packet));
        }
        // Continue loop to send the padding that was just added.
        continue;
      }

      // Can't fetch new packet and no padding to send, exit send loop.
      break;
    }

    std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
    RTC_DCHECK(rtp_packet);
    packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);

    data_sent += packet->size();
    // Send succeeded, remove it from the queue.
    OnPacketSent(packet);
    if (recommended_probe_size && data_sent > *recommended_probe_size)
      break;
  }

  if (is_probing) {
    probing_send_failure_ = data_sent == DataSize::Zero();
    if (!probing_send_failure_) {
      prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
    }
  }
}
RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
    const PacedPacketInfo& pacing_info) {
  if (packet_queue_.Empty()) {
    return nullptr;
  }

  // Since we need to release the lock in order to send, we first pop the
  // element from the priority queue but keep it in storage, so that we can
  // reinsert it if send fails.
  RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
  bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
  bool apply_pacing = !audio_packet || pace_audio_;
  if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
                                       pacing_info.probe_cluster_id ==
                                           PacedPacketInfo::kNotAProbe))) {
    packet_queue_.CancelPop();
    return nullptr;
  }
  return packet;
}
上一篇下一篇

猜你喜欢

热点阅读