长安网站建设哪家好百度投诉中心24人工
P2P从宏观原理上其实就是:
- 收集本地Candidates
- 设置远程Candidates
- 连通性测试及排序
本文我们从Offer端的角度进行源码分析,学习webrtc是如何进行P2P连接的。版本m98
。
一、收集本地Candidates
-
examples/peerconnection中,
CreateOffer
以后,会调用PeerConnection::SetLocalDescription
,继而调用到SdpOfferAnswerHandler::SetLocalDescription
>SdpOfferAnswerHandler::DoSetLocalDescription
>JsepTransportController::MaybeStartGathering
>P2PTransportChannel::MaybeStartGathering
。
void P2PTransportChannel::MaybeStartGathering() {RTC_DCHECK_RUN_ON(network_thread_);if (ice_parameters_.ufrag.empty() || ice_parameters_.pwd.empty()) {RTC_LOG(LS_ERROR)<< "Cannot gather candidates because ICE parameters are empty"" ufrag: "<< ice_parameters_.ufrag << " pwd: " << ice_parameters_.pwd;return;}// Start gathering if we never started before, or if an ICE restart occurred.//正常第一次收集Candidates的话,allocator_sessions_应该是空的if (allocator_sessions_.empty() ||IceCredentialsChanged(allocator_sessions_.back()->ice_ufrag(),allocator_sessions_.back()->ice_pwd(),ice_parameters_.ufrag, ice_parameters_.pwd)) {if (gathering_state_ != kIceGatheringGathering) {gathering_state_ = kIceGatheringGathering; //gathering_state_设置为正在kIceGatheringGatheringSignalGatheringState(this);}if (!allocator_sessions_.empty()) { //更新ICE状态,第一次收集应该是空IceRestartState state;if (writable()) {state = IceRestartState::CONNECTED;} else if (IsGettingPorts()) {state = IceRestartState::CONNECTING;} else {state = IceRestartState::DISCONNECTED;}RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IceRestartState",static_cast<int>(state),static_cast<int>(IceRestartState::MAX_VALUE));}for (const auto& session : allocator_sessions_) {if (session->IsStopped()) {continue;}session->StopGettingPorts();}// Time for a new allocator.//从当前的Session中寻找对应的ice,如果是第一次收集,则pooled_session为空。std::unique_ptr<PortAllocatorSession> pooled_session =allocator_->TakePooledSession(transport_name(), component(),ice_parameters_.ufrag,ice_parameters_.pwd);if (pooled_session) {AddAllocatorSession(std::move(pooled_session));PortAllocatorSession* raw_pooled_session =allocator_sessions_.back().get();// Process the pooled session's existing candidates/ports, if they exist.OnCandidatesReady(raw_pooled_session,raw_pooled_session->ReadyCandidates());for (PortInterface* port : allocator_sessions_.back()->ReadyPorts()) {OnPortReady(raw_pooled_session, port);}if (allocator_sessions_.back()->CandidatesAllocationDone()) {OnCandidatesAllocationDone(raw_pooled_session);}} else {//创建Session并添加到allocator_sessions_AddAllocatorSession(allocator_->CreateSession(transport_name(), component(), ice_parameters_.ufrag,ice_parameters_.pwd));allocator_sessions_.back()->StartGettingPorts();}}
}
- 接下来做了一系列调用,直到真正的
DoAllocate
:BasicPortAllocatorSession::StartGettingPorts
>BasicPortAllocatorSession::GetPortConfigurations
>BasicPortAllocatorSession::ConfigReady
>BasicPortAllocatorSession::OnConfigReady
>BasicPortAllocatorSession::AllocatePorts
>BasicPortAllocatorSession::OnAllocate
>BasicPortAllocatorSession::DoAllocate
void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) {RTC_DCHECK_RUN_ON(network_thread_);bool done_signal_needed = false;std::vector<rtc::Network*> networks = GetNetworks(); //获取到所有的网络设备if (networks.empty()) {RTC_LOG(LS_WARNING)<< "Machine has no networks; no ports will be allocated";done_signal_needed = true;} else {RTC_LOG(LS_INFO) << "Allocate ports on " << networks.size() << " networks";PortConfiguration* config =configs_.empty() ? nullptr : configs_.back().get();for (uint32_t i = 0; i < networks.size(); ++i) { //遍历所有网络设备进行端口分配uint32_t sequence_flags = flags();if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {// If all the ports are disabled we should just fire the allocation// done event and return.done_signal_needed = true;break;}if (!config || config->relays.empty()) {// ice config里没有relay服务sequence_flags |= PORTALLOCATOR_DISABLE_RELAY;}//跳过IPV6相关配置if (!(sequence_flags & PORTALLOCATOR_ENABLE_IPV6) &&networks[i]->GetBestIP().family() == AF_INET6) {// Skip IPv6 networks unless the flag's been set.continue;}if (!(sequence_flags & PORTALLOCATOR_ENABLE_IPV6_ON_WIFI) &&networks[i]->GetBestIP().family() == AF_INET6 &&networks[i]->type() == rtc::ADAPTER_TYPE_WIFI) {// Skip IPv6 Wi-Fi networks unless the flag's been set.continue;}if (disable_equivalent) {// Disable phases that would only create ports equivalent to// ones that we have already made.DisableEquivalentPhases(networks[i], config, &sequence_flags);if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {// New AllocationSequence would have nothing to do, so don't make it.continue;}}AllocationSequence* sequence =new AllocationSequence(this, networks[i], config, sequence_flags,[this, safety_flag = network_safety_.flag()] {if (safety_flag->alive())OnPortAllocationComplete();});sequence->Init(); //初始化AllocationSequencesequence->Start(); //开始分配端口sequences_.push_back(sequence);done_signal_needed = true;}}if (done_signal_needed) {network_thread_->PostTask(webrtc::ToQueuedTask(network_safety_, [this] { OnAllocationSequenceObjectsCreated(); }));}
}
- AllocationSequence::Start之后会循环执行AllocationSequence::Process,直到state() != kRunning,而在进行TCP端口分配以后state_ = kCompleted,也就是说如果不出错的情况下,AllocationSequence::Process会一直循环,直到TCP端口分配完成。
void AllocationSequence::Process(int epoch) {RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};if (epoch != epoch_)return;// Perform all of the phases in the current step.RTC_LOG(LS_INFO) << network_->ToString()<< ": Allocation Phase=" << PHASE_NAMES[phase_];switch (phase_) {case PHASE_UDP:CreateUDPPorts(); //分配UDP端口CreateStunPorts(); //分配Stun端口break;case PHASE_RELAY:CreateRelayPorts(); //分配Relay端口break;case PHASE_TCP:CreateTCPPorts();state_ = kCompleted; //state设置为完成,可以跳出Process循环了break;default:RTC_DCHECK_NOTREACHED();}if (state() == kRunning) { //在状态为kRunning时,一直循环执行Process++phase_; //执行完一种类型的端口分配后,执行另外一种,顺序为 PHASE_UDP > PHASE_RELAY > PHASE_TCPsession_->network_thread()->PostDelayedTask(webrtc::ToQueuedTask(safety_,[this, epoch = epoch_] { Process(epoch); }),session_->allocator()->step_delay());} else {// No allocation steps needed further if all phases in AllocationSequence// are completed. Cause further Process calls in the previous epoch to be// ignored.++epoch_;port_allocation_complete_callback_(); //分配完成,回调}
}
1、PHASE_UDP
在UDP阶段,会收集两种candidate:host
和srflx
。对应函数CreateUDPPorts
和CreateStunPorts
。
CreateUDPPorts
比较简单,就是从可用端口中选择一个端口创建udp socket,并通过AddAllocatedPort
函数添加到session中。
AddAllocatedPort
具体流程为:BasicPortAllocatorSession::AddAllocatedPort
>UDPPort::PrepareAddress
>UDPPort::OnLocalAddressReady
>Port::AddAddress
>Port::FinishAddingAddress
>SignalCandidateReady
srflx
使用的是STUN
协议,也就是通过NAT
穿透的方式传输数据。对应webrtc源码中,如果srflx
与host
端口复用的话,在UDPPort::OnLocalAddressReady
阶段就会进行srflx
收集,默认是复用的。srflx
的原理是:client向stun server
发送一包数据,stun server会把client对应的外网地址和端口返回给client,这样client就获取到了自己的外网地址和端口,以及与之对应的内网端口。- 发送流程:
UDPPort::OnLocalAddressReady
>UDPPort::MaybePrepareStunCandidate
>UDPPort::SendStunBindingRequests
>UDPPort::SendStunBindingRequest
>StunRequestManager::Send
>StunRequestManager::SendDelayed
>StunRequest::OnMessage
>UDPPort::OnSendPacket
>AsyncUDPSocket::SendTo
>PhysicalSocket::SendTo
- 接收流程:
PhysicalSocketServer::WaitSelect
>ProcessEvents
>SocketDispatcher::OnEvent
>SignalReadEvent
>AsyncUDPSocket::OnReadEvent
>SignalReadPacket
>AllocationSequence::OnReadPacket
>UDPPort::HandleIncomingPacket
>UDPPort::OnReadPacket
>StunRequestManager::CheckResponse
>StunBindingRequest::OnResponse
>UDPPort::OnStunBindingRequestSucceeded
>Port::AddAddress
>Port::FinishAddingAddress
>SignalCandidateReady
2、PHASE_RELAY
在RELAY
阶段,会收集relay
类型的candidate
,也就是在NAT
打洞不通的情况下,需要用中继的方式传输数据,使用的是TURN
协议,TURN
协议是STUN
协议的扩展,所以在流程上与STUN
类似。
relay
类型的candidate
,默认也是与udp端口复用的,TURN 协议的具体流程如下:
- 客户端发送 Allocate request 到 server,server 返回 401 未授权错误(带有 realm 和 nonce),客户端再发送带上认证信息的 Allocate request,server 返回成功分配的 relay address。分配成功后,客户端需要通过发送机制(Send Mechanism)或信道机制(Channels)在 server 上配置和其他 peer 的转发信息。此外 allocation 和 channel 都需要保活。
- 发送流程:
AllocationSequence::CreateRelayPorts
>AllocationSequence::CreateTurnPort
>BasicPortAllocatorSession::AddAllocatedPort
>TurnPort::PrepareAddress
>TurnPort::SendRequest
>StunRequestManager::Send
>StunRequestManager::Send
>StunRequestManager::SendDelayed
>StunRequest::OnMessage
>UDPPort::OnSendPacket
>AsyncUDPSocket::SendTo
>PhysicalSocket::SendTo
- 接收流程1:
PhysicalSocketServer::WaitSelect
>ProcessEvents
>SocketDispatcher::OnEvent
>SignalReadEvent
>AsyncUDPSocket::OnReadEvent
>SignalReadPacket
>AllocationSequence::OnReadPacket
>UDPPort::HandleIncomingPacket
>UDPPort::OnReadPacket
>StunRequestManager::CheckResponse
>TurnAllocateRequest::OnErrorResponse
然后发送认证信息 - 接收流程2:
PhysicalSocketServer::WaitSelect
>ProcessEvents
>SocketDispatcher::OnEvent
>SignalReadEvent
>AsyncUDPSocket::OnReadEvent
>SignalReadPacket
>AllocationSequence::OnReadPacket
>UDPPort::HandleIncomingPacket
>UDPPort::OnReadPacket
>StunRequestManager::CheckResponse
>TurnAllocateRequest::OnResponse
>TurnPort::OnAllocateSuccess
>Port::AddAddress
>Port::FinishAddingAddress
>SignalCandidateReady
3、PHASE_TCP
基本不使用
4、SignalCandidateReady
收集完成Candidate
会发出SignalCandidateReady
信号,进而触发BasicPortAllocatorSession::OnCandidateReady
。在BasicPortAllocatorSession::OnCandidateReady
中会发出两个重要的信号SignalPortReady
和 SignalCandidatesReady
。
- 其中
SignalPortReady
会触发P2PTransportChannel::OnPortReady
,将收集到的candidate
对应的端口保存下来,待远程candidate
收到以后就尝试ICE连接。 SignalCandidatesReady
会触发P2PTransportChannel::OnCandidatesReady
>JsepTransportController::OnTransportCandidateGathered_n
>PeerConnection::OnTransportControllerCandidatesGathered
>PeerConnection::OnIceCandidate
>Observer()->OnIceCandidate
这个observer
就是examples/peerconnection的Conductor
。按这个流程就可以把收集到的candidate
通知到上层应用了。
二、 设置远程Candidates
有两种方式设置远程Candidates
,一种是通过解析远程的sdp
获取到Candidate
,然后调用PeerConnection::AddRemoteCandidate
来设置,另一种是通过信令服务器接收到对端发送过来的Candidate
后,直接调用PeerConnection::AddIceCandidate
来设置。
- SDP解析设置Candidate的流程:
PeerConnection::SetRemoteDescription
>SdpOfferAnswerHandler::SetRemoteDescription
>SdpOfferAnswerHandler::DoSetRemoteDescription
>SdpOfferAnswerHandler::ApplyRemoteDescription
>SdpOfferAnswerHandler::UseCandidatesInSessionDescription
>SdpOfferAnswerHandler::UseCandidate
- 直接通过
PeerConnection::AddIceCandidate
设置Candidate
的流程:
PeerConnection::AddIceCandidate
>SdpOfferAnswerHandler::AddIceCandidate
>SdpOfferAnswerHandler::AddIceCandidateInternal
>SdpOfferAnswerHandler::UseCandidate
- 共同部分
SdpOfferAnswerHandler::UseCandidate
>PeerConnection::AddRemoteCandidate
>JsepTransportController::AddRemoteCandidates
>JsepTransport::AddRemoteCandidates
>P2PTransportChannel::AddRemoteCandidate
>P2PTransportChannel::FinishAddingRemoteCandidate
三、连通性测试及排序
在收集到本地Candidates及远程Candidates后,会调用P2PTransportChannel::FinishAddingRemoteCandidate
函数,在这个函数里主要做了两件事,一是创建连接(P2PTransportChannel::CreateConnections
),也就是创建每个本地candidate与远程candidate的连接;另一件事是对创建的连接进行排序和连通性测试(P2PTransportChannel::SortConnectionsAndUpdateState
)。
- 创建连接
P2PTransportChannel::CreateConnections
>P2PTransportChannel::CreateConnection
>UDPPort::CreateConnection
- 连通性测试和排序
void P2PTransportChannel::SortConnectionsAndUpdateState(IceControllerEvent reason_to_sort) {RTC_DCHECK_RUN_ON(network_thread_);// Make sure the connection states are up-to-date since this affects how they// will be sorted.UpdateConnectionStates(); //根据ping的结果更新connection的读写状态,即连接状态// Any changes after this point will require a re-sort.sort_dirty_ = false;// If necessary, switch to the new choice. Note that `top_connection` doesn't// have to be writable to become the selected connection although it will// have higher priority if it is writable.//MaybeSwitchSelectedConnection 会循环执行,对连接进行排序MaybeSwitchSelectedConnection(reason_to_sort, ice_controller_->SortAndSwitchConnection(reason_to_sort));// The controlled side can prune only if the selected connection has been// nominated because otherwise it may prune the connection that will be// selected by the controlling side.// TODO(honghaiz): This is not enough to prevent a connection from being// pruned too early because with aggressive nomination, the controlling side// will nominate every connection until it becomes writable.if (ice_role_ == ICEROLE_CONTROLLING ||(selected_connection_ && selected_connection_->nominated())) { //如果已经有了选中的连接,而且连接对应的网络设备已经是最优了,就把网络设备对应的其他连接清理掉PruneConnections();}// Check if all connections are timedout.bool all_connections_timedout = true;for (const Connection* conn : connections()) {//剩余的连接是否都已经超时了if (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) {all_connections_timedout = false;break;}}// Now update the writable state of the channel with the information we have// so far.if (all_connections_timedout) { HandleAllTimedOut(); //如果剩余的连接都已经超时了,就清理掉}// Update the state of this channel.UpdateState(); //更新channel state,通知上层是否可以连接等状态// Also possibly start pinging.// We could start pinging if:// * The first connection was created.// * ICE credentials were provided.// * A TCP connection became connected.MaybeStartPinging(); //连通性测试
}
1、连通性测试
-
发送流程
P2PTransportChannel::MaybeStartPinging
>P2PTransportChannel::CheckAndPing
>P2PTransportChannel::PingConnection
>Connection::Ping
>StunRequestManager::Send
>StunRequestManager::Send
>StunRequestManager::SendDelayed
>StunRequest::OnMessage
>UDPPort::OnSendPacket
>AsyncUDPSocket::SendTo
>PhysicalSocket::SendTo
-
接收流程
PhysicalSocketServer::WaitSelect
>ProcessEvents
>SocketDispatcher::OnEvent
>SignalReadEvent
>AsyncUDPSocket::OnReadEvent
>SignalReadPacket
>AllocationSequence::OnReadPacket
>UDPPort::HandleIncomingPacket
>UDPPort::OnReadPacket
>StunRequestManager::CheckResponse
>ConnectionRequest::OnResponse
>Connection::OnConnectionRequestResponse
>Connection::ReceivedPingResponse
-
在
Connection::ReceivedPingResponse
中会更新Connection的读写状态及rtt。供后续connection排序和选择使用。
2、排序和选择
2.1 排序
- 首先是通过
BasicIceController::CompareConnectionStates
函数进行排序:
如果a writable b不是则a排在前面,反之b排前面;
如果a b writable是一样的,则比较a b 的write_state,小的排前面;
如果a b 的write_state也一样,则比较receiving,逻辑上与write一样;
如果a b的receiving状态也一样,且a b都是STATE_WRITABLE,则比较connected,connected的排前面; - 如果
BasicIceController::CompareConnectionStates
比较不出结果,则通过BasicIceController::CompareConnectionCandidates
比较。 - 如果还是比较不出结果,则通过
rtt
来比较。
2.2选择
- 首先必须是可以发送的,才可能被选择;
- 如果当前没有被选择的连接,且新连接是可发送的,就选择新连接;
- 如果当前已经有了选择的连接,则通过network cost来比较,如果新连接不如已经选择的,则维持不变
- 否则通过排序原则进行比较,如果排序结果新连接不如已经选择的,则维持不变,如果新连接比已经选择的优先级高,则选择新连接。
- 如果排序的结果是新连接与已经选择的优先级相同,就比较
rtt
,如果新连接的rtt
小于已经连接的rtt
减去一个阈值
,则选择新连接,否则维持不变。
当ICE连通性和排序完成后,就可以进行正常的连接使用了。