From a5cf76afa378aae81af2a9b0ce548e5d2582f832 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 25 Sep 2020 10:04:21 +0200 Subject: Add watch messages if live has not started --- server/lib/job-queue/handlers/video-live-ending.ts | 47 ++++++++++++++++++++++ server/lib/job-queue/job-queue.ts | 20 ++++++--- 2 files changed, 62 insertions(+), 5 deletions(-) create mode 100644 server/lib/job-queue/handlers/video-live-ending.ts (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts new file mode 100644 index 000000000..1a58a9f7e --- /dev/null +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -0,0 +1,47 @@ +import * as Bull from 'bull' +import { readdir, remove } from 'fs-extra' +import { join } from 'path' +import { getHLSDirectory } from '@server/lib/video-paths' +import { VideoModel } from '@server/models/video/video' +import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' +import { VideoLiveEndingPayload } from '@shared/models' +import { logger } from '../../../helpers/logger' + +async function processVideoLiveEnding (job: Bull.Job) { + const payload = job.data as VideoLiveEndingPayload + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) + if (!video) { + logger.warn('Video live %d does not exist anymore. Cannot cleanup.', payload.videoId) + return + } + + const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) + const hlsDirectory = getHLSDirectory(video, false) + + const files = await readdir(hlsDirectory) + + for (const filename of files) { + if ( + filename.endsWith('.ts') || + filename.endsWith('.m3u8') || + filename.endsWith('.mpd') || + filename.endsWith('.m4s') || + filename.endsWith('.tmp') + ) { + const p = join(hlsDirectory, filename) + + remove(p) + .catch(err => logger.error('Cannot remove %s.', p, { err })) + } + } + + streamingPlaylist.destroy() + .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) +} + +// --------------------------------------------------------------------------- + +export { + processVideoLiveEnding +} diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 14e181835..8d97434ac 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -10,6 +10,7 @@ import { RefreshPayload, VideoFileImportPayload, VideoImportPayload, + VideoLiveEndingPayload, VideoRedundancyPayload, VideoTranscodingPayload } from '../../../shared/models' @@ -27,6 +28,7 @@ import { processVideosViews } from './handlers/video-views' import { refreshAPObject } from './handlers/activitypub-refresher' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' +import { processVideoLiveEnding } from './handlers/video-live-ending' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -39,8 +41,13 @@ type CreateJobArgument = { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'videos-views', payload: {} } | + { type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'video-redundancy', payload: VideoRedundancyPayload } +type CreateJobOptions = { + delay?: number +} + const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, @@ -52,6 +59,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { 'video-import': processVideoImport, 'videos-views': processVideosViews, 'activitypub-refresher': refreshAPObject, + 'video-live-ending': processVideoLiveEnding, 'video-redundancy': processVideoRedundancy } @@ -66,7 +74,8 @@ const jobTypes: JobType[] = [ 'video-import', 'videos-views', 'activitypub-refresher', - 'video-redundancy' + 'video-redundancy', + 'video-live-ending' ] class JobQueue { @@ -122,12 +131,12 @@ class JobQueue { } } - createJob (obj: CreateJobArgument): void { - this.createJobWithPromise(obj) + createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { + this.createJobWithPromise(obj, options) .catch(err => logger.error('Cannot create job.', { err, obj })) } - createJobWithPromise (obj: CreateJobArgument) { + createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { const queue = this.queues[obj.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', obj.type) @@ -137,7 +146,8 @@ class JobQueue { const jobArgs: Bull.JobOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], - timeout: JOB_TTL[obj.type] + timeout: JOB_TTL[obj.type], + delay: options.delay } return queue.add(obj.payload, jobArgs) -- cgit v1.2.3