| // Copyright 2019 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "osp/public/presentation/presentation_controller.h" |
| |
| #include <algorithm> |
| #include <sstream> |
| #include <type_traits> |
| |
| #include "osp/impl/presentation/presentation_utils.h" |
| #include "osp/impl/presentation/url_availability_requester.h" |
| #include "osp/msgs/osp_messages.h" |
| #include "osp/public/message_demuxer.h" |
| #include "osp/public/network_service_manager.h" |
| #include "osp/public/request_response_handler.h" |
| #include "util/osp_logging.h" |
| #include "util/std_util.h" |
| |
| namespace openscreen::osp { |
| |
| #define DECLARE_MSG_REQUEST_RESPONSE(base_name) \ |
| using RequestMsgType = msgs::Presentation##base_name##Request; \ |
| using ResponseMsgType = msgs::Presentation##base_name##Response; \ |
| \ |
| static constexpr MessageEncodingFunction<RequestMsgType> kEncoder = \ |
| &msgs::EncodePresentation##base_name##Request; \ |
| static constexpr MessageDecodingFunction<ResponseMsgType> kDecoder = \ |
| &msgs::DecodePresentation##base_name##Response; \ |
| static constexpr msgs::Type kResponseType = \ |
| msgs::Type::kPresentation##base_name##Response |
| |
| struct StartRequest { |
| DECLARE_MSG_REQUEST_RESPONSE(Start); |
| |
| msgs::PresentationStartRequest request; |
| RequestDelegate* delegate; |
| Connection::Delegate* presentation_connection_delegate; |
| }; |
| |
| struct ConnectionOpenRequest { |
| DECLARE_MSG_REQUEST_RESPONSE(ConnectionOpen); |
| |
| msgs::PresentationConnectionOpenRequest request; |
| RequestDelegate* delegate; |
| Connection::Delegate* presentation_connection_delegate; |
| std::unique_ptr<Connection> connection; |
| }; |
| |
| struct TerminationRequest { |
| DECLARE_MSG_REQUEST_RESPONSE(Termination); |
| |
| msgs::PresentationTerminationRequest request; |
| }; |
| |
| class Controller::MessageGroupStreams final |
| : public ProtocolConnection::Observer, |
| public RequestResponseHandler<StartRequest>::Delegate, |
| public RequestResponseHandler<ConnectionOpenRequest>::Delegate, |
| public RequestResponseHandler<TerminationRequest>::Delegate { |
| public: |
| MessageGroupStreams(Controller* controller, |
| std::string_view instance_name, |
| uint64_t instance_id); |
| MessageGroupStreams(const MessageGroupStreams&) = delete; |
| MessageGroupStreams& operator=(const MessageGroupStreams&) = delete; |
| MessageGroupStreams(MessageGroupStreams&&) noexcept = delete; |
| MessageGroupStreams& operator=(MessageGroupStreams&&) noexcept = delete; |
| ~MessageGroupStreams(); |
| |
| uint64_t SendStartRequest(StartRequest request); |
| void CancelStartRequest(uint64_t request_id); |
| void OnMatchedResponse(StartRequest* request, |
| msgs::PresentationStartResponse* response, |
| uint64_t instance_id) override; |
| void OnError(StartRequest* request, const Error& error) override; |
| |
| uint64_t SendConnectionOpenRequest(ConnectionOpenRequest request); |
| void CancelConnectionOpenRequest(uint64_t request_id); |
| void OnMatchedResponse(ConnectionOpenRequest* request, |
| msgs::PresentationConnectionOpenResponse* response, |
| uint64_t instance_id) override; |
| void OnError(ConnectionOpenRequest* request, const Error& error) override; |
| |
| void SendTerminationRequest(TerminationRequest request); |
| void OnMatchedResponse(TerminationRequest* request, |
| msgs::PresentationTerminationResponse* response, |
| uint64_t instance_id) override; |
| void OnError(TerminationRequest* request, const Error& error) override; |
| |
| // ProtocolConnection::Observer overrides. |
| void OnConnectionClosed(const ProtocolConnection& connection) override; |
| |
| private: |
| uint64_t GetNextInternalRequestId() { return ++next_internal_request_id_; } |
| |
| void CreateProtocolConnection(bool is_initiation); |
| |
| Controller* const controller_; |
| const std::string instance_name_; |
| const uint64_t instance_id_; |
| uint64_t next_internal_request_id_ = 1; |
| |
| std::unique_ptr<ProtocolConnection> initiation_protocol_connection_; |
| RequestResponseHandler<StartRequest> initiation_handler_; |
| RequestResponseHandler<TerminationRequest> termination_handler_; |
| |
| std::unique_ptr<ProtocolConnection> connection_protocol_connection_; |
| RequestResponseHandler<ConnectionOpenRequest> connection_open_handler_; |
| }; |
| |
| Controller::MessageGroupStreams::MessageGroupStreams( |
| Controller* controller, |
| std::string_view instance_name, |
| uint64_t instance_id) |
| : controller_(controller), |
| instance_name_(instance_name), |
| instance_id_(instance_id), |
| initiation_handler_(*this), |
| termination_handler_(*this), |
| connection_open_handler_(*this) {} |
| |
| Controller::MessageGroupStreams::~MessageGroupStreams() { |
| // Both are used to avoid triggering `OnConnectionClosed` during the |
| // destruction process, which may cause error that delete one instance twice. |
| if (initiation_protocol_connection_) { |
| initiation_protocol_connection_->SetObserver(nullptr); |
| } |
| |
| if (connection_protocol_connection_) { |
| connection_protocol_connection_->SetObserver(nullptr); |
| } |
| } |
| |
| uint64_t Controller::MessageGroupStreams::SendStartRequest( |
| StartRequest request) { |
| if (!initiation_protocol_connection_) { |
| CreateProtocolConnection(/*is_initiation=*/true); |
| } |
| |
| uint64_t request_id = GetNextInternalRequestId(); |
| initiation_handler_.WriteMessage(request_id, std::move(request)); |
| return request_id; |
| } |
| |
| void Controller::MessageGroupStreams::CancelStartRequest(uint64_t request_id) { |
| // TODO(btolsch): Instead, mark the `request_id` for immediate termination if |
| // we get a successful response. |
| initiation_handler_.CancelMessage(request_id); |
| } |
| |
| void Controller::MessageGroupStreams::OnMatchedResponse( |
| StartRequest* request, |
| msgs::PresentationStartResponse* response, |
| uint64_t instance_id) { |
| if (response->result != msgs::PresentationStartResponse_result::kSuccess) { |
| std::stringstream ss; |
| ss << "presentation-start-response for " << request->request.url |
| << " failed: " << static_cast<int>(response->result); |
| Error error(Error::Code::kUnknownStartError, ss.str()); |
| OSP_LOG_INFO << error.message(); |
| request->delegate->OnError(error); |
| return; |
| } |
| |
| OSP_LOG_INFO << "presentation started for " << request->request.url; |
| Controller::ControlledPresentation& presentation = |
| controller_->presentations_by_id_[request->request.presentation_id]; |
| presentation.instance_name = instance_name_; |
| presentation.url = request->request.url; |
| auto connection = std::make_unique<Connection>( |
| Connection::PresentationInfo{request->request.presentation_id, |
| request->request.url}, |
| request->presentation_connection_delegate, controller_); |
| controller_->OpenConnection( |
| response->connection_id, instance_id, instance_name_, request->delegate, |
| std::move(connection), CreateClientProtocolConnection(instance_id)); |
| } |
| |
| void Controller::MessageGroupStreams::OnError(StartRequest* request, |
| const Error& error) { |
| request->delegate->OnError(error); |
| } |
| |
| uint64_t Controller::MessageGroupStreams::SendConnectionOpenRequest( |
| ConnectionOpenRequest request) { |
| if (!connection_protocol_connection_) { |
| CreateProtocolConnection(/*is_initiation=*/false); |
| } |
| |
| uint64_t request_id = GetNextInternalRequestId(); |
| connection_open_handler_.WriteMessage(request_id, std::move(request)); |
| return request_id; |
| } |
| |
| void Controller::MessageGroupStreams::CancelConnectionOpenRequest( |
| uint64_t request_id) { |
| connection_open_handler_.CancelMessage(request_id); |
| } |
| |
| void Controller::MessageGroupStreams::OnMatchedResponse( |
| ConnectionOpenRequest* request, |
| msgs::PresentationConnectionOpenResponse* response, |
| uint64_t instance_id) { |
| if (response->result != |
| msgs::PresentationConnectionOpenResponse_result::kSuccess) { |
| std::stringstream ss; |
| ss << "presentation-connection-open-response for " << request->request.url |
| << " failed: " << static_cast<int>(response->result); |
| Error error(Error::Code::kUnknownStartError, ss.str()); |
| OSP_LOG_INFO << error.message(); |
| request->delegate->OnError(error); |
| return; |
| } |
| |
| OSP_LOG_INFO << "presentation connection opened to " |
| << request->request.presentation_id; |
| if (request->presentation_connection_delegate) { |
| request->connection = std::make_unique<Connection>( |
| Connection::PresentationInfo{request->request.presentation_id, |
| request->request.url}, |
| request->presentation_connection_delegate, controller_); |
| } |
| std::unique_ptr<ProtocolConnection> protocol_connection = |
| CreateClientProtocolConnection(instance_id); |
| request->connection->OnConnected(response->connection_id, instance_id, |
| std::move(protocol_connection)); |
| controller_->AddConnection(request->connection.get()); |
| request->delegate->OnConnection(std::move(request->connection)); |
| } |
| |
| void Controller::MessageGroupStreams::OnError(ConnectionOpenRequest* request, |
| const Error& error) { |
| request->delegate->OnError(error); |
| } |
| |
| void Controller::MessageGroupStreams::SendTerminationRequest( |
| TerminationRequest request) { |
| if (!initiation_protocol_connection_) { |
| CreateProtocolConnection(/*is_initiation=*/true); |
| } |
| |
| termination_handler_.WriteMessage(std::move(request)); |
| } |
| |
| void Controller::MessageGroupStreams::OnMatchedResponse( |
| TerminationRequest* request, |
| msgs::PresentationTerminationResponse* response, |
| uint64_t instance_id) { |
| OSP_VLOG << "got presentation-termination-response for " |
| << request->request.presentation_id << " with result " |
| << static_cast<int>(response->result); |
| controller_->TerminatePresentationById(request->request.presentation_id); |
| } |
| |
| void Controller::MessageGroupStreams::OnError(TerminationRequest* request, |
| const Error& error) {} |
| |
| void Controller::MessageGroupStreams::OnConnectionClosed( |
| const ProtocolConnection& connection) { |
| if (initiation_protocol_connection_ && |
| initiation_protocol_connection_.get() == &connection) { |
| initiation_handler_.Reset(); |
| termination_handler_.Reset(); |
| initiation_protocol_connection_.reset(); |
| return; |
| } |
| |
| if (connection_protocol_connection_ && |
| connection_protocol_connection_.get() == &connection) { |
| connection_open_handler_.Reset(); |
| connection_protocol_connection_.reset(); |
| } |
| } |
| |
| void Controller::MessageGroupStreams::CreateProtocolConnection( |
| bool is_initiation) { |
| if (is_initiation) { |
| initiation_protocol_connection_ = |
| CreateClientProtocolConnection(instance_id_); |
| if (initiation_protocol_connection_) { |
| initiation_protocol_connection_->SetObserver(this); |
| initiation_handler_.SetConnection(initiation_protocol_connection_.get()); |
| termination_handler_.SetConnection(initiation_protocol_connection_.get()); |
| } else { |
| OSP_LOG_WARN << "There is no valid underlying connection."; |
| } |
| } else { |
| connection_protocol_connection_ = |
| CreateClientProtocolConnection(instance_id_); |
| if (connection_protocol_connection_) { |
| connection_protocol_connection_->SetObserver(this); |
| connection_open_handler_.SetConnection( |
| connection_protocol_connection_.get()); |
| } else { |
| OSP_LOG_WARN << "There is no valid underlying connection."; |
| } |
| } |
| } |
| |
| class Controller::TerminationListener final |
| : public MessageDemuxer::MessageCallback { |
| public: |
| TerminationListener(Controller* controller, |
| const std::string& presentation_id, |
| uint64_t instance_id); |
| TerminationListener(const TerminationListener&) = delete; |
| TerminationListener& operator=(const TerminationListener&) = delete; |
| TerminationListener(TerminationListener&&) noexcept = delete; |
| TerminationListener& operator=(TerminationListener&&) noexcept = delete; |
| ~TerminationListener() override; |
| |
| // MessageDemuxer::MessageCallback overrides. |
| ErrorOr<size_t> OnStreamMessage(uint64_t instance_id, |
| uint64_t connection_id, |
| msgs::Type message_type, |
| const uint8_t* buffer, |
| size_t buffer_size, |
| Clock::time_point now) override; |
| |
| private: |
| Controller* const controller_; |
| std::string presentation_id_; |
| MessageDemuxer::MessageWatch event_watch_; |
| }; |
| |
| Controller::TerminationListener::TerminationListener( |
| Controller* controller, |
| const std::string& presentation_id, |
| uint64_t instance_id) |
| : controller_(controller), presentation_id_(presentation_id) { |
| event_watch_ = GetClientDemuxer().WatchMessageType( |
| instance_id, msgs::Type::kPresentationTerminationEvent, this); |
| } |
| |
| Controller::TerminationListener::~TerminationListener() = default; |
| |
| ErrorOr<size_t> Controller::TerminationListener::OnStreamMessage( |
| uint64_t instance_id, |
| uint64_t connection_id, |
| msgs::Type message_type, |
| const uint8_t* buffer, |
| size_t buffer_size, |
| Clock::time_point now) { |
| OSP_CHECK_EQ(static_cast<int>(msgs::Type::kPresentationTerminationEvent), |
| static_cast<int>(message_type)); |
| msgs::PresentationTerminationEvent event; |
| const msgs::CborResult result = |
| msgs::DecodePresentationTerminationEvent(buffer, buffer_size, event); |
| if (result < 0) { |
| if (result == msgs::kParserEOF) { |
| return Error::Code::kCborIncompleteMessage; |
| } |
| |
| OSP_LOG_WARN << "decode presentation-termination-event error: " << result; |
| return Error::Code::kCborParsing; |
| } else if (event.presentation_id != presentation_id_) { |
| OSP_LOG_WARN << "got presentation-termination-event for wrong id: " |
| << presentation_id_ << " vs. " << event.presentation_id; |
| return result; |
| } |
| |
| OSP_LOG_INFO << "termination event"; |
| auto presentation_entry = |
| controller_->presentations_by_id_.find(event.presentation_id); |
| if (presentation_entry != controller_->presentations_by_id_.end()) { |
| for (auto* connection : presentation_entry->second.connections) { |
| connection->OnTerminated(); |
| } |
| controller_->presentations_by_id_.erase(presentation_entry); |
| } |
| controller_->termination_listener_by_id_.erase(event.presentation_id); |
| return result; |
| } |
| |
| RequestDelegate::RequestDelegate() = default; |
| RequestDelegate::~RequestDelegate() = default; |
| |
| ReceiverObserver::ReceiverObserver() = default; |
| ReceiverObserver::~ReceiverObserver() = default; |
| |
| Controller::ReceiverWatch::ReceiverWatch() = default; |
| Controller::ReceiverWatch::ReceiverWatch(Controller* controller, |
| const std::vector<std::string>& urls, |
| ReceiverObserver* observer) |
| : urls_(urls), observer_(observer), controller_(controller) {} |
| |
| Controller::ReceiverWatch::ReceiverWatch( |
| Controller::ReceiverWatch&& other) noexcept { |
| // Although all fields are POD, this does not use the default implementation. |
| // See `operator=` for details. |
| *this = std::move(other); |
| } |
| |
| Controller::ReceiverWatch& Controller::ReceiverWatch::operator=( |
| Controller::ReceiverWatch&& other) noexcept { |
| // Although all fields are POD, this does not use the default `operator=` |
| // implementation because it is important that we should stop current watch |
| // for `observer_` before taking values from `other` and making `other` |
| // invalid. |
| StopWatching(); |
| urls_ = std::move(other.urls_); |
| observer_ = other.observer_; |
| controller_ = other.controller_; |
| other.observer_ = nullptr; |
| other.observer_ = nullptr; |
| return *this; |
| } |
| |
| Controller::ReceiverWatch::~ReceiverWatch() { |
| StopWatching(); |
| } |
| |
| void Controller::ReceiverWatch::Reset() { |
| StopWatching(); |
| urls_.clear(); |
| controller_ = nullptr; |
| } |
| |
| void Controller::ReceiverWatch::StopWatching() { |
| if (observer_) { |
| controller_->CancelReceiverWatch(urls_, observer_); |
| } |
| observer_ = nullptr; |
| } |
| |
| Controller::ConnectRequest::ConnectRequest() = default; |
| |
| Controller::ConnectRequest::ConnectRequest(Controller* controller, |
| const std::string& instance_name, |
| bool is_reconnect, |
| uint64_t request_id) |
| : instance_name_(instance_name), |
| is_reconnect_(is_reconnect), |
| request_id_(request_id), |
| controller_(controller) {} |
| |
| Controller::ConnectRequest::ConnectRequest(ConnectRequest&& other) noexcept { |
| // Although all fields are POD, this does not use the default implementation. |
| // See `operator=` for details. |
| *this = std::move(other); |
| } |
| |
| Controller::ConnectRequest& Controller::ConnectRequest::operator=( |
| ConnectRequest&& other) noexcept { |
| // Although all fields are POD, this does not use the default `operator=` |
| // implementation because it is important that we should cancel current |
| // request for `controller_` before taking values from `other` and making |
| // `other` invalid. |
| CancelRequest(); |
| instance_name_ = std::move(other.instance_name_); |
| is_reconnect_ = other.is_reconnect_; |
| request_id_ = other.request_id_; |
| controller_ = other.controller_; |
| other.request_id_ = 0; |
| other.is_reconnect_ = false; |
| other.controller_ = nullptr; |
| return *this; |
| } |
| |
| Controller::ConnectRequest::~ConnectRequest() { |
| CancelRequest(); |
| } |
| |
| void Controller::ConnectRequest::Reset() { |
| CancelRequest(); |
| instance_name_.clear(); |
| is_reconnect_ = false; |
| controller_ = nullptr; |
| } |
| |
| void Controller::ConnectRequest::CancelRequest() { |
| if (request_id_) { |
| controller_->CancelConnectRequest(instance_name_, is_reconnect_, |
| request_id_); |
| } |
| request_id_ = 0; |
| } |
| |
| Controller::Controller(ClockNowFunctionPtr now_function) { |
| availability_requester_ = |
| std::make_unique<UrlAvailabilityRequester>(now_function); |
| connection_manager_ = std::make_unique<ConnectionManager>(GetClientDemuxer()); |
| const std::vector<ServiceInfo>& receivers = |
| NetworkServiceManager::Get()->GetServiceListener()->GetReceivers(); |
| for (const auto& info : receivers) { |
| availability_requester_->AddReceiver(info); |
| } |
| |
| NetworkServiceManager::Get()->GetServiceListener()->AddObserver(*this); |
| } |
| |
| Controller::~Controller() { |
| NetworkServiceManager::Get()->GetServiceListener()->RemoveObserver(*this); |
| } |
| |
| Error Controller::CloseConnection(Connection* connection, |
| Connection::CloseReason reason) { |
| auto presentation_entry = |
| presentations_by_id_.find(connection->presentation_info().id); |
| if (presentation_entry == presentations_by_id_.end()) { |
| std::stringstream ss; |
| ss << "no presentation found when trying to close connection " |
| << connection->presentation_info().id << ":" |
| << connection->connection_id(); |
| return Error(Error::Code::kNoPresentationFound, ss.str()); |
| } |
| |
| std::unique_ptr<ProtocolConnection> protocol_connection = |
| CreateClientProtocolConnection(connection->instance_id()); |
| if (!protocol_connection) { |
| return Error::Code::kNoActiveConnection; |
| } |
| |
| msgs::PresentationConnectionCloseEvent event = { |
| .connection_id = connection->connection_id(), |
| .reason = ConvertCloseEventReason(reason), |
| .connection_count = connection_manager_->ConnectionCount(), |
| .has_error_message = false}; |
| return protocol_connection->WriteMessage( |
| event, msgs::EncodePresentationConnectionCloseEvent); |
| } |
| |
| Error Controller::OnPresentationTerminated(const std::string& presentation_id, |
| TerminationSource source, |
| TerminationReason reason) { |
| auto presentation_entry = presentations_by_id_.find(presentation_id); |
| if (presentation_entry == presentations_by_id_.end()) { |
| return Error::Code::kNoPresentationFound; |
| } |
| |
| ControlledPresentation& presentation = presentation_entry->second; |
| for (auto* connection : presentation.connections) { |
| connection->OnTerminated(); |
| } |
| |
| TerminationRequest request = { |
| .request = {.presentation_id = presentation_id, |
| .reason = msgs::PresentationTerminationReason::kUserRequest}}; |
| group_streams_by_instance_name_[presentation.instance_name] |
| ->SendTerminationRequest(std::move(request)); |
| presentations_by_id_.erase(presentation_entry); |
| termination_listener_by_id_.erase(presentation_id); |
| return Error::None(); |
| } |
| |
| void Controller::OnConnectionDestroyed(Connection* connection) { |
| auto presentation_entry = |
| presentations_by_id_.find(connection->presentation_info().id); |
| if (presentation_entry == presentations_by_id_.end()) { |
| return; |
| } |
| |
| std::vector<Connection*>& connections = |
| presentation_entry->second.connections; |
| connections.erase( |
| std::remove(connections.begin(), connections.end(), connection), |
| connections.end()); |
| |
| connection_manager_->RemoveConnection(connection); |
| } |
| |
| void Controller::BuildConnection(std::string_view instance_name) { |
| std::string name(instance_name); |
| auto requset_entry = connect_requests_by_instance_name_.find(name); |
| if (requset_entry != connect_requests_by_instance_name_.end()) { |
| OSP_LOG_WARN << "There is alreay a request in progress for connecting to " |
| << instance_name; |
| return; |
| } |
| |
| auto result = connect_requests_by_instance_name_.insert( |
| {std::move(name), openscreen::osp::ConnectRequest()}); |
| NetworkServiceManager::Get()->GetProtocolConnectionClient()->Connect( |
| instance_name, result.first->second, this); |
| } |
| |
| Controller::ReceiverWatch Controller::RegisterReceiverWatch( |
| const std::vector<std::string>& urls, |
| ReceiverObserver* observer) { |
| availability_requester_->AddObserver(urls, observer); |
| return ReceiverWatch(this, urls, observer); |
| } |
| |
| Controller::ConnectRequest Controller::StartPresentation( |
| const std::string& url, |
| const std::string& instance_name, |
| RequestDelegate* delegate, |
| Connection::Delegate* conn_delegate) { |
| StartRequest request = { |
| .request = {.presentation_id = MakePresentationId(url, instance_name), |
| .url = url}, |
| .delegate = delegate, |
| .presentation_connection_delegate = conn_delegate}; |
| uint64_t request_id = |
| group_streams_by_instance_name_[instance_name]->SendStartRequest( |
| std::move(request)); |
| return ConnectRequest(this, instance_name, /*is_reconnect=*/false, |
| request_id); |
| } |
| |
| Controller::ConnectRequest Controller::ReconnectPresentation( |
| const std::vector<std::string>& urls, |
| const std::string& presentation_id, |
| const std::string& instance_name, |
| RequestDelegate* delegate, |
| Connection::Delegate* conn_delegate) { |
| auto presentation_entry = presentations_by_id_.find(presentation_id); |
| if (presentation_entry == presentations_by_id_.end()) { |
| delegate->OnError(Error::Code::kNoPresentationFound); |
| return ConnectRequest(); |
| } |
| |
| if (!Contains(urls, presentation_entry->second.url)) { |
| delegate->OnError(Error::Code::kNoPresentationFound); |
| return ConnectRequest(); |
| } |
| |
| ConnectionOpenRequest request = { |
| .request = {.presentation_id = presentation_id, |
| .url = presentation_entry->second.url}, |
| .delegate = delegate, |
| .presentation_connection_delegate = conn_delegate, |
| .connection = nullptr}; |
| uint64_t request_id = |
| group_streams_by_instance_name_[instance_name]->SendConnectionOpenRequest( |
| std::move(request)); |
| return ConnectRequest(this, instance_name, /*is_reconnect=*/true, request_id); |
| } |
| |
| Controller::ConnectRequest Controller::ReconnectConnection( |
| std::unique_ptr<Connection> connection, |
| RequestDelegate* delegate) { |
| if (connection->state() != Connection::State::kClosed) { |
| delegate->OnError(Error::Code::kInvalidConnectionState); |
| return ConnectRequest(); |
| } |
| |
| const Connection::PresentationInfo& info = connection->presentation_info(); |
| auto presentation_entry = presentations_by_id_.find(info.id); |
| if (presentation_entry == presentations_by_id_.end() || |
| presentation_entry->second.url != info.url) { |
| OSP_LOG_ERROR << "missing ControlledPresentation for non-terminated " |
| "connection with info (" |
| << info.id << ", " << info.url << ")"; |
| delegate->OnError(Error::Code::kNoPresentationFound); |
| return ConnectRequest(); |
| } |
| |
| OSP_CHECK(connection_manager_->GetConnection(connection->connection_id())) |
| << "valid connection for reconnect is unknown to the " |
| "connection manager"; |
| connection_manager_->RemoveConnection(connection.get()); |
| connection->OnConnecting(); |
| ConnectionOpenRequest request = { |
| .request = {.presentation_id = info.id, .url = info.url}, |
| .delegate = delegate, |
| .presentation_connection_delegate = nullptr, |
| .connection = std::move(connection)}; |
| const std::string& instance_name = presentation_entry->second.instance_name; |
| uint64_t request_id = |
| group_streams_by_instance_name_[instance_name]->SendConnectionOpenRequest( |
| std::move(request)); |
| return ConnectRequest(this, instance_name, /*is_reconnect=*/true, request_id); |
| } |
| |
| std::string Controller::GetServiceIdForPresentationId( |
| const std::string& presentation_id) const { |
| auto presentation_entry = presentations_by_id_.find(presentation_id); |
| if (presentation_entry == presentations_by_id_.end()) { |
| return ""; |
| } |
| return presentation_entry->second.instance_name; |
| } |
| |
| ProtocolConnection* Controller::GetConnectionRequestGroupStream( |
| const std::string& instance_name) { |
| OSP_UNIMPLEMENTED(); |
| return nullptr; |
| } |
| |
| void Controller::OnStarted() {} |
| void Controller::OnStopped() {} |
| void Controller::OnSuspended() {} |
| void Controller::OnSearching() {} |
| |
| void Controller::OnReceiverAdded(const ServiceInfo& info) {} |
| |
| void Controller::OnReceiverChanged(const ServiceInfo& info) { |
| availability_requester_->ChangeReceiver(info); |
| } |
| |
| void Controller::OnReceiverRemoved(const ServiceInfo& info) { |
| group_streams_by_instance_name_.erase(info.instance_name); |
| availability_requester_->RemoveReceiver(info); |
| } |
| |
| void Controller::OnAllReceiversRemoved() { |
| group_streams_by_instance_name_.clear(); |
| availability_requester_->RemoveAllReceivers(); |
| } |
| |
| void Controller::OnError(const Error&) {} |
| |
| void Controller::OnConnectSucceed(uint64_t request_id, |
| std::string_view instance_name, |
| uint64_t instance_id) { |
| auto request_entry = |
| connect_requests_by_instance_name_.find(std::string(instance_name)); |
| if (request_entry == connect_requests_by_instance_name_.end()) { |
| return; |
| } |
| |
| OSP_CHECK_EQ(request_id, request_entry->second.request_id()); |
| request_entry->second.MarkComplete(); |
| connect_requests_by_instance_name_.erase(request_entry); |
| |
| group_streams_by_instance_name_.emplace( |
| instance_name, |
| std::make_unique<MessageGroupStreams>(this, instance_name, instance_id)); |
| availability_requester_->CreateReceiverRequester(instance_name, instance_id); |
| OSP_LOG_INFO << "Controller succeed to build the underlying connection to: " |
| << instance_name; |
| } |
| |
| void Controller::OnConnectFailed(uint64_t request_id, |
| std::string_view instance_name) { |
| auto request_entry = |
| connect_requests_by_instance_name_.find(std::string(instance_name)); |
| if (request_entry == connect_requests_by_instance_name_.end()) { |
| return; |
| } |
| |
| request_entry->second.MarkComplete(); |
| connect_requests_by_instance_name_.erase(request_entry); |
| OSP_LOG_WARN << "Controller failed to build the underlying connection to: " |
| << instance_name; |
| } |
| |
| // static |
| std::string Controller::MakePresentationId(const std::string& url, |
| const std::string& instance_name) { |
| // TODO(btolsch): This is just a placeholder for the demo. It should |
| // eventually become a GUID/unguessable token routine. |
| std::string safe_id = instance_name; |
| for (auto& c : safe_id) { |
| if (c < ' ' || c > '~') { |
| c = '.'; |
| } |
| } |
| return safe_id + ":" + url; |
| } |
| |
| void Controller::AddConnection(Connection* connection) { |
| connection_manager_->AddConnection(connection); |
| } |
| |
| void Controller::OpenConnection( |
| uint64_t connection_id, |
| uint64_t instance_id, |
| const std::string& instance_name, |
| RequestDelegate* request_delegate, |
| std::unique_ptr<Connection>&& connection, |
| std::unique_ptr<ProtocolConnection>&& protocol_connection) { |
| connection->OnConnected(connection_id, instance_id, |
| std::move(protocol_connection)); |
| const std::string& presentation_id = connection->presentation_info().id; |
| auto presentation_entry = presentations_by_id_.find(presentation_id); |
| if (presentation_entry == presentations_by_id_.end()) { |
| auto emplace_entry = presentations_by_id_.emplace( |
| presentation_id, |
| ControlledPresentation{ |
| instance_name, connection->presentation_info().url, {}}); |
| presentation_entry = emplace_entry.first; |
| } |
| ControlledPresentation& presentation = presentation_entry->second; |
| presentation.connections.push_back(connection.get()); |
| AddConnection(connection.get()); |
| |
| auto terminate_entry = termination_listener_by_id_.find(presentation_id); |
| if (terminate_entry == termination_listener_by_id_.end()) { |
| termination_listener_by_id_.emplace( |
| presentation_id, std::make_unique<TerminationListener>( |
| this, presentation_id, instance_id)); |
| } |
| request_delegate->OnConnection(std::move(connection)); |
| } |
| |
| void Controller::TerminatePresentationById(const std::string& presentation_id) { |
| auto presentation_entry = presentations_by_id_.find(presentation_id); |
| if (presentation_entry != presentations_by_id_.end()) { |
| for (auto* connection : presentation_entry->second.connections) { |
| connection->OnTerminated(); |
| } |
| presentations_by_id_.erase(presentation_entry); |
| } |
| } |
| |
| void Controller::CancelReceiverWatch(const std::vector<std::string>& urls, |
| ReceiverObserver* observer) { |
| availability_requester_->RemoveObserverUrls(urls, observer); |
| } |
| |
| void Controller::CancelConnectRequest(const std::string& instance_name, |
| bool is_reconnect, |
| uint64_t request_id) { |
| auto group_streams_entry = |
| group_streams_by_instance_name_.find(instance_name); |
| if (group_streams_entry == group_streams_by_instance_name_.end()) { |
| return; |
| } |
| |
| if (is_reconnect) { |
| group_streams_entry->second->CancelConnectionOpenRequest(request_id); |
| } else { |
| group_streams_entry->second->CancelStartRequest(request_id); |
| } |
| } |
| |
| } // namespace openscreen::osp |