import path from 'path'; import fs from 'fs'; import { getAllJobs, getJob, createJob, updateJobStatus, saveCheckpoint, saveJobOutputs, deleteJob as deleteJobFromDb, Job, OutputOptions } from '../db/jobStore'; import { generateAudioDescriptionFromOptions } from '../../utils/processor'; import { generateSRT, generateVTT } from './subtitleGenerator'; import { muxAudioDescription } from './muxer'; import { getDefaultConfig, Config } from '../../config/config'; import { AudioSegment, BatchContext } from '../../interfaces'; import { getVideoDuration, cleanupTempFiles } from '../../utils/mediaUtils'; import { EventEmitter } from 'events'; function jobTempDir(baseTempDir: string, jobId: string): string { return path.join(baseTempDir, jobId); } function safeCleanupJobTmp(dir: string): void { try { if (!fs.existsSync(dir)) return; cleanupTempFiles(dir); fs.rmSync(dir, { recursive: true, force: true }); } catch (err: any) { console.warn(`Failed to clean up tmp dir ${dir}:`, err.message); } } interface ProgressData { id: string; status: string; progress: number; currentIndex: number; totalUnits: number; segments: AudioSegment[]; error: string | null; output_audio: string | null; output_subtitles_srt: string | null; output_subtitles_vtt: string | null; output_muxed: string | null; } export class JobManager { private queue: string[] = []; private processing = false; private pausedJobs = new Set(); private emitter = new EventEmitter(); private pollInterval: ReturnType | null = null; constructor() { this.recoverStuckJobs(); this.emitter.setMaxListeners(100); } private recoverStuckJobs(): void { const jobs = getAllJobs(); for (const job of jobs) { if (job.status === 'processing') { updateJobStatus(job.id, 'failed', 'Server restarted while job was in progress. Click Restart to resume from the last checkpoint.'); } } } createJob(videoPath: string, configOverride: Partial = {}, outputOptions: Partial = {}): Job { const baseConfig = getDefaultConfig(); // Drop empty/undefined/null values so blank form fields don't clobber the // baked-in defaults (a blank prompt textarea must NOT overwrite the real // prompt with ""). const cleanedOverride: Record = {}; for (const [k, v] of Object.entries(configOverride)) { if (v === '' || v === null || v === undefined) continue; cleanedOverride[k] = v; } const mergedConfig: Config = { ...baseConfig, ...(cleanedOverride as Partial) }; const filename = path.basename(videoPath); const opts: OutputOptions = { audio: outputOptions.audio !== false, subtitles: outputOptions.subtitles !== false, muxed: outputOptions.muxed || false }; return createJob(videoPath, filename, mergedConfig, opts); } async startJob(jobId: string): Promise { const job = getJob(jobId); if (!job) throw new Error('Job not found'); if (job.status === 'processing') throw new Error('Job is already processing'); if (job.status === 'completed') throw new Error('Job is already completed'); updateJobStatus(jobId, 'queued'); this.queue.push(jobId); this.processNext(); } async pauseJob(jobId: string): Promise { const job = getJob(jobId); if (!job) throw new Error('Job not found'); if (job.status !== 'processing') throw new Error('Only processing jobs can be paused'); this.pausedJobs.add(jobId); updateJobStatus(jobId, 'paused'); this.emitProgress(jobId); } async restartJob(jobId: string): Promise { const job = getJob(jobId); if (!job) throw new Error('Job not found'); if (job.status !== 'failed' && job.status !== 'paused' && job.status !== 'cancelled') { throw new Error('Only failed, paused, or cancelled jobs can be restarted'); } this.pausedJobs.delete(jobId); updateJobStatus(jobId, 'queued'); this.queue.push(jobId); this.processNext(); } async cancelJob(jobId: string): Promise { const job = getJob(jobId); if (!job) throw new Error('Job not found'); if (job.status === 'processing') { this.pausedJobs.add(jobId); } updateJobStatus(jobId, 'cancelled'); this.emitProgress(jobId); } deleteJob(jobId: string): void { const job = getJob(jobId); if (!job) throw new Error('Job not found'); if (job.status === 'processing') throw new Error('Cannot delete a running job'); try { const config: Config = JSON.parse(job.config); // job.config may contain either the base tempDir (older jobs) or the // per-job tempDir (newer jobs). Trim a trailing job-id segment if present; // otherwise compute the per-job dir from the stored base. const stored = config.tempDir || './desc/tmp/'; const candidate = path.basename(stored) === jobId ? stored : jobTempDir(stored, jobId); safeCleanupJobTmp(candidate); } catch { // ignore: cleanup is best-effort and must not block deletion } deleteJobFromDb(jobId); } listJobs(): Job[] { return getAllJobs(); } onJobProgress(jobId: string, callback: (data: ProgressData) => void): () => void { this.emitter.on(`progress:${jobId}`, callback); if (!this.pollInterval) { this.pollInterval = setInterval(() => { for (const id of this.emitter.eventNames()) { const eventName = String(id); if (eventName.startsWith('progress:')) { const jId = eventName.replace('progress:', ''); this.emitProgress(jId); } } }, 2000); } return () => { this.emitter.off(`progress:${jobId}`, callback); }; } private emitProgress(jobId: string): void { const job = getJob(jobId); if (!job) return; const data: ProgressData = { id: job.id, status: job.status, progress: job.progress, currentIndex: job.current_index, totalUnits: job.total_units, segments: JSON.parse(job.segments || '[]'), error: job.error, output_audio: job.output_audio, output_subtitles_srt: job.output_subtitles_srt, output_subtitles_vtt: job.output_subtitles_vtt, output_muxed: job.output_muxed }; this.emitter.emit(`progress:${jobId}`, data); } private async processNext(): Promise { if (this.processing) return; while (this.queue.length > 0) { this.processing = true; const jobId = this.queue.shift()!; const job = getJob(jobId); if (!job || job.status !== 'queued') continue; try { await this.processJob(job); } catch (err: any) { console.error(`Job ${jobId} failed:`, err.message); } } this.processing = false; } private async processJob(job: Job): Promise { updateJobStatus(job.id, 'processing'); this.emitProgress(job.id); const config: Config = JSON.parse(job.config); const outputOptions: OutputOptions = JSON.parse(job.output_options); // Isolate this job's intermediates so concurrent jobs (or future resumes) // don't collide on filenames like frame_00001.jpg / segment_3_std.wav. // The pipeline already reads config.tempDir, so just override it here. const baseTempDir = config.tempDir || './desc/tmp/'; if (path.basename(baseTempDir) !== job.id) { config.tempDir = jobTempDir(baseTempDir, job.id); } fs.mkdirSync(config.tempDir, { recursive: true }); const existingSegments: AudioSegment[] = JSON.parse(job.segments || '[]'); const lastContext: BatchContext = JSON.parse(job.last_context || '{}'); const startIndex = existingSegments.length > 0 ? job.current_index : 0; const startTimePosition = job.current_time_position || 0; const videoDuration = getVideoDuration(job.video_path); const totalUnits = config.batchTimeMode ? Math.floor(videoDuration / config.batchWindowDuration) : Math.floor(videoDuration / config.captureIntervalSeconds); saveCheckpoint(job.id, JSON.stringify(existingSegments), startIndex, totalUnits, startTimePosition, JSON.stringify(lastContext), 0); this.emitProgress(job.id); try { const result = await generateAudioDescriptionFromOptions( job.video_path, config, { startIndex, existingSegments, lastContext, currentTimePosition: startTimePosition, onProgress: (info) => { if (this.pausedJobs.has(job.id)) { throw new Error('JOB_PAUSED'); } const allSegments = existingSegments.length > 0 && info.index === startIndex ? [...existingSegments, info.segment] : (() => { const currentJob = getJob(job.id); if (!currentJob) return [info.segment]; const segs = JSON.parse(currentJob.segments || '[]'); segs.push(info.segment); return segs; })(); const progress = totalUnits > 0 ? Math.min(((info.index + 1) / totalUnits) * 100, 99) : 50; saveCheckpoint( job.id, JSON.stringify(allSegments), info.index + 1, totalUnits, info.segment.startTime + info.segment.duration + (config.batchTimeMode ? 0.5 : 0.25), JSON.stringify(lastContext), progress ); this.emitProgress(job.id); } } ); // All segments from the result const segments = result.segments || []; // Combine audio segments into final audio (use the result's pre-combined file) const outputAudio = result.audioDescriptionFile; let outputSubtitlesSrt: string | null = null; let outputSubtitlesVtt: string | null = null; let outputMuxed: string | null = null; const baseName = path.basename(job.video_path, path.extname(job.video_path)); const outputDir = config.outputDir; if (outputOptions.subtitles && segments.length > 0) { const srtPath = path.join(outputDir, `${baseName}_description.srt`); const vttPath = path.join(outputDir, `${baseName}_description.vtt`); fs.writeFileSync(srtPath, generateSRT(segments, videoDuration)); fs.writeFileSync(vttPath, generateVTT(segments, videoDuration)); outputSubtitlesSrt = srtPath; outputSubtitlesVtt = vttPath; } if (outputOptions.muxed && fs.existsSync(outputAudio)) { const muxedPath = path.join(outputDir, `${baseName}_described.mkv`); muxAudioDescription(job.video_path, outputAudio, muxedPath); outputMuxed = muxedPath; } saveJobOutputs(job.id, { audio: outputAudio, subtitlesSrt: outputSubtitlesSrt || undefined, subtitlesVtt: outputSubtitlesVtt || undefined, muxed: outputMuxed || undefined }); saveCheckpoint(job.id, JSON.stringify(segments), totalUnits, totalUnits, 0, '{}', 100); updateJobStatus(job.id, 'completed'); this.emitProgress(job.id); safeCleanupJobTmp(config.tempDir); } catch (err: any) { if (err.message === 'JOB_PAUSED') { // Keep config.tempDir intact — restart will resume into the same dir. updateJobStatus(job.id, 'paused'); this.emitProgress(job.id); return; } const errorMsg = err.message || 'Unknown error'; updateJobStatus(job.id, 'failed', errorMsg); this.emitProgress(job.id); safeCleanupJobTmp(config.tempDir); } } }