Files
2026-06-03 22:05:03 +08:00

506 lines
21 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Authentication;
using System.Text;
using System.Threading;
using Newtonsoft.Json;
using Unity.WebRTC;
using UnityEngine;
using WebSocketSharp;
namespace Unity.RenderStreaming.Signaling
{
public class WebSocketSignaling : ISignaling
{
private static readonly HashSet<WebSocketSignaling> instances = new();
private readonly SynchronizationContext m_mainThreadContext;
/// <summary>
/// 参与者连接ID到房间connectionId的映射
/// Key: participantId, Value: roomConnectionId
/// 用于Host向特定Participant发送消息时将内部connectionId还原为服务器格式
/// </summary>
private readonly Dictionary<string, string> m_participantToRoom = new();
private readonly float m_timeout;
private readonly AutoResetEvent m_wsCloseEvent;
/// <summary>
/// Host连接时使用的房间connectionId
/// </summary>
private string m_roomConnectionId;
private bool m_running;
private Thread m_signalingThread;
private WebSocket m_webSocket;
public string participantId;
public WebSocketSignaling(SignalingSettings signalingSettings, SynchronizationContext mainThreadContext)
{
if (signalingSettings == null)
throw new ArgumentNullException(nameof(signalingSettings));
if (!(signalingSettings is WebSocketSignalingSettings settings))
throw new ArgumentException("signalingSettings is not WebSocketSignalingSettings");
Url = settings.url;
m_timeout = 5.0f;
m_mainThreadContext = mainThreadContext;
m_wsCloseEvent = new AutoResetEvent(false);
if (instances.Any(x => x.Url == Url))
RenderStreaming.Logger.Log(LogType.Warning,
$"Other {nameof(WebSocketSignaling)} exists with same URL:{Url}. Signaling process may be in conflict.");
instances.Add(this);
}
/// <summary>
/// 当前客户端是否为Host角色由服务器在connect响应中分配
/// </summary>
public bool isHost { get; private set; }
public string Url { get; }
public void Start()
{
if (m_running)
throw new InvalidOperationException("This object is already started.");
m_running = true;
m_signalingThread = new Thread(WSManage);
m_signalingThread.Start();
}
public void Stop()
{
if (m_running)
{
m_running = false;
m_webSocket?.Close();
if (m_signalingThread.ThreadState == ThreadState.WaitSleepJoin)
m_signalingThread.Abort();
else
m_signalingThread.Join(1000);
m_signalingThread = null;
}
}
public event OnStartHandler OnStart;
public event OnConnectHandler OnCreateConnection;
public event OnDisconnectHandler OnDestroyConnection;
public event OnOfferHandler OnOffer;
#pragma warning disable 0067
// this event is never used in this class
public event OnAnswerHandler OnAnswer;
#pragma warning restore 0067
public event OnIceCandidateHandler OnIceCandidate;
public event OnParticipantJoinedHandler OnParticipantJoined;
public event OnParticipantLeftHandler OnParticipantLeft;
public event OnCallRequestHandler OnCallRequest;
public event OnMessageHandler OnMessage;
public void SendOffer(string connectionId, RTCSessionDescription offer)
{
var data = new DescData();
// data.connectionId必须使用房间connectionId服务器用此查找连接组进行路由
// 而不是内部participantIdparticipantId仅通过routedMessage.participantId传递用于目标选择
data.connectionId = IsParticipantConnectionId(connectionId)
? GetRoomConnectionId(connectionId)
: connectionId;
data.sdp = offer.sdp;
data.dateTime = DateTime.Now;
var routedMessage = new RoutedMessage<DescData>();
// 如果connectionId是参与者ID使用房间connectionId作为from并设置目标participantId
if (IsParticipantConnectionId(connectionId))
{
routedMessage.from = GetRoomConnectionId(connectionId);
routedMessage.participantId = connectionId;
}
else
{
routedMessage.from = connectionId;
routedMessage.participantId = participantId;
}
routedMessage.data = data;
routedMessage.type = "offer";
WSSend(routedMessage);
}
public void SendAnswer(string connectionId, RTCSessionDescription answer)
{
var data = new DescData();
// data.connectionId必须使用房间connectionId服务器用此查找连接组进行路由
data.connectionId = IsParticipantConnectionId(connectionId)
? GetRoomConnectionId(connectionId)
: connectionId;
data.sdp = answer.sdp;
var routedMessage = new RoutedMessage<DescData>();
// 如果connectionId是参与者ID使用房间connectionId作为from并设置目标participantId
if (IsParticipantConnectionId(connectionId))
{
routedMessage.from = GetRoomConnectionId(connectionId);
routedMessage.participantId = connectionId;
}
else
{
routedMessage.from = connectionId;
routedMessage.participantId = participantId;
}
routedMessage.data = data;
routedMessage.type = "answer";
WSSend(routedMessage);
}
public void SendCandidate(string connectionId, RTCIceCandidate candidate)
{
var data = new CandidateData();
// data.connectionId必须使用房间connectionId服务器用此查找连接组进行路由
data.connectionId = IsParticipantConnectionId(connectionId)
? GetRoomConnectionId(connectionId)
: connectionId;
data.candidate = candidate.Candidate;
data.sdpMLineIndex = candidate.SdpMLineIndex.GetValueOrDefault(0);
data.sdpMid = candidate.SdpMid;
var routedMessage = new RoutedMessage<CandidateData>();
// 如果connectionId是参与者ID使用房间connectionId作为from并设置目标participantId
if (IsParticipantConnectionId(connectionId))
{
routedMessage.from = GetRoomConnectionId(connectionId);
routedMessage.participantId = connectionId;
}
else
{
routedMessage.from = connectionId;
}
routedMessage.data = data;
routedMessage.type = "candidate";
WSSend(routedMessage);
}
public void OpenConnection(string connectionId)
{
m_roomConnectionId = connectionId;
WSSend($"{{\"type\":\"connect\", \"connectionId\":\"{connectionId}\"}}");
}
public void CloseConnection(string connectionId)
{
// 如果关闭的是参与者连接使用房间connectionId发送disconnect
var actualConnectionId = IsParticipantConnectionId(connectionId)
? GetRoomConnectionId(connectionId)
: connectionId;
m_participantToRoom.Remove(connectionId);
WSSend($"{{\"type\":\"disconnect\", \"connectionId\":\"{actualConnectionId}\"}}");
}
~WebSocketSignaling()
{
if (m_running)
Stop();
instances.Remove(this);
}
/// <summary>
/// 判断一个connectionId是否为参与者连接ID而非房间connectionId
/// </summary>
private bool IsParticipantConnectionId(string connectionId)
{
if (!string.IsNullOrEmpty(connectionId) && m_participantToRoom != null && m_participantToRoom.Count > 0)
return m_participantToRoom.ContainsKey(connectionId);
return false;
}
/// <summary>
/// 获取参与者对应的房间connectionId
/// </summary>
private string GetRoomConnectionId(string participantConnectionId)
{
return m_participantToRoom.TryGetValue(participantConnectionId, out var roomId)
? roomId
: participantConnectionId;
}
// /// <summary>
// /// 发送自定义消息
// /// </summary>
// /// <param name="connectionId">目标连接ID</param>
// /// <param name="message">消息对象或字符串</param>
// public void SendMessage(string connectionId, object message)
// {
// if (message == null)
// {
// RenderStreaming.Logger.Log(LogType.Warning, "Signaling: Cannot send null message");
// return;
// }
//
// var actualConnectionId = IsParticipantConnectionId(connectionId)
// ? GetRoomConnectionId(connectionId)
// : connectionId;
//
// var routedMessage = new RoutedMessage<object>();
// routedMessage.from = actualConnectionId;
// routedMessage.participantId = IsParticipantConnectionId(connectionId) ? connectionId : participantId;
// routedMessage.data = message;
// routedMessage.type = "message";
//
// WSSend(routedMessage);
// }
/// <summary>
/// 发送自定义消息
/// </summary>
/// <param name="connectionId">目标连接ID</param>
/// <param name="message">消息对象或字符串</param>
public void SendMessage(string message)
{
if (message == null)
{
RenderStreaming.Logger.Log(LogType.Warning, "Signaling: Cannot send null message");
return;
}
WSSend(message);
}
private void WSManage()
{
while (m_running)
{
WSCreate();
try
{
m_wsCloseEvent.WaitOne();
Thread.Sleep((int)(m_timeout * 1000));
}
catch (ThreadAbortException)
{
// Thread.Abort() called from main thread. Ignore
return;
}
}
RenderStreaming.Logger.Log("Signaling: WS managing thread ended");
}
private void WSCreate()
{
m_webSocket = new WebSocket(Url);
if (Url.StartsWith("wss"))
m_webSocket.SslConfiguration.EnabledSslProtocols =
SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12;
m_webSocket.OnOpen += WSConnected;
m_webSocket.OnMessage += WSProcessMessage;
m_webSocket.OnError += WSError;
m_webSocket.OnClose += WSClosed;
Monitor.Enter(m_webSocket);
RenderStreaming.Logger.Log($"Signaling: Connecting WS {Url}");
m_webSocket.ConnectAsync();
}
private void WSProcessMessage(object sender, MessageEventArgs e)
{
var content = Encoding.UTF8.GetString(e.RawData);
RenderStreaming.Logger.Log($"Signaling: Receiving message: {content}");
try
{
var routedMessage = JsonUtility.FromJson<RoutedMessage<SignalingMessage>>(content);
SignalingMessage msg;
if (!string.IsNullOrEmpty(routedMessage.type))
msg = routedMessage.data;
else
msg = JsonUtility.FromJson<SignalingMessage>(content);
if (!string.IsNullOrEmpty(routedMessage.type))
{
if (routedMessage.type == "connect")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
isHost = msg.role == "host";
participantId = msg.participantId;
m_mainThreadContext.Post(d => OnCreateConnection?.Invoke(this, msg.connectionId, msg.polite),
null);
}
else if (routedMessage.type == "disconnect")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
var disconnectConnectionId = msg.connectionId;
// 如果断开的是参与者连接用participantId查找内部connectionId
if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost)
{
disconnectConnectionId = routedMessage.participantId;
m_participantToRoom.Remove(routedMessage.participantId);
}
m_mainThreadContext.Post(d => OnDestroyConnection?.Invoke(this, disconnectConnectionId), null);
}
else if (routedMessage.type == "offer")
{
var offer = new DescData();
// Host收到带participantId的offer时使用participantId作为内部connectionId
// 这样每个participant会创建独立的PeerConnection
if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost)
{
offer.connectionId = routedMessage.participantId;
m_participantToRoom[routedMessage.participantId] = routedMessage.from ?? m_roomConnectionId;
}
else
{
offer.connectionId = routedMessage.from;
}
offer.sdp = msg.sdp;
offer.polite = msg.polite;
offer.participantId = routedMessage.participantId;
m_mainThreadContext.Post(d => OnOffer?.Invoke(this, offer), null);
}
else if (routedMessage.type == "answer")
{
var answer = new DescData
{
// Host收到带participantId的answer时使用participantId作为内部connectionId
connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost
? routedMessage.participantId
: routedMessage.from,
sdp = msg.sdp,
participantId = routedMessage.participantId
};
m_mainThreadContext.Post(d => OnAnswer?.Invoke(this, answer), null);
}
else if (routedMessage.type == "candidate")
{
var candidate = new CandidateData
{
// Host收到带participantId的candidate时使用participantId作为内部connectionId
connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost
? routedMessage.participantId
: routedMessage.from,
candidate = msg.candidate,
sdpMLineIndex = msg.sdpMLineIndex,
sdpMid = msg.sdpMid,
participantId = routedMessage.participantId
};
m_mainThreadContext.Post(d => OnIceCandidate?.Invoke(this, candidate), null);
}
else if (routedMessage.type == "participant-joined")
{
// Host收到新参与者加入通知
msg = JsonUtility.FromJson<SignalingMessage>(content);
var participantData = new ParticipantEventData
{
connectionId = msg.connectionId ?? routedMessage.from,
participantId = msg.participantId ?? routedMessage.participantId
};
// 记录参与者映射
if (!string.IsNullOrEmpty(participantData.participantId))
m_participantToRoom[participantData.participantId] = participantData.connectionId;
RenderStreaming.Logger.Log(
$"Signaling: Participant joined - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}");
m_mainThreadContext.Post(d => OnParticipantJoined?.Invoke(this, participantData), null);
}
else if (routedMessage.type == "participant-left")
{
// Host/Participant收到参与者离开通知
msg = JsonUtility.FromJson<SignalingMessage>(content);
var participantData = new ParticipantEventData
{
connectionId = msg.connectionId ?? routedMessage.from,
participantId = msg.participantId ?? routedMessage.participantId
};
m_participantToRoom.Remove(participantData.participantId);
RenderStreaming.Logger.Log(
$"Signaling: Participant left - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}");
m_mainThreadContext.Post(d => OnParticipantLeft?.Invoke(this, participantData), null);
}
else if (routedMessage.type == "call-request")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
var callData = new CallRequestData
{
connectionId = routedMessage.from,
data = msg.data
};
m_mainThreadContext.Post(d => OnCallRequest?.Invoke(this, callData), null);
}
else if (routedMessage.type == "on-message")
{
var message = JsonConvert.DeserializeObject<SignalingMessage>(content);
var messageData = new OnMessageData
{
connectionId = routedMessage.from,
participantId = routedMessage.participantId,
message = message.data
};
m_mainThreadContext.Post(d => OnMessage?.Invoke(this, messageData), null);
}
else if (routedMessage.type == "error")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
RenderStreaming.Logger.Log(LogType.Error, msg.message);
}
}
}
catch (Exception ex)
{
RenderStreaming.Logger.Log(LogType.Error, "Signaling: Failed to parse message: " + ex);
}
}
private void WSConnected(object sender, EventArgs e)
{
RenderStreaming.Logger.Log("Signaling: WS connected.");
m_mainThreadContext.Post(d => OnStart?.Invoke(this), null);
}
private void WSError(object sender, ErrorEventArgs e)
{
RenderStreaming.Logger.Log(LogType.Error, $"Signaling: WS connection error: {e.Message}");
}
private void WSClosed(object sender, CloseEventArgs e)
{
RenderStreaming.Logger.Log($"Signaling: WS connection closed, code: {e.Code}");
m_wsCloseEvent.Set();
m_webSocket = null;
}
private void WSSend(object data)
{
if (m_webSocket == null || m_webSocket.ReadyState != WebSocketState.Open)
{
RenderStreaming.Logger.Log(LogType.Error, "Signaling: WS is not connected. Unable to send message");
return;
}
if (data is string s)
{
RenderStreaming.Logger.Log("Signaling: Sending WS data: " + s);
m_webSocket.Send(s);
}
else
{
var str = JsonUtility.ToJson(data);
RenderStreaming.Logger.Log("Signaling: Sending WS data: " + str);
m_webSocket.Send(str);
}
}
}
}