using System; using System.Collections.Generic; using System.Linq; using System.Security.Authentication; using System.Text; using System.Threading; using Newtonsoft.Json; using Unity.WebRTC; using UnityEngine; using WebSocketSharp; namespace Unity.RenderStreaming.Signaling { public class WebSocketSignaling : ISignaling { private static readonly HashSet instances = new(); private readonly SynchronizationContext m_mainThreadContext; /// /// 参与者连接ID到房间connectionId的映射 /// Key: participantId, Value: roomConnectionId /// 用于Host向特定Participant发送消息时,将内部connectionId还原为服务器格式 /// private readonly Dictionary m_participantToRoom = new(); private readonly float m_timeout; private readonly AutoResetEvent m_wsCloseEvent; /// /// Host连接时使用的房间connectionId /// private string m_roomConnectionId; private bool m_running; private Thread m_signalingThread; private WebSocket m_webSocket; public string participantId; public WebSocketSignaling(SignalingSettings signalingSettings, SynchronizationContext mainThreadContext) { if (signalingSettings == null) throw new ArgumentNullException(nameof(signalingSettings)); if (!(signalingSettings is WebSocketSignalingSettings settings)) throw new ArgumentException("signalingSettings is not WebSocketSignalingSettings"); Url = settings.url; m_timeout = 5.0f; m_mainThreadContext = mainThreadContext; m_wsCloseEvent = new AutoResetEvent(false); if (instances.Any(x => x.Url == Url)) RenderStreaming.Logger.Log(LogType.Warning, $"Other {nameof(WebSocketSignaling)} exists with same URL:{Url}. Signaling process may be in conflict."); instances.Add(this); } /// /// 当前客户端是否为Host角色(由服务器在connect响应中分配) /// public bool isHost { get; private set; } public string Url { get; } public void Start() { if (m_running) throw new InvalidOperationException("This object is already started."); m_running = true; m_signalingThread = new Thread(WSManage); m_signalingThread.Start(); } public void Stop() { if (m_running) { m_running = false; m_webSocket?.Close(); if (m_signalingThread.ThreadState == ThreadState.WaitSleepJoin) m_signalingThread.Abort(); else m_signalingThread.Join(1000); m_signalingThread = null; } } public event OnStartHandler OnStart; public event OnConnectHandler OnCreateConnection; public event OnDisconnectHandler OnDestroyConnection; public event OnOfferHandler OnOffer; #pragma warning disable 0067 // this event is never used in this class public event OnAnswerHandler OnAnswer; #pragma warning restore 0067 public event OnIceCandidateHandler OnIceCandidate; public event OnParticipantJoinedHandler OnParticipantJoined; public event OnParticipantLeftHandler OnParticipantLeft; public event OnCallRequestHandler OnCallRequest; public event OnMessageHandler OnMessage; public void SendOffer(string connectionId, RTCSessionDescription offer) { var data = new DescData(); // data.connectionId必须使用房间connectionId(服务器用此查找连接组进行路由) // 而不是内部participantId(participantId仅通过routedMessage.participantId传递用于目标选择) data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId; data.sdp = offer.sdp; data.dateTime = DateTime.Now; var routedMessage = new RoutedMessage(); // 如果connectionId是参与者ID,使用房间connectionId作为from,并设置目标participantId if (IsParticipantConnectionId(connectionId)) { routedMessage.from = GetRoomConnectionId(connectionId); routedMessage.participantId = connectionId; } else { routedMessage.from = connectionId; routedMessage.participantId = participantId; } routedMessage.data = data; routedMessage.type = "offer"; WSSend(routedMessage); } public void SendAnswer(string connectionId, RTCSessionDescription answer) { var data = new DescData(); // data.connectionId必须使用房间connectionId(服务器用此查找连接组进行路由) data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId; data.sdp = answer.sdp; var routedMessage = new RoutedMessage(); // 如果connectionId是参与者ID,使用房间connectionId作为from,并设置目标participantId if (IsParticipantConnectionId(connectionId)) { routedMessage.from = GetRoomConnectionId(connectionId); routedMessage.participantId = connectionId; } else { routedMessage.from = connectionId; routedMessage.participantId = participantId; } routedMessage.data = data; routedMessage.type = "answer"; WSSend(routedMessage); } public void SendCandidate(string connectionId, RTCIceCandidate candidate) { var data = new CandidateData(); // data.connectionId必须使用房间connectionId(服务器用此查找连接组进行路由) data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId; data.candidate = candidate.Candidate; data.sdpMLineIndex = candidate.SdpMLineIndex.GetValueOrDefault(0); data.sdpMid = candidate.SdpMid; var routedMessage = new RoutedMessage(); // 如果connectionId是参与者ID,使用房间connectionId作为from,并设置目标participantId if (IsParticipantConnectionId(connectionId)) { routedMessage.from = GetRoomConnectionId(connectionId); routedMessage.participantId = connectionId; } else { routedMessage.from = connectionId; } routedMessage.data = data; routedMessage.type = "candidate"; WSSend(routedMessage); } public void OpenConnection(string connectionId) { m_roomConnectionId = connectionId; WSSend($"{{\"type\":\"connect\", \"connectionId\":\"{connectionId}\"}}"); } public void CloseConnection(string connectionId) { // 如果关闭的是参与者连接,使用房间connectionId发送disconnect var actualConnectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId; m_participantToRoom.Remove(connectionId); WSSend($"{{\"type\":\"disconnect\", \"connectionId\":\"{actualConnectionId}\"}}"); } ~WebSocketSignaling() { if (m_running) Stop(); instances.Remove(this); } /// /// 判断一个connectionId是否为参与者连接ID(而非房间connectionId) /// private bool IsParticipantConnectionId(string connectionId) { if (!string.IsNullOrEmpty(connectionId) && m_participantToRoom != null && m_participantToRoom.Count > 0) return m_participantToRoom.ContainsKey(connectionId); return false; } /// /// 获取参与者对应的房间connectionId /// private string GetRoomConnectionId(string participantConnectionId) { return m_participantToRoom.TryGetValue(participantConnectionId, out var roomId) ? roomId : participantConnectionId; } // /// // /// 发送自定义消息 // /// // /// 目标连接ID // /// 消息对象或字符串 // public void SendMessage(string connectionId, object message) // { // if (message == null) // { // RenderStreaming.Logger.Log(LogType.Warning, "Signaling: Cannot send null message"); // return; // } // // var actualConnectionId = IsParticipantConnectionId(connectionId) // ? GetRoomConnectionId(connectionId) // : connectionId; // // var routedMessage = new RoutedMessage(); // routedMessage.from = actualConnectionId; // routedMessage.participantId = IsParticipantConnectionId(connectionId) ? connectionId : participantId; // routedMessage.data = message; // routedMessage.type = "message"; // // WSSend(routedMessage); // } /// /// 发送自定义消息 /// /// 目标连接ID /// 消息对象或字符串 public void SendMessage(string message) { if (message == null) { RenderStreaming.Logger.Log(LogType.Warning, "Signaling: Cannot send null message"); return; } WSSend(message); } private void WSManage() { while (m_running) { WSCreate(); try { m_wsCloseEvent.WaitOne(); Thread.Sleep((int)(m_timeout * 1000)); } catch (ThreadAbortException) { // Thread.Abort() called from main thread. Ignore return; } } RenderStreaming.Logger.Log("Signaling: WS managing thread ended"); } private void WSCreate() { m_webSocket = new WebSocket(Url); if (Url.StartsWith("wss")) m_webSocket.SslConfiguration.EnabledSslProtocols = SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12; m_webSocket.OnOpen += WSConnected; m_webSocket.OnMessage += WSProcessMessage; m_webSocket.OnError += WSError; m_webSocket.OnClose += WSClosed; Monitor.Enter(m_webSocket); RenderStreaming.Logger.Log($"Signaling: Connecting WS {Url}"); m_webSocket.ConnectAsync(); } private void WSProcessMessage(object sender, MessageEventArgs e) { var content = Encoding.UTF8.GetString(e.RawData); RenderStreaming.Logger.Log($"Signaling: Receiving message: {content}"); try { var routedMessage = JsonUtility.FromJson>(content); SignalingMessage msg; if (!string.IsNullOrEmpty(routedMessage.type)) msg = routedMessage.data; else msg = JsonUtility.FromJson(content); if (!string.IsNullOrEmpty(routedMessage.type)) { if (routedMessage.type == "connect") { msg = JsonUtility.FromJson(content); isHost = msg.role == "host"; participantId = msg.participantId; m_mainThreadContext.Post(d => OnCreateConnection?.Invoke(this, msg.connectionId, msg.polite), null); } else if (routedMessage.type == "disconnect") { msg = JsonUtility.FromJson(content); var disconnectConnectionId = msg.connectionId; // 如果断开的是参与者连接,用participantId查找内部connectionId if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost) { disconnectConnectionId = routedMessage.participantId; m_participantToRoom.Remove(routedMessage.participantId); } m_mainThreadContext.Post(d => OnDestroyConnection?.Invoke(this, disconnectConnectionId), null); } else if (routedMessage.type == "offer") { var offer = new DescData(); // Host收到带participantId的offer时,使用participantId作为内部connectionId // 这样每个participant会创建独立的PeerConnection if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost) { offer.connectionId = routedMessage.participantId; m_participantToRoom[routedMessage.participantId] = routedMessage.from ?? m_roomConnectionId; } else { offer.connectionId = routedMessage.from; } offer.sdp = msg.sdp; offer.polite = msg.polite; offer.participantId = routedMessage.participantId; m_mainThreadContext.Post(d => OnOffer?.Invoke(this, offer), null); } else if (routedMessage.type == "answer") { var answer = new DescData { // Host收到带participantId的answer时,使用participantId作为内部connectionId connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost ? routedMessage.participantId : routedMessage.from, sdp = msg.sdp, participantId = routedMessage.participantId }; m_mainThreadContext.Post(d => OnAnswer?.Invoke(this, answer), null); } else if (routedMessage.type == "candidate") { var candidate = new CandidateData { // Host收到带participantId的candidate时,使用participantId作为内部connectionId connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost ? routedMessage.participantId : routedMessage.from, candidate = msg.candidate, sdpMLineIndex = msg.sdpMLineIndex, sdpMid = msg.sdpMid, participantId = routedMessage.participantId }; m_mainThreadContext.Post(d => OnIceCandidate?.Invoke(this, candidate), null); } else if (routedMessage.type == "participant-joined") { // Host收到新参与者加入通知 msg = JsonUtility.FromJson(content); var participantData = new ParticipantEventData { connectionId = msg.connectionId ?? routedMessage.from, participantId = msg.participantId ?? routedMessage.participantId }; // 记录参与者映射 if (!string.IsNullOrEmpty(participantData.participantId)) m_participantToRoom[participantData.participantId] = participantData.connectionId; RenderStreaming.Logger.Log( $"Signaling: Participant joined - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}"); m_mainThreadContext.Post(d => OnParticipantJoined?.Invoke(this, participantData), null); } else if (routedMessage.type == "participant-left") { // Host/Participant收到参与者离开通知 msg = JsonUtility.FromJson(content); var participantData = new ParticipantEventData { connectionId = msg.connectionId ?? routedMessage.from, participantId = msg.participantId ?? routedMessage.participantId }; m_participantToRoom.Remove(participantData.participantId); RenderStreaming.Logger.Log( $"Signaling: Participant left - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}"); m_mainThreadContext.Post(d => OnParticipantLeft?.Invoke(this, participantData), null); } else if (routedMessage.type == "call-request") { msg = JsonUtility.FromJson(content); var callData = new CallRequestData { connectionId = routedMessage.from, data = msg.data }; m_mainThreadContext.Post(d => OnCallRequest?.Invoke(this, callData), null); } else if (routedMessage.type == "on-message") { var message = JsonConvert.DeserializeObject(content); var messageData = new OnMessageData { connectionId = routedMessage.from, participantId = routedMessage.participantId, message = message.data }; m_mainThreadContext.Post(d => OnMessage?.Invoke(this, messageData), null); } else if (routedMessage.type == "error") { msg = JsonUtility.FromJson(content); RenderStreaming.Logger.Log(LogType.Error, msg.message); } } } catch (Exception ex) { RenderStreaming.Logger.Log(LogType.Error, "Signaling: Failed to parse message: " + ex); } } private void WSConnected(object sender, EventArgs e) { RenderStreaming.Logger.Log("Signaling: WS connected."); m_mainThreadContext.Post(d => OnStart?.Invoke(this), null); } private void WSError(object sender, ErrorEventArgs e) { RenderStreaming.Logger.Log(LogType.Error, $"Signaling: WS connection error: {e.Message}"); } private void WSClosed(object sender, CloseEventArgs e) { RenderStreaming.Logger.Log($"Signaling: WS connection closed, code: {e.Code}"); m_wsCloseEvent.Set(); m_webSocket = null; } private void WSSend(object data) { if (m_webSocket == null || m_webSocket.ReadyState != WebSocketState.Open) { RenderStreaming.Logger.Log(LogType.Error, "Signaling: WS is not connected. Unable to send message"); return; } if (data is string s) { RenderStreaming.Logger.Log("Signaling: Sending WS data: " + s); m_webSocket.Send(s); } else { var str = JsonUtility.ToJson(data); RenderStreaming.Logger.Log("Signaling: Sending WS data: " + str); m_webSocket.Send(str); } } } }