From 3a0c2a77b1a6626699514ddaf8135f4397175443 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 2 May 2023 13:51:06 +0200 Subject: Enable external plugins to test the PR --- server/lib/job-queue/job-queue.ts | 7 +++- server/lib/live/shared/muxing-session.ts | 40 ++++++++++++---------- .../runners/job-handlers/abstract-job-handler.ts | 22 +++++++----- 3 files changed, 42 insertions(+), 27 deletions(-) (limited to 'server/lib') diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 21bf0f226..03f6fbea7 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -96,6 +96,7 @@ export type CreateJobArgument = export type CreateJobOptions = { delay?: number priority?: number + failParentOnFailure?: boolean } const handlers: { [id in JobType]: (job: Job) => Promise } = { @@ -363,7 +364,11 @@ class JobQueue { name: 'job', data: job.payload, queueName: job.type, - opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) + opts: { + failParentOnFailure: true, + + ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ])) + } } } diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index f3f8fc886..ef4ecb83e 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -79,9 +79,7 @@ class MuxingSession extends EventEmitter { private streamingPlaylist: MStreamingPlaylistVideo private liveSegmentShaStore: LiveSegmentShaStore - private tsWatcher: FSWatcher - private masterWatcher: FSWatcher - private m3u8Watcher: FSWatcher + private filesWatcher: FSWatcher private masterPlaylistCreated = false private liveReady = false @@ -149,6 +147,8 @@ class MuxingSession extends EventEmitter { await this.transcodingWrapper.run() + this.filesWatcher = watch(this.outDirectory, { depth: 0 }) + this.watchMasterFile() this.watchTSFiles() this.watchM3U8File() @@ -168,9 +168,10 @@ class MuxingSession extends EventEmitter { } private watchMasterFile () { - this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) + this.filesWatcher.on('add', async path => { + if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return + if (this.masterPlaylistCreated === true) return - this.masterWatcher.on('add', async () => { try { if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) @@ -188,20 +189,18 @@ class MuxingSession extends EventEmitter { this.masterPlaylistCreated = true logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) - - this.masterWatcher.close() - .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) }) } private watchM3U8File () { - this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') - const sendQueues = new Map() - const onChangeOrAdd = async (m3u8Path: string) => { + const onChange = async (m3u8Path: string) => { + if (m3u8Path.endsWith('.m3u8') !== true) return if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return + logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags()) + try { if (!sendQueues.has(m3u8Path)) { sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) @@ -214,18 +213,18 @@ class MuxingSession extends EventEmitter { } } - this.m3u8Watcher.on('change', onChangeOrAdd) + this.filesWatcher.on('change', onChange) } private watchTSFiles () { const startStreamDateTime = new Date().getTime() - this.tsWatcher = watch(this.outDirectory + '/*.ts') - const playlistIdMatcher = /^([\d+])-/ const addHandler = async (segmentPath: string) => { - logger.debug('Live add handler of %s.', segmentPath, this.lTags()) + if (segmentPath.endsWith('.ts') !== true) return + + logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] @@ -252,6 +251,10 @@ class MuxingSession extends EventEmitter { } const deleteHandler = async (segmentPath: string) => { + if (segmentPath.endsWith('.ts') !== true) return + + logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags()) + try { await this.liveSegmentShaStore.removeSegmentSha(segmentPath) } catch (err) { @@ -267,8 +270,8 @@ class MuxingSession extends EventEmitter { } } - this.tsWatcher.on('add', p => addHandler(p)) - this.tsWatcher.on('unlink', p => deleteHandler(p)) + this.filesWatcher.on('add', p => addHandler(p)) + this.filesWatcher.on('unlink', p => deleteHandler(p)) } private async isQuotaExceeded (segmentPath: string) { @@ -371,7 +374,8 @@ class MuxingSession extends EventEmitter { setTimeout(() => { // Wait latest segments generation, and close watchers - Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) + const promise = this.filesWatcher?.close() || Promise.resolve() + promise .then(() => { // Process remaining segments hash for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts index 73fc14574..74b455107 100644 --- a/server/lib/runners/job-handlers/abstract-job-handler.ts +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts @@ -21,6 +21,7 @@ import { RunnerJobVODWebVideoTranscodingPayload, RunnerJobVODWebVideoTranscodingPrivatePayload } from '@shared/models' +import { throttle } from 'lodash' type CreateRunnerJobArg = { @@ -48,6 +49,8 @@ export abstract class AbstractJobHandler @@ -102,16 +105,19 @@ export abstract class AbstractJobHandler { return sequelizeTypescript.transaction(async transaction => { - if (runnerJob.changed()) { - return runnerJob.save({ transaction }) - } - - // Don't update the job too often - if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) { - await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction }) - } + return runnerJob.save({ transaction }) }) }) } -- cgit v1.2.3