diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/video-live-ending.ts | 47 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 20 |
2 files changed, 62 insertions, 5 deletions
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts new file mode 100644 index 000000000..1a58a9f7e --- /dev/null +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -0,0 +1,47 @@ | |||
1 | import * as Bull from 'bull' | ||
2 | import { readdir, remove } from 'fs-extra' | ||
3 | import { join } from 'path' | ||
4 | import { getHLSDirectory } from '@server/lib/video-paths' | ||
5 | import { VideoModel } from '@server/models/video/video' | ||
6 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | ||
7 | import { VideoLiveEndingPayload } from '@shared/models' | ||
8 | import { logger } from '../../../helpers/logger' | ||
9 | |||
10 | async function processVideoLiveEnding (job: Bull.Job) { | ||
11 | const payload = job.data as VideoLiveEndingPayload | ||
12 | |||
13 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) | ||
14 | if (!video) { | ||
15 | logger.warn('Video live %d does not exist anymore. Cannot cleanup.', payload.videoId) | ||
16 | return | ||
17 | } | ||
18 | |||
19 | const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) | ||
20 | const hlsDirectory = getHLSDirectory(video, false) | ||
21 | |||
22 | const files = await readdir(hlsDirectory) | ||
23 | |||
24 | for (const filename of files) { | ||
25 | if ( | ||
26 | filename.endsWith('.ts') || | ||
27 | filename.endsWith('.m3u8') || | ||
28 | filename.endsWith('.mpd') || | ||
29 | filename.endsWith('.m4s') || | ||
30 | filename.endsWith('.tmp') | ||
31 | ) { | ||
32 | const p = join(hlsDirectory, filename) | ||
33 | |||
34 | remove(p) | ||
35 | .catch(err => logger.error('Cannot remove %s.', p, { err })) | ||
36 | } | ||
37 | } | ||
38 | |||
39 | streamingPlaylist.destroy() | ||
40 | .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) | ||
41 | } | ||
42 | |||
43 | // --------------------------------------------------------------------------- | ||
44 | |||
45 | export { | ||
46 | processVideoLiveEnding | ||
47 | } | ||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 14e181835..8d97434ac 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -10,6 +10,7 @@ import { | |||
10 | RefreshPayload, | 10 | RefreshPayload, |
11 | VideoFileImportPayload, | 11 | VideoFileImportPayload, |
12 | VideoImportPayload, | 12 | VideoImportPayload, |
13 | VideoLiveEndingPayload, | ||
13 | VideoRedundancyPayload, | 14 | VideoRedundancyPayload, |
14 | VideoTranscodingPayload | 15 | VideoTranscodingPayload |
15 | } from '../../../shared/models' | 16 | } from '../../../shared/models' |
@@ -27,6 +28,7 @@ import { processVideosViews } from './handlers/video-views' | |||
27 | import { refreshAPObject } from './handlers/activitypub-refresher' | 28 | import { refreshAPObject } from './handlers/activitypub-refresher' |
28 | import { processVideoFileImport } from './handlers/video-file-import' | 29 | import { processVideoFileImport } from './handlers/video-file-import' |
29 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 30 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
31 | import { processVideoLiveEnding } from './handlers/video-live-ending' | ||
30 | 32 | ||
31 | type CreateJobArgument = | 33 | type CreateJobArgument = |
32 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 34 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -39,8 +41,13 @@ type CreateJobArgument = | |||
39 | { type: 'video-import', payload: VideoImportPayload } | | 41 | { type: 'video-import', payload: VideoImportPayload } | |
40 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 42 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
41 | { type: 'videos-views', payload: {} } | | 43 | { type: 'videos-views', payload: {} } | |
44 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | ||
42 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | 45 | { type: 'video-redundancy', payload: VideoRedundancyPayload } |
43 | 46 | ||
47 | type CreateJobOptions = { | ||
48 | delay?: number | ||
49 | } | ||
50 | |||
44 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | 51 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { |
45 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 52 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
46 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 53 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
@@ -52,6 +59,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | |||
52 | 'video-import': processVideoImport, | 59 | 'video-import': processVideoImport, |
53 | 'videos-views': processVideosViews, | 60 | 'videos-views': processVideosViews, |
54 | 'activitypub-refresher': refreshAPObject, | 61 | 'activitypub-refresher': refreshAPObject, |
62 | 'video-live-ending': processVideoLiveEnding, | ||
55 | 'video-redundancy': processVideoRedundancy | 63 | 'video-redundancy': processVideoRedundancy |
56 | } | 64 | } |
57 | 65 | ||
@@ -66,7 +74,8 @@ const jobTypes: JobType[] = [ | |||
66 | 'video-import', | 74 | 'video-import', |
67 | 'videos-views', | 75 | 'videos-views', |
68 | 'activitypub-refresher', | 76 | 'activitypub-refresher', |
69 | 'video-redundancy' | 77 | 'video-redundancy', |
78 | 'video-live-ending' | ||
70 | ] | 79 | ] |
71 | 80 | ||
72 | class JobQueue { | 81 | class JobQueue { |
@@ -122,12 +131,12 @@ class JobQueue { | |||
122 | } | 131 | } |
123 | } | 132 | } |
124 | 133 | ||
125 | createJob (obj: CreateJobArgument): void { | 134 | createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { |
126 | this.createJobWithPromise(obj) | 135 | this.createJobWithPromise(obj, options) |
127 | .catch(err => logger.error('Cannot create job.', { err, obj })) | 136 | .catch(err => logger.error('Cannot create job.', { err, obj })) |
128 | } | 137 | } |
129 | 138 | ||
130 | createJobWithPromise (obj: CreateJobArgument) { | 139 | createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { |
131 | const queue = this.queues[obj.type] | 140 | const queue = this.queues[obj.type] |
132 | if (queue === undefined) { | 141 | if (queue === undefined) { |
133 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 142 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
@@ -137,7 +146,8 @@ class JobQueue { | |||
137 | const jobArgs: Bull.JobOptions = { | 146 | const jobArgs: Bull.JobOptions = { |
138 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 147 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
139 | attempts: JOB_ATTEMPTS[obj.type], | 148 | attempts: JOB_ATTEMPTS[obj.type], |
140 | timeout: JOB_TTL[obj.type] | 149 | timeout: JOB_TTL[obj.type], |
150 | delay: options.delay | ||
141 | } | 151 | } |
142 | 152 | ||
143 | return queue.add(obj.payload, jobArgs) | 153 | return queue.add(obj.payload, jobArgs) |