增加自定义不是房间的消息发送

This commit is contained in:
2026-05-18 23:31:04 +08:00
parent ce9b1e85e9
commit 3daebf56ab
29 changed files with 2375 additions and 356 deletions

View File

@@ -12,34 +12,29 @@ namespace Unity.RenderStreaming.Signaling
{
public class WebSocketSignaling : ISignaling
{
private static HashSet<WebSocketSignaling> instances = new HashSet<WebSocketSignaling>();
private readonly string m_url;
private readonly float m_timeout;
private static readonly HashSet<WebSocketSignaling> instances = new();
private readonly SynchronizationContext m_mainThreadContext;
private bool m_running;
private Thread m_signalingThread;
private readonly AutoResetEvent m_wsCloseEvent;
private WebSocket m_webSocket;
public string participantId;
public string Url { get { return m_url; } }
/// <summary>
/// 当前客户端是否为Host角色由服务器在connect响应中分配
/// 参与者连接ID到房间connectionId的映射
/// Key: participantId, Value: roomConnectionId
/// 用于Host向特定Participant发送消息时将内部connectionId还原为服务器格式
/// </summary>
public bool isHost { get; private set; }
private readonly Dictionary<string, string> m_participantToRoom = new();
private readonly float m_timeout;
private readonly AutoResetEvent m_wsCloseEvent;
/// <summary>
/// Host连接时使用的房间connectionId
/// Host连接时使用的房间connectionId
/// </summary>
private string m_roomConnectionId;
/// <summary>
/// 参与者连接ID到房间connectionId的映射
/// Key: participantId, Value: roomConnectionId
/// 用于Host向特定Participant发送消息时将内部connectionId还原为服务器格式
/// </summary>
private Dictionary<string, string> m_participantToRoom = new Dictionary<string, string>();
private bool m_running;
private Thread m_signalingThread;
private WebSocket m_webSocket;
public string participantId;
public WebSocketSignaling(SignalingSettings signalingSettings, SynchronizationContext mainThreadContext)
{
@@ -47,26 +42,24 @@ namespace Unity.RenderStreaming.Signaling
throw new ArgumentNullException(nameof(signalingSettings));
if (!(signalingSettings is WebSocketSignalingSettings settings))
throw new ArgumentException("signalingSettings is not WebSocketSignalingSettings");
m_url = settings.url;
Url = settings.url;
m_timeout = 5.0f;
m_mainThreadContext = mainThreadContext;
m_wsCloseEvent = new AutoResetEvent(false);
if (instances.Any(x => x.Url == m_url))
{
RenderStreaming.Logger.Log(LogType.Warning, $"Other {nameof(WebSocketSignaling)} exists with same URL:{m_url}. Signaling process may be in conflict.");
}
if (instances.Any(x => x.Url == Url))
RenderStreaming.Logger.Log(LogType.Warning,
$"Other {nameof(WebSocketSignaling)} exists with same URL:{Url}. Signaling process may be in conflict.");
instances.Add(this);
}
~WebSocketSignaling()
{
if (m_running)
Stop();
/// <summary>
/// 当前客户端是否为Host角色由服务器在connect响应中分配
/// </summary>
public bool isHost { get; private set; }
instances.Remove(this);
}
public string Url { get; }
public void Start()
{
@@ -86,13 +79,9 @@ namespace Unity.RenderStreaming.Signaling
m_webSocket?.Close();
if (m_signalingThread.ThreadState == ThreadState.WaitSleepJoin)
{
m_signalingThread.Abort();
}
else
{
m_signalingThread.Join(1000);
}
m_signalingThread = null;
}
}
@@ -111,32 +100,18 @@ namespace Unity.RenderStreaming.Signaling
public event OnCallRequestHandler OnCallRequest;
public event OnMessageHandler OnMessage;
/// <summary>
/// 判断一个connectionId是否为参与者连接ID而非房间connectionId
/// </summary>
private bool IsParticipantConnectionId(string connectionId)
{
return m_participantToRoom.ContainsKey(connectionId);
}
/// <summary>
/// 获取参与者对应的房间connectionId
/// </summary>
private string GetRoomConnectionId(string participantConnectionId)
{
return m_participantToRoom.TryGetValue(participantConnectionId, out var roomId) ? roomId : participantConnectionId;
}
public void SendOffer(string connectionId, RTCSessionDescription offer)
{
DescData data = new DescData();
var data = new DescData();
// data.connectionId必须使用房间connectionId服务器用此查找连接组进行路由
// 而不是内部participantIdparticipantId仅通过routedMessage.participantId传递用于目标选择
data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId;
data.connectionId = IsParticipantConnectionId(connectionId)
? GetRoomConnectionId(connectionId)
: connectionId;
data.sdp = offer.sdp;
data.dateTime = DateTime.Now;
RoutedMessage<DescData> routedMessage = new RoutedMessage<DescData>();
var routedMessage = new RoutedMessage<DescData>();
// 如果connectionId是参与者ID使用房间connectionId作为from并设置目标participantId
if (IsParticipantConnectionId(connectionId))
{
@@ -148,6 +123,7 @@ namespace Unity.RenderStreaming.Signaling
routedMessage.from = connectionId;
routedMessage.participantId = participantId;
}
routedMessage.data = data;
routedMessage.type = "offer";
WSSend(routedMessage);
@@ -155,12 +131,14 @@ namespace Unity.RenderStreaming.Signaling
public void SendAnswer(string connectionId, RTCSessionDescription answer)
{
DescData data = new DescData();
var data = new DescData();
// data.connectionId必须使用房间connectionId服务器用此查找连接组进行路由
data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId;
data.connectionId = IsParticipantConnectionId(connectionId)
? GetRoomConnectionId(connectionId)
: connectionId;
data.sdp = answer.sdp;
RoutedMessage<DescData> routedMessage = new RoutedMessage<DescData>();
var routedMessage = new RoutedMessage<DescData>();
// 如果connectionId是参与者ID使用房间connectionId作为from并设置目标participantId
if (IsParticipantConnectionId(connectionId))
{
@@ -172,6 +150,7 @@ namespace Unity.RenderStreaming.Signaling
routedMessage.from = connectionId;
routedMessage.participantId = participantId;
}
routedMessage.data = data;
routedMessage.type = "answer";
WSSend(routedMessage);
@@ -179,14 +158,16 @@ namespace Unity.RenderStreaming.Signaling
public void SendCandidate(string connectionId, RTCIceCandidate candidate)
{
CandidateData data = new CandidateData();
var data = new CandidateData();
// data.connectionId必须使用房间connectionId服务器用此查找连接组进行路由
data.connectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId;
data.connectionId = IsParticipantConnectionId(connectionId)
? GetRoomConnectionId(connectionId)
: connectionId;
data.candidate = candidate.Candidate;
data.sdpMLineIndex = candidate.SdpMLineIndex.GetValueOrDefault(0);
data.sdpMid = candidate.SdpMid;
RoutedMessage<CandidateData> routedMessage = new RoutedMessage<CandidateData>();
var routedMessage = new RoutedMessage<CandidateData>();
// 如果connectionId是参与者ID使用房间connectionId作为from并设置目标participantId
if (IsParticipantConnectionId(connectionId))
{
@@ -197,6 +178,7 @@ namespace Unity.RenderStreaming.Signaling
{
routedMessage.from = connectionId;
}
routedMessage.data = data;
routedMessage.type = "candidate";
@@ -206,15 +188,85 @@ namespace Unity.RenderStreaming.Signaling
public void OpenConnection(string connectionId)
{
m_roomConnectionId = connectionId;
this.WSSend($"{{\"type\":\"connect\", \"connectionId\":\"{connectionId}\"}}");
WSSend($"{{\"type\":\"connect\", \"connectionId\":\"{connectionId}\"}}");
}
public void CloseConnection(string connectionId)
{
// 如果关闭的是参与者连接使用房间connectionId发送disconnect
string actualConnectionId = IsParticipantConnectionId(connectionId) ? GetRoomConnectionId(connectionId) : connectionId;
var actualConnectionId = IsParticipantConnectionId(connectionId)
? GetRoomConnectionId(connectionId)
: connectionId;
m_participantToRoom.Remove(connectionId);
this.WSSend($"{{\"type\":\"disconnect\", \"connectionId\":\"{actualConnectionId}\"}}");
WSSend($"{{\"type\":\"disconnect\", \"connectionId\":\"{actualConnectionId}\"}}");
}
~WebSocketSignaling()
{
if (m_running)
Stop();
instances.Remove(this);
}
/// <summary>
/// 判断一个connectionId是否为参与者连接ID而非房间connectionId
/// </summary>
private bool IsParticipantConnectionId(string connectionId)
{
return m_participantToRoom.ContainsKey(connectionId);
}
/// <summary>
/// 获取参与者对应的房间connectionId
/// </summary>
private string GetRoomConnectionId(string participantConnectionId)
{
return m_participantToRoom.TryGetValue(participantConnectionId, out var roomId)
? roomId
: participantConnectionId;
}
// /// <summary>
// /// 发送自定义消息
// /// </summary>
// /// <param name="connectionId">目标连接ID</param>
// /// <param name="message">消息对象或字符串</param>
// public void SendMessage(string connectionId, object message)
// {
// if (message == null)
// {
// RenderStreaming.Logger.Log(LogType.Warning, "Signaling: Cannot send null message");
// return;
// }
//
// var actualConnectionId = IsParticipantConnectionId(connectionId)
// ? GetRoomConnectionId(connectionId)
// : connectionId;
//
// var routedMessage = new RoutedMessage<object>();
// routedMessage.from = actualConnectionId;
// routedMessage.participantId = IsParticipantConnectionId(connectionId) ? connectionId : participantId;
// routedMessage.data = message;
// routedMessage.type = "message";
//
// WSSend(routedMessage);
// }
/// <summary>
/// 发送自定义消息
/// </summary>
/// <param name="connectionId">目标连接ID</param>
/// <param name="message">消息对象或字符串</param>
public void SendMessage(string message)
{
if (message == null)
{
RenderStreaming.Logger.Log(LogType.Warning, "Signaling: Cannot send null message");
return;
}
WSSend(message);
}
private void WSManage()
@@ -241,12 +293,10 @@ namespace Unity.RenderStreaming.Signaling
private void WSCreate()
{
m_webSocket = new WebSocket(m_url);
if (m_url.StartsWith("wss"))
{
m_webSocket = new WebSocket(Url);
if (Url.StartsWith("wss"))
m_webSocket.SslConfiguration.EnabledSslProtocols =
SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12;
}
m_webSocket.OnOpen += WSConnected;
m_webSocket.OnMessage += WSProcessMessage;
@@ -255,7 +305,7 @@ namespace Unity.RenderStreaming.Signaling
Monitor.Enter(m_webSocket);
RenderStreaming.Logger.Log($"Signaling: Connecting WS {m_url}");
RenderStreaming.Logger.Log($"Signaling: Connecting WS {Url}");
m_webSocket.ConnectAsync();
}
@@ -270,13 +320,9 @@ namespace Unity.RenderStreaming.Signaling
SignalingMessage msg;
if (!string.IsNullOrEmpty(routedMessage.type))
{
msg = routedMessage.data;
}
else
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
}
if (!string.IsNullOrEmpty(routedMessage.type))
{
@@ -285,23 +331,25 @@ namespace Unity.RenderStreaming.Signaling
msg = JsonUtility.FromJson<SignalingMessage>(content);
isHost = msg.role == "host";
participantId = msg.participantId;
m_mainThreadContext.Post(d => OnCreateConnection?.Invoke(this, msg.connectionId, msg.polite), null);
m_mainThreadContext.Post(d => OnCreateConnection?.Invoke(this, msg.connectionId, msg.polite),
null);
}
else if (routedMessage.type == "disconnect")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
string disconnectConnectionId = msg.connectionId;
var disconnectConnectionId = msg.connectionId;
// 如果断开的是参与者连接用participantId查找内部connectionId
if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost)
{
disconnectConnectionId = routedMessage.participantId;
m_participantToRoom.Remove(routedMessage.participantId);
}
m_mainThreadContext.Post(d => OnDestroyConnection?.Invoke(this, disconnectConnectionId), null);
}
else if (routedMessage.type == "offer")
{
DescData offer = new DescData();
var offer = new DescData();
// Host收到带participantId的offer时使用participantId作为内部connectionId
// 这样每个participant会创建独立的PeerConnection
if (!string.IsNullOrEmpty(routedMessage.participantId) && isHost)
@@ -313,6 +361,7 @@ namespace Unity.RenderStreaming.Signaling
{
offer.connectionId = routedMessage.from;
}
offer.sdp = msg.sdp;
offer.polite = msg.polite;
offer.participantId = routedMessage.participantId;
@@ -320,7 +369,7 @@ namespace Unity.RenderStreaming.Signaling
}
else if (routedMessage.type == "answer")
{
DescData answer = new DescData
var answer = new DescData
{
// Host收到带participantId的answer时使用participantId作为内部connectionId
connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost
@@ -333,12 +382,12 @@ namespace Unity.RenderStreaming.Signaling
}
else if (routedMessage.type == "candidate")
{
CandidateData candidate = new CandidateData
var candidate = new CandidateData
{
// Host收到带participantId的candidate时使用participantId作为内部connectionId
connectionId = !string.IsNullOrEmpty(routedMessage.participantId) && isHost
? routedMessage.participantId
: routedMessage.@from,
: routedMessage.from,
candidate = msg.candidate,
sdpMLineIndex = msg.sdpMLineIndex,
sdpMid = msg.sdpMid,
@@ -350,36 +399,36 @@ namespace Unity.RenderStreaming.Signaling
{
// Host收到新参与者加入通知
msg = JsonUtility.FromJson<SignalingMessage>(content);
ParticipantEventData participantData = new ParticipantEventData
var participantData = new ParticipantEventData
{
connectionId = msg.connectionId ?? routedMessage.from,
participantId = msg.participantId ?? routedMessage.participantId
};
// 记录参与者映射
if (!string.IsNullOrEmpty(participantData.participantId))
{
m_participantToRoom[participantData.participantId] = participantData.connectionId;
}
RenderStreaming.Logger.Log($"Signaling: Participant joined - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}");
RenderStreaming.Logger.Log(
$"Signaling: Participant joined - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}");
m_mainThreadContext.Post(d => OnParticipantJoined?.Invoke(this, participantData), null);
}
else if (routedMessage.type == "participant-left")
{
// Host/Participant收到参与者离开通知
msg = JsonUtility.FromJson<SignalingMessage>(content);
ParticipantEventData participantData = new ParticipantEventData
var participantData = new ParticipantEventData
{
connectionId = msg.connectionId ?? routedMessage.from,
participantId = msg.participantId ?? routedMessage.participantId
};
m_participantToRoom.Remove(participantData.participantId);
RenderStreaming.Logger.Log($"Signaling: Participant left - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}");
RenderStreaming.Logger.Log(
$"Signaling: Participant left - connectionId: {participantData.connectionId}, participantId: {participantData.participantId}");
m_mainThreadContext.Post(d => OnParticipantLeft?.Invoke(this, participantData), null);
}
else if (routedMessage.type == "call-request")
{
msg = JsonUtility.FromJson<SignalingMessage>(content);
CallRequestData callData = new CallRequestData
var callData = new CallRequestData
{
connectionId = routedMessage.from,
data = msg.data
@@ -389,12 +438,12 @@ namespace Unity.RenderStreaming.Signaling
else if (routedMessage.type == "on-message")
{
var message = JsonUtility.FromJson<SignalingMessage>(content);
OnMessageData messageData = new OnMessageData
{
connectionId = routedMessage.from,
participantId = routedMessage.participantId,
message = message.data
};
var messageData = new OnMessageData
{
connectionId = routedMessage.from,
participantId = routedMessage.participantId,
message = message.data
};
m_mainThreadContext.Post(d => OnMessage?.Invoke(this, messageData), null);
}
else if (routedMessage.type == "error")
@@ -445,10 +494,10 @@ namespace Unity.RenderStreaming.Signaling
}
else
{
string str = JsonUtility.ToJson(data);
var str = JsonUtility.ToJson(data);
RenderStreaming.Logger.Log("Signaling: Sending WS data: " + str);
m_webSocket.Send(str);
}
}
}
}
}