/** * WebSocket处理器 * 负责管理WebSocket连接和信令消息处理 */ import Offer from './offer'; import Answer from './answer'; import Candidate from './candidate'; /** * 是否为私有模式 */ let isPrivate: boolean; /** * 客户端连接映射 * 键: WebSocket实例 * 值: 该WebSocket的连接ID集合 */ const clients: Map> = new Map>(); /** * 连接对映射 * 键: connectionId * 值: [WebSocket实例1, WebSocket实例2] */ const connectionPair: Map = new Map(); /** * 获取或创建WebSocket会话的连接ID集合 * @param session WebSocket会话实例 * @returns 连接ID的Set集合 */ function getOrCreateConnectionIds(session: WebSocket): Set { let connectionIds = null; // 检查客户端是否已存在 if (!clients.has(session)) { // 如果不存在,创建新的连接ID集合 connectionIds = new Set(); // 将新的连接ID集合与客户端关联 clients.set(session, connectionIds); } // 获取客户端的连接ID集合 connectionIds = clients.get(session); // 返回连接ID集合 return connectionIds; } /** * 重置处理器状态 * @param mode 通信模式(public或private) */ function reset(mode: string): void { // 设置是否为私有模式 isPrivate = mode == "private"; } /** * 添加新的WebSocket连接 * @param ws WebSocket连接实例 */ function add(ws: WebSocket): void { // 为新连接创建空的连接ID集合 clients.set(ws, new Set()); // 记录添加WebSocket连接的日志 console.log(`Add WebSocket: ${ws}`); } /** * 移除WebSocket连接 * @param ws WebSocket连接实例 */ function remove(ws: WebSocket): void { // 获取连接的所有连接ID const connectionIds = clients.get(ws); // 遍历所有连接ID connectionIds.forEach(connectionId => { // 获取连接对 const pair = connectionPair.get(connectionId); if (pair) { // 找到另一个WebSocket实例 const otherSessionWs = pair[0] == ws ? pair[1] : pair[0]; if (otherSessionWs) { // 向另一个连接发送断开连接消息 otherSessionWs.send(JSON.stringify({ type: "disconnect", connectionId: connectionId })); } } // 从连接对映射中删除 connectionPair.delete(connectionId); // 记录删除连接ID的日志 console.log(`Remove connectionId: ${connectionId}`); }); // 从客户端映射中删除 clients.delete(ws); } /** * 处理连接请求 * @param ws WebSocket连接实例 * @param connectionId 连接ID */ function onConnect(ws: WebSocket, connectionId: string): void { let polite = true; // 处理私有模式 if (isPrivate) { if (connectionPair.has(connectionId)) { const pair = connectionPair.get(connectionId); if (pair[0] != null && pair[1] != null) { // 连接ID已被使用 ws.send(JSON.stringify({ type: "error", message: `${connectionId}: This connection id is already used.` })); return; } else if (pair[0] != null) { // 找到配对连接 connectionPair.set(connectionId, [pair[0], ws]); } } else { // 创建新的连接对 connectionPair.set(connectionId, [ws, null]); polite = false; } } // 获取或创建连接ID集合 const connectionIds = getOrCreateConnectionIds(ws); // 添加连接ID connectionIds.add(connectionId); // 发送连接成功消息 ws.send(JSON.stringify({ type: "connect", connectionId: connectionId, polite: polite })); } /** * 处理断开连接请求 * @param ws WebSocket连接实例 * @param connectionId 连接ID */ function onDisconnect(ws: WebSocket, connectionId: string): void { // 获取连接的连接ID集合 const connectionIds = clients.get(ws); // 从集合中删除连接ID connectionIds.delete(connectionId); // 处理连接对 if (connectionPair.has(connectionId)) { const pair = connectionPair.get(connectionId); // 找到另一个WebSocket实例 const otherSessionWs = pair[0] == ws ? pair[1] : pair[0]; if (otherSessionWs) { // 向另一个连接发送断开连接消息 otherSessionWs.send(JSON.stringify({ type: "disconnect", connectionId: connectionId })); } } // 从连接对映射中删除 connectionPair.delete(connectionId); // 向当前连接发送断开连接消息 ws.send(JSON.stringify({ type: "disconnect", connectionId: connectionId })); } /** * 处理offer信令 * @param ws WebSocket连接实例 * @param message 消息数据 */ function onOffer(ws: WebSocket, message: any): void { // 获取连接ID const connectionId = message.connectionId as string; // 创建新的offer const newOffer = new Offer(message.sdp, Date.now(), false); // 处理私有模式 if (isPrivate) { if (connectionPair.has(connectionId)) { const pair = connectionPair.get(connectionId); // 找到另一个WebSocket实例 const otherSessionWs = pair[0] == ws ? pair[1] : pair[0]; if (otherSessionWs) { // 设置为polite模式 newOffer.polite = true; // 发送offer消息 otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer })); } } return; } // 公共模式:创建新的连接对 connectionPair.set(connectionId, [ws, null]); // 向所有其他客户端广播offer clients.forEach((_v, k) => { if (k == ws) { return; } k.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer })); }); } /** * 处理answer信令 * @param ws WebSocket连接实例 * @param message 消息数据 */ function onAnswer(ws: WebSocket, message: any): void { // 获取连接ID const connectionId = message.connectionId as string; // 获取或创建连接ID集合 const connectionIds = getOrCreateConnectionIds(ws); // 添加连接ID connectionIds.add(connectionId); // 创建新的answer const newAnswer = new Answer(message.sdp, Date.now()); // 检查连接对是否存在 if (!connectionPair.has(connectionId)) { return; } // 获取连接对 const pair = connectionPair.get(connectionId); // 找到另一个WebSocket实例 const otherSessionWs = pair[0] == ws ? pair[1] : pair[0]; // 公共模式:更新连接对 if (!isPrivate) { connectionPair.set(connectionId, [otherSessionWs, ws]); } // 发送answer消息 otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer })); } /** * 处理candidate信令 * @param ws WebSocket连接实例 * @param message 消息数据 */ function onCandidate(ws: WebSocket, message: any): void { // 获取连接ID const connectionId = message.connectionId; // 创建新的candidate const candidate = new Candidate(message.candidate, message.sdpMLineIndex, message.sdpMid, Date.now()); // 处理私有模式 if (isPrivate) { if (connectionPair.has(connectionId)) { const pair = connectionPair.get(connectionId); // 找到另一个WebSocket实例 const otherSessionWs = pair[0] == ws ? pair[1] : pair[0]; if (otherSessionWs) { // 发送candidate消息 otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate })); } } return; } // 公共模式:向所有其他客户端广播candidate clients.forEach((_v, k) => { if (k === ws) { return; } k.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate })); }); } /** * 处理获取连接信息请求 * @param ws WebSocket连接实例 */ function onGetConnections(ws: WebSocket): void { // 收集所有connectionId const allConnectionIds = Array.from(connectionPair.keys()); // 收集所有WebSocket连接信息 const allWebSockets = Array.from(clients.entries()).map(([ws, connectionIds]) => { return { connectionIds: Array.from(connectionIds), // 注意:这里不能直接序列化WebSocket对象,只能返回连接数量或其他信息 connected: true }; }); // 发送连接信息给请求的客户端 ws.send(JSON.stringify({ type: "connections", connectionIds: allConnectionIds, websocketCount: clients.size })); } /** * 处理广播消息请求 * @param ws WebSocket连接实例 * @param message 消息数据 */ /** * 处理广播消息请求 * @param ws WebSocket连接实例 * @param message 消息数据 */ function onBroadcast(ws: WebSocket, message: any): void { const broadcastMessage = message.message; const targetConnectionId = message.targetConnectionId; if (targetConnectionId) { // 向指定连接广播 if (connectionPair.has(targetConnectionId)) { const pair = connectionPair.get(targetConnectionId); // 向连接对中的两个WebSocket实例发送消息 if (pair[0]) { pair[0].send(JSON.stringify({ type: "broadcast", message: broadcastMessage, from: "server" })); } if (pair[1]) { pair[1].send(JSON.stringify({ type: "broadcast", message: broadcastMessage, from: "server" })); } } } else { // 全局广播:向所有客户端发送消息 clients.forEach((_v, k) => { k.send(JSON.stringify({ type: "broadcast", message: broadcastMessage, from: "server" })); }); } } function AddHeartbeat(ws: WebSocket){ // 初始化心跳检测 (ws as any).lastActivity = Date.now(); // 设置心跳检测定时器,每30秒发送一次ping (ws as any).heartbeatTimer = setInterval(() => { const now = Date.now(); // 检查上次活动时间,如果超过60秒没有活动,关闭连接 if (now - (ws as any).lastActivity > 10000) { console.log('WebSocket connection timeout, closing...'); clearInterval((ws as any).heartbeatTimer); ws.close(); } else { // 发送ping消息 ws.send(JSON.stringify({ type: "ping" })); console.log('WebSocket connection heartbeat, lastActivity: ', (ws as any).lastActivity); } }, 3000); } function RemoveHeartbeat(ws: WebSocket){ // 清除心跳检测定时器 if ((ws as any).heartbeatTimer) { clearInterval((ws as any).heartbeatTimer); } } /** * 导出WebSocket处理器函数 */ export { reset, add, remove, onConnect, onDisconnect, onOffer, onAnswer, onCandidate, onGetConnections, onBroadcast, AddHeartbeat, RemoveHeartbeat };