diff --git a/WebApp/client/public/onebyone/store.js b/WebApp/client/public/onebyone/store.js index 6e193e4..10c7766 100644 --- a/WebApp/client/public/onebyone/store.js +++ b/WebApp/client/public/onebyone/store.js @@ -326,15 +326,15 @@ class CallStateManager { { googTypingNoiseDetection: true } // 启用打字噪声检测 ] }; - this.renderstreaming = new RenderStreaming(signaling, config); // 创建WebRTC连接管理实例 -this.renderstreaming.onNewPeer = (connectionId) => { - console.log(`New peer created for ${connectionId}, adding local tracks`); + this.renderstreaming = new RenderStreaming(signaling, config); + this.renderstreaming.onNewPeer = (participantId) => { + console.log(`New peer created for ${participantId}, adding local tracks`); if (this.state.localStream) { const tracks = this.state.localStream.getTracks(); for (const track of tracks) { - this.renderstreaming.addTransceiver(track, { direction: 'sendonly' }); + this.renderstreaming.addTransceiver(track, { direction: 'sendonly' }, participantId); } - this.setCodecPreferences(); + this.setCodecPreferences(participantId); } }; // 连接建立回调 @@ -375,44 +375,39 @@ this.renderstreaming.onNewPeer = (connectionId) => { }; // participant离开回调(host收到,房间仍然存在) - this.renderstreaming.onParticipantLeft = (connectionId) => { - console.log(`Participant left: ${connectionId}, room still active`); - // 更新远程用户状态,但不关闭房间 + this.renderstreaming.onParticipantLeft = (participantId) => { + console.log(`Participant left: ${participantId}, room still active`); this.updateRemoteUserStatus('offline'); this.updateRemoteUserNetworkQuality('no_signal'); showNotification('对方已离开通话', 'warning'); // 清理该 participant 的远端流 - if (this.state.remoteStreams[connectionId]) { - this.state.remoteStreams[connectionId].getTracks().forEach(track => track.stop()); - delete this.state.remoteStreams[connectionId]; + if (this.state.remoteStreams[participantId]) { + this.state.remoteStreams[participantId].getTracks().forEach(track => track.stop()); + delete this.state.remoteStreams[participantId]; } - // 同时清理单路远端流(兼容) if (this.state.remoteStream) { this.state.remoteStream.getTracks().forEach(track => track.stop()); this.state.remoteStream = null; } - // 通知UI更新 - this.notify({ type: 'PARTICIPANT_LEFT', connectionId: connectionId }); + // 通知UI更新,用participantId作为connectionId传给renderer + this.notify({ type: 'PARTICIPANT_LEFT', connectionId: participantId }); }; // 轨道事件回调 this.renderstreaming.onTrackEvent = (data) => { const direction = data.transceiver.direction; if (direction == "sendrecv" || direction == "recvonly") { - // 获取当前连接的远端流 - const trackConnectionId = this.connectionId; - // Host端: 每个participant有独立的远端流 - // Participant端: 只有一个host的远端流 + // 使用participantId区分不同participant的流 + const trackParticipantId = data.participantId || this.connectionId; const isHost = this.role === 'host'; - // 获取或创建对应的远端流 let targetStream = null; if (isHost) { - // Host端: 按 connectionId 管理多路远端流 - if (!this.state.remoteStreams[trackConnectionId]) { - this.state.remoteStreams[trackConnectionId] = new MediaStream(); + // Host端: 按 participantId 管理多路远端流 + if (!this.state.remoteStreams[trackParticipantId]) { + this.state.remoteStreams[trackParticipantId] = new MediaStream(); } - targetStream = this.state.remoteStreams[trackConnectionId]; + targetStream = this.state.remoteStreams[trackParticipantId]; } else { // Participant端: 使用单一远端流 if (this.state.remoteStream == null) { @@ -423,22 +418,19 @@ this.renderstreaming.onNewPeer = (connectionId) => { // 检查是否已经有相同类型的轨道 const existingTracks = targetStream.getTracks().filter(track => track.kind === data.track.kind); - - // 移除旧的轨道 existingTracks.forEach(track => { targetStream.removeTrack(track); console.log('Removed old track:', track.kind); }); - // 添加新的轨道 targetStream.addTrack(data.track); - console.log('Added new track:', data.track.kind, 'to stream:', trackConnectionId); + console.log('Added new track:', data.track.kind, 'for participant:', trackParticipantId); // 通知UI远程流已更新 this.notify({ type: 'REMOTE_STREAM_OBTAINED', stream: targetStream, - connectionId: trackConnectionId, + connectionId: trackParticipantId, isHost: isHost }); console.log('Notified UI about remote stream update'); @@ -585,29 +577,24 @@ this.renderstreaming.onNewPeer = (connectionId) => { /** * 设置编解码器偏好 */ - setCodecPreferences() { - /** @type {RTCRtpCodecCapability[] | null} */ + setCodecPreferences(participantId) { let selectedCodecs = null; - - // 获取视频编解码器能力 const { codecs } = RTCRtpSender.getCapabilities('video'); if (codecs && codecs.length > 0) { - // 优先选择H.264编解码器 const h264Codec = codecs.find(c => c.mimeType === 'video/H264'); if (h264Codec) { selectedCodecs = [h264Codec]; } } + if (selectedCodecs == null) return; - if (selectedCodecs == null) { - return; - } - - // 获取视频收发器并设置编解码器偏好 if (this.renderstreaming) { - const transceivers = this.renderstreaming.getTransceivers().filter(t => t.receiver.track.kind == "video"); + const transceivers = this.renderstreaming.getTransceivers(participantId); if (transceivers && transceivers.length > 0) { - transceivers.forEach(t => t.setCodecPreferences(selectedCodecs)); + const videoTransceivers = transceivers.filter(t => t.receiver.track.kind == "video"); + if (videoTransceivers && videoTransceivers.length > 0) { + videoTransceivers.forEach(t => t.setCodecPreferences(selectedCodecs)); + } } } } diff --git a/WebApp/client/src/renderstreaming.js b/WebApp/client/src/renderstreaming.js index 4968730..52cbef1 100644 --- a/WebApp/client/src/renderstreaming.js +++ b/WebApp/client/src/renderstreaming.js @@ -5,26 +5,26 @@ function uuid4() { var temp_url = URL.createObjectURL(new Blob()); var uuid = temp_url.toString(); URL.revokeObjectURL(temp_url); - return uuid.split(/[:/]/g).pop().toLowerCase(); // remove prefixes + return uuid.split(/[:/]/g).pop().toLowerCase(); } export class RenderStreaming { - /** - * @param signaling signaling class - * @param {RTCConfiguration} config - */ constructor(signaling, config) { - this._peer = null; + this._peer = null; // participant端:单一peer + this._peers = new Map(); // host端:多peer Map (participantId → Peer) this._connectionId = null; - this.onConnect = function (connectionId) { Logger.log(`Connect peer on ${connectionId}.`); }; + this._participantId = null; // 自己的participantId + this._isHost = false; + this.onConnect = function (connectionId, data) { Logger.log(`Connect peer on ${connectionId}.`); }; this.onDisconnect = function (connectionId) { Logger.log(`Disconnect peer on ${connectionId}.`); }; this.onGotOffer = function (connectionId) { Logger.log(`On got Offer on ${connectionId}.`); }; this.onGotAnswer = function (connectionId) { Logger.log(`On got Answer on ${connectionId}.`); }; this.onTrackEvent = function (data) { Logger.log(`OnTrack event peer with data:${data}`); }; this.onAddChannel = function (data) { Logger.log(`onAddChannel event peer with data:${data}`); }; this.onMessage = function (data) { Logger.log(`On message: ${data}`); }; - this.onParticipantLeft = function (connectionId) { Logger.log(`Participant left on ${connectionId}.`); }; - this.onNewPeer = function (connectionId) { Logger.log(`New peer created for ${connectionId}.`); }; + this.onParticipantLeft = function (participantId) { Logger.log(`Participant left: ${participantId}.`); }; + this.onParticipantJoined = function (participantId) { Logger.log(`Participant joined: ${participantId}.`); }; + this.onNewPeer = function (participantId) { Logger.log(`New peer created for ${participantId}.`); }; this._config = config; this._signaling = signaling; this._signaling.addEventListener('connect', this._onConnect.bind(this)); @@ -34,12 +34,21 @@ export class RenderStreaming { this._signaling.addEventListener('candidate', this._onIceCandidate.bind(this)); this._signaling.addEventListener('on-message', this._onMessage.bind(this)); this._signaling.addEventListener('participant-left', this._onParticipantLeft.bind(this)); + this._signaling.addEventListener('participant-joined', this._onParticipantJoined.bind(this)); } async _onConnect(e) { const data = e.detail; if (this._connectionId == data.connectionId) { - this._preparePeerConnection(this._connectionId, data.polite); + this._participantId = data.participantId; + this._isHost = data.role === 'host'; + + if (!this._isHost) { + // participant端:立即创建单一peer并开始协商 + this._preparePeerConnection(this._connectionId, data.polite, null); + } + // host端:不在connect时创建peer,等participant加入后再创建 + this.onConnect(data.connectionId, data); } } @@ -52,50 +61,91 @@ export class RenderStreaming { this._peer.close(); this._peer = null; } + // 关闭所有host端peers + this._peers.forEach((peer, participantId) => { + peer.close(); + }); + this._peers.clear(); } } async _onOffer(e) { const offer = e.detail; - // 如果已有Peer但ICE连接已断开,需要重建Peer - if (this._peer && this._peer.pc && this._peer.pc.iceConnectionState === 'disconnected') { - Logger.log('ICE disconnected, resetting PeerConnection for new offer'); - this._peer.close(); - this._peer = null; - } - if (!this._peer) { - this._preparePeerConnection(offer.connectionId, offer.polite); - } - const desc = new RTCSessionDescription({ sdp: offer.sdp, type: "offer" }); - try { - await this._peer.onGotDescription(offer.connectionId, desc); - } catch (error) { - Logger.warn(`Error happen on GotDescription that description.\n Message: ${error}\n RTCSdpType:${desc.type}\n sdp:${desc.sdp}`); - return; + const participantId = offer.participantId; + + if (this._isHost) { + // host端:为该participant创建或复用peer + let peer = this._peers.get(participantId); + if (!peer || (peer.pc && peer.pc.iceConnectionState === 'disconnected')) { + if (peer) peer.close(); + peer = this._preparePeerConnection(this._connectionId, offer.polite, participantId); + } + const desc = new RTCSessionDescription({ sdp: offer.sdp, type: "offer" }); + try { + await peer.onGotDescription(this._connectionId, desc); + } catch (error) { + Logger.warn(`Error on GotDescription for participant ${participantId}: ${error}`); + } + } else { + // participant端:使用单一peer + if (this._peer && this._peer.pc && this._peer.pc.iceConnectionState === 'disconnected') { + this._peer.close(); + this._peer = null; + } + if (!this._peer) { + this._preparePeerConnection(offer.connectionId, offer.polite, null); + } + const desc = new RTCSessionDescription({ sdp: offer.sdp, type: "offer" }); + try { + await this._peer.onGotDescription(offer.connectionId, desc); + } catch (error) { + Logger.warn(`Error on GotDescription: ${error}`); + } } } async _onAnswer(e) { const answer = e.detail; + const participantId = answer.participantId; const desc = new RTCSessionDescription({ sdp: answer.sdp, type: "answer" }); - if (this._peer) { + + if (this._isHost && participantId) { + // host端:路由到对应participant的peer + const peer = this._peers.get(participantId); + if (peer) { + try { + await peer.onGotDescription(this._connectionId, desc); + } catch (error) { + Logger.warn(`Error on GotDescription answer for ${participantId}: ${error}`); + } + } + } else if (this._peer) { + // participant端 try { await this._peer.onGotDescription(answer.connectionId, desc); } catch (error) { - Logger.warn(`Error happen on GotDescription that description.\n Message: ${error}\n RTCSdpType:${desc.type}\n sdp:${desc.sdp}`); - return; + Logger.warn(`Error on GotDescription answer: ${error}`); } } } async _onIceCandidate(e) { const candidate = e.detail; + const participantId = candidate.participantId; const iceCandidate = new RTCIceCandidate({ candidate: candidate.candidate, sdpMid: candidate.sdpMid, sdpMLineIndex: candidate.sdpMLineIndex }); - if (this._peer) { + + if (this._isHost && participantId) { + // host端:路由到对应participant的peer + const peer = this._peers.get(participantId); + if (peer) { + await peer.onGotCandidate(this._connectionId, iceCandidate); + } + } else if (this._peer) { + // participant端 await this._peer.onGotCandidate(candidate.connectionId, iceCandidate); } } - // 在 RenderStreaming 类中添加 + async _onMessage(e) { const data = e.detail; this.onMessage(data); @@ -103,13 +153,32 @@ export class RenderStreaming { async _onParticipantLeft(e) { const data = e.detail; - Logger.log(`Participant left: ${data.connectionId}`); - this.onParticipantLeft(data.connectionId); + const participantId = data.participantId; + Logger.log(`Participant left: ${participantId}`); + + // 关闭该participant的peer + if (this._peers.has(participantId)) { + const peer = this._peers.get(participantId); + peer.close(); + this._peers.delete(participantId); + } + + this.onParticipantLeft(participantId); } - /** - * if not set argument, a generated uuid is used. - * @param {string | null} connectionId - */ + + async _onParticipantJoined(e) { + const data = e.detail; + const participantId = data.participantId; + Logger.log(`Participant joined: ${participantId}`); + + // host端:为新participant创建peer + if (this._isHost && !this._peers.has(participantId)) { + this._preparePeerConnection(this._connectionId, false, participantId); + } + + this.onParticipantJoined(participantId); + } + async createConnection(connectionId) { this._connectionId = connectionId ? connectionId : uuid4(); await this._signaling.createConnection(this._connectionId); @@ -118,90 +187,106 @@ export class RenderStreaming { async deleteConnection() { await this._signaling.deleteConnection(this._connectionId); } - // 在 RenderStreaming 类中添加 - _preparePeerConnection(connectionId, polite) { - if (this._peer) { - Logger.log('Close current PeerConnection'); - this._peer.close(); - this._peer = null; + _preparePeerConnection(connectionId, polite, participantId) { + // host端多peer模式:participantId标识目标participant + // participant端单peer模式:participantId为null + + const peer = new Peer(connectionId, polite, this._config); + + // 保存peer + if (participantId) { + if (this._peers.has(participantId)) { + const oldPeer = this._peers.get(participantId); + oldPeer.close(); + } + this._peers.set(participantId, peer); + } else { + if (this._peer) { + this._peer.close(); + } + this._peer = peer; } - // Create peerConnection with proxy server and set up handlers - this._peer = new Peer(connectionId, polite, this._config); - // this._peer.addEventListener('disconnect', () => { - // this.onDisconnect(`Receive disconnect message from peer. connectionId:${connectionId}`); - // }); - this._peer.addEventListener('trackevent', (e) => { + // 事件处理:附加participantId用于路由 + peer.addEventListener('trackevent', (e) => { const data = e.detail; + data.participantId = participantId; this.onTrackEvent(data); }); - this._peer.addEventListener('adddatachannel', (e) => { + + peer.addEventListener('adddatachannel', (e) => { const data = e.detail; this.onAddChannel(data); }); - this._peer.addEventListener('ongotoffer', (e) => { + + peer.addEventListener('ongotoffer', (e) => { const id = e.detail.connectionId; this.onGotOffer(id); }); - this._peer.addEventListener('ongotanswer', (e) => { + + peer.addEventListener('ongotanswer', (e) => { const id = e.detail.connectionId; this.onGotAnswer(id); }); - this._peer.addEventListener('sendoffer', (e) => { + + peer.addEventListener('sendoffer', (e) => { const offer = e.detail; - this._signaling.sendOffer(offer.connectionId, offer.sdp); + this._signaling.sendOffer(offer.connectionId, offer.sdp, participantId); }); - this._peer.addEventListener('sendanswer', (e) => { + + peer.addEventListener('sendanswer', (e) => { const answer = e.detail; - this._signaling.sendAnswer(answer.connectionId, answer.sdp); + this._signaling.sendAnswer(answer.connectionId, answer.sdp, participantId); }); - this._peer.addEventListener('sendcandidate', (e) => { + + peer.addEventListener('sendcandidate', (e) => { const candidate = e.detail; - this._signaling.sendCandidate(candidate.connectionId, candidate.candidate, candidate.sdpMid, candidate.sdpMLineIndex); + this._signaling.sendCandidate(candidate.connectionId, candidate.candidate, candidate.sdpMid, candidate.sdpMLineIndex, participantId); }); - this.onNewPeer(connectionId); - return this._peer; + + this.onNewPeer(participantId || connectionId); + return peer; } - /** - * @returns {Promise | null} - */ - async getStats() { - return await this._peer.getStats(this._connectionId); + async getStats(participantId) { + if (this._isHost && participantId) { + const peer = this._peers.get(participantId); + return peer ? await peer.getStats(this._connectionId) : null; + } + return this._peer ? await this._peer.getStats(this._connectionId) : null; } - /** - * @param {string} label - * @returns {RTCDataChannel | null} - */ - createDataChannel(label) { - return this._peer.createDataChannel(this._connectionId, label); + createDataChannel(label, participantId) { + if (this._isHost && participantId) { + const peer = this._peers.get(participantId); + return peer ? peer.createDataChannel(this._connectionId, label) : null; + } + return this._peer ? this._peer.createDataChannel(this._connectionId, label) : null; } - /** - * @param {MediaStreamTrack} track - * @returns {RTCRtpSender | null} - */ - addTrack(track) { - return this._peer.addTrack(this._connectionId, track); + addTrack(track, participantId) { + if (this._isHost && participantId) { + const peer = this._peers.get(participantId); + return peer ? peer.addTrack(this._connectionId, track) : null; + } + return this._peer ? this._peer.addTrack(this._connectionId, track) : null; } - /** - * @param {MediaStreamTrack | string} trackOrKind - * @param {RTCRtpTransceiverInit | null} init - * @returns {RTCRtpTransceiver | null} - */ - addTransceiver(trackOrKind, init) { - return this._peer.addTransceiver(this._connectionId, trackOrKind, init); + addTransceiver(trackOrKind, init, participantId) { + if (this._isHost && participantId) { + const peer = this._peers.get(participantId); + return peer ? peer.addTransceiver(this._connectionId, trackOrKind, init) : null; + } + return this._peer ? this._peer.addTransceiver(this._connectionId, trackOrKind, init) : null; } - - /** - * @returns {RTCRtpTransceiver[] | null} - */ - getTransceivers() { - return this._peer.getTransceivers(this._connectionId); + getTransceivers(participantId) { + if (this._isHost && participantId) { + const peer = this._peers.get(participantId); + return peer ? peer.getTransceivers(this._connectionId) : null; + } + return this._peer ? this._peer.getTransceivers(this._connectionId) : null; } sendMessage(message) { @@ -209,6 +294,7 @@ export class RenderStreaming { this._signaling.sendMessage(this._connectionId, message); } } + async start() { await this._signaling.start(); } @@ -218,10 +304,14 @@ export class RenderStreaming { this._peer.close(); this._peer = null; } + this._peers.forEach((peer) => { + peer.close(); + }); + this._peers.clear(); if (this._signaling) { await this._signaling.stop(); this._signaling = null; } } -} +} \ No newline at end of file diff --git a/WebApp/client/src/signaling.js b/WebApp/client/src/signaling.js index 9a5ed22..5127b30 100644 --- a/WebApp/client/src/signaling.js +++ b/WebApp/client/src/signaling.js @@ -182,13 +182,13 @@ export class WebSocketSignaling extends EventTarget { this.dispatchEvent(new CustomEvent('disconnect', { detail: msg })); break; case "offer": - this.dispatchEvent(new CustomEvent('offer', { detail: { connectionId: msg.from, sdp: msg.data.sdp, polite: msg.data.polite } })); + this.dispatchEvent(new CustomEvent('offer', { detail: { connectionId: msg.from, sdp: msg.data.sdp, polite: msg.data.polite, participantId: msg.participantId } })); break; case "answer": - this.dispatchEvent(new CustomEvent('answer', { detail: { connectionId: msg.from, sdp: msg.data.sdp } })); + this.dispatchEvent(new CustomEvent('answer', { detail: { connectionId: msg.from, sdp: msg.data.sdp, participantId: msg.participantId } })); break; case "candidate": - this.dispatchEvent(new CustomEvent('candidate', { detail: { connectionId: msg.from, candidate: msg.data.candidate, sdpMLineIndex: msg.data.sdpMLineIndex, sdpMid: msg.data.sdpMid } })); + this.dispatchEvent(new CustomEvent('candidate', { detail: { connectionId: msg.from, candidate: msg.data.candidate, sdpMLineIndex: msg.data.sdpMLineIndex, sdpMid: msg.data.sdpMid, participantId: msg.participantId } })); break; case "on-message": this.dispatchEvent(new CustomEvent('on-message', { detail: msg.data })); @@ -196,8 +196,10 @@ export class WebSocketSignaling extends EventTarget { case "participant-left": this.dispatchEvent(new CustomEvent('participant-left', { detail: msg })); break; + case "participant-joined": + this.dispatchEvent(new CustomEvent('participant-joined', { detail: msg })); + break; case "broadcast": - // 处理服务器广播的消息 this.dispatchEvent(new CustomEvent('on-message', { detail: msg.message })); break; default: @@ -231,28 +233,28 @@ export class WebSocketSignaling extends EventTarget { this.websocket.send(sendJson); } - sendOffer(connectionId, sdp) { + sendOffer(connectionId, sdp, participantId) { const data = { 'sdp': sdp, 'connectionId': connectionId }; - const sendJson = JSON.stringify({ type: "offer", from: connectionId, data: data }); + const sendJson = JSON.stringify({ type: "offer", from: connectionId, data: data, participantId: participantId || '' }); Logger.log(sendJson); this.websocket.send(sendJson); } - sendAnswer(connectionId, sdp) { + sendAnswer(connectionId, sdp, participantId) { const data = { 'sdp': sdp, 'connectionId': connectionId }; - const sendJson = JSON.stringify({ type: "answer", from: connectionId, data: data }); + const sendJson = JSON.stringify({ type: "answer", from: connectionId, data: data, participantId: participantId || '' }); Logger.log(sendJson); this.websocket.send(sendJson); } - sendCandidate(connectionId, candidate, sdpMLineIndex, sdpMid) { + sendCandidate(connectionId, candidate, sdpMLineIndex, sdpMid, participantId) { const data = { 'candidate': candidate, 'sdpMLineIndex': sdpMLineIndex, 'sdpMid': sdpMid, 'connectionId': connectionId }; - const sendJson = JSON.stringify({ type: "candidate", from: connectionId, data: data }); + const sendJson = JSON.stringify({ type: "candidate", from: connectionId, data: data, participantId: participantId || '' }); Logger.log(sendJson); this.websocket.send(sendJson); } diff --git a/WebApp/src/class/websockethandler.ts b/WebApp/src/class/websockethandler.ts index 64e18a7..e126dba 100644 --- a/WebApp/src/class/websockethandler.ts +++ b/WebApp/src/class/websockethandler.ts @@ -112,32 +112,26 @@ function broadcastToGroup(connectionId: string, senderWs: WebSocket, message: an * @param ws WebSocket连接实例 */ function remove(ws: WebSocket): void { - // 获取连接的所有连接ID const connectionIds = clients.get(ws); if (!connectionIds) return; - // 遍历所有连接ID connectionIds.forEach(connectionId => { const group = connectionGroup.get(connectionId); if (group) { if (group.host === ws) { - // host断开连接,通知所有participants房间已关闭 group.participants.forEach(participantWs => { participantWs.send(JSON.stringify({ type: "disconnect", connectionId: connectionId, reason: "host-left" })); }); - // 删除整个连接组 connectionGroup.delete(connectionId); } else { - // participant断开连接,从participants中移除并通知host group.participants.delete(ws); - group.host.send(JSON.stringify({ type: "participant-left", connectionId: connectionId })); + // 包含participantId,让host能识别是哪个participant离开 + group.host.send(JSON.stringify({ type: "participant-left", connectionId: connectionId, participantId: (ws as any).participantId })); } } - // 记录删除连接ID的日志 console.log(`Remove connectionId: ${connectionId}`); }); - // 从客户端映射中删除 clients.delete(ws); } @@ -149,30 +143,27 @@ function remove(ws: WebSocket): void { */ function onConnect(ws: WebSocket, connectionId: string): void { let polite = true; - // 处理私有模式 + // 为每个WebSocket生成唯一的participantId + const participantId = (ws as any).participantId = (ws as any).participantId || `p_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; + if (isPrivate) { if (connectionGroup.has(connectionId)) { const group = connectionGroup.get(connectionId); - // 已有host,新连接作为participant加入 group.participants.add(ws); - console.log(`Participant joined connectionId: ${connectionId}, total participants: ${group.participants.size}`); + console.log(`Participant ${participantId} joined connectionId: ${connectionId}, total participants: ${group.participants.size}`); + // 通知host有新participant加入 + group.host.send(JSON.stringify({ type: "participant-joined", connectionId: connectionId, participantId: participantId })); } else { - // 第一个连接成为host connectionGroup.set(connectionId, { host: ws, participants: new Set() }); polite = false; console.log(`Host created connectionId: ${connectionId}`); } } - // 获取或创建连接ID集合 const connectionIds = getOrCreateConnectionIds(ws); - // 添加连接ID connectionIds.add(connectionId); - // 发送连接成功消息(包含角色信息) const role = polite ? 'participant' : 'host'; - ws.send(JSON.stringify({ type: "connect", connectionId: connectionId, polite: polite, role: role })); - //启用心跳包 - //AddHeartbeat(ws, connectionId); + ws.send(JSON.stringify({ type: "connect", connectionId: connectionId, polite: polite, role: role, participantId: participantId })); } /** @@ -220,25 +211,35 @@ function onDisconnect(ws: WebSocket, connectionId: string): void { * @param message 消息数据 */ function onOffer(ws: WebSocket, message: any): void { - // 获取连接ID const connectionId = message.connectionId as string; - // 创建新的offer const newOffer = new Offer(message.sdp, Date.now(), false); - // 处理私有模式 if (isPrivate) { if (connectionGroup.has(connectionId)) { const group = connectionGroup.get(connectionId); + const senderParticipantId = (ws as any).participantId; + const targetParticipantId = message.participantId; if (group.host === ws) { - // host发送offer,转发给所有participants + // host发送offer给特定participant(多peer模式下按participantId路由) newOffer.polite = true; - group.participants.forEach(participantWs => { - participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer })); - }); + if (targetParticipantId) { + // 路由到指定participant + group.participants.forEach(participantWs => { + if ((participantWs as any).participantId === targetParticipantId) { + participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer, participantId: targetParticipantId })); + } + }); + } else { + // 兼容:无目标时广播给所有participants + group.participants.forEach(participantWs => { + const pid = (participantWs as any).participantId; + participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer, participantId: pid })); + }); + } } else { - // participant发送offer,转发给host + // participant发送offer给host,携带该participant的participantId newOffer.polite = true; - group.host.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer })); + group.host.send(JSON.stringify({ from: connectionId, to: "", type: "offer", data: newOffer, participantId: senderParticipantId })); } } return; @@ -264,30 +265,37 @@ function onOffer(ws: WebSocket, message: any): void { * @param message 消息数据 */ function onAnswer(ws: WebSocket, message: any): void { - // 获取连接ID const connectionId = message.connectionId as string; - // 获取或创建连接ID集合 const connectionIds = getOrCreateConnectionIds(ws); - // 添加连接ID connectionIds.add(connectionId); - // 创建新的answer const newAnswer = new Answer(message.sdp, Date.now()); - // 检查连接组是否存在 if (!connectionGroup.has(connectionId)) { return; } const group = connectionGroup.get(connectionId); + const senderParticipantId = (ws as any).participantId; + // 从answer消息中获取目标participantId(host回复时指定) + const targetParticipantId = message.participantId; if (group.host === ws) { - // host发送answer,转发给所有participants(通常host不发送answer) - group.participants.forEach(participantWs => { - participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer })); - }); + // host发送answer给特定participant + if (targetParticipantId) { + group.participants.forEach(participantWs => { + if ((participantWs as any).participantId === targetParticipantId) { + participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer, participantId: targetParticipantId })); + } + }); + } else { + // 兼容:没有targetParticipantId时广播给所有participants + group.participants.forEach(participantWs => { + participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer })); + }); + } } else { - // participant发送answer,转发给host - group.host.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer })); + // participant发送answer给host,携带自己的participantId + group.host.send(JSON.stringify({ from: connectionId, to: "", type: "answer", data: newAnswer, participantId: senderParticipantId })); } } @@ -298,150 +306,163 @@ function onAnswer(ws: WebSocket, message: any): void { * @param message 消息数据 */ function onCandidate(ws: WebSocket, message: any): void { - // 获取连接ID const connectionId = message.connectionId; - // 创建新的candidate const candidate = new Candidate(message.candidate, message.sdpMLineIndex, message.sdpMid, Date.now()); + const senderParticipantId = (ws as any).participantId; + const targetParticipantId = message.participantId; - // 处理私有模式 if (isPrivate) { if (connectionGroup.has(connectionId)) { const group = connectionGroup.get(connectionId); if (group.host === ws) { - // host发送candidate,转发给所有participants - group.participants.forEach(participantWs => { - participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate })); - }); + // host发送candidate给特定participant + if (targetParticipantId) { + group.participants.forEach(participantWs => { + if ((participantWs as any).participantId === targetParticipantId) { + participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate, participantId: targetParticipantId })); + } + }); + } else { + group.participants.forEach(participantWs => { + participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate })); + }); + } } else { - // participant发送candidate,转发给host - group.host.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate })); + // participant发送candidate给host,携带自己的participantId + group.host.send(JSON.stringify({ from: connectionId, to: "", type: "candidate", data: candidate, participantId: senderParticipantId })); } } return; } } - function onCallConnectionId(ws: WebSocket, message: any): void { - // 获取连接ID - const connectionId = message.connectionId; - const clientId = message.clientId; - // 在1对多模式下,通知host有新的呼叫请求 - if (connectionGroup.has(connectionId)) { - const group = connectionGroup.get(connectionId); - if (group.host !== ws) { - // participant发起呼叫,通知host - group.host.send(JSON.stringify({ from: connectionId, to: "", type: "call-request", data: connectionId })); + +function onCallConnectionId(ws: WebSocket, message: any): void { + // 获取连接ID + const connectionId = message.connectionId; + const clientId = message.clientId; + // 在1对多模式下,通知host有新的呼叫请求 + if (connectionGroup.has(connectionId)) { + const group = connectionGroup.get(connectionId); + if (group.host !== ws) { + // participant发起呼叫,通知host + group.host.send(JSON.stringify({ from: connectionId, to: "", type: "call-request", data: connectionId })); + } + } else { + // 兼容旧的广播方式 + clients.forEach((_v, k) => { + if (k === ws) { + return; } - } else { - // 兼容旧的广播方式 - clients.forEach((_v, k) => { - if (k === ws) { - return; - } - if (_v == clientId) { - k.send(JSON.stringify({ from: connectionId, to: "", type: "call-request", data: connectionId })); - } + if (_v == clientId) { + k.send(JSON.stringify({ from: connectionId, to: "", type: "call-request", data: connectionId })); + } + }); + } +} + + +/** + * 处理广播消息请求(1对多模式) + * @param ws WebSocket连接实例 + * @param message 消息数据 + */ +function onBroadcast(ws: WebSocket, message: any): void { + const broadcastMessage = message.message; + const targetConnectionId = message.targetConnectionId; + + if (targetConnectionId) { + // 向指定连接组广播 + if (connectionGroup.has(targetConnectionId)) { + const group = connectionGroup.get(targetConnectionId); + // 向组内所有成员发送消息 + group.host.send(JSON.stringify({ + type: "broadcast", + message: broadcastMessage, + from: "server" + })); + group.participants.forEach(participantWs => { + participantWs.send(JSON.stringify({ + type: "broadcast", + message: broadcastMessage, + from: "server" + })); }); } + } else { + // 全局广播:向所有客户端发送消息 + clients.forEach((_v, k) => { + k.send(JSON.stringify({ + type: "broadcast", + message: broadcastMessage, + from: "server" + })); + }); } +} +function AddHeartbeat(ws: WebSocket, connectionId: string) { + // 初始化心跳检测 + (ws as any).lastActivity = Date.now(); - /** - * 处理广播消息请求(1对多模式) - * @param ws WebSocket连接实例 - * @param message 消息数据 - */ - function onBroadcast(ws: WebSocket, message: any): void { - const broadcastMessage = message.message; - const targetConnectionId = message.targetConnectionId; + // 设置心跳检测定时器,每30秒发送一次ping + (ws as any).heartbeatTimer = setInterval(() => { + const now = Date.now(); + // 检查上次活动时间,如果超过60秒没有活动,关闭连接 + if (now - (ws as any).lastActivity > 10000) { + console.log('WebSocket connection timeout, closing...'); + clearInterval((ws as any).heartbeatTimer); + //ws.close(); + onDisconnect(ws, connectionId); + } else { + // 发送ping消息 + ws.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: { type: "ping"} })); + console.log('WebSocket connection heartbeat, lastActivity: ', (ws as any).lastActivity); + } + }, 3000); +} - if (targetConnectionId) { - // 向指定连接组广播 - if (connectionGroup.has(targetConnectionId)) { - const group = connectionGroup.get(targetConnectionId); - // 向组内所有成员发送消息 - group.host.send(JSON.stringify({ - type: "broadcast", - message: broadcastMessage, - from: "server" - })); - group.participants.forEach(participantWs => { - participantWs.send(JSON.stringify({ - type: "broadcast", - message: broadcastMessage, - from: "server" - })); - }); - } - } else { - // 全局广播:向所有客户端发送消息 - clients.forEach((_v, k) => { - k.send(JSON.stringify({ - type: "broadcast", - message: broadcastMessage, - from: "server" - })); - }); - } - } - function AddHeartbeat(ws: WebSocket, connectionId: string) { - // 初始化心跳检测 - (ws as any).lastActivity = Date.now(); +function RemoveHeartbeat(ws: WebSocket) { + // 清除心跳检测定时器 + if ((ws as any).heartbeatTimer) { + clearInterval((ws as any).heartbeatTimer); + } +} - // 设置心跳检测定时器,每30秒发送一次ping - (ws as any).heartbeatTimer = setInterval(() => { - const now = Date.now(); - // 检查上次活动时间,如果超过60秒没有活动,关闭连接 - if (now - (ws as any).lastActivity > 10000) { - console.log('WebSocket connection timeout, closing...'); - clearInterval((ws as any).heartbeatTimer); - //ws.close(); - onDisconnect(ws, connectionId); - } else { - // 发送ping消息 - ws.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: { type: "ping"} })); - console.log('WebSocket connection heartbeat, lastActivity: ', (ws as any).lastActivity); - } - }, 3000); +/** + * 处理获取所有连接ID的请求 + * @param ws WebSocket连接实例 + */ +function onGetAllConnectionIds(): string[] { + // 获取所有connectionId + const connectionIds = Array.from(connectionGroup.keys()); + return connectionIds; +} + +/** + * 处理chat-message信令(1对多模式) + * host的消息转发给所有participants,participant的消息转发给host + * @param ws WebSocket连接实例 + * @param message 消息数据 + */ +function onMessage(ws: WebSocket, message: any): void { + // 获取连接ID + const connectionId = message.connectionId; + const chatMessage = message.message; + if (connectionGroup.has(connectionId)) { + const group = connectionGroup.get(connectionId); + if (group.host === ws) { + // host发送消息,转发给所有participants + group.participants.forEach(participantWs => { + participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: chatMessage })); + }); + } else { + // participant发送消息,转发给host + group.host.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: chatMessage })); } - function RemoveHeartbeat(ws: WebSocket) { - // 清除心跳检测定时器 - if ((ws as any).heartbeatTimer) { - clearInterval((ws as any).heartbeatTimer); - } - } - /** - * 处理获取所有连接ID的请求 - * @param ws WebSocket连接实例 - */ - function onGetAllConnectionIds(): string[] { - // 获取所有connectionId - const connectionIds = Array.from(connectionGroup.keys()); - return connectionIds; - } - /** - * 处理chat-message信令(1对多模式) - * host的消息转发给所有participants,participant的消息转发给host - * @param ws WebSocket连接实例 - * @param message 消息数据 - */ - function onMessage(ws: WebSocket, message: any): void { - // 获取连接ID - const connectionId = message.connectionId; - const chatMessage = message.message; - if (connectionGroup.has(connectionId)) { - const group = connectionGroup.get(connectionId); - if (group.host === ws) { - // host发送消息,转发给所有participants - group.participants.forEach(participantWs => { - participantWs.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: chatMessage })); - }); - } else { - // participant发送消息,转发给host - group.host.send(JSON.stringify({ from: connectionId, to: "", type: "on-message", data: chatMessage })); - } - } - } - /** - * 导出WebSocket处理器函数 - */ - export { reset, add, remove, onConnect, onDisconnect, onOffer, onAnswer, onCandidate, onCallConnectionId, onBroadcast, onGetAllConnectionIds, AddHeartbeat, RemoveHeartbeat, onMessage, isHost, broadcastToGroup, connectionGroup }; + } +} + +/** + * 导出WebSocket处理器函数 + */ +export { reset, add, remove, onConnect, onDisconnect, onOffer, onAnswer, onCandidate, onCallConnectionId, onBroadcast, onGetAllConnectionIds, AddHeartbeat, RemoveHeartbeat, onMessage, isHost, broadcastToGroup, connectionGroup };