blob: ec5d26d62e2f16e6a27e4539f438daae86572985 [file]
// Copyright 2026 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "cast/streaming/impl/receiver_impl.h"
#include <algorithm>
#include <utility>
#include "cast/streaming/impl/receiver_packet_router.h"
#include "cast/streaming/public/constants.h"
#include "cast/streaming/public/session_config.h"
#include "platform/base/span.h"
#include "platform/base/trivial_clock_traits.h"
#include "util/chrono_helpers.h"
#include "util/osp_logging.h"
#include "util/std_util.h"
#include "util/trace_logging.h"
namespace openscreen::cast {
using clock_operators::operator<<;
// Conveniences for ensuring logging output includes the SSRC of the Receiver,
// to help distinguish one out of multiple instances in a Cast Streaming
// session.
//
#define SSRC() "[SSRC:" << config().receiver_ssrc << "] "
#define RECEIVER_DLOG(level) OSP_DLOG_##level << SSRC()
#define RECEIVER_LOG(level) OSP_LOG_##level << SSRC()
#define RECEIVER_VLOG OSP_VLOG << SSRC()
#define RECEIVER_DVLOG OSP_DVLOG << SSRC()
ReceiverImpl::ReceiverImpl(Environment& environment,
ReceiverPacketRouter& packet_router,
SessionConfig config)
: now_(environment.now_function()),
packet_router_(packet_router),
config_(config),
rtcp_session_(config.sender_ssrc, config.receiver_ssrc, now_()),
rtcp_parser_(rtcp_session_),
rtcp_builder_(std::make_unique<CompoundRtcpBuilder>(rtcp_session_)),
stats_tracker_(config.rtp_timebase),
rtp_parser_(config.sender_ssrc),
rtp_timebase_(config.rtp_timebase),
crypto_(config.aes_secret_key, config.aes_iv_mask),
is_pli_enabled_(config.is_pli_enabled),
rtcp_alarm_(environment.now_function(), environment.task_runner()),
smoothed_clock_offset_(ClockDriftSmoother::kDefaultTimeConstant),
consumption_alarm_(environment.now_function(),
environment.task_runner()) {
OSP_CHECK_EQ(checkpoint_frame(), FrameId::leader());
rtcp_buffer_.assign(environment.GetMaxPacketSize(), 0);
OSP_CHECK_GT(rtcp_buffer_.size(), 0);
rtcp_builder_->SetPlayoutDelay(config.target_playout_delay);
playout_delay_changes_.emplace_back(FrameId::leader(),
config.target_playout_delay);
packet_router_.RegisterPacketConsumer(rtcp_session_.sender_ssrc(), this);
}
ReceiverImpl::~ReceiverImpl() {
packet_router_.DeregisterPacketConsumer(rtcp_session_.sender_ssrc());
}
const SessionConfig& ReceiverImpl::config() const {
return config_;
}
void ReceiverImpl::SetConsumer(Consumer* consumer) {
consumer_ = consumer;
ScheduleFrameReadyCheck();
}
void ReceiverImpl::SetPlayerProcessingTime(Clock::duration needed_time) {
RECEIVER_DVLOG << __func__ << ": setting processing time to " << needed_time;
player_processing_time_ = std::max(Clock::duration::zero(), needed_time);
}
Error ReceiverImpl::ReportPlayoutEvent(FrameId frame_id,
RtpTimeTicks rtp_timestamp,
Clock::time_point playout_time) {
if (!config_.are_receiver_event_logs_enabled) {
return Error(Error::Code::kOperationInvalid,
"receiver event logs are disabled. reports are ignored.");
}
if (frame_id <= latest_frame_expected_ - kMaxUnackedFrames) {
return Error(Error::Code::kParameterOutOfRange, "frame is too old.");
}
const PendingFrame& entry = GetQueueEntry(frame_id);
OSP_CHECK(entry.estimated_capture_time);
const Clock::duration playout_delay =
std::max(Clock::duration(), playout_time - *entry.estimated_capture_time);
if (config_.are_receiver_event_logs_enabled) {
AddEventToPendingLogs(
rtp_timestamp,
RtcpReceiverEventLogMessage{
.type = StatisticsEvent::Type::kFramePlayedOut,
.timestamp = playout_time,
.delay = std::chrono::duration_cast<std::chrono::milliseconds>(
playout_delay)});
}
TRACE_FLOW_END_WITH_TIME(TraceCategory::kReceiver, "Frame.PlayedOut",
frame_id, playout_time);
return Error::None();
}
void ReceiverImpl::RequestKeyFrame() {
// If we don't have picture loss indication enabled, we should not request
// any key frames.
if (!is_pli_enabled_) {
RECEIVER_LOG(WARN) << "Should not request any key frames when picture loss "
"indication is not enabled";
return;
}
if (!last_key_frame_received_.is_null() &&
last_frame_consumed_ >= last_key_frame_received_ &&
!rtcp_builder_->is_picture_loss_indicator_set()) {
rtcp_builder_->SetPictureLossIndicator(true);
SendRtcp();
}
}
std::optional<size_t> ReceiverImpl::AdvanceToNextFrame() {
TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
const FrameId immediate_next_frame = last_frame_consumed_ + 1;
// Scan the queue for the next frame that should be consumed. Typically, this
// is the very next frame; but if it is incomplete and already late for
// playout, consider skipping-ahead.
for (FrameId f = immediate_next_frame; f <= latest_frame_expected_; ++f) {
PendingFrame& entry = GetQueueEntry(f);
if (entry.collector.is_complete()) {
const EncodedFrame& metadata = entry.collector.PeekFrameMetadata();
const bool is_next_frame = f == immediate_next_frame;
const bool is_independent =
metadata.dependency != EncodedFrame::Dependency::kDependent;
const bool is_ready = is_next_frame || is_independent;
if (is_ready) {
// Found a frame after skipping past some frames. Drop the ones being
// skipped, advancing `last_frame_consumed_` before returning.
// TODO(crbug.com/472513637): we may not always want to drop all
// dependent frames just because we have a complete independent frame.
if (!is_next_frame) {
DropAllFramesBefore(f);
}
TRACE_FLOW_STEP(TraceCategory::kReceiver, "Frame.Ready", f);
return static_cast<int>(entry.collector.GetFramePayloadSize());
}
}
// Do not consider skipping past this frame if its estimated capture time is
// unknown. The implication here is that, if `estimated_capture_time` is
// set, the Receiver also knows whether any target playout delay changes
// were communicated from the Sender in the frame's first RTP packet.
if (!entry.estimated_capture_time) {
break;
}
// If this incomplete frame is not yet late for playout, simply wait for the
// rest of its packets to come in. However, do schedule a check to
// re-examine things at the time it should be processed.
const auto process_time = *entry.estimated_capture_time +
ResolveTargetPlayoutDelay(f) -
player_processing_time_;
if (process_time > now_()) {
ScheduleFrameReadyCheck(process_time);
break;
}
}
return std::nullopt;
}
EncodedFrame ReceiverImpl::ConsumeNextFrame(ByteBuffer buffer) {
TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
// Assumption: The required call to AdvanceToNextFrame() ensures that
// `last_frame_consumed_` is set to one before the frame to be consumed here.
const FrameId frame_id = last_frame_consumed_ + 1;
OSP_CHECK_LE(frame_id, checkpoint_frame());
TRACE_FLOW_STEP(TraceCategory::kReceiver, "Frame.Consumed", frame_id);
// Decrypt the frame, populating the given output `frame`.
PendingFrame& entry = GetQueueEntry(frame_id);
OSP_CHECK(entry.collector.is_complete());
OSP_CHECK(entry.estimated_capture_time);
const EncodedFrame& metadata = entry.collector.PeekFrameMetadata();
// `buffer` will contain the decrypted frame contents.
crypto_.Decrypt(metadata.frame_id, entry.collector.GetPayloadChunks(),
buffer);
EncodedFrame frame;
metadata.CopyMetadataTo(&frame);
frame.data = buffer;
frame.reference_time = *entry.estimated_capture_time +
ResolveTargetPlayoutDelay(frame_id) -
player_processing_time_;
RECEIVER_VLOG << "ConsumeNextFrame → " << frame.frame_id << ": "
<< frame.data.size() << " payload bytes, RTP Timestamp "
<< frame.rtp_timestamp.ToTimeSinceOrigin<microseconds>(
rtp_timebase_)
<< ", to play-out " << (frame.reference_time - now_())
<< " from now.";
// Reset the collector to free up memory, and leave the estimated_capture_time
// for this entry, as it may still be used if the consumer decides to report
// the playout event.
entry.collector.Reset();
last_frame_consumed_ = frame_id;
// Ensure the Consumer is notified if there are already more frames ready for
// consumption, and it hasn't explicitly called AdvanceToNextFrame() to check
// for itself.
ScheduleFrameReadyCheck();
return frame;
}
void ReceiverImpl::OnReceivedRtpPacket(Clock::time_point arrival_time,
std::vector<uint8_t> packet) {
const std::optional<RtpPacketParser::ParseResult> part =
rtp_parser_.Parse(packet);
if (!part) {
RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
<< " bytes as an RTP packet failed.";
return;
}
stats_tracker_.OnReceivedValidRtpPacket(part->sequence_number,
part->rtp_timestamp, arrival_time);
// Ignore packets for frames the Receiver is no longer interested in.
if (part->frame_id <= checkpoint_frame()) {
RECEIVER_VLOG << "ignoring packet for frame " << part->frame_id
<< " as it has been consumed or dropped already.";
return;
}
// Extend the range of frames known to this Receiver, within the capacity of
// this Receiver's queue. Prepare the FrameCollectors to receive any
// newly-discovered frames.
if (part->frame_id > latest_frame_expected_) {
const FrameId max_allowed_frame_id =
last_frame_consumed_ + kMaxUnackedFrames;
if (part->frame_id > max_allowed_frame_id) {
RECEIVER_VLOG << "ignoring packet for unknown frame " << part->frame_id;
return;
}
do {
++latest_frame_expected_;
PendingFrame& entry = GetQueueEntry(latest_frame_expected_);
// The collector was already reset, so just reset the capture time.
entry.estimated_capture_time.reset();
entry.collector.set_frame_id(latest_frame_expected_);
} while (latest_frame_expected_ < part->frame_id);
}
// Start-up edge case: Blatantly drop the first packet of all frames until the
// Receiver has processed at least one Sender Report containing the necessary
// clock-drift and lip-sync information (see OnReceivedRtcpPacket()). This is
// an inescapable data dependency. Note that this special case should almost
// never trigger, since a well-behaving Sender will send the first Sender
// Report RTCP packet before any of the RTP packets.
if (!last_sender_report_ && part->packet_id == FramePacketId{0}) {
RECEIVER_LOG(WARN) << "Dropping packet 0 of frame " << part->frame_id
<< " because it arrived before the first Sender Report.";
// Note: The Sender will have to re-transmit this dropped packet after the
// Sender Report to allow the Receiver to move forward.
return;
}
PendingFrame& pending_frame = GetQueueEntry(part->frame_id);
FrameCollector& collector = pending_frame.collector;
if (collector.is_complete()) {
// An extra, redundant `packet` was received. Do nothing since the frame was
// already complete.
RECEIVER_VLOG << "ignoring redundant packet for frame " << part->frame_id;
return;
}
if (!collector.CollectRtpPacket(*part, &packet)) {
RECEIVER_LOG(WARN) << "bad data in parsed packet for frame "
<< part->frame_id;
return; // Bad data in the parsed packet. Ignore it.
}
// The first packet in a frame contains timing information critical for
// computing this frame's (and all future frames') playout time. Process that,
// but only once.
if (part->packet_id == FramePacketId{0} &&
!pending_frame.estimated_capture_time) {
pending_frame.rtp_timestamp = part->rtp_timestamp;
// Estimate the original capture time of this frame (at the Sender), in
// terms of the Receiver's clock: First, start with a reference time point
// from the Sender's clock (the one from the last Sender Report). Then,
// translate it into the equivalent reference time point in terms of the
// Receiver's clock by applying the measured offset between the two clocks.
// Finally, apply the RTP timestamp difference between the Sender Report and
// this frame to determine what the original capture time of this frame was.
const auto smoothed_offset = smoothed_clock_offset_.Current();
if (!smoothed_offset) {
return;
}
pending_frame.estimated_capture_time =
last_sender_report_->reference_time + *smoothed_offset +
(part->rtp_timestamp - last_sender_report_->rtp_timestamp)
.ToDuration<Clock::duration>(rtp_timebase_);
// If a target playout delay change was included in this packet, record it.
if (part->new_playout_delay > milliseconds::zero()) {
RecordNewTargetPlayoutDelay(part->frame_id, part->new_playout_delay);
}
// Now that the estimated capture time is known, other frames may have just
// become ready, per the frame-skipping logic in AdvanceToNextFrame().
ScheduleFrameReadyCheck();
}
if (config_.are_receiver_event_logs_enabled) {
AddEventToPendingLogs(part->rtp_timestamp,
RtcpReceiverEventLogMessage{
.type = StatisticsEvent::Type::kPacketReceived,
.timestamp = arrival_time,
.packet_id = part->packet_id});
}
if (!collector.is_complete()) {
return; // Wait for the rest of the packets to come in.
}
TRACE_FLOW_STEP(TraceCategory::kReceiver, "Frame.Complete", part->frame_id);
const EncodedFrame& metadata = collector.PeekFrameMetadata();
// Whenever a key frame has been received, the decoder has what it needs to
// recover. In this case, clear the PLI condition.
if (metadata.dependency == EncryptedFrame::Dependency::kKeyFrame) {
rtcp_builder_->SetPictureLossIndicator(false);
last_key_frame_received_ = part->frame_id;
}
// If this just-completed frame is the one right after the checkpoint frame,
// advance the checkpoint forward.
if (part->frame_id == (checkpoint_frame() + 1)) {
// Make sure we provide a FrameAckSent event to the sender later.
pending_frame_acks_.push_back(part->rtp_timestamp);
AdvanceCheckpoint(part->frame_id);
}
// Since a frame has become complete, schedule a check to see whether this or
// any other frames have become ready for consumption.
ScheduleFrameReadyCheck();
}
void ReceiverImpl::OnReceivedRtcpPacket(Clock::time_point arrival_time,
std::span<const uint8_t> packet) {
TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
std::optional<SenderReportParser::SenderReportWithId> parsed_report =
rtcp_parser_.Parse(packet);
if (!parsed_report) {
TRACE_SCOPED(TraceCategory::kReceiver, "ReceivedInvalidRtcpReport");
RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
<< " bytes as an RTCP packet failed.";
return;
}
TRACE_DEFAULT_SCOPED1(TraceCategory::kReceiver, "packet_id",
parsed_report->report_id);
last_sender_report_ = std::move(parsed_report);
last_sender_report_arrival_time_ = arrival_time;
// Measure the offset between the Sender's clock and the Receiver's Clock.
// This will be used to translate reference timestamps from the Sender into
// timestamps that represent the exact same moment in time at the Receiver.
//
// Note: Due to design limitations in the Cast Streaming spec, the Receiver
// has no way to compute how long it took the Sender Report to travel over the
// network. The calculation here just ignores that, and so the
// `measured_offset` below will be larger than the true value by that amount.
// This will have the effect of a later-than-configured playout delay.
// TODO(crbug.com/496703606): determine network delay by using round trip
// timestamp estimations.
const Clock::duration measured_offset =
arrival_time - last_sender_report_->reference_time;
smoothed_clock_offset_.Update(arrival_time, measured_offset);
RtcpReportBlock report;
report.ssrc = rtcp_session_.sender_ssrc();
stats_tracker_.PopulateNextReport(&report);
report.last_status_report_id = last_sender_report_->report_id;
report.SetDelaySinceLastReport(now_() - last_sender_report_arrival_time_);
rtcp_builder_->IncludeReceiverReportInNextPacket(report);
SendRtcp();
}
void ReceiverImpl::SendRtcp() {
// Collect ACK/NACK feedback for all active frames in the queue.
std::vector<PacketNack> packet_nacks;
std::vector<FrameId> frame_acks;
for (FrameId f = checkpoint_frame() + 1; f <= latest_frame_expected_; ++f) {
const PendingFrame& entry = GetQueueEntry(f);
if (entry.collector.is_complete()) {
frame_acks.push_back(f);
if (config_.are_receiver_event_logs_enabled) {
if (entry.rtp_timestamp) {
pending_frame_acks_.push_back(entry.rtp_timestamp.value());
}
}
} else {
entry.collector.GetMissingPackets(&packet_nacks);
}
}
// Fire off events for frames that were implicitly ACKed.
if (config_.are_receiver_event_logs_enabled) {
for (auto rtp_timestamp : pending_frame_acks_) {
AddEventToPendingLogs(rtp_timestamp,
RtcpReceiverEventLogMessage{
.type = StatisticsEvent::Type::kFrameAckSent,
.timestamp = now_(),
});
}
pending_frame_acks_.clear();
rtcp_builder_->IncludeReceiverLogsInNextPacket(std::move(pending_logs_));
pending_logs_.clear();
}
// Build and send a compound RTCP packet.
rtcp_builder_->IncludeFeedbackInNextPacket(std::move(packet_nacks),
std::move(frame_acks));
last_rtcp_send_time_ = now_();
packet_router_.SendRtcpPacket(
rtcp_builder_->BuildPacket(last_rtcp_send_time_, rtcp_buffer_));
// Schedule the automatic sending of another RTCP packet, if this method is
// not called within some bounded amount of time. While incomplete frames
// exist in the queue, send RTCP packets (with ACK/NACK feedback) frequently.
// When there are no incomplete frames, use a longer "keepalive" interval.
const Clock::duration interval =
(packet_nacks.empty() ? kRtcpReportInterval : kNackFeedbackInterval);
rtcp_alarm_.Schedule([this] { SendRtcp(); }, last_rtcp_send_time_ + interval);
}
const ReceiverImpl::PendingFrame& ReceiverImpl::GetQueueEntry(
FrameId frame_id) const {
return const_cast<ReceiverImpl*>(this)->GetQueueEntry(frame_id);
}
ReceiverImpl::PendingFrame& ReceiverImpl::GetQueueEntry(FrameId frame_id) {
return pending_frames_[(frame_id - FrameId::first()) %
pending_frames_.size()];
}
void ReceiverImpl::RecordNewTargetPlayoutDelay(FrameId as_of_frame,
milliseconds delay) {
OSP_CHECK_GT(as_of_frame, checkpoint_frame());
// Prune-out entries from `playout_delay_changes_` that are no longer needed.
// At least one entry must always be kept (i.e., there must always be a
// "current" setting).
const FrameId next_frame = last_frame_consumed_ - kMaxUnackedFrames + 1;
const auto keep_one_before_it = std::find_if(
std::next(playout_delay_changes_.begin()), playout_delay_changes_.end(),
[&](const auto& entry) { return entry.first > next_frame; });
playout_delay_changes_.erase(playout_delay_changes_.begin(),
std::prev(keep_one_before_it));
// Insert the delay change entry, maintaining the ascending ordering of the
// vector.
const auto insert_it = std::find_if(
playout_delay_changes_.begin(), playout_delay_changes_.end(),
[&](const auto& entry) { return entry.first > as_of_frame; });
playout_delay_changes_.emplace(insert_it, as_of_frame, delay);
OSP_DCHECK(AreElementsSortedAndUnique(playout_delay_changes_));
}
milliseconds ReceiverImpl::ResolveTargetPlayoutDelay(FrameId frame_id) const {
const FrameId first_possible =
last_frame_consumed_ > FrameId::first() + kMaxUnackedFrames
? last_frame_consumed_ - kMaxUnackedFrames
: FrameId::first();
OSP_CHECK_GE(frame_id, first_possible);
#if OSP_DCHECK_IS_ON()
// Extra precaution: Ensure all possible playout delay changes are known. In
// other words, every unconsumed frame in the queue, up to (and including)
// `frame_id`, must have an assigned estimated_capture_time.
for (FrameId f = first_possible; f <= frame_id; ++f) {
OSP_CHECK(GetQueueEntry(f).estimated_capture_time)
<< " don't know whether there was a playout delay change for frame "
<< f;
}
#endif
const auto it = std::find_if(
playout_delay_changes_.crbegin(), playout_delay_changes_.crend(),
[&](const auto& entry) { return entry.first <= frame_id; });
OSP_CHECK(it != playout_delay_changes_.crend());
return it->second;
}
void ReceiverImpl::AdvanceCheckpoint(FrameId new_checkpoint) {
TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
OSP_CHECK_GT(new_checkpoint, checkpoint_frame());
OSP_CHECK_LE(new_checkpoint, latest_frame_expected_);
while (new_checkpoint < latest_frame_expected_) {
const FrameId next = new_checkpoint + 1;
if (!GetQueueEntry(next).collector.is_complete()) {
break;
}
new_checkpoint = next;
}
set_checkpoint_frame(new_checkpoint);
rtcp_builder_->SetPlayoutDelay(ResolveTargetPlayoutDelay(new_checkpoint));
SendRtcp();
}
void ReceiverImpl::DropAllFramesBefore(FrameId first_kept_frame) {
// The following CHECKs are verifying that this method is only being called
// because one or more incomplete frames are being skipped-over.
const FrameId first_to_drop = last_frame_consumed_ + 1;
OSP_CHECK_GT(first_kept_frame, first_to_drop);
OSP_CHECK_GT(first_kept_frame, checkpoint_frame());
OSP_CHECK_LE(first_kept_frame, latest_frame_expected_);
// Reset each of the frames being dropped, pretending that they were consumed.
for (FrameId f = first_to_drop; f < first_kept_frame; ++f) {
PendingFrame& entry = GetQueueEntry(f);
// Pedantic sanity-check: Ensure the "target playout delay change" data
// dependency was satisfied. See comments in AdvanceToNextFrame().
OSP_CHECK(entry.estimated_capture_time);
entry.collector.Reset();
}
last_frame_consumed_ = first_kept_frame - 1;
RECEIVER_LOG(INFO) << "Artificially advancing checkpoint after skipping.";
AdvanceCheckpoint(first_kept_frame);
}
void ReceiverImpl::ScheduleFrameReadyCheck(Clock::time_point when) {
consumption_alarm_.Schedule(
[this] {
if (consumer_) {
const std::optional<size_t> next_size = AdvanceToNextFrame();
if (next_size.has_value()) {
consumer_->OnFramesReady(*next_size);
}
}
},
when);
}
void ReceiverImpl::AddEventToPendingLogs(
RtpTimeTicks rtp_timestamp,
RtcpReceiverEventLogMessage event_log) {
OSP_CHECK(config_.are_receiver_event_logs_enabled);
// Find or create a frame log for this RTP timestamp.
auto it = std::find_if(
pending_logs_.begin(), pending_logs_.end(),
[&](const auto& log) { return log.rtp_timestamp == rtp_timestamp; });
if (it == pending_logs_.end()) {
pending_logs_.push_back({rtp_timestamp, {}});
it = pending_logs_.end() - 1;
}
it->messages.push_back(event_log);
}
} // namespace openscreen::cast