Files
webRtc/Packages/com.unity.renderstreaming@3.1.0-exp.9/Runtime/Scripts/Signaling/WebSocketSignaling.cs
2026-05-12 23:04:08 +08:00

455 lines
20 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Authentication;
using System.Text;
using System.Threading;
using Unity.WebRTC;
using UnityEngine;
using WebSocketSharp;
namespace Unity.RenderStreaming.Signaling
{
public class WebSocketSignaling : ISignaling
{
private static HashSet<WebSocketSignaling> instances = new HashSet<WebSocketSignaling>();
private readonly string m_url;
private readonly float m_timeout;
private readonly SynchronizationContext m_mainThreadContext;
private bool m_running;
private Thread m_signalingThread;
private readonly AutoResetEvent m_wsCloseEvent;
private WebSocket m_webSocket;
public string participantId;
public string Url { get { return m_url; } }
/// <summary>
/// 当前客户端是否为Host角色由服务器在connect响应中分配
/// </summary>
public bool isHost { get; private set; }
/// <summary>
/// Host连接时使用的房间connectionId
/// </summary>
private string m_roomConnectionId;
/// <summary>
/// 参与者连接ID到房间connectionId的映射
/// Key: participantId, Value: roomConnectionId
/// 用于Host向特定Participant发送消息时将内部connectionId还原为服务器格式
/// </summary>
private Dictionary<string, string> m_participantToRoom = new Dictionary<string, string>();
public WebSocketSignaling(SignalingSettings signalingSettings, SynchronizationContext mainThreadContext)
{
if (signalingSettings == null)
throw new ArgumentNullException(nameof(signalingSettings));
if (!(signalingSettings is WebSocketSignalingSettings settings))
throw new ArgumentException("signalingSettings is not WebSocketSignalingSettings");
m_url = settings.url;
m_timeout = 5.0f;
m_mainThreadContext = mainThreadContext;
m_wsCloseEvent = new AutoResetEvent(false);
if (instances.Any(x => x.Url == m_url))
{
RenderStreaming.Logger.Log(LogType.Warning, $"Other {nameof(WebSocketSignaling)} exists with same URL:{m_url}. Signaling process may be in conflict.");
}
instances.Add(this);
}
~WebSocketSignaling()
{
if (m_running)
Stop();
instances.Remove(this);
}
public void Start()
{
if (m_running)
throw new InvalidOperationException("This object is already started.");
m_running = true;
m_signalingThread = new Thread(WSManage);
m_signalingThread.Start();
}
public void Stop()
{
if (m_running)
{
m_running = false;
m_webSocket?.Close();
if (m_signalingThread.ThreadState == ThreadState.WaitSleepJoin)
{
m_signalingThread.Abort();
}
else
{
m_signalingThread.Join(1000);
}
m_signalingThread = null;
}
}
public event OnStartHandler OnStart;
public event OnConnectHandler OnCreateConnection;
public event OnDisconnectHandler OnDestroyConnection;
public event OnOfferHandler OnOffer;
#pragma warning disable 0067
// this event is never used in this class
public event OnAnswerHandler OnAnswer;
#pragma warning restore 0067
public event OnIceCandidateHandler OnIceCandidate;
public event OnParticipantJoinedHandler OnParticipantJoined;
public event OnParticipantLeftHandler OnParticipantLeft;
public event OnCallRequestHandler OnCallRequest;
public event OnMessageHandler OnMessage;
/// <summary>
/// 判断一个connectionId是否为参与者连接ID而非房间connectionId
/// </summary>
private bool IsParticipantConnectionId(string connectionId)
{
return m_participantToRoom.ContainsKey(connectionId);
}
/// <summary>
/// 获取参与者对应的房间connectionId
/// </summary>
private string GetRoomConnectionId(string participantConnectionId)
{
return m_participantToRoom.TryGetValue(participantConnectionId, out var roomId) ? roomId : participantConnectionId;
}
public void SendOffer(string connectionId, RTCSessionDescription offer)
{
DescData data = new DescData();
// data.connectionId必须使用房间connectionId服务器用此查找连接组进行路由
// 而不是内部participantIdparticipantId仅通过routedMessage.participantId传递用于目标选择
data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId;
data.sdp = offer.sdp;
data.dateTime = DateTime.Now;
RoutedMessage<DescData> routedMessage = new RoutedMessage<DescData>();
// 如果connectionId是参与者ID使用房间connectionId作为from并设置目标participantId
if (IsParticipantConnectionId(connectionId))
{
routedMessage.from = GetRoomConnectionId(connectionId);
routedMessage.participantId = connectionId;
}
else
{
routedMessage.from = connectionId;
routedMessage.participantId = participantId;
}
routedMessage.data = data;
routedMessage.type = "offer";
WSSend(routedMessage);
}
public void SendAnswer(string connectionId, RTCSessionDescription answer)
{
DescData data = new DescData();
// data.connectionId必须使用房间connectionId服务器用此查找连接组进行路由
data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId;
data.sdp = answer.sdp;
RoutedMessage<DescData> routedMessage = new RoutedMessage<DescData>();
// 如果connectionId是参与者ID使用房间connectionId作为from并设置目标participantId
if (IsParticipantConnectionId(connectionId))
{
routedMessage.from = GetRoomConnectionId(connectionId);
routedMessage.participantId = connectionId;
}
else
{
routedMessage.from = connectionId;
routedMessage.participantId = participantId;
}
routedMessage.data = data;
routedMessage.type = "answer";
WSSend(routedMessage);
}
public void SendCandidate(string connectionId, RTCIceCandidate candidate)
{
CandidateData data = new CandidateData();
// data.connectionId必须使用房间connectionId服务器用此查找连接组进行路由
data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId;
data.candidate = candidate.Candidate;
data.sdpMLineIndex = candidate.SdpMLineIndex.GetValueOrDefault(0);
data.sdpMid = candidate.SdpMid;
RoutedMessage<CandidateData> routedMessage = new RoutedMessage<CandidateData>();
// 如果connectionId是参与者ID使用房间connectionId作为from并设置目标participantId
if (IsParticipantConnectionId(connectionId))
{
routedMessage.from = GetRoomConnectionId(connectionId);
routedMessage.participantId = connectionId;
}
else
{
routedMessage.from = connectionId;
}
routedMessage.data = data;
routedMessage.type = "candidate";
WSSend(routedMessage);
}
public void OpenConnection(string connectionId)
{
m_roomConnectionId = connectionId;
this.WSSend($"{{\"type\":\"connect\", \"connectionId\":\"{connectionId}\"}}");
}
public void CloseConnection(string connectionId)
{
// 如果关闭的是参与者连接使用房间connectionId发送disconnect
string actualConnectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId;
m_participantToRoom.Remove(connectionId);
this.WSSend($"{{\"type\":\"disconnect\", \"connectionId\":\"{actualConnectionId}\"}}");
}
private void WSManage()
{
while (m_running)
{
WSCreate();
try
{
m_wsCloseEvent.WaitOne();
Thread.Sleep((int)(m_timeout * 1000));
}
catch (ThreadAbortException)
{
// Thread.Abort() called from main thread. Ignore
return;
}
}
RenderStreaming.Logger.Log("Signaling: WS managing thread ended");
}
private void WSCreate()
{
m_webSocket = new WebSocket(m_url);
if (m_url.StartsWith("wss"))
{
m_webSocket.SslConfiguration.EnabledSslProtocols =
SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12;
}
m_webSocket.OnOpen += WSConnected;
m_webSocket.OnMessage += WSProcessMessage;
m_webSocket.OnError += WSError;
m_webSocket.OnClose += WSClosed;
Monitor.Enter(m_webSocket);
RenderStreaming.Logger.Log($"Signaling: Connecting WS {m_url}");
m_webSocket.ConnectAsync();
}
private void WSProcessMessage(object sender, MessageEventArgs e)
{
var content = Encoding.UTF8.GetString(e.RawData);
RenderStreaming.Logger.Log($"Signaling: Receiving message: {content}");
try
{
var routedMessage = JsonUtility.FromJson<RoutedMessage<SignalingMessage>>(content);
SignalingMessage msg;
if (!string.IsNullOrEmpty(routedMessage.type))
{
msg = routedMessage.data;
}
else
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
}
if (!string.IsNullOrEmpty(routedMessage.type))
{
if (routedMessage.type == "connect")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
isHost = msg.role == "host";
participantId = msg.participantId;
m_mainThreadContext.Post(d => OnCreateConnection?.Invoke(this, msg.connectionId, msg.polite), null);
}
else if (routedMessage.type == "disconnect")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
string disconnectConnectionId = msg.connectionId;
// 如果断开的是参与者连接用participantId查找内部connectionId
if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost)
{
disconnectConnectionId = routedMessage.participantId;
m_participantToRoom.Remove(routedMessage.participantId);
}
m_mainThreadContext.Post(d => OnDestroyConnection?.Invoke(this, disconnectConnectionId), null);
}
else if (routedMessage.type == "offer")
{
DescData offer = new DescData();
// Host收到带participantId的offer时使用participantId作为内部connectionId
// 这样每个participant会创建独立的PeerConnection
if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost)
{
offer.connectionId = routedMessage.participantId;
m_participantToRoom[routedMessage.participantId] = routedMessage.from ?? m_roomConnectionId;
}
else
{
offer.connectionId = routedMessage.from;
}
offer.sdp = msg.sdp;
offer.polite = msg.polite;
offer.participantId = routedMessage.participantId;
m_mainThreadContext.Post(d => OnOffer?.Invoke(this, offer), null);
}
else if (routedMessage.type == "answer")
{
DescData answer = new DescData
{
// Host收到带participantId的answer时使用participantId作为内部connectionId
connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost
? routedMessage.participantId
: routedMessage.from,
sdp = msg.sdp,
participantId = routedMessage.participantId
};
m_mainThreadContext.Post(d => OnAnswer?.Invoke(this, answer), null);
}
else if (routedMessage.type == "candidate")
{
CandidateData candidate = new CandidateData
{
// Host收到带participantId的candidate时使用participantId作为内部connectionId
connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost
? routedMessage.participantId
: routedMessage.@from,
candidate = msg.candidate,
sdpMLineIndex = msg.sdpMLineIndex,
sdpMid = msg.sdpMid,
participantId = routedMessage.participantId
};
m_mainThreadContext.Post(d => OnIceCandidate?.Invoke(this, candidate), null);
}
else if (routedMessage.type == "participant-joined")
{
// Host收到新参与者加入通知
msg = JsonUtility.FromJson<SignalingMessage>(content);
ParticipantEventData participantData = new ParticipantEventData
{
connectionId = msg.connectionId ?? routedMessage.from,
participantId = msg.participantId ?? routedMessage.participantId
};
// 记录参与者映射
if (!string.IsNullOrEmpty(participantData.participantId))
{
m_participantToRoom[participantData.participantId] = participantData.connectionId;
}
RenderStreaming.Logger.Log($"Signaling: Participant joined - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}");
m_mainThreadContext.Post(d => OnParticipantJoined?.Invoke(this, participantData), null);
}
else if (routedMessage.type == "participant-left")
{
// Host/Participant收到参与者离开通知
msg = JsonUtility.FromJson<SignalingMessage>(content);
ParticipantEventData participantData = new ParticipantEventData
{
connectionId = msg.connectionId ?? routedMessage.from,
participantId = msg.participantId ?? routedMessage.participantId
};
m_participantToRoom.Remove(participantData.participantId);
RenderStreaming.Logger.Log($"Signaling: Participant left - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}");
m_mainThreadContext.Post(d => OnParticipantLeft?.Invoke(this, participantData), null);
}
else if (routedMessage.type == "call-request")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
CallRequestData callData = new CallRequestData
{
connectionId = routedMessage.from,
data = msg.data
};
m_mainThreadContext.Post(d => OnCallRequest?.Invoke(this, callData), null);
}
else if (routedMessage.type == "on-message")
{
var message = JsonUtility.FromJson<SignalingMessage>(content);
OnMessageData messageData = new OnMessageData
{
connectionId = routedMessage.from,
participantId = routedMessage.participantId,
message = message.data
};
m_mainThreadContext.Post(d => OnMessage?.Invoke(this, messageData), null);
}
else if (routedMessage.type == "error")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
RenderStreaming.Logger.Log(LogType.Error, msg.message);
}
}
}
catch (Exception ex)
{
RenderStreaming.Logger.Log(LogType.Error, "Signaling: Failed to parse message: " + ex);
}
}
private void WSConnected(object sender, EventArgs e)
{
RenderStreaming.Logger.Log("Signaling: WS connected.");
m_mainThreadContext.Post(d => OnStart?.Invoke(this), null);
}
private void WSError(object sender, ErrorEventArgs e)
{
RenderStreaming.Logger.Log(LogType.Error, $"Signaling: WS connection error: {e.Message}");
}
private void WSClosed(object sender, CloseEventArgs e)
{
RenderStreaming.Logger.Log($"Signaling: WS connection closed, code: {e.Code}");
m_wsCloseEvent.Set();
m_webSocket = null;
}
private void WSSend(object data)
{
if (m_webSocket == null || m_webSocket.ReadyState != WebSocketState.Open)
{
RenderStreaming.Logger.Log(LogType.Error, "Signaling: WS is not connected. Unable to send message");
return;
}
if (data is string s)
{
RenderStreaming.Logger.Log("Signaling: Sending WS data: " + s);
m_webSocket.Send(s);
}
else
{
string str = JsonUtility.ToJson(data);
RenderStreaming.Logger.Log("Signaling: Sending WS data: " + str);
m_webSocket.Send(str);
}
}
}
}