2026-05-13 16:23:43 +02:00
|
|
|
import path from 'path';
|
|
|
|
|
import fs from 'fs';
|
2026-05-15 03:53:43 +02:00
|
|
|
import {
|
|
|
|
|
getAllJobs, getJob, createJob, updateJobStatus, saveCheckpoint,
|
|
|
|
|
saveJobOutputs, deleteJob as deleteJobFromDb, Job, OutputOptions
|
2026-05-13 16:23:43 +02:00
|
|
|
} from '../db/jobStore';
|
|
|
|
|
import { generateAudioDescriptionFromOptions } from '../../utils/processor';
|
|
|
|
|
import { generateSRT, generateVTT } from './subtitleGenerator';
|
2026-05-15 04:10:06 +02:00
|
|
|
import { muxAudioDescription, muxMixedAudioDescription } from './muxer';
|
2026-05-13 16:23:43 +02:00
|
|
|
import { getDefaultConfig, Config } from '../../config/config';
|
|
|
|
|
import { AudioSegment, BatchContext } from '../../interfaces';
|
2026-05-15 03:53:43 +02:00
|
|
|
import { getVideoDuration, cleanupTempFiles } from '../../utils/mediaUtils';
|
2026-05-13 16:23:43 +02:00
|
|
|
import { EventEmitter } from 'events';
|
|
|
|
|
|
2026-05-15 03:53:43 +02:00
|
|
|
function jobTempDir(baseTempDir: string, jobId: string): string {
|
|
|
|
|
return path.join(baseTempDir, jobId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function safeCleanupJobTmp(dir: string): void {
|
|
|
|
|
try {
|
|
|
|
|
if (!fs.existsSync(dir)) return;
|
|
|
|
|
cleanupTempFiles(dir);
|
|
|
|
|
fs.rmSync(dir, { recursive: true, force: true });
|
|
|
|
|
} catch (err: any) {
|
|
|
|
|
console.warn(`Failed to clean up tmp dir ${dir}:`, err.message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-13 16:23:43 +02:00
|
|
|
interface ProgressData {
|
|
|
|
|
id: string;
|
|
|
|
|
status: string;
|
|
|
|
|
progress: number;
|
|
|
|
|
currentIndex: number;
|
|
|
|
|
totalUnits: number;
|
|
|
|
|
segments: AudioSegment[];
|
|
|
|
|
error: string | null;
|
|
|
|
|
output_audio: string | null;
|
|
|
|
|
output_subtitles_srt: string | null;
|
|
|
|
|
output_subtitles_vtt: string | null;
|
|
|
|
|
output_muxed: string | null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export class JobManager {
|
|
|
|
|
private queue: string[] = [];
|
|
|
|
|
private processing = false;
|
|
|
|
|
private pausedJobs = new Set<string>();
|
|
|
|
|
private emitter = new EventEmitter();
|
|
|
|
|
private pollInterval: ReturnType<typeof setInterval> | null = null;
|
|
|
|
|
|
|
|
|
|
constructor() {
|
|
|
|
|
this.recoverStuckJobs();
|
|
|
|
|
this.emitter.setMaxListeners(100);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private recoverStuckJobs(): void {
|
|
|
|
|
const jobs = getAllJobs();
|
|
|
|
|
for (const job of jobs) {
|
|
|
|
|
if (job.status === 'processing') {
|
|
|
|
|
updateJobStatus(job.id, 'failed', 'Server restarted while job was in progress. Click Restart to resume from the last checkpoint.');
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
createJob(videoPath: string, configOverride: Partial<Config> = {}, outputOptions: Partial<OutputOptions> = {}): Job {
|
|
|
|
|
const baseConfig = getDefaultConfig();
|
2026-05-15 03:53:43 +02:00
|
|
|
|
|
|
|
|
// Drop empty/undefined/null values so blank form fields don't clobber the
|
|
|
|
|
// baked-in defaults (a blank prompt textarea must NOT overwrite the real
|
|
|
|
|
// prompt with "").
|
|
|
|
|
const cleanedOverride: Record<string, unknown> = {};
|
|
|
|
|
for (const [k, v] of Object.entries(configOverride)) {
|
|
|
|
|
if (v === '' || v === null || v === undefined) continue;
|
|
|
|
|
cleanedOverride[k] = v;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const mergedConfig: Config = { ...baseConfig, ...(cleanedOverride as Partial<Config>) };
|
2026-05-13 16:23:43 +02:00
|
|
|
|
|
|
|
|
const filename = path.basename(videoPath);
|
|
|
|
|
const opts: OutputOptions = {
|
|
|
|
|
audio: outputOptions.audio !== false,
|
|
|
|
|
subtitles: outputOptions.subtitles !== false,
|
2026-05-15 04:10:06 +02:00
|
|
|
muxed: outputOptions.muxed || false,
|
|
|
|
|
muxMode: outputOptions.muxMode === 'mixed' ? 'mixed' : 'separate'
|
2026-05-13 16:23:43 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
return createJob(videoPath, filename, mergedConfig, opts);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async startJob(jobId: string): Promise<void> {
|
|
|
|
|
const job = getJob(jobId);
|
|
|
|
|
if (!job) throw new Error('Job not found');
|
|
|
|
|
if (job.status === 'processing') throw new Error('Job is already processing');
|
|
|
|
|
if (job.status === 'completed') throw new Error('Job is already completed');
|
|
|
|
|
|
|
|
|
|
updateJobStatus(jobId, 'queued');
|
|
|
|
|
this.queue.push(jobId);
|
|
|
|
|
this.processNext();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async pauseJob(jobId: string): Promise<void> {
|
|
|
|
|
const job = getJob(jobId);
|
|
|
|
|
if (!job) throw new Error('Job not found');
|
|
|
|
|
if (job.status !== 'processing') throw new Error('Only processing jobs can be paused');
|
|
|
|
|
|
|
|
|
|
this.pausedJobs.add(jobId);
|
|
|
|
|
updateJobStatus(jobId, 'paused');
|
|
|
|
|
this.emitProgress(jobId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async restartJob(jobId: string): Promise<void> {
|
|
|
|
|
const job = getJob(jobId);
|
|
|
|
|
if (!job) throw new Error('Job not found');
|
|
|
|
|
if (job.status !== 'failed' && job.status !== 'paused' && job.status !== 'cancelled') {
|
|
|
|
|
throw new Error('Only failed, paused, or cancelled jobs can be restarted');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.pausedJobs.delete(jobId);
|
|
|
|
|
updateJobStatus(jobId, 'queued');
|
|
|
|
|
this.queue.push(jobId);
|
|
|
|
|
this.processNext();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async cancelJob(jobId: string): Promise<void> {
|
|
|
|
|
const job = getJob(jobId);
|
|
|
|
|
if (!job) throw new Error('Job not found');
|
|
|
|
|
|
|
|
|
|
if (job.status === 'processing') {
|
|
|
|
|
this.pausedJobs.add(jobId);
|
|
|
|
|
}
|
|
|
|
|
updateJobStatus(jobId, 'cancelled');
|
|
|
|
|
this.emitProgress(jobId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
deleteJob(jobId: string): void {
|
|
|
|
|
const job = getJob(jobId);
|
|
|
|
|
if (!job) throw new Error('Job not found');
|
|
|
|
|
if (job.status === 'processing') throw new Error('Cannot delete a running job');
|
|
|
|
|
|
2026-05-15 03:53:43 +02:00
|
|
|
try {
|
|
|
|
|
const config: Config = JSON.parse(job.config);
|
|
|
|
|
// job.config may contain either the base tempDir (older jobs) or the
|
|
|
|
|
// per-job tempDir (newer jobs). Trim a trailing job-id segment if present;
|
|
|
|
|
// otherwise compute the per-job dir from the stored base.
|
|
|
|
|
const stored = config.tempDir || './desc/tmp/';
|
|
|
|
|
const candidate = path.basename(stored) === jobId ? stored : jobTempDir(stored, jobId);
|
|
|
|
|
safeCleanupJobTmp(candidate);
|
|
|
|
|
} catch {
|
|
|
|
|
// ignore: cleanup is best-effort and must not block deletion
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-13 16:23:43 +02:00
|
|
|
deleteJobFromDb(jobId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
listJobs(): Job[] {
|
|
|
|
|
return getAllJobs();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
onJobProgress(jobId: string, callback: (data: ProgressData) => void): () => void {
|
|
|
|
|
this.emitter.on(`progress:${jobId}`, callback);
|
|
|
|
|
|
|
|
|
|
if (!this.pollInterval) {
|
|
|
|
|
this.pollInterval = setInterval(() => {
|
|
|
|
|
for (const id of this.emitter.eventNames()) {
|
|
|
|
|
const eventName = String(id);
|
|
|
|
|
if (eventName.startsWith('progress:')) {
|
|
|
|
|
const jId = eventName.replace('progress:', '');
|
|
|
|
|
this.emitProgress(jId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}, 2000);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return () => {
|
|
|
|
|
this.emitter.off(`progress:${jobId}`, callback);
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private emitProgress(jobId: string): void {
|
|
|
|
|
const job = getJob(jobId);
|
|
|
|
|
if (!job) return;
|
|
|
|
|
|
|
|
|
|
const data: ProgressData = {
|
|
|
|
|
id: job.id,
|
|
|
|
|
status: job.status,
|
|
|
|
|
progress: job.progress,
|
|
|
|
|
currentIndex: job.current_index,
|
|
|
|
|
totalUnits: job.total_units,
|
|
|
|
|
segments: JSON.parse(job.segments || '[]'),
|
|
|
|
|
error: job.error,
|
|
|
|
|
output_audio: job.output_audio,
|
|
|
|
|
output_subtitles_srt: job.output_subtitles_srt,
|
|
|
|
|
output_subtitles_vtt: job.output_subtitles_vtt,
|
|
|
|
|
output_muxed: job.output_muxed
|
|
|
|
|
};
|
|
|
|
|
this.emitter.emit(`progress:${jobId}`, data);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async processNext(): Promise<void> {
|
|
|
|
|
if (this.processing) return;
|
|
|
|
|
|
|
|
|
|
while (this.queue.length > 0) {
|
|
|
|
|
this.processing = true;
|
|
|
|
|
const jobId = this.queue.shift()!;
|
|
|
|
|
|
|
|
|
|
const job = getJob(jobId);
|
|
|
|
|
if (!job || job.status !== 'queued') continue;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
await this.processJob(job);
|
|
|
|
|
} catch (err: any) {
|
|
|
|
|
console.error(`Job ${jobId} failed:`, err.message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
this.processing = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async processJob(job: Job): Promise<void> {
|
|
|
|
|
updateJobStatus(job.id, 'processing');
|
|
|
|
|
this.emitProgress(job.id);
|
|
|
|
|
|
|
|
|
|
const config: Config = JSON.parse(job.config);
|
|
|
|
|
const outputOptions: OutputOptions = JSON.parse(job.output_options);
|
2026-05-15 03:53:43 +02:00
|
|
|
|
|
|
|
|
// Isolate this job's intermediates so concurrent jobs (or future resumes)
|
|
|
|
|
// don't collide on filenames like frame_00001.jpg / segment_3_std.wav.
|
|
|
|
|
// The pipeline already reads config.tempDir, so just override it here.
|
|
|
|
|
const baseTempDir = config.tempDir || './desc/tmp/';
|
|
|
|
|
if (path.basename(baseTempDir) !== job.id) {
|
|
|
|
|
config.tempDir = jobTempDir(baseTempDir, job.id);
|
|
|
|
|
}
|
|
|
|
|
fs.mkdirSync(config.tempDir, { recursive: true });
|
|
|
|
|
|
2026-05-13 16:23:43 +02:00
|
|
|
const existingSegments: AudioSegment[] = JSON.parse(job.segments || '[]');
|
|
|
|
|
const lastContext: BatchContext = JSON.parse(job.last_context || '{}');
|
|
|
|
|
|
|
|
|
|
const startIndex = existingSegments.length > 0 ? job.current_index : 0;
|
|
|
|
|
const startTimePosition = job.current_time_position || 0;
|
|
|
|
|
|
|
|
|
|
const videoDuration = getVideoDuration(job.video_path);
|
|
|
|
|
const totalUnits = config.batchTimeMode
|
|
|
|
|
? Math.floor(videoDuration / config.batchWindowDuration)
|
|
|
|
|
: Math.floor(videoDuration / config.captureIntervalSeconds);
|
|
|
|
|
|
|
|
|
|
saveCheckpoint(job.id, JSON.stringify(existingSegments), startIndex, totalUnits, startTimePosition, JSON.stringify(lastContext), 0);
|
|
|
|
|
this.emitProgress(job.id);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
const result = await generateAudioDescriptionFromOptions(
|
|
|
|
|
job.video_path,
|
|
|
|
|
config,
|
|
|
|
|
{
|
|
|
|
|
startIndex,
|
|
|
|
|
existingSegments,
|
|
|
|
|
lastContext,
|
|
|
|
|
currentTimePosition: startTimePosition,
|
|
|
|
|
onProgress: (info) => {
|
|
|
|
|
if (this.pausedJobs.has(job.id)) {
|
|
|
|
|
throw new Error('JOB_PAUSED');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const allSegments = existingSegments.length > 0 && info.index === startIndex
|
|
|
|
|
? [...existingSegments, info.segment]
|
|
|
|
|
: (() => {
|
|
|
|
|
const currentJob = getJob(job.id);
|
|
|
|
|
if (!currentJob) return [info.segment];
|
|
|
|
|
const segs = JSON.parse(currentJob.segments || '[]');
|
|
|
|
|
segs.push(info.segment);
|
|
|
|
|
return segs;
|
|
|
|
|
})();
|
|
|
|
|
|
|
|
|
|
const progress = totalUnits > 0 ? Math.min(((info.index + 1) / totalUnits) * 100, 99) : 50;
|
|
|
|
|
|
|
|
|
|
saveCheckpoint(
|
|
|
|
|
job.id,
|
|
|
|
|
JSON.stringify(allSegments),
|
|
|
|
|
info.index + 1,
|
|
|
|
|
totalUnits,
|
|
|
|
|
info.segment.startTime + info.segment.duration + (config.batchTimeMode ? 0.5 : 0.25),
|
|
|
|
|
JSON.stringify(lastContext),
|
|
|
|
|
progress
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
this.emitProgress(job.id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// All segments from the result
|
|
|
|
|
const segments = result.segments || [];
|
|
|
|
|
|
|
|
|
|
// Combine audio segments into final audio (use the result's pre-combined file)
|
|
|
|
|
const outputAudio = result.audioDescriptionFile;
|
|
|
|
|
|
|
|
|
|
let outputSubtitlesSrt: string | null = null;
|
|
|
|
|
let outputSubtitlesVtt: string | null = null;
|
|
|
|
|
let outputMuxed: string | null = null;
|
|
|
|
|
|
|
|
|
|
const baseName = path.basename(job.video_path, path.extname(job.video_path));
|
|
|
|
|
const outputDir = config.outputDir;
|
|
|
|
|
|
|
|
|
|
if (outputOptions.subtitles && segments.length > 0) {
|
|
|
|
|
const srtPath = path.join(outputDir, `${baseName}_description.srt`);
|
|
|
|
|
const vttPath = path.join(outputDir, `${baseName}_description.vtt`);
|
|
|
|
|
fs.writeFileSync(srtPath, generateSRT(segments, videoDuration));
|
|
|
|
|
fs.writeFileSync(vttPath, generateVTT(segments, videoDuration));
|
|
|
|
|
outputSubtitlesSrt = srtPath;
|
|
|
|
|
outputSubtitlesVtt = vttPath;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (outputOptions.muxed && fs.existsSync(outputAudio)) {
|
2026-05-15 04:10:06 +02:00
|
|
|
const isMixed = outputOptions.muxMode === 'mixed';
|
|
|
|
|
const muxedPath = path.join(
|
|
|
|
|
outputDir,
|
|
|
|
|
`${baseName}${isMixed ? '_described_mixed' : '_described'}.mkv`
|
|
|
|
|
);
|
|
|
|
|
if (isMixed) {
|
|
|
|
|
muxMixedAudioDescription(job.video_path, outputAudio, muxedPath);
|
|
|
|
|
} else {
|
|
|
|
|
muxAudioDescription(job.video_path, outputAudio, muxedPath);
|
|
|
|
|
}
|
2026-05-13 16:23:43 +02:00
|
|
|
outputMuxed = muxedPath;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
saveJobOutputs(job.id, {
|
|
|
|
|
audio: outputAudio,
|
|
|
|
|
subtitlesSrt: outputSubtitlesSrt || undefined,
|
|
|
|
|
subtitlesVtt: outputSubtitlesVtt || undefined,
|
|
|
|
|
muxed: outputMuxed || undefined
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
saveCheckpoint(job.id, JSON.stringify(segments), totalUnits, totalUnits, 0, '{}', 100);
|
|
|
|
|
updateJobStatus(job.id, 'completed');
|
|
|
|
|
this.emitProgress(job.id);
|
|
|
|
|
|
2026-05-15 03:53:43 +02:00
|
|
|
safeCleanupJobTmp(config.tempDir);
|
|
|
|
|
|
2026-05-13 16:23:43 +02:00
|
|
|
} catch (err: any) {
|
|
|
|
|
if (err.message === 'JOB_PAUSED') {
|
2026-05-15 03:53:43 +02:00
|
|
|
// Keep config.tempDir intact — restart will resume into the same dir.
|
2026-05-13 16:23:43 +02:00
|
|
|
updateJobStatus(job.id, 'paused');
|
|
|
|
|
this.emitProgress(job.id);
|
|
|
|
|
return;
|
|
|
|
|
}
|
2026-05-15 03:53:43 +02:00
|
|
|
|
2026-05-13 16:23:43 +02:00
|
|
|
const errorMsg = err.message || 'Unknown error';
|
|
|
|
|
updateJobStatus(job.id, 'failed', errorMsg);
|
|
|
|
|
this.emitProgress(job.id);
|
2026-05-15 03:53:43 +02:00
|
|
|
|
|
|
|
|
safeCleanupJobTmp(config.tempDir);
|
2026-05-13 16:23:43 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|