Add server: Express web UI + API for remote audio description generation with job queue, basic auth, resumable processing, subtitles, and muxing

This commit is contained in:
2026-05-13 16:23:43 +02:00
parent ce22dadd80
commit 7d1a0029bc
22 changed files with 1904 additions and 22 deletions

View File

@@ -0,0 +1,293 @@
import path from 'path';
import fs from 'fs';
import {
getAllJobs, getJob, createJob, updateJobStatus, saveCheckpoint,
saveJobOutputs, deleteJob as deleteJobFromDb, Job, OutputOptions
} from '../db/jobStore';
import { generateAudioDescriptionFromOptions } from '../../utils/processor';
import { generateSRT, generateVTT } from './subtitleGenerator';
import { muxAudioDescription } from './muxer';
import { getDefaultConfig, Config } from '../../config/config';
import { AudioSegment, BatchContext } from '../../interfaces';
import { getVideoDuration } from '../../utils/mediaUtils';
import { EventEmitter } from 'events';
interface ProgressData {
id: string;
status: string;
progress: number;
currentIndex: number;
totalUnits: number;
segments: AudioSegment[];
error: string | null;
output_audio: string | null;
output_subtitles_srt: string | null;
output_subtitles_vtt: string | null;
output_muxed: string | null;
}
export class JobManager {
private queue: string[] = [];
private processing = false;
private pausedJobs = new Set<string>();
private emitter = new EventEmitter();
private pollInterval: ReturnType<typeof setInterval> | null = null;
constructor() {
this.recoverStuckJobs();
this.emitter.setMaxListeners(100);
}
private recoverStuckJobs(): void {
const jobs = getAllJobs();
for (const job of jobs) {
if (job.status === 'processing') {
updateJobStatus(job.id, 'failed', 'Server restarted while job was in progress. Click Restart to resume from the last checkpoint.');
}
}
}
createJob(videoPath: string, configOverride: Partial<Config> = {}, outputOptions: Partial<OutputOptions> = {}): Job {
const baseConfig = getDefaultConfig();
const mergedConfig: Config = { ...baseConfig, ...configOverride };
const filename = path.basename(videoPath);
const opts: OutputOptions = {
audio: outputOptions.audio !== false,
subtitles: outputOptions.subtitles !== false,
muxed: outputOptions.muxed || false
};
return createJob(videoPath, filename, mergedConfig, opts);
}
async startJob(jobId: string): Promise<void> {
const job = getJob(jobId);
if (!job) throw new Error('Job not found');
if (job.status === 'processing') throw new Error('Job is already processing');
if (job.status === 'completed') throw new Error('Job is already completed');
updateJobStatus(jobId, 'queued');
this.queue.push(jobId);
this.processNext();
}
async pauseJob(jobId: string): Promise<void> {
const job = getJob(jobId);
if (!job) throw new Error('Job not found');
if (job.status !== 'processing') throw new Error('Only processing jobs can be paused');
this.pausedJobs.add(jobId);
updateJobStatus(jobId, 'paused');
this.emitProgress(jobId);
}
async restartJob(jobId: string): Promise<void> {
const job = getJob(jobId);
if (!job) throw new Error('Job not found');
if (job.status !== 'failed' && job.status !== 'paused' && job.status !== 'cancelled') {
throw new Error('Only failed, paused, or cancelled jobs can be restarted');
}
this.pausedJobs.delete(jobId);
updateJobStatus(jobId, 'queued');
this.queue.push(jobId);
this.processNext();
}
async cancelJob(jobId: string): Promise<void> {
const job = getJob(jobId);
if (!job) throw new Error('Job not found');
if (job.status === 'processing') {
this.pausedJobs.add(jobId);
}
updateJobStatus(jobId, 'cancelled');
this.emitProgress(jobId);
}
deleteJob(jobId: string): void {
const job = getJob(jobId);
if (!job) throw new Error('Job not found');
if (job.status === 'processing') throw new Error('Cannot delete a running job');
deleteJobFromDb(jobId);
}
listJobs(): Job[] {
return getAllJobs();
}
onJobProgress(jobId: string, callback: (data: ProgressData) => void): () => void {
this.emitter.on(`progress:${jobId}`, callback);
if (!this.pollInterval) {
this.pollInterval = setInterval(() => {
for (const id of this.emitter.eventNames()) {
const eventName = String(id);
if (eventName.startsWith('progress:')) {
const jId = eventName.replace('progress:', '');
this.emitProgress(jId);
}
}
}, 2000);
}
return () => {
this.emitter.off(`progress:${jobId}`, callback);
};
}
private emitProgress(jobId: string): void {
const job = getJob(jobId);
if (!job) return;
const data: ProgressData = {
id: job.id,
status: job.status,
progress: job.progress,
currentIndex: job.current_index,
totalUnits: job.total_units,
segments: JSON.parse(job.segments || '[]'),
error: job.error,
output_audio: job.output_audio,
output_subtitles_srt: job.output_subtitles_srt,
output_subtitles_vtt: job.output_subtitles_vtt,
output_muxed: job.output_muxed
};
this.emitter.emit(`progress:${jobId}`, data);
}
private async processNext(): Promise<void> {
if (this.processing) return;
while (this.queue.length > 0) {
this.processing = true;
const jobId = this.queue.shift()!;
const job = getJob(jobId);
if (!job || job.status !== 'queued') continue;
try {
await this.processJob(job);
} catch (err: any) {
console.error(`Job ${jobId} failed:`, err.message);
}
}
this.processing = false;
}
private async processJob(job: Job): Promise<void> {
updateJobStatus(job.id, 'processing');
this.emitProgress(job.id);
const config: Config = JSON.parse(job.config);
const outputOptions: OutputOptions = JSON.parse(job.output_options);
const existingSegments: AudioSegment[] = JSON.parse(job.segments || '[]');
const lastContext: BatchContext = JSON.parse(job.last_context || '{}');
const startIndex = existingSegments.length > 0 ? job.current_index : 0;
const startTimePosition = job.current_time_position || 0;
const videoDuration = getVideoDuration(job.video_path);
const totalUnits = config.batchTimeMode
? Math.floor(videoDuration / config.batchWindowDuration)
: Math.floor(videoDuration / config.captureIntervalSeconds);
saveCheckpoint(job.id, JSON.stringify(existingSegments), startIndex, totalUnits, startTimePosition, JSON.stringify(lastContext), 0);
this.emitProgress(job.id);
try {
const result = await generateAudioDescriptionFromOptions(
job.video_path,
config,
{
startIndex,
existingSegments,
lastContext,
currentTimePosition: startTimePosition,
onProgress: (info) => {
if (this.pausedJobs.has(job.id)) {
throw new Error('JOB_PAUSED');
}
const allSegments = existingSegments.length > 0 && info.index === startIndex
? [...existingSegments, info.segment]
: (() => {
const currentJob = getJob(job.id);
if (!currentJob) return [info.segment];
const segs = JSON.parse(currentJob.segments || '[]');
segs.push(info.segment);
return segs;
})();
const progress = totalUnits > 0 ? Math.min(((info.index + 1) / totalUnits) * 100, 99) : 50;
saveCheckpoint(
job.id,
JSON.stringify(allSegments),
info.index + 1,
totalUnits,
info.segment.startTime + info.segment.duration + (config.batchTimeMode ? 0.5 : 0.25),
JSON.stringify(lastContext),
progress
);
this.emitProgress(job.id);
}
}
);
// All segments from the result
const segments = result.segments || [];
// Combine audio segments into final audio (use the result's pre-combined file)
const outputAudio = result.audioDescriptionFile;
let outputSubtitlesSrt: string | null = null;
let outputSubtitlesVtt: string | null = null;
let outputMuxed: string | null = null;
const baseName = path.basename(job.video_path, path.extname(job.video_path));
const outputDir = config.outputDir;
if (outputOptions.subtitles && segments.length > 0) {
const srtPath = path.join(outputDir, `${baseName}_description.srt`);
const vttPath = path.join(outputDir, `${baseName}_description.vtt`);
fs.writeFileSync(srtPath, generateSRT(segments, videoDuration));
fs.writeFileSync(vttPath, generateVTT(segments, videoDuration));
outputSubtitlesSrt = srtPath;
outputSubtitlesVtt = vttPath;
}
if (outputOptions.muxed && fs.existsSync(outputAudio)) {
const muxedPath = path.join(outputDir, `${baseName}_described.mkv`);
muxAudioDescription(job.video_path, outputAudio, muxedPath);
outputMuxed = muxedPath;
}
saveJobOutputs(job.id, {
audio: outputAudio,
subtitlesSrt: outputSubtitlesSrt || undefined,
subtitlesVtt: outputSubtitlesVtt || undefined,
muxed: outputMuxed || undefined
});
saveCheckpoint(job.id, JSON.stringify(segments), totalUnits, totalUnits, 0, '{}', 100);
updateJobStatus(job.id, 'completed');
this.emitProgress(job.id);
} catch (err: any) {
if (err.message === 'JOB_PAUSED') {
updateJobStatus(job.id, 'paused');
this.emitProgress(job.id);
return;
}
const errorMsg = err.message || 'Unknown error';
updateJobStatus(job.id, 'failed', errorMsg);
this.emitProgress(job.id);
}
}
}