/** * WebSocket处理器 * 负责管理WebSocket连接和信令消息处理 */ import Offer from './offer'; import Answer from './answer'; import Candidate from './candidate'; /** * 是否为私有模式 */ let isPrivate: boolean; /** * 客户端连接映射 * 键: WebSocket实例 * 值: 该WebSocket的连接ID集合 */ const clients: Map> = new Map>(); /** * 连接组结构 * host: 主机WebSocket实例(第一个连接的客户端) * participants: 参与者WebSocket集合(后续连接的客户端) */ interface ConnectionGroup { host: WebSocket; participants: Set; } /** * 连接组映射 * 键: connectionId * 值: ConnectionGroup(1个host + 多个participants) */ const connectionGroup: Map = new Map(); /** * 获取或创建WebSocket会话的连接ID集合 * @param session WebSocket会话实例 * @returns 连接ID的Set集合 */ function getOrCreateConnectionIds(session: WebSocket): Set { let connectionIds = null; // 检查客户端是否已存在 if (!clients.has(session)) { // 如果不存在,创建新的连接ID集合 connectionIds = new Set(); // 将新的连接ID集合与客户端关联 clients.set(session, connectionIds); } // 获取客户端的连接ID集合 connectionIds = clients.get(session); // 返回连接ID集合 return connectionIds; } /** * 重置处理器状态 * @param mode 通信模式(public或private) */ function reset(mode: string): void { // 设置是否为私有模式 isPrivate = mode == "private"; } /** * 添加新的WebSocket连接 * @param ws WebSocket连接实例 */ function add(ws: WebSocket): void { // 为新连接创建空的连接ID集合 const id = new Set(); clients.set(ws, id); // 记录添加WebSocket连接的日志 console.log(`Add WebSocket: ${id}`); } /** * 判断WebSocket是否为指定连接组的host * @param ws WebSocket连接实例 * @param connectionId 连接ID * @returns 是否为host */ function isHost(ws: WebSocket, connectionId: string): boolean { const group = connectionGroup.get(connectionId); return group != null && group.host === ws; } /** * 向连接组中除发送者外的所有成员发送消息 * @param connectionId 连接ID * @param senderWs 发送者WebSocket实例 * @param message 要发送的消息对象 */ function broadcastToGroup(connectionId: string, senderWs: WebSocket, message: any): void { const group = connectionGroup.get(connectionId); if (!group) return; // 如果发送者是host,转发给所有participants if (senderWs === group.host) { group.participants.forEach(participantWs => { participantWs.send(JSON.stringify(message)); }); } else { // 如果发送者是participant,转发给host group.host.send(JSON.stringify(message)); } } /** * 移除WebSocket连接 * @param ws WebSocket连接实例 */ function remove(ws: WebSocket): void { // 获取连接的所有连接ID const connectionIds = clients.get(ws); if (!connectionIds) return; // 遍历所有连接ID connectionIds.forEach(connectionId => { const group = connectionGroup.get(connectionId); if (group) { if (group.host === ws) { // host断开连接,通知所有participants房间已关闭 group.participants.forEach(participantWs => { participantWs.send(JSON.stringify({ type: "disconnect", connectionId: connectionId, reason: "host-left" })); }); // 删除整个连接组 connectionGroup.delete(connectionId); } else { // participant断开连接,从participants中移除并通知host group.participants.delete(ws); group.host.send(JSON.stringify({ type: "participant-left", connectionId: connectionId })); } } // 记录删除连接ID的日志 console.log(`Remove connectionId: ${connectionId}`); }); // 从客户端映射中删除 clients.delete(ws); } /** * 处理连接请求(1对多模式) * 第一个连接的客户端成为host,后续连接的客户端成为participants * @param ws WebSocket连接实例 * @param connectionId 连接ID */ function onConnect(ws: WebSocket, connectionId: string): void { let polite = true; // 处理私有模式 if (isPrivate) { if (connectionGroup.has(connectionId)) { const group = connectionGroup.get(connectionId); // 已有host,新连接作为participant加入 group.participants.add(ws); console.log(`Participant joined connectionId: ${connectionId}, total participants: ${group.participants.size}`); } else { // 第一个连接成为host connectionGroup.set(connectionId, { host: ws, participants: new Set() }); polite = false; console.log(`Host created connectionId: ${connectionId}`); } } // 获取或创建连接ID集合 const connectionIds = getOrCreateConnectionIds(ws); // 添加连接ID connectionIds.add(connectionId); // 发送连接成功消息(包含角色信息) const role = polite ? 'participant' : 'host'; ws.send(JSON.stringify({ type: "connect", connectionId: connectionId, polite: polite, role: role })); //启用心跳包 //AddHeartbeat(ws, connectionId); } /** * 处理断开连接请求(1对多模式) * @param ws WebSocket连接实例 * @param connectionId 连接ID */ function onDisconnect(ws: WebSocket, connectionId: string): void { // 获取连接的连接ID集合 const connectionIds = clients.get(ws); if (connectionIds) { // 从集合中删除连接ID connectionIds.delete(connectionId); } // 处理连接组 const group = connectionGroup.get(connectionId); if (group) { if (group.host === ws) { // host断开连接,通知所有participants房间已关闭,并删除连接组 group.participants.forEach(participantWs => { participantWs.send(JSON.stringify({ type: "disconnect", connectionId: connectionId, reason: "host-left" })); }); connectionGroup.delete(connectionId); console.log(`Host disconnected, room ${connectionId} deleted, notified ${group.participants.size} participants`); } else { // participant断开连接,从组中移除并通知host(使用participant-left类型,host不会关闭房间) group.participants.delete(ws); group.host.send(JSON.stringify({ type: "participant-left", connectionId: connectionId })); console.log(`Participant left connectionId: ${connectionId}, remaining participants: ${group.participants.size}`); } } // 向当前连接发送断开连接消息 ws.send(JSON.stringify({ type: "disconnect", connectionId: connectionId })); //RemoveHeartbeat(ws); // 记录断开连接的日志 console.log(`Disconnect connectionId: ${connectionId}`); } /** * 处理offer信令(1对多模式) * host的offer转发给所有participants,participant的offer转发给host * @param ws WebSocket连接实例 * @param message 消息数据 */ function onOffer(ws: WebSocket, message: any): void { // 获取连接ID const connectionId = message.connectionId as string; // 创建新的offer const newOffer = new Offer(message.sdp, Date.now(), false); // 处理私有模式 if (isPrivate) { if (connectionGroup.has(connectionId)) { const group = connectionGroup.get(connectionId); if (group.host === ws) { // host发送offer,转发给所有participants newOffer.polite = true; group.participants.forEach(participantWs => { participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer })); }); } else { // participant发送offer,转发给host newOffer.polite = true; group.host.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer })); } } return; } // 公共模式:创建新的连接组(如果不存在) if (!connectionGroup.has(connectionId)) { connectionGroup.set(connectionId, { host: ws, participants: new Set() }); } // 向所有其他客户端广播offer clients.forEach((_v, k) => { if (k == ws) { return; } k.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer })); }); } /** * 处理answer信令(1对多模式) * participant的answer转发给host * @param ws WebSocket连接实例 * @param message 消息数据 */ function onAnswer(ws: WebSocket, message: any): void { // 获取连接ID const connectionId = message.connectionId as string; // 获取或创建连接ID集合 const connectionIds = getOrCreateConnectionIds(ws); // 添加连接ID connectionIds.add(connectionId); // 创建新的answer const newAnswer = new Answer(message.sdp, Date.now()); // 检查连接组是否存在 if (!connectionGroup.has(connectionId)) { return; } const group = connectionGroup.get(connectionId); if (group.host === ws) { // host发送answer,转发给所有participants(通常host不发送answer) group.participants.forEach(participantWs => { participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer })); }); } else { // participant发送answer,转发给host group.host.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer })); } } /** * 处理candidate信令(1对多模式) * host的candidate转发给所有participants,participant的candidate转发给host * @param ws WebSocket连接实例 * @param message 消息数据 */ function onCandidate(ws: WebSocket, message: any): void { // 获取连接ID const connectionId = message.connectionId; // 创建新的candidate const candidate = new Candidate(message.candidate, message.sdpMLineIndex, message.sdpMid, Date.now()); // 处理私有模式 if (isPrivate) { if (connectionGroup.has(connectionId)) { const group = connectionGroup.get(connectionId); if (group.host === ws) { // host发送candidate,转发给所有participants group.participants.forEach(participantWs => { participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate })); }); } else { // participant发送candidate,转发给host group.host.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate })); } } return; } } function onCallConnectionId(ws: WebSocket, message: any): void { // 获取连接ID const connectionId = message.connectionId; const clientId = message.clientId; // 在1对多模式下,通知host有新的呼叫请求 if (connectionGroup.has(connectionId)) { const group = connectionGroup.get(connectionId); if (group.host !== ws) { // participant发起呼叫,通知host group.host.send(JSON.stringify({ from: connectionId, to: "", type: "call-request", data: connectionId })); } } else { // 兼容旧的广播方式 clients.forEach((_v, k) => { if (k === ws) { return; } if (_v == clientId) { k.send(JSON.stringify({ from: connectionId, to: "", type: "call-request", data: connectionId })); } }); } } /** * 处理广播消息请求(1对多模式) * @param ws WebSocket连接实例 * @param message 消息数据 */ function onBroadcast(ws: WebSocket, message: any): void { const broadcastMessage = message.message; const targetConnectionId = message.targetConnectionId; if (targetConnectionId) { // 向指定连接组广播 if (connectionGroup.has(targetConnectionId)) { const group = connectionGroup.get(targetConnectionId); // 向组内所有成员发送消息 group.host.send(JSON.stringify({ type: "broadcast", message: broadcastMessage, from: "server" })); group.participants.forEach(participantWs => { participantWs.send(JSON.stringify({ type: "broadcast", message: broadcastMessage, from: "server" })); }); } } else { // 全局广播:向所有客户端发送消息 clients.forEach((_v, k) => { k.send(JSON.stringify({ type: "broadcast", message: broadcastMessage, from: "server" })); }); } } function AddHeartbeat(ws: WebSocket, connectionId: string) { // 初始化心跳检测 (ws as any).lastActivity = Date.now(); // 设置心跳检测定时器,每30秒发送一次ping (ws as any).heartbeatTimer = setInterval(() => { const now = Date.now(); // 检查上次活动时间,如果超过60秒没有活动,关闭连接 if (now - (ws as any).lastActivity > 10000) { console.log('WebSocket connection timeout, closing...'); clearInterval((ws as any).heartbeatTimer); //ws.close(); onDisconnect(ws, connectionId); } else { // 发送ping消息 ws.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: { type: "ping"} })); console.log('WebSocket connection heartbeat, lastActivity: ', (ws as any).lastActivity); } }, 3000); } function RemoveHeartbeat(ws: WebSocket) { // 清除心跳检测定时器 if ((ws as any).heartbeatTimer) { clearInterval((ws as any).heartbeatTimer); } } /** * 处理获取所有连接ID的请求 * @param ws WebSocket连接实例 */ function onGetAllConnectionIds(): string[] { // 获取所有connectionId const connectionIds = Array.from(connectionGroup.keys()); return connectionIds; } /** * 处理chat-message信令(1对多模式) * host的消息转发给所有participants,participant的消息转发给host * @param ws WebSocket连接实例 * @param message 消息数据 */ function onMessage(ws: WebSocket, message: any): void { // 获取连接ID const connectionId = message.connectionId; const chatMessage = message.message; if (connectionGroup.has(connectionId)) { const group = connectionGroup.get(connectionId); if (group.host === ws) { // host发送消息,转发给所有participants group.participants.forEach(participantWs => { participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: chatMessage })); }); } else { // participant发送消息,转发给host group.host.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: chatMessage })); } } } /** * 导出WebSocket处理器函数 */ export { reset, add, remove, onConnect, onDisconnect, onOffer, onAnswer, onCandidate, onCallConnectionId, onBroadcast, onGetAllConnectionIds, AddHeartbeat, RemoveHeartbeat, onMessage, isHost, broadcastToGroup, connectionGroup };