From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- .../schedulers/runner-job-watch-dog-scheduler.ts | 42 ++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 server/lib/schedulers/runner-job-watch-dog-scheduler.ts (limited to 'server/lib/schedulers') diff --git a/server/lib/schedulers/runner-job-watch-dog-scheduler.ts b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts new file mode 100644 index 000000000..f7a26d2bc --- /dev/null +++ b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts @@ -0,0 +1,42 @@ +import { CONFIG } from '@server/initializers/config' +import { RunnerJobModel } from '@server/models/runner/runner-job' +import { logger, loggerTagsFactory } from '../../helpers/logger' +import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { getRunnerJobHandlerClass } from '../runners' +import { AbstractScheduler } from './abstract-scheduler' + +const lTags = loggerTagsFactory('runner') + +export class RunnerJobWatchDogScheduler extends AbstractScheduler { + + private static instance: AbstractScheduler + + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.RUNNER_JOB_WATCH_DOG + + private constructor () { + super() + } + + protected async internalExecute () { + const vodStalledJobs = await RunnerJobModel.listStalledJobs({ + staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.VOD, + types: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ] + }) + + const liveStalledJobs = await RunnerJobModel.listStalledJobs({ + staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.LIVE, + types: [ 'live-rtmp-hls-transcoding' ] + }) + + for (const stalled of [ ...vodStalledJobs, ...liveStalledJobs ]) { + logger.info('Abort stalled runner job %s (%s)', stalled.uuid, stalled.type, lTags(stalled.uuid, stalled.type)) + + const Handler = getRunnerJobHandlerClass(stalled) + await new Handler().abort({ runnerJob: stalled }) + } + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} -- cgit v1.2.3