Compare commits

...

4 Commits

10 changed files with 747 additions and 101 deletions

View File

@@ -1,5 +1,41 @@
import * as Logger from "../utils/logger.js";
const RECORDING_SIGNAL_EVENTS = [
'recording-started',
'recording-peer-request',
'recording-stopped',
'recording-status',
'recording-answer',
'recording-candidate'
];
function parseOnMessageData(data) {
if (typeof data !== 'string') {
return data;
}
try {
return JSON.parse(data);
} catch(e) {
Logger.error(`Signaling: on-message, error: ${e}`);
return data;
}
}
function dispatchOnMessageEvent(target, data, participantId) {
const parsed = parseOnMessageData(data);
if (participantId && parsed && typeof parsed === 'object') {
parsed.participantId = participantId;
}
target.dispatchEvent(new CustomEvent('on-message', { detail: parsed }));
if (parsed && typeof parsed.type === 'string' && RECORDING_SIGNAL_EVENTS.indexOf(parsed.type) !== -1) {
const detail = parsed.data && typeof parsed.data === 'object'
? { type: parsed.type, ...parsed.data }
: parsed;
target.dispatchEvent(new CustomEvent(parsed.type, { detail }));
}
}
export class Signaling extends EventTarget {
constructor(interval = 1000, baseUrl = null) {
@@ -73,15 +109,7 @@ export class Signaling extends EventTarget {
this.dispatchEvent(new CustomEvent('candidate', { detail: msg }));
break;
case "on-message":
{
let parsed = msg.data;
if (typeof msg.data === 'string') {
try { parsed = JSON.parse(msg.data); } catch(e) {
Logger.error(`Signaling: on-message, error: ${e}`);
}
}
this.dispatchEvent(new CustomEvent('on-message', { detail: parsed }));
}
dispatchOnMessageEvent(this, msg.data, msg.participantId);
break;
default:
break;
@@ -201,18 +229,7 @@ export class WebSocketSignaling extends EventTarget {
this.dispatchEvent(new CustomEvent('candidate', { detail: { connectionId: msg.from, candidate: msg.data.candidate, sdpMLineIndex: msg.data.sdpMLineIndex, sdpMid: msg.data.sdpMid, participantId: msg.participantId } }));
break;
case "on-message":
{
let parsed = msg.data;
if (typeof msg.data === 'string') {
try { parsed = JSON.parse(msg.data); } catch(e) {
Logger.error(`Signaling: on-message, error: ${e}`);
}
}
if (msg.participantId) {
parsed.participantId = msg.participantId;
}
this.dispatchEvent(new CustomEvent('on-message', { detail: parsed }));
}
dispatchOnMessageEvent(this, msg.data, msg.participantId);
break;
case "participant-left":
this.dispatchEvent(new CustomEvent('participant-left', { detail: msg }));
@@ -346,20 +363,23 @@ export class WebSocketSignaling extends EventTarget {
}
sendRecordingOffer(payload) {
const sendJson = JSON.stringify({ type: 'recording-offer', data: payload });
Logger.log(sendJson);
this.websocket.send(sendJson);
this.sendMessage(payload.connectionId || '', {
type: 'recording-offer',
data: payload
});
}
sendRecordingCandidate(payload) {
const sendJson = JSON.stringify({ type: 'recording-candidate', data: payload });
Logger.log(sendJson);
this.websocket.send(sendJson);
this.sendMessage(payload.connectionId || '', {
type: 'recording-candidate',
data: payload
});
}
sendRecordingStatus(payload) {
const sendJson = JSON.stringify({ type: 'recording-status', data: payload });
Logger.log(sendJson);
this.websocket.send(sendJson);
this.sendMessage(payload.connectionId || '', {
type: 'recording-status',
data: payload
});
}
}

View File

@@ -43,6 +43,89 @@ function createWebSocketSignaling(port) {
return new WebSocketSignaling(1, `ws://localhost:${port}`);
}
describe('recording signaling message envelope', () => {
const OriginalWebSocket = window.WebSocket;
let sentMessages;
beforeEach(() => {
sentMessages = [];
window.WebSocket = class {
constructor() {
this.readyState = 1;
}
send(message) {
sentMessages.push(message);
}
close() {
if (this.onclose) {
this.onclose();
}
}
};
});
afterEach(() => {
window.WebSocket = OriginalWebSocket;
});
test('sends recording offer through on-message', () => {
const signaling = new WebSocketSignaling(1, 'ws://localhost:1234');
signaling.sendRecordingOffer({
recordingId: 'recording-1',
connectionId: 'room-1',
participantId: 'participant-1',
sdp: 'offer-sdp'
});
expect(sentMessages).toHaveLength(1);
const outer = JSON.parse(sentMessages[0]);
expect(outer.type).toBe('on-message');
expect(outer.data.connectionId).toBe('room-1');
expect(outer.data.message).toEqual({
type: 'recording-offer',
data: {
recordingId: 'recording-1',
connectionId: 'room-1',
participantId: 'participant-1',
sdp: 'offer-sdp'
}
});
});
test('dispatches wrapped recording messages as recording events', () => {
const signaling = new WebSocketSignaling(1, 'ws://localhost:1234');
let recordingAnswer;
signaling.addEventListener('recording-answer', (event) => {
recordingAnswer = event.detail;
});
signaling.websocket.onmessage({
data: JSON.stringify({
type: 'on-message',
from: 'room-1',
data: JSON.stringify({
type: 'recording-answer',
data: {
recordingId: 'recording-1',
connectionId: 'room-1',
sdp: 'answer-sdp'
}
})
})
});
expect(recordingAnswer).toEqual({
type: 'recording-answer',
recordingId: 'recording-1',
connectionId: 'room-1',
sdp: 'answer-sdp'
});
});
});
describe.each(signalingModes)('signaling test in public mode', ({ mode }) => {
let signaling1;
let signaling2;

View File

@@ -85,6 +85,8 @@ type RecordingBroadcastPayload = {
mediaMode?: string;
};
type RecordingClientMessageType = 'recording-offer' | 'recording-candidate' | 'recording-status';
interface StoredRoom {
roomId: string;
connectionId: string;
@@ -347,6 +349,74 @@ function sendToEntireGroup(connectionId: string, message: any): boolean {
return true;
}
function toTypedDataMessage(message: any): any {
if (!message || typeof message !== 'object' || typeof message.type !== 'string') {
return message;
}
const data: any = {};
Object.keys(message).forEach((key) => {
if (key !== 'type') {
data[key] = message[key];
}
});
return {
type: message.type,
data
};
}
function toOnMessageEnvelope(connectionId: string, message: any, participantId?: string): any {
const envelope: any = {
from: connectionId,
to: "",
type: "on-message",
data: JSON.stringify(toTypedDataMessage(message))
};
if (participantId) {
envelope.participantId = participantId;
}
return envelope;
}
function safeSendOnMessage(ws: WebSocket, connectionId: string, message: any, participantId?: string): boolean {
return safeSend(ws, toOnMessageEnvelope(connectionId, message, participantId));
}
function sendOnMessageToEntireGroup(connectionId: string, message: any): boolean {
return sendToEntireGroup(connectionId, toOnMessageEnvelope(connectionId, message));
}
function isRecordingClientMessage(message: any): boolean {
return message
&& typeof message === 'object'
&& (message.type === 'recording-offer'
|| message.type === 'recording-candidate'
|| message.type === 'recording-status');
}
function unwrapRecordingClientPayload(message: any): any {
const payload = message.data && typeof message.data === 'object' ? message.data : message;
if (!payload.connectionId && message.connectionId) {
payload.connectionId = message.connectionId;
}
if (!payload.participantId && message.participantId) {
payload.participantId = message.participantId;
}
return payload;
}
function getSdpMediaSections(sdp: string): string[] {
return sdp
.split(/\r?\n/)
.map((line) => line.trim())
.filter((line) => /^m=(audio|video)\s/i.test(line));
}
function hasRecordableSdpMedia(sdp: string): boolean {
return getSdpMediaSections(sdp).length > 0;
}
function getActiveRecordingSessions(connectionId: string): RecordingSession[] {
return listRecordingSessions(connectionId).filter((session) => session.status === 'recording');
}
@@ -802,7 +872,7 @@ function toRecordingBroadcastPayload(type: RecordingBroadcastPayload['type'], se
}
function broadcastRecordingStarted(session: RecordingSession): boolean {
return sendToEntireGroup(
return sendOnMessageToEntireGroup(
session.connectionId,
toRecordingBroadcastPayload('recording-started', session)
);
@@ -811,11 +881,11 @@ function broadcastRecordingStarted(session: RecordingSession): boolean {
function broadcastRecordingPeerRequest(session: RecordingSession): boolean {
const payload = toRecordingBroadcastPayload('recording-peer-request', session);
payload.mediaMode = 'webrtc-sendonly';
return sendToEntireGroup(session.connectionId, payload);
return sendOnMessageToEntireGroup(session.connectionId, payload);
}
function broadcastRecordingStopped(session: RecordingSession): boolean {
return sendToEntireGroup(
return sendOnMessageToEntireGroup(
session.connectionId,
toRecordingBroadcastPayload('recording-stopped', session)
);
@@ -824,8 +894,8 @@ function broadcastRecordingStopped(session: RecordingSession): boolean {
function sendActiveRecordingRequests(ws: WebSocket, connectionId: string): void {
const activeSessions = getActiveRecordingSessions(connectionId);
activeSessions.forEach((session) => {
safeSend(ws, toRecordingBroadcastPayload('recording-started', session));
safeSend(ws, {
safeSendOnMessage(ws, connectionId, toRecordingBroadcastPayload('recording-started', session));
safeSendOnMessage(ws, connectionId, {
...toRecordingBroadcastPayload('recording-peer-request', session),
mediaMode: 'webrtc-sendonly'
});
@@ -837,7 +907,24 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise<void> {
const connectionId = typeof message.connectionId === 'string' ? message.connectionId : '';
const sdp = typeof message.sdp === 'string' ? message.sdp : '';
if (!recordingId || !connectionId || !sdp) {
safeSend(ws, { type: 'recording-status', recordingId, connectionId, status: 'invalid-offer' });
safeSendOnMessage(ws, connectionId, { type: 'recording-status', recordingId, connectionId, status: 'invalid-offer' });
return;
}
if (!hasRecordableSdpMedia(sdp)) {
const participantId = getParticipantId(ws) || message.participantId || 'unknown';
log(LogLevel.warn, 'Rejected recording offer without audio/video media sections:', {
recordingId,
connectionId,
participantId,
mediaSections: getSdpMediaSections(sdp)
});
safeSendOnMessage(ws, connectionId, {
type: 'recording-status',
recordingId,
connectionId,
status: 'no-media-offer',
participantId
});
return;
}
@@ -849,7 +936,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise<void> {
});
if (!offer) {
safeSend(ws, {
safeSendOnMessage(ws, connectionId, {
type: 'recording-status',
recordingId,
connectionId,
@@ -870,7 +957,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise<void> {
role,
onLocalCandidate: (candidate) => {
const json = typeof candidate.toJSON === 'function' ? candidate.toJSON() : candidate;
safeSend(ws, {
safeSendOnMessage(ws, connectionId, {
type: 'recording-candidate',
recordingId,
connectionId,
@@ -882,7 +969,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise<void> {
}
});
safeSend(ws, {
safeSendOnMessage(ws, connectionId, {
type: 'recording-answer',
recordingId,
connectionId,
@@ -891,7 +978,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise<void> {
});
} catch (error) {
log(LogLevel.error, 'Failed to accept recording offer:', error);
safeSend(ws, {
safeSendOnMessage(ws, connectionId, {
type: 'recording-status',
recordingId,
connectionId,
@@ -901,7 +988,7 @@ async function onRecordingOffer(ws: WebSocket, message: any): Promise<void> {
return;
}
safeSend(ws, {
safeSendOnMessage(ws, connectionId, {
type: 'recording-status',
recordingId,
connectionId,
@@ -928,7 +1015,7 @@ async function onRecordingCandidate(ws: WebSocket, message: any): Promise<void>
});
if (!candidate) {
safeSend(ws, { type: 'recording-status', recordingId, connectionId, status: 'candidate-rejected' });
safeSendOnMessage(ws, connectionId, { type: 'recording-status', recordingId, connectionId, status: 'candidate-rejected' });
return;
}
@@ -942,7 +1029,7 @@ async function onRecordingCandidate(ws: WebSocket, message: any): Promise<void>
});
} catch (error) {
log(LogLevel.warn, 'Failed to add recording ICE candidate:', error);
safeSend(ws, { type: 'recording-status', recordingId, connectionId, status: 'candidate-rejected' });
safeSendOnMessage(ws, connectionId, { type: 'recording-status', recordingId, connectionId, status: 'candidate-rejected' });
}
}
@@ -1077,6 +1164,23 @@ function onMessage(ws: WebSocket, message: any): void {
}
chatMessage.participantId = senderParticipantId;
chatMessage.connectionId = connectionId;
if (isRecordingClientMessage(chatMessage)) {
const recordingPayload = unwrapRecordingClientPayload(chatMessage);
switch (chatMessage.type as RecordingClientMessageType) {
case 'recording-offer':
onRecordingOffer(ws, recordingPayload);
break;
case 'recording-candidate':
onRecordingCandidate(ws, recordingPayload);
break;
case 'recording-status':
log(LogLevel.log, 'Received recording status:', recordingPayload);
break;
default:
break;
}
return;
}
if (connectionGroup.has(connectionId)) {
const group = connectionGroup.get(connectionId);
if (group.host === ws) {

View File

@@ -53,7 +53,18 @@ type CompositionInputSets = {
audioInputs: ServerTrackRecordingFile[];
};
type VideoTimelineSegment = {
startMs: number;
endMs: number | null;
activeInputs: ServerTrackRecordingFile[];
};
const jobs: Map<string, RecordingCompositionJob> = new Map<string, RecordingCompositionJob>();
const COMPOSITION_OUTPUT_WIDTH = 2560;
const COMPOSITION_OUTPUT_HEIGHT = 1440;
const COMPOSITION_OUTPUT_FPS = 60;
const COMPOSITION_HOST_HEIGHT = 1080;
const COMPOSITION_VIDEO_BITRATE = '16000k';
function nowIso(): string {
return new Date().toISOString();
@@ -120,6 +131,100 @@ function orderVideoInputsForComposition(files: ServerTrackRecordingFile[]): Serv
];
}
function parseInputTimestamp(value: unknown): number | null {
if (typeof value !== 'string' || !value.trim()) {
return null;
}
const timestamp = Date.parse(value);
return isFinite(timestamp) ? timestamp : null;
}
function getInputStartMs(file: ServerTrackRecordingFile): number | null {
const metadata = file.metadata || {};
return parseInputTimestamp(metadata.recordingStartedAt)
|| parseInputTimestamp(file.recordingStartedAt)
|| parseInputTimestamp(metadata.startedAt)
|| parseInputTimestamp(file.uploadedAt)
|| parseInputTimestamp(metadata.uploadedAt);
}
function getInputEndMs(file: ServerTrackRecordingFile): number | null {
const metadata = file.metadata || {};
return parseInputTimestamp(metadata.recordingEndedAt)
|| parseInputTimestamp(file.recordingEndedAt)
|| parseInputTimestamp(metadata.endedAt)
|| parseInputTimestamp(metadata.updatedAt);
}
function getTimelineOriginMs(files: ServerTrackRecordingFile[]): number | null {
const starts = files
.map(getInputStartMs)
.filter((timestamp) => timestamp !== null) as number[];
if (starts.length === 0) {
return null;
}
return Math.min(...starts);
}
function getTimelineDurationSeconds(files: ServerTrackRecordingFile[], timelineOriginMs: number | null): number | null {
if (timelineOriginMs === null) {
return null;
}
const ends = files
.map(getInputEndMs)
.filter((timestamp) => timestamp !== null) as number[];
if (ends.length === 0) {
return null;
}
const durationSeconds = (Math.max(...ends) - timelineOriginMs) / 1000;
return durationSeconds > 0 ? durationSeconds : null;
}
function getTimelineEndMs(files: ServerTrackRecordingFile[]): number | null {
const ends = files
.map(getInputEndMs)
.filter((timestamp) => timestamp !== null) as number[];
if (ends.length === 0) {
return null;
}
return Math.max(...ends);
}
function getInputOffsetSeconds(file: ServerTrackRecordingFile, timelineOriginMs: number | null): number {
const startMs = getInputStartMs(file);
if (startMs === null || timelineOriginMs === null) {
return 0;
}
return Math.max(0, (startMs - timelineOriginMs) / 1000);
}
function formatSeconds(value: number): string {
if (value <= 0.001) {
return '0';
}
return value.toFixed(3).replace(/\.?0+$/, '');
}
function getDurationBoundVideoFilters(segmentDurationSeconds: number | null): string[] {
if (segmentDurationSeconds === null) {
return [];
}
const duration = formatSeconds(segmentDurationSeconds);
return [
`tpad=stop_mode=clone:stop_duration=${duration}`,
`trim=duration=${duration}`,
'setpts=PTS-STARTPTS'
];
}
function getBottomTileWidth(index: number, inputCount: number, outputWidth: number): number {
const sideCount = inputCount - 1;
if (sideCount <= 1) {
@@ -142,17 +247,94 @@ function createHostBottomLayout(inputCount: number, outputWidth: number, hostHei
return positions.join('|');
}
function sortActiveVideoInputs(files: ServerTrackRecordingFile[]): ServerTrackRecordingFile[] {
return orderVideoInputsForComposition(files).sort((a, b) => {
const aIsHost = isHostInput(a);
const bIsHost = isHostInput(b);
if (aIsHost !== bIsHost) {
return aIsHost ? -1 : 1;
}
return a.participantId.localeCompare(b.participantId);
});
}
function getVideoTimelineSegments(
files: ServerTrackRecordingFile[],
timelineOriginMs: number | null,
timelineEndMs: number | null
): VideoTimelineSegment[] {
if (timelineOriginMs === null || timelineEndMs === null || timelineEndMs <= timelineOriginMs) {
return [];
}
const pointsByMs: { [timestamp: string]: boolean } = {};
pointsByMs[String(timelineOriginMs)] = true;
pointsByMs[String(timelineEndMs)] = true;
files.forEach((file) => {
const startMs = getInputStartMs(file);
const endMs = getInputEndMs(file);
if (startMs !== null && startMs > timelineOriginMs && startMs < timelineEndMs) {
pointsByMs[String(startMs)] = true;
}
if (endMs !== null && endMs > timelineOriginMs && endMs < timelineEndMs) {
pointsByMs[String(endMs)] = true;
}
});
const points = Object.keys(pointsByMs)
.map((value) => Number(value))
.sort((a, b) => a - b);
const segments: VideoTimelineSegment[] = [];
for (let index = 0; index < points.length - 1; index += 1) {
const startMs = points[index];
const endMs = points[index + 1];
if (endMs <= startMs) {
continue;
}
const activeInputs = sortActiveVideoInputs(files.filter((file) => {
const fileStartMs = getInputStartMs(file);
const fileEndMs = getInputEndMs(file);
const inputStartMs = fileStartMs === null ? timelineOriginMs : fileStartMs;
const inputEndMs = fileEndMs === null ? timelineEndMs : fileEndMs;
return inputStartMs < endMs && inputEndMs > startMs;
}));
segments.push({ startMs, endMs, activeInputs });
}
return segments;
}
function getFallbackVideoTimelineSegment(
files: ServerTrackRecordingFile[],
timelineOriginMs: number | null
): VideoTimelineSegment {
return {
startMs: timelineOriginMs === null ? 0 : timelineOriginMs,
endMs: null,
activeInputs: sortActiveVideoInputs(files)
};
}
export function buildFfmpegCompositionArgs(input: {
videoInputs: ServerTrackRecordingFile[];
audioInputs: ServerTrackRecordingFile[];
outputPath: string;
format: string;
}): string[] {
const outputWidth = 1280;
const outputHeight = 720;
const hostHeight = 540;
const outputWidth = COMPOSITION_OUTPUT_WIDTH;
const outputHeight = COMPOSITION_OUTPUT_HEIGHT;
const outputFps = COMPOSITION_OUTPUT_FPS;
const hostHeight = COMPOSITION_HOST_HEIGHT;
const bottomHeight = outputHeight - hostHeight;
const videoInputs = orderVideoInputsForComposition(input.videoInputs);
const timelineOriginMs = getTimelineOriginMs(videoInputs.concat(input.audioInputs));
const timelineEndMs = getTimelineEndMs(videoInputs.concat(input.audioInputs));
const timelineDurationSeconds = getTimelineDurationSeconds(videoInputs.concat(input.audioInputs), timelineOriginMs);
const timelineVideoSegments = getVideoTimelineSegments(videoInputs, timelineOriginMs, timelineEndMs);
const videoSegments = timelineVideoSegments.length > 0
? timelineVideoSegments
: [getFallbackVideoTimelineSegment(videoInputs, timelineOriginMs)];
const args = ['-y'];
const orderedInputs = videoInputs.concat(input.audioInputs);
orderedInputs.forEach((file) => {
@@ -160,31 +342,89 @@ export function buildFfmpegCompositionArgs(input: {
});
const filters: string[] = [];
videoInputs.forEach((_file, index) => {
const width = videoInputs.length === 1
? outputWidth
: index === 0 ? outputWidth : getBottomTileWidth(index - 1, videoInputs.length, outputWidth);
const height = videoInputs.length === 1
? outputHeight
: index === 0 ? hostHeight : bottomHeight;
filters.push(`[${index}:v]scale=${width}:${height}:force_original_aspect_ratio=decrease,pad=${width}:${height}:(ow-iw)/2:(oh-ih)/2:black,setsar=1[v${index}]`);
const videoInputUseCounts = videoInputs.map((file) => videoSegments.filter((segment) => segment.activeInputs.indexOf(file) >= 0).length);
const videoInputUsePositions = videoInputs.map(() => 0);
videoInputUseCounts.forEach((useCount, inputIndex) => {
if (useCount <= 1) {
return;
}
const splitLabels = [];
for (let splitIndex = 0; splitIndex < useCount; splitIndex += 1) {
splitLabels.push(`[vin${inputIndex}_${splitIndex}]`);
}
filters.push(`[${inputIndex}:v]split=${useCount}${splitLabels.join('')}`);
});
if (videoInputs.length === 1) {
filters.push('[v0]fps=30,format=yuv420p[vout]');
videoSegments.forEach((segment, segmentIndex) => {
const segmentDurationSeconds = segment.endMs === null ? null : (segment.endMs - segment.startMs) / 1000;
if (segment.activeInputs.length === 0) {
if (segmentDurationSeconds === null) {
return;
}
filters.push(`color=color=black:size=${outputWidth}x${outputHeight}:rate=${outputFps}:duration=${formatSeconds(segmentDurationSeconds)},format=yuv420p[seg${segmentIndex}]`);
return;
}
segment.activeInputs.forEach((file, activeIndex) => {
const inputIndex = videoInputs.indexOf(file);
const inputLabel = videoInputUseCounts[inputIndex] > 1
? `vin${inputIndex}_${videoInputUsePositions[inputIndex]++}`
: `${inputIndex}:v`;
const fileStartMs = getInputStartMs(file);
const inputStartMs = fileStartMs === null ? segment.startMs : fileStartMs;
const trimStartSeconds = Math.max(0, (segment.startMs - inputStartMs) / 1000);
const width = segment.activeInputs.length === 1
? outputWidth
: activeIndex === 0 ? outputWidth : getBottomTileWidth(activeIndex - 1, segment.activeInputs.length, outputWidth);
const height = segment.activeInputs.length === 1
? outputHeight
: activeIndex === 0 ? hostHeight : bottomHeight;
const trimOptions = [`start=${formatSeconds(trimStartSeconds)}`];
if (segmentDurationSeconds !== null) {
trimOptions.push(`duration=${formatSeconds(segmentDurationSeconds)}`);
}
const videoFilters = [
`trim=${trimOptions.join(':')}`,
'setpts=PTS-STARTPTS',
...getDurationBoundVideoFilters(segmentDurationSeconds),
`scale=${width}:${height}:force_original_aspect_ratio=decrease`,
`pad=${width}:${height}:(ow-iw)/2:(oh-ih)/2:black`,
'setsar=1'
];
filters.push(`[${inputLabel}]${videoFilters.join(',')}[seg${segmentIndex}v${activeIndex}]`);
});
if (segment.activeInputs.length === 1) {
filters.push(`[seg${segmentIndex}v0]fps=${outputFps},format=yuv420p[seg${segmentIndex}]`);
return;
}
const segmentVideoLabels = segment.activeInputs.map((_file, activeIndex) => `[seg${segmentIndex}v${activeIndex}]`).join('');
filters.push(`${segmentVideoLabels}xstack=inputs=${segment.activeInputs.length}:layout=${createHostBottomLayout(segment.activeInputs.length, outputWidth, hostHeight)}:fill=black,fps=${outputFps},format=yuv420p[seg${segmentIndex}]`);
});
if (videoSegments.length === 1) {
filters.push('[seg0]null[vout]');
} else {
const videoLabels = videoInputs.map((_file, index) => `[v${index}]`).join('');
filters.push(`${videoLabels}xstack=inputs=${videoInputs.length}:layout=${createHostBottomLayout(videoInputs.length, outputWidth, hostHeight)}:fill=black,fps=30,format=yuv420p[vout]`);
const videoLabels = videoSegments.map((_segment, index) => `[seg${index}]`).join('');
filters.push(`${videoLabels}concat=n=${videoSegments.length}:v=1:a=0[vout]`);
}
if (input.audioInputs.length === 1) {
const audioInputIndex = videoInputs.length;
filters.push(`[${audioInputIndex}:a]aresample=async=1:first_pts=0[aout]`);
const offsetMs = Math.round(getInputOffsetSeconds(input.audioInputs[0], timelineOriginMs) * 1000);
const offsetFilter = offsetMs > 1 ? `,adelay=${offsetMs}:all=1` : '';
filters.push(`[${audioInputIndex}:a]aresample=async=1:first_pts=0${offsetFilter},asetpts=N/SR/TB[aout]`);
} else if (input.audioInputs.length > 1) {
const audioLabels = input.audioInputs
.map((_file, index) => `[${videoInputs.length + index}:a]`)
.join('');
filters.push(`${audioLabels}amix=inputs=${input.audioInputs.length}:duration=longest:dropout_transition=2[aout]`);
const audioLabels = input.audioInputs.map((file, index) => {
const audioInputIndex = videoInputs.length + index;
const offsetMs = Math.round(getInputOffsetSeconds(file, timelineOriginMs) * 1000);
const offsetFilter = offsetMs > 1 ? `,adelay=${offsetMs}:all=1` : '';
filters.push(`[${audioInputIndex}:a]aresample=async=1:first_pts=0${offsetFilter}[a${index}]`);
return `[a${index}]`;
}).join('');
filters.push(`${audioLabels}amix=inputs=${input.audioInputs.length}:duration=longest:dropout_transition=2,asetpts=N/SR/TB[aout]`);
}
args.push('-filter_complex', filters.join(';'), '-map', '[vout]');
@@ -193,18 +433,21 @@ export function buildFfmpegCompositionArgs(input: {
}
if (input.format === 'mp4') {
args.push('-c:v', 'libx264', '-preset', 'veryfast', '-pix_fmt', 'yuv420p');
args.push('-c:v', 'libx264', '-preset', 'veryfast', '-pix_fmt', 'yuv420p', '-b:v', COMPOSITION_VIDEO_BITRATE, '-r', String(outputFps));
if (input.audioInputs.length > 0) {
args.push('-c:a', 'aac');
}
} else {
args.push('-c:v', 'libvpx-vp9', '-deadline', 'realtime', '-cpu-used', '4');
args.push('-c:v', 'libvpx-vp9', '-deadline', 'good', '-cpu-used', '4', '-b:v', COMPOSITION_VIDEO_BITRATE, '-r', String(outputFps));
if (input.audioInputs.length > 0) {
args.push('-c:a', 'libopus');
}
}
args.push('-shortest', input.outputPath);
if (timelineDurationSeconds !== null) {
args.push('-t', formatSeconds(timelineDurationSeconds));
}
args.push(input.outputPath);
return args;
}
@@ -243,6 +486,16 @@ function toOutput(job: RecordingCompositionJob, target: ServerTrackRecordingTarg
};
}
function getActiveCompositionJob(input: StartCompositionInput): RecordingCompositionJob | null {
const recordingId = normalizeOption(input.recordingId, '');
const meetingId = normalizeOption(input.meetingId, '');
return Array.from(jobs.values()).find((job) => {
return job.recordingId === recordingId
&& job.meetingId === meetingId
&& (job.status === 'queued' || job.status === 'running');
}) || null;
}
async function runRecordingCompositionJob(job: RecordingCompositionJob): Promise<RecordingCompositionJob> {
const timestamp = nowIso();
job.status = 'running';
@@ -297,6 +550,11 @@ async function runRecordingCompositionJob(job: RecordingCompositionJob): Promise
}
export function startRecordingCompositionJob(input: StartCompositionInput): RecordingCompositionJob {
const activeJob = getActiveCompositionJob(input);
if (activeJob) {
return activeJob;
}
const timestamp = nowIso();
const inputSets = getInputSets(input);
const job: RecordingCompositionJob = {

View File

@@ -15,6 +15,8 @@ export type ServerTrackRecordingFile = ServerTrackRecordingTarget & {
trackId: string;
trackKind: string;
uploadedAt: string;
recordingStartedAt?: string;
recordingEndedAt?: string;
metadata: any;
};
@@ -143,6 +145,7 @@ export function writeServerTrackRecordingMetadata(input: WriteMetadataInput): vo
],
uploadedAt: now,
updatedAt: now,
recordingStartedAt: now,
recordingSource: 'server',
recordingId: input.recordingId,
participantId: input.participantId,
@@ -162,6 +165,7 @@ export function updateServerTrackRecordingMetadataSize(target: ServerTrackRecord
const metadata = JSON.parse(fs.readFileSync(target.metadataPath, 'utf8'));
metadata.size = fs.statSync(target.filePath).size;
metadata.updatedAt = new Date().toISOString();
metadata.recordingEndedAt = metadata.updatedAt;
fs.writeFileSync(target.metadataPath, JSON.stringify(metadata, null, 2));
}
@@ -208,7 +212,9 @@ export function listServerTrackRecordingFiles(input: {
participantId: metadata.participantId || '',
trackId: metadata.trackId || '',
trackKind: metadata.trackKind || '',
uploadedAt: metadata.uploadedAt || fs.statSync(filePath).birthtime.toISOString()
uploadedAt: metadata.uploadedAt || fs.statSync(filePath).birthtime.toISOString(),
recordingStartedAt: metadata.recordingStartedAt || metadata.uploadedAt,
recordingEndedAt: metadata.recordingEndedAt || metadata.updatedAt
};
})
.filter((file) => Boolean(file)) as ServerTrackRecordingFile[];

View File

@@ -20,6 +20,10 @@ const werift = require('werift');
const RTCPeerConnection = werift.RTCPeerConnection;
const weriftNonstandard = require('werift/nonstandard');
const MediaRecorder = weriftNonstandard.MediaRecorder;
const SERVER_RECORDING_WIDTH = 2560;
const SERVER_RECORDING_HEIGHT = 1440;
const SERVER_RECORDING_JITTER_BUFFER_LATENCY_MS = 1000;
const SERVER_RECORDING_JITTER_BUFFER_SIZE = 50000;
type RecordingPeerState = {
pc: WeriftPeerConnection;
@@ -146,9 +150,13 @@ function startTrackRecorder(input: {
const recorder = new MediaRecorder({
path: target.filePath,
tracks: [input.track],
width: 1280,
height: 720,
width: SERVER_RECORDING_WIDTH,
height: SERVER_RECORDING_HEIGHT,
disableLipSync: true,
jitterBuffer: {
latency: SERVER_RECORDING_JITTER_BUFFER_LATENCY_MS,
bufferSize: SERVER_RECORDING_JITTER_BUFFER_SIZE
},
defaultDuration: 24 * 60 * 60
});

View File

@@ -17,9 +17,6 @@ const VALID_MESSAGE_TYPES = new Set([
"host-userInfo",
"invite-call",
"on-message",
"recording-offer",
"recording-candidate",
"recording-status",
]);
function sendJson(ws: WebSocket, payload: unknown): void {
@@ -207,16 +204,6 @@ export default class WSSignaling {
if (msg.from) msg.data.connectionId = msg.from;
handler.onMessage(ws, msg.data);
break;
case 'recording-offer':
if (!hasData(msg)) return;
handler.onRecordingOffer(ws, msg.data);
break;
case 'recording-candidate':
if (!hasData(msg)) return;
handler.onRecordingCandidate(ws, msg.data);
break;
case 'recording-status':
break;
default:
break;
}

View File

@@ -1,7 +1,14 @@
import { buildFfmpegCompositionArgs } from '../src/recording/composer';
import { ServerTrackRecordingFile } from '../src/recording/storage';
function file(filename: string, trackKind: string, participantId: string, role = 'participant'): ServerTrackRecordingFile {
function file(
filename: string,
trackKind: string,
participantId: string,
role = 'participant',
recordingStartedAt = '2026-06-01T00:00:00.000Z',
recordingEndedAt = '2026-06-01T00:00:10.000Z'
): ServerTrackRecordingFile {
return {
meetingId: 'room-1',
directory: 'recordings/room-1',
@@ -12,8 +19,10 @@ function file(filename: string, trackKind: string, participantId: string, role =
participantId,
trackId: `${participantId}-${trackKind}`,
trackKind,
uploadedAt: '2026-06-01T00:00:00.000Z',
metadata: { role }
uploadedAt: recordingStartedAt,
recordingStartedAt,
recordingEndedAt,
metadata: { role, recordingStartedAt, recordingEndedAt, updatedAt: recordingEndedAt }
};
}
@@ -34,12 +43,15 @@ describe('recording composer', () => {
expect(args).toContain('-filter_complex');
expect(args.join(' ')).toContain('xstack=inputs=2');
expect(args.join(' ')).toContain('scale=1280:540');
expect(args.join(' ')).toContain('scale=1280:180');
expect(args.join(' ')).toContain('layout=0_0|0_540');
expect(args.join(' ')).toContain('scale=2560:1080');
expect(args.join(' ')).toContain('scale=2560:360');
expect(args.join(' ')).toContain('layout=0_0|0_1080');
expect(args.join(' ')).toContain('fps=60');
expect(args.join(' ')).toContain('amix=inputs=2');
expect(args).toContain('libvpx-vp9');
expect(args).toContain('libopus');
expect(args).toContain('16000k');
expect(args).not.toContain('-shortest');
expect(args[args.length - 1]).toBe('recordings/room-1/output.webm');
});
@@ -65,9 +77,9 @@ describe('recording composer', () => {
'-i',
'recordings/room-1/p2-video.webm'
]);
expect(filter).toContain('scale=1280:540');
expect(filter).toContain('scale=640:180');
expect(filter).toContain('layout=0_0|0_540|640_540');
expect(filter).toContain('scale=2560:1080');
expect(filter).toContain('scale=1280:360');
expect(filter).toContain('layout=0_0|0_1080|1280_1080');
});
test('builds mp4 encoder args', () => {
@@ -80,6 +92,111 @@ describe('recording composer', () => {
expect(args).toContain('libx264');
expect(args).toContain('-pix_fmt');
expect(args).toContain('16000k');
expect(args).toContain('60');
expect(args).not.toContain('libopus');
});
test('falls back to one video segment when input end timestamps are missing', () => {
const args = buildFfmpegCompositionArgs({
videoInputs: [
file('host-video.webm', 'video', 'host', 'host', '2026-06-01T00:00:00.000Z', ''),
file('p1-video.webm', 'video', 'p1', 'participant', '2026-06-01T00:00:02.500Z', '')
],
audioInputs: [],
outputPath: 'recordings/room-1/output.webm',
format: 'webm'
});
const filter = args[args.indexOf('-filter_complex') + 1];
expect(filter).toContain('xstack=inputs=2');
expect(filter).toContain('trim=start=0,setpts=PTS-STARTPTS');
expect(filter).not.toContain('concat=n=0');
expect(args).not.toContain('-t');
});
test('pads late participant tracks to keep the room timeline aligned', () => {
const args = buildFfmpegCompositionArgs({
videoInputs: [
file('host-video.webm', 'video', 'host', 'host', '2026-06-01T00:00:00.000Z', '2026-06-01T00:00:10.000Z'),
file('p1-video.webm', 'video', 'p1', 'participant', '2026-06-01T00:00:02.500Z', '2026-06-01T00:00:10.000Z')
],
audioInputs: [
file('host-audio.webm', 'audio', 'host', 'host', '2026-06-01T00:00:00.000Z', '2026-06-01T00:00:10.000Z'),
file('p1-audio.webm', 'audio', 'p1', 'participant', '2026-06-01T00:00:02.500Z', '2026-06-01T00:00:10.000Z')
],
outputPath: 'recordings/room-1/output.webm',
format: 'webm'
});
const filter = args[args.indexOf('-filter_complex') + 1];
expect(filter).toContain('[0:v]split=2[vin0_0][vin0_1]');
expect(filter).toContain('[vin0_0]trim=start=0:duration=2.5');
expect(filter).toContain('tpad=stop_mode=clone:stop_duration=2.5,trim=duration=2.5');
expect(filter).toContain('[vin0_1]trim=start=2.5:duration=7.5');
expect(filter).toContain('tpad=stop_mode=clone:stop_duration=7.5,trim=duration=7.5');
expect(filter).toContain('[1:v]trim=start=0:duration=7.5');
expect(filter).toContain('concat=n=2:v=1:a=0[vout]');
expect(filter).toContain('[2:a]aresample=async=1:first_pts=0[a0]');
expect(filter).toContain('[3:a]aresample=async=1:first_pts=0,adelay=2500:all=1[a1]');
expect(filter).toContain('[a0][a1]amix=inputs=2:duration=longest:dropout_transition=2,asetpts=N/SR/TB[aout]');
});
test('bounds each video segment to its timeline duration before composition', () => {
const args = buildFfmpegCompositionArgs({
videoInputs: [
file('host-video.webm', 'video', 'host', 'host', '2026-06-01T00:00:00.000Z', '2026-06-01T00:00:24.000Z')
],
audioInputs: [
file('host-audio.webm', 'audio', 'host', 'host', '2026-06-01T00:00:00.000Z', '2026-06-01T00:00:24.000Z')
],
outputPath: 'recordings/room-1/output.webm',
format: 'webm'
});
const filter = args[args.indexOf('-filter_complex') + 1];
expect(filter).toContain('trim=start=0:duration=24,setpts=PTS-STARTPTS,tpad=stop_mode=clone:stop_duration=24,trim=duration=24,setpts=PTS-STARTPTS');
expect(filter).toContain('[1:a]aresample=async=1:first_pts=0,asetpts=N/SR/TB[aout]');
});
test('changes the layout when participants join and leave without overlapping', () => {
const args = buildFfmpegCompositionArgs({
videoInputs: [
file('host-video.webm', 'video', 'host', 'host', '2026-06-01T00:00:00.000Z', '2026-06-01T00:00:12.000Z'),
file('p1-video.webm', 'video', 'p1', 'participant', '2026-06-01T00:00:00.000Z', '2026-06-01T00:00:05.000Z'),
file('p2-video.webm', 'video', 'p2', 'participant', '2026-06-01T00:00:05.000Z', '2026-06-01T00:00:12.000Z')
],
audioInputs: [],
outputPath: 'recordings/room-1/output.webm',
format: 'webm'
});
const filter = args[args.indexOf('-filter_complex') + 1];
expect(filter).toContain('xstack=inputs=2');
expect(filter).toContain('layout=0_0|0_1080');
expect(filter).toContain('[0:v]split=2[vin0_0][vin0_1]');
expect(filter).toContain('[vin0_0]trim=start=0:duration=5');
expect(filter).toContain('[1:v]trim=start=0:duration=5');
expect(filter).toContain('[vin0_1]trim=start=5:duration=7');
expect(filter).toContain('[2:v]trim=start=0:duration=7');
expect(filter).toContain('concat=n=2:v=1:a=0[vout]');
expect(filter).not.toContain('xstack=inputs=3');
});
test('keeps separate viewports for participants whose video intervals overlap', () => {
const args = buildFfmpegCompositionArgs({
videoInputs: [
file('host-video.webm', 'video', 'host', 'host', '2026-06-01T00:00:00.000Z', '2026-06-01T00:00:12.000Z'),
file('p1-video.webm', 'video', 'p1', 'participant', '2026-06-01T00:00:00.000Z', '2026-06-01T00:00:08.000Z'),
file('p2-video.webm', 'video', 'p2', 'participant', '2026-06-01T00:00:05.000Z', '2026-06-01T00:00:12.000Z')
],
audioInputs: [],
outputPath: 'recordings/room-1/output.webm',
format: 'webm'
});
const filter = args[args.indexOf('-filter_complex') + 1];
expect(filter).toContain('xstack=inputs=3');
expect(filter).toContain('layout=0_0|0_1080|1280_1080');
});
});

View File

@@ -64,7 +64,9 @@ describe('recording storage', () => {
userId: 'server-recorder',
recordingSource: 'server',
participantId: 'participant-1',
trackKind: 'video'
trackKind: 'video',
recordingStartedAt: expect.any(String),
recordingEndedAt: expect.any(String)
}));
const files = listServerTrackRecordingFiles({
meetingId: 'room_1',

View File

@@ -8,6 +8,20 @@ Date.now = jest.fn(() => 1482363367071);
const anyParticipantId = expect.any(String);
function recordingEnvelope(connectionId: string, data: any): any {
const innerData = { ...data };
delete innerData.type;
return {
from: connectionId,
to: "",
type: "on-message",
data: JSON.stringify({
type: data.type,
data: innerData
})
};
}
describe('websocket signaling test in public mode', () => {
let server: WS;
let client: WebSocket;
@@ -212,20 +226,17 @@ describe('websocket signaling test in private mode', () => {
};
expect(wsHandler.broadcastRecordingStarted(session)).toBe(true);
await expect(server).toReceiveMessage(expected);
await expect(server).toReceiveMessage(expected);
await expect(server).toReceiveMessage(recordingEnvelope(connectionId, expected));
await expect(server).toReceiveMessage(recordingEnvelope(connectionId, expected));
expect(wsHandler.broadcastRecordingPeerRequest(session)).toBe(true);
await expect(server).toReceiveMessage({
const peerRequest = {
...expected,
type: 'recording-peer-request',
mediaMode: 'webrtc-sendonly'
});
await expect(server).toReceiveMessage({
...expected,
type: 'recording-peer-request',
mediaMode: 'webrtc-sendonly'
});
};
await expect(server).toReceiveMessage(recordingEnvelope(connectionId, peerRequest));
await expect(server).toReceiveMessage(recordingEnvelope(connectionId, peerRequest));
});
test('send offer from session1', async () => {
@@ -281,3 +292,53 @@ describe('websocket signaling test in private mode', () => {
await wsHandler.remove(client);
});
});
describe('recording offer validation', () => {
let server: WS;
let client: WebSocket;
const connectionId = "recording-room";
beforeAll(async () => {
wsHandler.reset("private");
server = new WS("ws://localhost:1234", { jsonProtocol: true });
client = new WebSocket("ws://localhost:1234");
await server.connected;
await wsHandler.add(client);
await wsHandler.onConnect(client, connectionId);
await expect(server).toReceiveMessage({
type: "connect",
connectionId: connectionId,
polite: false,
role: "host",
participantId: anyParticipantId
});
});
afterAll(() => {
WS.clean();
});
test('rejects recording offers without audio or video media sections', async () => {
await wsHandler.onRecordingOffer(client, {
recordingId: 'recording-empty-offer',
connectionId,
sdp: [
'v=0',
'o=- 25268170 0 IN IP4 0.0.0.0',
's=-',
't=0 0',
'a=group:BUNDLE ',
'a=extmap-allow-mixed',
'a=msid-semantic:WMS *',
''
].join('\r\n')
});
await expect(server).toReceiveMessage(expect.objectContaining({
from: connectionId,
to: "",
type: "on-message",
data: expect.stringContaining('"status":"no-media-offer"')
}));
});
});