This commit is contained in:
zhangzheng
2026-02-28 18:17:08 +08:00
parent 1bb1fee5cc
commit 50566b700c
7 changed files with 658 additions and 135 deletions

View File

@@ -1,104 +1,191 @@
/**
* WebSocket处理器
* 负责管理WebSocket连接和信令消息处理
*/
import Offer from './offer';
import Answer from './answer';
import Candidate from './candidate';
/**
* 是否为私有模式
*/
let isPrivate: boolean;
// [{sessonId:[connectionId,...]}]
/**
* 客户端连接映射
* 键: WebSocket实例
* 值: 该WebSocket的连接ID集合
*/
const clients: Map<WebSocket, Set<string>> = new Map<WebSocket, Set<string>>();
// [{connectionId:[sessionId1, sessionId2]}]
/**
* 连接对映射
* 键: connectionId
* 值: [WebSocket实例1, WebSocket实例2]
*/
const connectionPair: Map<string, [WebSocket, WebSocket]> = new Map<string, [WebSocket, WebSocket]>();
/**
* 获取或创建WebSocket会话的连接ID集合
* @param session WebSocket会话实例
* @returns 连接ID的Set集合
*/
function getOrCreateConnectionIds(session: WebSocket): Set<string> {
let connectionIds = null;
// 检查客户端是否已存在
if (!clients.has(session)) {
// 如果不存在创建新的连接ID集合
connectionIds = new Set<string>();
// 将新的连接ID集合与客户端关联
clients.set(session, connectionIds);
}
// 获取客户端的连接ID集合
connectionIds = clients.get(session);
// 返回连接ID集合
return connectionIds;
}
/**
* 重置处理器状态
* @param mode 通信模式public或private
*/
function reset(mode: string): void {
// 设置是否为私有模式
isPrivate = mode == "private";
}
/**
* 添加新的WebSocket连接
* @param ws WebSocket连接实例
*/
function add(ws: WebSocket): void {
// 为新连接创建空的连接ID集合
clients.set(ws, new Set<string>());
// 记录添加WebSocket连接的日志
console.log(`Add WebSocket: ${ws}`);
}
/**
* 移除WebSocket连接
* @param ws WebSocket连接实例
*/
function remove(ws: WebSocket): void {
// 获取连接的所有连接ID
const connectionIds = clients.get(ws);
// 遍历所有连接ID
connectionIds.forEach(connectionId => {
// 获取连接对
const pair = connectionPair.get(connectionId);
if (pair) {
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
// 向另一个连接发送断开连接消息
otherSessionWs.send(JSON.stringify({ type: "disconnect", connectionId: connectionId }));
}
}
// 从连接对映射中删除
connectionPair.delete(connectionId);
// 记录删除连接ID的日志
console.log(`Remove connectionId: ${connectionId}`);
});
// 从客户端映射中删除
clients.delete(ws);
}
/**
* 处理连接请求
* @param ws WebSocket连接实例
* @param connectionId 连接ID
*/
function onConnect(ws: WebSocket, connectionId: string): void {
let polite = true;
// 处理私有模式
if (isPrivate) {
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
if (pair[0] != null && pair[1] != null) {
// 连接ID已被使用
ws.send(JSON.stringify({ type: "error", message: `${connectionId}: This connection id is already used.` }));
return;
} else if (pair[0] != null) {
// 找到配对连接
connectionPair.set(connectionId, [pair[0], ws]);
}
} else {
// 创建新的连接对
connectionPair.set(connectionId, [ws, null]);
polite = false;
}
}
// 获取或创建连接ID集合
const connectionIds = getOrCreateConnectionIds(ws);
// 添加连接ID
connectionIds.add(connectionId);
// 发送连接成功消息
ws.send(JSON.stringify({ type: "connect", connectionId: connectionId, polite: polite }));
}
/**
* 处理断开连接请求
* @param ws WebSocket连接实例
* @param connectionId 连接ID
*/
function onDisconnect(ws: WebSocket, connectionId: string): void {
// 获取连接的连接ID集合
const connectionIds = clients.get(ws);
// 从集合中删除连接ID
connectionIds.delete(connectionId);
// 处理连接对
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
// 向另一个连接发送断开连接消息
otherSessionWs.send(JSON.stringify({ type: "disconnect", connectionId: connectionId }));
}
}
// 从连接对映射中删除
connectionPair.delete(connectionId);
// 向当前连接发送断开连接消息
ws.send(JSON.stringify({ type: "disconnect", connectionId: connectionId }));
}
/**
* 处理offer信令
* @param ws WebSocket连接实例
* @param message 消息数据
*/
function onOffer(ws: WebSocket, message: any): void {
// 获取连接ID
const connectionId = message.connectionId as string;
// 创建新的offer
const newOffer = new Offer(message.sdp, Date.now(), false);
// 处理私有模式
if (isPrivate) {
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
// 设置为polite模式
newOffer.polite = true;
// 发送offer消息
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer }));
}
}
return;
}
// 公共模式:创建新的连接对
connectionPair.set(connectionId, [ws, null]);
// 向所有其他客户端广播offer
clients.forEach((_v, k) => {
if (k == ws) {
return;
@@ -107,41 +194,66 @@ function onOffer(ws: WebSocket, message: any): void {
});
}
/**
* 处理answer信令
* @param ws WebSocket连接实例
* @param message 消息数据
*/
function onAnswer(ws: WebSocket, message: any): void {
// 获取连接ID
const connectionId = message.connectionId as string;
// 获取或创建连接ID集合
const connectionIds = getOrCreateConnectionIds(ws);
// 添加连接ID
connectionIds.add(connectionId);
// 创建新的answer
const newAnswer = new Answer(message.sdp, Date.now());
// 检查连接对是否存在
if (!connectionPair.has(connectionId)) {
return;
}
// 获取连接对
const pair = connectionPair.get(connectionId);
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
// 公共模式:更新连接对
if (!isPrivate) {
connectionPair.set(connectionId, [otherSessionWs, ws]);
}
// 发送answer消息
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer }));
}
/**
* 处理candidate信令
* @param ws WebSocket连接实例
* @param message 消息数据
*/
function onCandidate(ws: WebSocket, message: any): void {
// 获取连接ID
const connectionId = message.connectionId;
// 创建新的candidate
const candidate = new Candidate(message.candidate, message.sdpMLineIndex, message.sdpMid, Date.now());
// 处理私有模式
if (isPrivate) {
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
// 发送candidate消息
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate }));
}
}
return;
}
// 公共模式向所有其他客户端广播candidate
clients.forEach((_v, k) => {
if (k === ws) {
return;
@@ -150,4 +262,102 @@ function onCandidate(ws: WebSocket, message: any): void {
});
}
export { reset, add, remove, onConnect, onDisconnect, onOffer, onAnswer, onCandidate };
/**
* 处理获取连接信息请求
* @param ws WebSocket连接实例
*/
function onGetConnections(ws: WebSocket): void {
// 收集所有connectionId
const allConnectionIds = Array.from(connectionPair.keys());
// 收集所有WebSocket连接信息
const allWebSockets = Array.from(clients.entries()).map(([ws, connectionIds]) => {
return {
connectionIds: Array.from(connectionIds),
// 注意这里不能直接序列化WebSocket对象只能返回连接数量或其他信息
connected: true
};
});
// 发送连接信息给请求的客户端
ws.send(JSON.stringify({
type: "connections",
connectionIds: allConnectionIds,
websocketCount: clients.size
}));
}
/**
* 处理广播消息请求
* @param ws WebSocket连接实例
* @param message 消息数据
*/
/**
* 处理广播消息请求
* @param ws WebSocket连接实例
* @param message 消息数据
*/
function onBroadcast(ws: WebSocket, message: any): void {
const broadcastMessage = message.message;
const targetConnectionId = message.targetConnectionId;
if (targetConnectionId) {
// 向指定连接广播
if (connectionPair.has(targetConnectionId)) {
const pair = connectionPair.get(targetConnectionId);
// 向连接对中的两个WebSocket实例发送消息
if (pair[0]) {
pair[0].send(JSON.stringify({
type: "broadcast",
message: broadcastMessage,
from: "server"
}));
}
if (pair[1]) {
pair[1].send(JSON.stringify({
type: "broadcast",
message: broadcastMessage,
from: "server"
}));
}
}
} else {
// 全局广播:向所有客户端发送消息
clients.forEach((_v, k) => {
k.send(JSON.stringify({
type: "broadcast",
message: broadcastMessage,
from: "server"
}));
});
}
}
function AddHeartbeat(ws: WebSocket){
// 初始化心跳检测
(ws as any).lastActivity = Date.now();
// 设置心跳检测定时器每30秒发送一次ping
(ws as any).heartbeatTimer = setInterval(() => {
const now = Date.now();
// 检查上次活动时间如果超过60秒没有活动关闭连接
if (now - (ws as any).lastActivity > 10000) {
console.log('WebSocket connection timeout, closing...');
clearInterval((ws as any).heartbeatTimer);
//ws.close();
} else {
// 发送ping消息
ws.send(JSON.stringify({ type: "ping" }));
console.log('WebSocket connection heartbeat, lastActivity: ', (ws as any).lastActivity);
}
}, 3000);
}
function RemoveHeartbeat(ws: WebSocket){
// 清除心跳检测定时器
if ((ws as any).heartbeatTimer) {
clearInterval((ws as any).heartbeatTimer);
}
}
/**
* 导出WebSocket处理器函数
*/
export { reset, add, remove, onConnect, onDisconnect, onOffer, onAnswer, onCandidate, onGetConnections, onBroadcast, AddHeartbeat, RemoveHeartbeat };

View File

@@ -6,53 +6,109 @@ export default class WSSignaling {
server: Server;
wss: websocket.Server;
/**
* 构造函数初始化WebSocket信令服务器
* @param server HTTP服务器实例
* @param mode 通信模式public或private
*/
constructor(server: Server, mode: string) {
// 保存服务器实例
this.server = server;
// 创建WebSocket服务器
this.wss = new websocket.Server({ server });
// 重置处理器,设置通信模式
handler.reset(mode);
/**
* 监听WebSocket连接事件
* @param ws WebSocket连接实例
*/
this.wss.on('connection', (ws: WebSocket) => {
// 添加新的WebSocket连接到处理器
handler.add(ws);
handler.AddHeartbeat(ws);
/**
* 监听连接关闭事件
*/
ws.onclose = (): void => {
// 从处理器中移除关闭的连接
handler.remove(ws);
handler.RemoveHeartbeat(ws);
};
/**
* 监听消息事件
* @param event 消息事件对象
*/
ws.onmessage = (event: MessageEvent): void => {
// 消息类型说明:
// 1. connect, disconnect 消息格式:
// { type: "connect", connectionId: "连接ID" }
// { type: "disconnect", connectionId: "连接ID" }
// 2. offer, answer, candidate 消息格式:
// {
// type: "offer",
// data: {
// from: "发送方连接ID",
// to: "接收方连接ID",
// data: "信令数据"
// }
// }
// 3. broadcast 消息格式:
// {
// type: "broadcast",
// message: "广播消息内容",
// targetConnectionId: "目标连接ID可选"
// }
// type: connect, disconnect JSON Schema
// connectionId: connect or disconnect connectionId
// type: offer, answer, candidate JSON Schema
// from: from connection id
// to: to connection id
// data: any message data structure
// 解析消息数据
const msg = JSON.parse(event.data);
// 检查消息是否有效
if (!msg || !this) {
return;
}
// 打印接收到的消息
console.log(msg);
// 根据消息类型处理
switch (msg.type) {
case "connect":
// 处理连接请求
handler.onConnect(ws, msg.connectionId);
break;
case "disconnect":
// 处理断开连接请求
handler.onDisconnect(ws, msg.connectionId);
break;
case "offer":
// 处理offer信令
handler.onOffer(ws, msg.data);
break;
case "answer":
// 处理answer信令
handler.onAnswer(ws, msg.data);
break;
case "candidate":
// 处理candidate信令
handler.onCandidate(ws, msg.data);
break;
case "ping":
// 处理心跳请求回复pong
ws.send(JSON.stringify({ type: "pong" }));
break;
case "pong":
// 处理心跳响应,更新最后活动时间
(ws as any).lastActivity = Date.now();
break;
case "broadcast":
handler.onBroadcast(ws, msg.data);
break;
case "onGetConnections":
handler.onGetConnections(ws);
break;
default:
// 忽略未知消息类型
break;
}
};