diff --git a/client/src/core/signaling.js b/client/src/core/signaling.js index eb3fe34..69df4b6 100644 --- a/client/src/core/signaling.js +++ b/client/src/core/signaling.js @@ -1,5 +1,41 @@ import * as Logger from "../utils/logger.js"; +const RECORDING_SIGNAL_EVENTS = [ + 'recording-started', + 'recording-peer-request', + 'recording-stopped', + 'recording-status', + 'recording-answer', + 'recording-candidate' +]; + +function parseOnMessageData(data) { + if (typeof data !== 'string') { + return data; + } + + try { + return JSON.parse(data); + } catch(e) { + Logger.error(`Signaling: on-message, error: ${e}`); + return data; + } +} + +function dispatchOnMessageEvent(target, data, participantId) { + const parsed = parseOnMessageData(data); + if (participantId && parsed && typeof parsed === 'object') { + parsed.participantId = participantId; + } + target.dispatchEvent(new CustomEvent('on-message', { detail: parsed })); + if (parsed && typeof parsed.type === 'string' && RECORDING_SIGNAL_EVENTS.indexOf(parsed.type) !== -1) { + const detail = parsed.data && typeof parsed.data === 'object' + ? { type: parsed.type, ...parsed.data } + : parsed; + target.dispatchEvent(new CustomEvent(parsed.type, { detail })); + } +} + export class Signaling extends EventTarget { constructor(interval = 1000, baseUrl = null) { @@ -73,15 +109,7 @@ export class Signaling extends EventTarget { this.dispatchEvent(new CustomEvent('candidate', { detail: msg })); break; case "on-message": - { - let parsed = msg.data; - if (typeof msg.data === 'string') { - try { parsed = JSON.parse(msg.data); } catch(e) { - Logger.error(`Signaling: on-message, error: ${e}`); - } - } - this.dispatchEvent(new CustomEvent('on-message', { detail: parsed })); - } + dispatchOnMessageEvent(this, msg.data, msg.participantId); break; default: break; @@ -201,18 +229,7 @@ export class WebSocketSignaling extends EventTarget { this.dispatchEvent(new CustomEvent('candidate', { detail: { connectionId: msg.from, candidate: msg.data.candidate, sdpMLineIndex: msg.data.sdpMLineIndex, sdpMid: msg.data.sdpMid, participantId: msg.participantId } })); break; case "on-message": - { - let parsed = msg.data; - if (typeof msg.data === 'string') { - try { parsed = JSON.parse(msg.data); } catch(e) { - Logger.error(`Signaling: on-message, error: ${e}`); - } - } - if (msg.participantId) { - parsed.participantId = msg.participantId; - } - this.dispatchEvent(new CustomEvent('on-message', { detail: parsed })); - } + dispatchOnMessageEvent(this, msg.data, msg.participantId); break; case "participant-left": this.dispatchEvent(new CustomEvent('participant-left', { detail: msg })); @@ -346,20 +363,23 @@ export class WebSocketSignaling extends EventTarget { } sendRecordingOffer(payload) { - const sendJson = JSON.stringify({ type: 'recording-offer', data: payload }); - Logger.log(sendJson); - this.websocket.send(sendJson); + this.sendMessage(payload.connectionId || '', { + type: 'recording-offer', + data: payload + }); } sendRecordingCandidate(payload) { - const sendJson = JSON.stringify({ type: 'recording-candidate', data: payload }); - Logger.log(sendJson); - this.websocket.send(sendJson); + this.sendMessage(payload.connectionId || '', { + type: 'recording-candidate', + data: payload + }); } sendRecordingStatus(payload) { - const sendJson = JSON.stringify({ type: 'recording-status', data: payload }); - Logger.log(sendJson); - this.websocket.send(sendJson); + this.sendMessage(payload.connectionId || '', { + type: 'recording-status', + data: payload + }); } } diff --git a/client/test/unit/signaling.test.js b/client/test/unit/signaling.test.js index efb776e..c8a0a8a 100644 --- a/client/test/unit/signaling.test.js +++ b/client/test/unit/signaling.test.js @@ -43,6 +43,89 @@ function createWebSocketSignaling(port) { return new WebSocketSignaling(1, `ws://localhost:${port}`); } +describe('recording signaling message envelope', () => { + const OriginalWebSocket = window.WebSocket; + let sentMessages; + + beforeEach(() => { + sentMessages = []; + window.WebSocket = class { + constructor() { + this.readyState = 1; + } + + send(message) { + sentMessages.push(message); + } + + close() { + if (this.onclose) { + this.onclose(); + } + } + }; + }); + + afterEach(() => { + window.WebSocket = OriginalWebSocket; + }); + + test('sends recording offer through on-message', () => { + const signaling = new WebSocketSignaling(1, 'ws://localhost:1234'); + + signaling.sendRecordingOffer({ + recordingId: 'recording-1', + connectionId: 'room-1', + participantId: 'participant-1', + sdp: 'offer-sdp' + }); + + expect(sentMessages).toHaveLength(1); + const outer = JSON.parse(sentMessages[0]); + expect(outer.type).toBe('on-message'); + expect(outer.data.connectionId).toBe('room-1'); + expect(outer.data.message).toEqual({ + type: 'recording-offer', + data: { + recordingId: 'recording-1', + connectionId: 'room-1', + participantId: 'participant-1', + sdp: 'offer-sdp' + } + }); + }); + + test('dispatches wrapped recording messages as recording events', () => { + const signaling = new WebSocketSignaling(1, 'ws://localhost:1234'); + let recordingAnswer; + signaling.addEventListener('recording-answer', (event) => { + recordingAnswer = event.detail; + }); + + signaling.websocket.onmessage({ + data: JSON.stringify({ + type: 'on-message', + from: 'room-1', + data: JSON.stringify({ + type: 'recording-answer', + data: { + recordingId: 'recording-1', + connectionId: 'room-1', + sdp: 'answer-sdp' + } + }) + }) + }); + + expect(recordingAnswer).toEqual({ + type: 'recording-answer', + recordingId: 'recording-1', + connectionId: 'room-1', + sdp: 'answer-sdp' + }); + }); +}); + describe.each(signalingModes)('signaling test in public mode', ({ mode }) => { let signaling1; let signaling2; diff --git a/src/class/websockethandler.ts b/src/class/websockethandler.ts index b21d5ab..c824c22 100644 --- a/src/class/websockethandler.ts +++ b/src/class/websockethandler.ts @@ -85,6 +85,8 @@ type RecordingBroadcastPayload = { mediaMode?: string; }; +type RecordingClientMessageType = 'recording-offer' | 'recording-candidate' | 'recording-status'; + interface StoredRoom { roomId: string; connectionId: string; @@ -347,6 +349,63 @@ function sendToEntireGroup(connectionId: string, message: any): boolean { return true; } +function toTypedDataMessage(message: any): any { + if (!message || typeof message !== 'object' || typeof message.type !== 'string') { + return message; + } + + const data: any = {}; + Object.keys(message).forEach((key) => { + if (key !== 'type') { + data[key] = message[key]; + } + }); + return { + type: message.type, + data + }; +} + +function toOnMessageEnvelope(connectionId: string, message: any, participantId?: string): any { + const envelope: any = { + from: connectionId, + to: "", + type: "on-message", + data: JSON.stringify(toTypedDataMessage(message)) + }; + if (participantId) { + envelope.participantId = participantId; + } + return envelope; +} + +function safeSendOnMessage(ws: WebSocket, connectionId: string, message: any, participantId?: string): boolean { + return safeSend(ws, toOnMessageEnvelope(connectionId, message, participantId)); +} + +function sendOnMessageToEntireGroup(connectionId: string, message: any): boolean { + return sendToEntireGroup(connectionId, toOnMessageEnvelope(connectionId, message)); +} + +function isRecordingClientMessage(message: any): boolean { + return message + && typeof message === 'object' + && (message.type === 'recording-offer' + || message.type === 'recording-candidate' + || message.type === 'recording-status'); +} + +function unwrapRecordingClientPayload(message: any): any { + const payload = message.data && typeof message.data === 'object' ? message.data : message; + if (!payload.connectionId && message.connectionId) { + payload.connectionId = message.connectionId; + } + if (!payload.participantId && message.participantId) { + payload.participantId = message.participantId; + } + return payload; +} + function getActiveRecordingSessions(connectionId: string): RecordingSession[] { return listRecordingSessions(connectionId).filter((session) => session.status === 'recording'); } @@ -802,7 +861,7 @@ function toRecordingBroadcastPayload(type: RecordingBroadcastPayload['type'], se } function broadcastRecordingStarted(session: RecordingSession): boolean { - return sendToEntireGroup( + return sendOnMessageToEntireGroup( session.connectionId, toRecordingBroadcastPayload('recording-started', session) ); @@ -811,11 +870,11 @@ function broadcastRecordingStarted(session: RecordingSession): boolean { function broadcastRecordingPeerRequest(session: RecordingSession): boolean { const payload = toRecordingBroadcastPayload('recording-peer-request', session); payload.mediaMode = 'webrtc-sendonly'; - return sendToEntireGroup(session.connectionId, payload); + return sendOnMessageToEntireGroup(session.connectionId, payload); } function broadcastRecordingStopped(session: RecordingSession): boolean { - return sendToEntireGroup( + return sendOnMessageToEntireGroup( session.connectionId, toRecordingBroadcastPayload('recording-stopped', session) ); @@ -824,8 +883,8 @@ function broadcastRecordingStopped(session: RecordingSession): boolean { function sendActiveRecordingRequests(ws: WebSocket, connectionId: string): void { const activeSessions = getActiveRecordingSessions(connectionId); activeSessions.forEach((session) => { - safeSend(ws, toRecordingBroadcastPayload('recording-started', session)); - safeSend(ws, { + safeSendOnMessage(ws, connectionId, toRecordingBroadcastPayload('recording-started', session)); + safeSendOnMessage(ws, connectionId, { ...toRecordingBroadcastPayload('recording-peer-request', session), mediaMode: 'webrtc-sendonly' }); @@ -837,7 +896,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise { const connectionId = typeof message.connectionId === 'string' ? message.connectionId : ''; const sdp = typeof message.sdp === 'string' ? message.sdp : ''; if (!recordingId || !connectionId || !sdp) { - safeSend(ws, { type: 'recording-status', recordingId, connectionId, status: 'invalid-offer' }); + safeSendOnMessage(ws, connectionId, { type: 'recording-status', recordingId, connectionId, status: 'invalid-offer' }); return; } @@ -849,7 +908,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise { }); if (!offer) { - safeSend(ws, { + safeSendOnMessage(ws, connectionId, { type: 'recording-status', recordingId, connectionId, @@ -870,7 +929,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise { role, onLocalCandidate: (candidate) => { const json = typeof candidate.toJSON === 'function' ? candidate.toJSON() : candidate; - safeSend(ws, { + safeSendOnMessage(ws, connectionId, { type: 'recording-candidate', recordingId, connectionId, @@ -882,7 +941,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise { } }); - safeSend(ws, { + safeSendOnMessage(ws, connectionId, { type: 'recording-answer', recordingId, connectionId, @@ -891,7 +950,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise { }); } catch (error) { log(LogLevel.error, 'Failed to accept recording offer:', error); - safeSend(ws, { + safeSendOnMessage(ws, connectionId, { type: 'recording-status', recordingId, connectionId, @@ -901,7 +960,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise { return; } - safeSend(ws, { + safeSendOnMessage(ws, connectionId, { type: 'recording-status', recordingId, connectionId, @@ -928,7 +987,7 @@ async function onRecordingCandidate(ws: WebSocket, message: any): Promise }); if (!candidate) { - safeSend(ws, { type: 'recording-status', recordingId, connectionId, status: 'candidate-rejected' }); + safeSendOnMessage(ws, connectionId, { type: 'recording-status', recordingId, connectionId, status: 'candidate-rejected' }); return; } @@ -942,7 +1001,7 @@ async function onRecordingCandidate(ws: WebSocket, message: any): Promise }); } catch (error) { log(LogLevel.warn, 'Failed to add recording ICE candidate:', error); - safeSend(ws, { type: 'recording-status', recordingId, connectionId, status: 'candidate-rejected' }); + safeSendOnMessage(ws, connectionId, { type: 'recording-status', recordingId, connectionId, status: 'candidate-rejected' }); } } @@ -1077,6 +1136,23 @@ function onMessage(ws: WebSocket, message: any): void { } chatMessage.participantId = senderParticipantId; chatMessage.connectionId = connectionId; + if (isRecordingClientMessage(chatMessage)) { + const recordingPayload = unwrapRecordingClientPayload(chatMessage); + switch (chatMessage.type as RecordingClientMessageType) { + case 'recording-offer': + onRecordingOffer(ws, recordingPayload); + break; + case 'recording-candidate': + onRecordingCandidate(ws, recordingPayload); + break; + case 'recording-status': + log(LogLevel.log, 'Received recording status:', recordingPayload); + break; + default: + break; + } + return; + } if (connectionGroup.has(connectionId)) { const group = connectionGroup.get(connectionId); if (group.host === ws) { diff --git a/src/websocket.ts b/src/websocket.ts index 3f44396..757f34b 100644 --- a/src/websocket.ts +++ b/src/websocket.ts @@ -17,9 +17,6 @@ const VALID_MESSAGE_TYPES = new Set([ "host-userInfo", "invite-call", "on-message", - "recording-offer", - "recording-candidate", - "recording-status", ]); function sendJson(ws: WebSocket, payload: unknown): void { @@ -207,16 +204,6 @@ export default class WSSignaling { if (msg.from) msg.data.connectionId = msg.from; handler.onMessage(ws, msg.data); break; - case 'recording-offer': - if (!hasData(msg)) return; - handler.onRecordingOffer(ws, msg.data); - break; - case 'recording-candidate': - if (!hasData(msg)) return; - handler.onRecordingCandidate(ws, msg.data); - break; - case 'recording-status': - break; default: break; } diff --git a/test/websockethandler.test.ts b/test/websockethandler.test.ts index 52f2f47..d9bfe89 100644 --- a/test/websockethandler.test.ts +++ b/test/websockethandler.test.ts @@ -8,6 +8,20 @@ Date.now = jest.fn(() => 1482363367071); const anyParticipantId = expect.any(String); +function recordingEnvelope(connectionId: string, data: any): any { + const innerData = { ...data }; + delete innerData.type; + return { + from: connectionId, + to: "", + type: "on-message", + data: JSON.stringify({ + type: data.type, + data: innerData + }) + }; +} + describe('websocket signaling test in public mode', () => { let server: WS; let client: WebSocket; @@ -212,20 +226,17 @@ describe('websocket signaling test in private mode', () => { }; expect(wsHandler.broadcastRecordingStarted(session)).toBe(true); - await expect(server).toReceiveMessage(expected); - await expect(server).toReceiveMessage(expected); + await expect(server).toReceiveMessage(recordingEnvelope(connectionId, expected)); + await expect(server).toReceiveMessage(recordingEnvelope(connectionId, expected)); expect(wsHandler.broadcastRecordingPeerRequest(session)).toBe(true); - await expect(server).toReceiveMessage({ + const peerRequest = { ...expected, type: 'recording-peer-request', mediaMode: 'webrtc-sendonly' - }); - await expect(server).toReceiveMessage({ - ...expected, - type: 'recording-peer-request', - mediaMode: 'webrtc-sendonly' - }); + }; + await expect(server).toReceiveMessage(recordingEnvelope(connectionId, peerRequest)); + await expect(server).toReceiveMessage(recordingEnvelope(connectionId, peerRequest)); }); test('send offer from session1', async () => {