using System; using System.Collections.Generic; using System.Linq; using System.Security.Authentication; using System.Text; using System.Threading; using Unity.WebRTC; using UnityEngine; using WebSocketSharp; namespace Unity.RenderStreaming.Signaling { public class WebSocketSignaling : ISignaling { private static HashSet instances = new HashSet(); private readonly string m_url; private readonly float m_timeout; private readonly SynchronizationContext m_mainThreadContext; private bool m_running; private Thread m_signalingThread; private readonly AutoResetEvent m_wsCloseEvent; private WebSocket m_webSocket; public string participantId; public string Url { get { return m_url; } } /// /// 当前客户端是否为Host角色(由服务器在connect响应中分配) /// public bool isHost { get; private set; } /// /// Host连接时使用的房间connectionId /// private string m_roomConnectionId; /// /// 参与者连接ID到房间connectionId的映射 /// Key: participantId, Value: roomConnectionId /// 用于Host向特定Participant发送消息时,将内部connectionId还原为服务器格式 /// private Dictionary m_participantToRoom = new Dictionary(); public WebSocketSignaling(SignalingSettings signalingSettings, SynchronizationContext mainThreadContext) { if (signalingSettings == null) throw new ArgumentNullException(nameof(signalingSettings)); if (!(signalingSettings is WebSocketSignalingSettings settings)) throw new ArgumentException("signalingSettings is not WebSocketSignalingSettings"); m_url = settings.url; m_timeout = 5.0f; m_mainThreadContext = mainThreadContext; m_wsCloseEvent = new AutoResetEvent(false); if (instances.Any(x => x.Url == m_url)) { RenderStreaming.Logger.Log(LogType.Warning, $"Other {nameof(WebSocketSignaling)} exists with same URL:{m_url}. Signaling process may be in conflict."); } instances.Add(this); } ~WebSocketSignaling() { if (m_running) Stop(); instances.Remove(this); } public void Start() { if (m_running) throw new InvalidOperationException("This object is already started."); m_running = true; m_signalingThread = new Thread(WSManage); m_signalingThread.Start(); } public void Stop() { if (m_running) { m_running = false; m_webSocket?.Close(); if (m_signalingThread.ThreadState == ThreadState.WaitSleepJoin) { m_signalingThread.Abort(); } else { m_signalingThread.Join(1000); } m_signalingThread = null; } } public event OnStartHandler OnStart; public event OnConnectHandler OnCreateConnection; public event OnDisconnectHandler OnDestroyConnection; public event OnOfferHandler OnOffer; #pragma warning disable 0067 // this event is never used in this class public event OnAnswerHandler OnAnswer; #pragma warning restore 0067 public event OnIceCandidateHandler OnIceCandidate; public event OnParticipantJoinedHandler OnParticipantJoined; public event OnParticipantLeftHandler OnParticipantLeft; public event OnCallRequestHandler OnCallRequest; public event OnMessageHandler OnMessage; /// /// 判断一个connectionId是否为参与者连接ID(而非房间connectionId) /// private bool IsParticipantConnectionId(string connectionId) { return m_participantToRoom.ContainsKey(connectionId); } /// /// 获取参与者对应的房间connectionId /// private string GetRoomConnectionId(string participantConnectionId) { return m_participantToRoom.TryGetValue(participantConnectionId, out var roomId) ? roomId : participantConnectionId; } public void SendOffer(string connectionId, RTCSessionDescription offer) { DescData data = new DescData(); // data.connectionId必须使用房间connectionId(服务器用此查找连接组进行路由) // 而不是内部participantId(participantId仅通过routedMessage.participantId传递用于目标选择) data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId; data.sdp = offer.sdp; data.dateTime = DateTime.Now; RoutedMessage routedMessage = new RoutedMessage(); // 如果connectionId是参与者ID,使用房间connectionId作为from,并设置目标participantId if (IsParticipantConnectionId(connectionId)) { routedMessage.from = GetRoomConnectionId(connectionId); routedMessage.participantId = connectionId; } else { routedMessage.from = connectionId; routedMessage.participantId = participantId; } routedMessage.data = data; routedMessage.type = "offer"; WSSend(routedMessage); } public void SendAnswer(string connectionId, RTCSessionDescription answer) { DescData data = new DescData(); // data.connectionId必须使用房间connectionId(服务器用此查找连接组进行路由) data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId; data.sdp = answer.sdp; RoutedMessage routedMessage = new RoutedMessage(); // 如果connectionId是参与者ID,使用房间connectionId作为from,并设置目标participantId if (IsParticipantConnectionId(connectionId)) { routedMessage.from = GetRoomConnectionId(connectionId); routedMessage.participantId = connectionId; } else { routedMessage.from = connectionId; routedMessage.participantId = participantId; } routedMessage.data = data; routedMessage.type = "answer"; WSSend(routedMessage); } public void SendCandidate(string connectionId, RTCIceCandidate candidate) { CandidateData data = new CandidateData(); // data.connectionId必须使用房间connectionId(服务器用此查找连接组进行路由) data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId; data.candidate = candidate.Candidate; data.sdpMLineIndex = candidate.SdpMLineIndex.GetValueOrDefault(0); data.sdpMid = candidate.SdpMid; RoutedMessage routedMessage = new RoutedMessage(); // 如果connectionId是参与者ID,使用房间connectionId作为from,并设置目标participantId if (IsParticipantConnectionId(connectionId)) { routedMessage.from = GetRoomConnectionId(connectionId); routedMessage.participantId = connectionId; } else { routedMessage.from = connectionId; } routedMessage.data = data; routedMessage.type = "candidate"; WSSend(routedMessage); } public void OpenConnection(string connectionId) { m_roomConnectionId = connectionId; this.WSSend($"{{\"type\":\"connect\", \"connectionId\":\"{connectionId}\"}}"); } public void CloseConnection(string connectionId) { // 如果关闭的是参与者连接,使用房间connectionId发送disconnect string actualConnectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId; m_participantToRoom.Remove(connectionId); this.WSSend($"{{\"type\":\"disconnect\", \"connectionId\":\"{actualConnectionId}\"}}"); } private void WSManage() { while (m_running) { WSCreate(); try { m_wsCloseEvent.WaitOne(); Thread.Sleep((int)(m_timeout * 1000)); } catch (ThreadAbortException) { // Thread.Abort() called from main thread. Ignore return; } } RenderStreaming.Logger.Log("Signaling: WS managing thread ended"); } private void WSCreate() { m_webSocket = new WebSocket(m_url); if (m_url.StartsWith("wss")) { m_webSocket.SslConfiguration.EnabledSslProtocols = SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12; } m_webSocket.OnOpen += WSConnected; m_webSocket.OnMessage += WSProcessMessage; m_webSocket.OnError += WSError; m_webSocket.OnClose += WSClosed; Monitor.Enter(m_webSocket); RenderStreaming.Logger.Log($"Signaling: Connecting WS {m_url}"); m_webSocket.ConnectAsync(); } private void WSProcessMessage(object sender, MessageEventArgs e) { var content = Encoding.UTF8.GetString(e.RawData); RenderStreaming.Logger.Log($"Signaling: Receiving message: {content}"); try { var routedMessage = JsonUtility.FromJson>(content); SignalingMessage msg; if (!string.IsNullOrEmpty(routedMessage.type)) { msg = routedMessage.data; } else { msg = JsonUtility.FromJson(content); } if (!string.IsNullOrEmpty(routedMessage.type)) { if (routedMessage.type == "connect") { msg = JsonUtility.FromJson(content); isHost = msg.role == "host"; participantId = msg.participantId; m_mainThreadContext.Post(d => OnCreateConnection?.Invoke(this, msg.connectionId, msg.polite), null); } else if (routedMessage.type == "disconnect") { msg = JsonUtility.FromJson(content); string disconnectConnectionId = msg.connectionId; // 如果断开的是参与者连接,用participantId查找内部connectionId if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost) { disconnectConnectionId = routedMessage.participantId; m_participantToRoom.Remove(routedMessage.participantId); } m_mainThreadContext.Post(d => OnDestroyConnection?.Invoke(this, disconnectConnectionId), null); } else if (routedMessage.type == "offer") { DescData offer = new DescData(); // Host收到带participantId的offer时,使用participantId作为内部connectionId // 这样每个participant会创建独立的PeerConnection if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost) { offer.connectionId = routedMessage.participantId; m_participantToRoom[routedMessage.participantId] = routedMessage.from ?? m_roomConnectionId; } else { offer.connectionId = routedMessage.from; } offer.sdp = msg.sdp; offer.polite = msg.polite; offer.participantId = routedMessage.participantId; m_mainThreadContext.Post(d => OnOffer?.Invoke(this, offer), null); } else if (routedMessage.type == "answer") { DescData answer = new DescData { // Host收到带participantId的answer时,使用participantId作为内部connectionId connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost ? routedMessage.participantId : routedMessage.from, sdp = msg.sdp, participantId = routedMessage.participantId }; m_mainThreadContext.Post(d => OnAnswer?.Invoke(this, answer), null); } else if (routedMessage.type == "candidate") { CandidateData candidate = new CandidateData { // Host收到带participantId的candidate时,使用participantId作为内部connectionId connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost ? routedMessage.participantId : routedMessage.@from, candidate = msg.candidate, sdpMLineIndex = msg.sdpMLineIndex, sdpMid = msg.sdpMid, participantId = routedMessage.participantId }; m_mainThreadContext.Post(d => OnIceCandidate?.Invoke(this, candidate), null); } else if (routedMessage.type == "participant-joined") { // Host收到新参与者加入通知 msg = JsonUtility.FromJson(content); ParticipantEventData participantData = new ParticipantEventData { connectionId = msg.connectionId ?? routedMessage.from, participantId = msg.participantId ?? routedMessage.participantId }; // 记录参与者映射 if (!string.IsNullOrEmpty(participantData.participantId)) { m_participantToRoom[participantData.participantId] = participantData.connectionId; } RenderStreaming.Logger.Log($"Signaling: Participant joined - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}"); m_mainThreadContext.Post(d => OnParticipantJoined?.Invoke(this, participantData), null); } else if (routedMessage.type == "participant-left") { // Host/Participant收到参与者离开通知 msg = JsonUtility.FromJson(content); ParticipantEventData participantData = new ParticipantEventData { connectionId = msg.connectionId ?? routedMessage.from, participantId = msg.participantId ?? routedMessage.participantId }; m_participantToRoom.Remove(participantData.participantId); RenderStreaming.Logger.Log($"Signaling: Participant left - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}"); m_mainThreadContext.Post(d => OnParticipantLeft?.Invoke(this, participantData), null); } else if (routedMessage.type == "call-request") { msg = JsonUtility.FromJson(content); CallRequestData callData = new CallRequestData { connectionId = routedMessage.from, data = msg.data }; m_mainThreadContext.Post(d => OnCallRequest?.Invoke(this, callData), null); } else if (routedMessage.type == "on-message") { msg = JsonUtility.FromJson(content); OnMessageData messageData = new OnMessageData { connectionId = routedMessage.from, participantId = routedMessage.participantId, message = msg.message }; m_mainThreadContext.Post(d => OnMessage?.Invoke(this, messageData), null); } else if (routedMessage.type == "error") { msg = JsonUtility.FromJson(content); RenderStreaming.Logger.Log(LogType.Error, msg.message); } } } catch (Exception ex) { RenderStreaming.Logger.Log(LogType.Error, "Signaling: Failed to parse message: " + ex); } } private void WSConnected(object sender, EventArgs e) { RenderStreaming.Logger.Log("Signaling: WS connected."); m_mainThreadContext.Post(d => OnStart?.Invoke(this), null); } private void WSError(object sender, ErrorEventArgs e) { RenderStreaming.Logger.Log(LogType.Error, $"Signaling: WS connection error: {e.Message}"); } private void WSClosed(object sender, CloseEventArgs e) { RenderStreaming.Logger.Log($"Signaling: WS connection closed, code: {e.Code}"); m_wsCloseEvent.Set(); m_webSocket = null; } private void WSSend(object data) { if (m_webSocket == null || m_webSocket.ReadyState != WebSocketState.Open) { RenderStreaming.Logger.Log(LogType.Error, "Signaling: WS is not connected. Unable to send message"); return; } if (data is string s) { RenderStreaming.Logger.Log("Signaling: Sending WS data: " + s); m_webSocket.Send(s); } else { string str = JsonUtility.ToJson(data); RenderStreaming.Logger.Log("Signaling: Sending WS data: " + str); m_webSocket.Send(str); } } } }