Files
webRtc/WebApp/src/class/websockethandler.ts
2026-03-12 12:10:40 +08:00

387 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* WebSocket处理器
* 负责管理WebSocket连接和信令消息处理
*/
import Offer from './offer';
import Answer from './answer';
import Candidate from './candidate';
/**
* 是否为私有模式
*/
let isPrivate: boolean;
/**
* 客户端连接映射
* 键: WebSocket实例
* 值: 该WebSocket的连接ID集合
*/
const clients: Map<WebSocket, Set<string>> = new Map<WebSocket, Set<string>>();
/**
* 连接对映射
* 键: connectionId
* 值: [WebSocket实例1, WebSocket实例2]
*/
const connectionPair: Map<string, [WebSocket, WebSocket]> = new Map<string, [WebSocket, WebSocket]>();
/**
* 获取或创建WebSocket会话的连接ID集合
* @param session WebSocket会话实例
* @returns 连接ID的Set集合
*/
function getOrCreateConnectionIds(session: WebSocket): Set<string> {
let connectionIds = null;
// 检查客户端是否已存在
if (!clients.has(session)) {
// 如果不存在创建新的连接ID集合
connectionIds = new Set<string>();
// 将新的连接ID集合与客户端关联
clients.set(session, connectionIds);
}
// 获取客户端的连接ID集合
connectionIds = clients.get(session);
// 返回连接ID集合
return connectionIds;
}
/**
* 重置处理器状态
* @param mode 通信模式public或private
*/
function reset(mode: string): void {
// 设置是否为私有模式
isPrivate = mode == "private";
}
/**
* 添加新的WebSocket连接
* @param ws WebSocket连接实例
*/
function add(ws: WebSocket): void {
// 为新连接创建空的连接ID集合
const id = new Set<string>();
clients.set(ws, id);
// 记录添加WebSocket连接的日志
console.log(`Add WebSocket: ${id}`);
}
/**
* 移除WebSocket连接
* @param ws WebSocket连接实例
*/
function remove(ws: WebSocket): void {
// 获取连接的所有连接ID
const connectionIds = clients.get(ws);
// 遍历所有连接ID
connectionIds.forEach(connectionId => {
// 获取连接对
const pair = connectionPair.get(connectionId);
if (pair) {
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
// 向另一个连接发送断开连接消息
otherSessionWs.send(JSON.stringify({ type: "disconnect", connectionId: connectionId }));
}
}
// 从连接对映射中删除
connectionPair.delete(connectionId);
// 记录删除连接ID的日志
console.log(`Remove connectionId: ${connectionId}`);
});
// 从客户端映射中删除
clients.delete(ws);
}
/**
* 处理连接请求
* @param ws WebSocket连接实例
* @param connectionId 连接ID
*/
function onConnect(ws: WebSocket, connectionId: string): void {
let polite = true;
// 处理私有模式
if (isPrivate) {
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
if (pair[0] != null && pair[1] != null) {
// 连接ID已被使用
ws.send(JSON.stringify({ type: "error", message: `${connectionId}: This connection id is already used.` }));
return;
} else if (pair[0] != null) {
// 找到配对连接
connectionPair.set(connectionId, [pair[0], ws]);
}
} else {
// 创建新的连接对
connectionPair.set(connectionId, [ws, null]);
polite = false;
}
}
// 获取或创建连接ID集合
const connectionIds = getOrCreateConnectionIds(ws);
// 添加连接ID
connectionIds.add(connectionId);
// 发送连接成功消息
ws.send(JSON.stringify({ type: "connect", connectionId: connectionId, polite: polite }));
//启用心跳包
//AddHeartbeat(ws, connectionId);
}
/**
* 处理断开连接请求
* @param ws WebSocket连接实例
* @param connectionId 连接ID
*/
function onDisconnect(ws: WebSocket, connectionId: string): void {
// 获取连接的连接ID集合
const connectionIds = clients.get(ws);
// 从集合中删除连接ID
connectionIds.delete(connectionId);
// 处理连接对
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
// 向另一个连接发送断开连接消息
otherSessionWs.send(JSON.stringify({ type: "disconnect", connectionId: connectionId }));
}
}
// 从连接对映射中删除
connectionPair.delete(connectionId);
// 向当前连接发送断开连接消息
ws.send(JSON.stringify({ type: "disconnect", connectionId: connectionId }));
//RemoveHeartbeat(ws);
// 记录断开连接的日志
console.log(`Disconnect connectionId: ${connectionId}`);
}
/**
* 处理offer信令
* @param ws WebSocket连接实例
* @param message 消息数据
*/
function onOffer(ws: WebSocket, message: any): void {
// 获取连接ID
const connectionId = message.connectionId as string;
// 创建新的offer
const newOffer = new Offer(message.sdp, Date.now(), false);
// 处理私有模式
if (isPrivate) {
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
// 设置为polite模式
newOffer.polite = true;
// 发送offer消息
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer }));
}
}
return;
}
// 公共模式:创建新的连接对
connectionPair.set(connectionId, [ws, null]);
// 向所有其他客户端广播offer
clients.forEach((_v, k) => {
if (k == ws) {
return;
}
k.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer }));
});
}
/**
* 处理answer信令
* @param ws WebSocket连接实例
* @param message 消息数据
*/
function onAnswer(ws: WebSocket, message: any): void {
// 获取连接ID
const connectionId = message.connectionId as string;
// 获取或创建连接ID集合
const connectionIds = getOrCreateConnectionIds(ws);
// 添加连接ID
connectionIds.add(connectionId);
// 创建新的answer
const newAnswer = new Answer(message.sdp, Date.now());
// 检查连接对是否存在
if (!connectionPair.has(connectionId)) {
return;
}
// 获取连接对
const pair = connectionPair.get(connectionId);
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
// 公共模式:更新连接对
if (!isPrivate) {
connectionPair.set(connectionId, [otherSessionWs, ws]);
}
// 发送answer消息
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer }));
}
/**
* 处理candidate信令
* @param ws WebSocket连接实例
* @param message 消息数据
*/
function onCandidate(ws: WebSocket, message: any): void {
// 获取连接ID
const connectionId = message.connectionId;
// 创建新的candidate
const candidate = new Candidate(message.candidate, message.sdpMLineIndex, message.sdpMid, Date.now());
// 处理私有模式
if (isPrivate) {
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
// 发送candidate消息
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate }));
}
}
return;
}
}
function onCallConnectionId(ws: WebSocket, message: any): void {
// 获取连接ID
const connectionId = message.connectionId;
const clientId = message.clientId;
clients.forEach((_v, k) => {
if (k === ws) {
return;
}
if (_v == clientId) {
k.send(JSON.stringify({ from: connectionId, to: "", type: "call-request", data: connectionId }));
}
});
}
/**
* 处理广播消息请求
* @param ws WebSocket连接实例
* @param message 消息数据
*/
/**
* 处理广播消息请求
* @param ws WebSocket连接实例
* @param message 消息数据
*/
function onBroadcast(ws: WebSocket, message: any): void {
const broadcastMessage = message.message;
const targetConnectionId = message.targetConnectionId;
if (targetConnectionId) {
// 向指定连接广播
if (connectionPair.has(targetConnectionId)) {
const pair = connectionPair.get(targetConnectionId);
// 向连接对中的两个WebSocket实例发送消息
if (pair[0]) {
pair[0].send(JSON.stringify({
type: "broadcast",
message: broadcastMessage,
from: "server"
}));
}
if (pair[1]) {
pair[1].send(JSON.stringify({
type: "broadcast",
message: broadcastMessage,
from: "server"
}));
}
}
} else {
// 全局广播:向所有客户端发送消息
clients.forEach((_v, k) => {
k.send(JSON.stringify({
type: "broadcast",
message: broadcastMessage,
from: "server"
}));
});
}
}
function AddHeartbeat(ws: WebSocket, connectionId: string) {
// 初始化心跳检测
(ws as any).lastActivity = Date.now();
// 设置心跳检测定时器每30秒发送一次ping
(ws as any).heartbeatTimer = setInterval(() => {
const now = Date.now();
// 检查上次活动时间如果超过60秒没有活动关闭连接
if (now - (ws as any).lastActivity > 10000) {
console.log('WebSocket connection timeout, closing...');
clearInterval((ws as any).heartbeatTimer);
//ws.close();
onDisconnect(ws, connectionId);
} else {
// 发送ping消息
ws.send(JSON.stringify({ type: "ping" }));
console.log('WebSocket connection heartbeat, lastActivity: ', (ws as any).lastActivity);
}
}, 3000);
}
function RemoveHeartbeat(ws: WebSocket) {
// 清除心跳检测定时器
if ((ws as any).heartbeatTimer) {
clearInterval((ws as any).heartbeatTimer);
}
}
/**
* 处理获取所有连接ID的请求
* @param ws WebSocket连接实例
*/
function onGetAllConnectionIds(): string[] {
// 获取所有connectionId
const connectionIds = Array.from(connectionPair.keys());
// 发送连接ID列表给客户端
// ws.send(JSON.stringify({
// type: "connection-ids",
// connectionIds: connectionIds
// }));
return connectionIds;
}
/**
* 处理chat-message信令
* @param ws WebSocket连接实例
* @param message 消息数据
*/
function onMessage(ws: WebSocket, message: any): void {
// 获取连接ID
const connectionId = message.connectionId;
const chatMessage = message.message;
if (connectionPair.has(connectionId)) {
const pair = connectionPair.get(connectionId);
// 找到另一个WebSocket实例
const otherSessionWs = pair[0] == ws ? pair[1] : pair[0];
if (otherSessionWs) {
// 发送chat-message消息
otherSessionWs.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: chatMessage }));
}
}
}
/**
* 导出WebSocket处理器函数
*/
export { reset, add, remove, onConnect, onDisconnect, onOffer, onAnswer, onCandidate, onCallConnectionId, onBroadcast, onGetAllConnectionIds, AddHeartbeat, RemoveHeartbeat ,onMessage};