aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2020-09-25 10:04:21 +0200
committerChocobozzz <chocobozzz@cpy.re>2020-11-09 15:33:04 +0100
commita5cf76afa378aae81af2a9b0ce548e5d2582f832 (patch)
tree58da320232bee7c9656774c5d6811e82bbf6c696 /server/lib/job-queue
parentde6310b2fcbb8a6b79c546b23dfa1920724faaa7 (diff)
downloadPeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.tar.gz
PeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.tar.zst
PeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.zip
Add watch messages if live has not started
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts47
-rw-r--r--server/lib/job-queue/job-queue.ts20
2 files changed, 62 insertions, 5 deletions
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
new file mode 100644
index 000000000..1a58a9f7e
--- /dev/null
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -0,0 +1,47 @@
1import * as Bull from 'bull'
2import { readdir, remove } from 'fs-extra'
3import { join } from 'path'
4import { getHLSDirectory } from '@server/lib/video-paths'
5import { VideoModel } from '@server/models/video/video'
6import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
7import { VideoLiveEndingPayload } from '@shared/models'
8import { logger } from '../../../helpers/logger'
9
10async function processVideoLiveEnding (job: Bull.Job) {
11 const payload = job.data as VideoLiveEndingPayload
12
13 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId)
14 if (!video) {
15 logger.warn('Video live %d does not exist anymore. Cannot cleanup.', payload.videoId)
16 return
17 }
18
19 const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
20 const hlsDirectory = getHLSDirectory(video, false)
21
22 const files = await readdir(hlsDirectory)
23
24 for (const filename of files) {
25 if (
26 filename.endsWith('.ts') ||
27 filename.endsWith('.m3u8') ||
28 filename.endsWith('.mpd') ||
29 filename.endsWith('.m4s') ||
30 filename.endsWith('.tmp')
31 ) {
32 const p = join(hlsDirectory, filename)
33
34 remove(p)
35 .catch(err => logger.error('Cannot remove %s.', p, { err }))
36 }
37 }
38
39 streamingPlaylist.destroy()
40 .catch(err => logger.error('Cannot remove live streaming playlist.', { err }))
41}
42
43// ---------------------------------------------------------------------------
44
45export {
46 processVideoLiveEnding
47}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 14e181835..8d97434ac 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -10,6 +10,7 @@ import {
10 RefreshPayload, 10 RefreshPayload,
11 VideoFileImportPayload, 11 VideoFileImportPayload,
12 VideoImportPayload, 12 VideoImportPayload,
13 VideoLiveEndingPayload,
13 VideoRedundancyPayload, 14 VideoRedundancyPayload,
14 VideoTranscodingPayload 15 VideoTranscodingPayload
15} from '../../../shared/models' 16} from '../../../shared/models'
@@ -27,6 +28,7 @@ import { processVideosViews } from './handlers/video-views'
27import { refreshAPObject } from './handlers/activitypub-refresher' 28import { refreshAPObject } from './handlers/activitypub-refresher'
28import { processVideoFileImport } from './handlers/video-file-import' 29import { processVideoFileImport } from './handlers/video-file-import'
29import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 30import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
31import { processVideoLiveEnding } from './handlers/video-live-ending'
30 32
31type CreateJobArgument = 33type CreateJobArgument =
32 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 34 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -39,8 +41,13 @@ type CreateJobArgument =
39 { type: 'video-import', payload: VideoImportPayload } | 41 { type: 'video-import', payload: VideoImportPayload } |
40 { type: 'activitypub-refresher', payload: RefreshPayload } | 42 { type: 'activitypub-refresher', payload: RefreshPayload } |
41 { type: 'videos-views', payload: {} } | 43 { type: 'videos-views', payload: {} } |
44 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
42 { type: 'video-redundancy', payload: VideoRedundancyPayload } 45 { type: 'video-redundancy', payload: VideoRedundancyPayload }
43 46
47type CreateJobOptions = {
48 delay?: number
49}
50
44const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { 51const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
45 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 52 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
46 'activitypub-http-unicast': processActivityPubHttpUnicast, 53 'activitypub-http-unicast': processActivityPubHttpUnicast,
@@ -52,6 +59,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
52 'video-import': processVideoImport, 59 'video-import': processVideoImport,
53 'videos-views': processVideosViews, 60 'videos-views': processVideosViews,
54 'activitypub-refresher': refreshAPObject, 61 'activitypub-refresher': refreshAPObject,
62 'video-live-ending': processVideoLiveEnding,
55 'video-redundancy': processVideoRedundancy 63 'video-redundancy': processVideoRedundancy
56} 64}
57 65
@@ -66,7 +74,8 @@ const jobTypes: JobType[] = [
66 'video-import', 74 'video-import',
67 'videos-views', 75 'videos-views',
68 'activitypub-refresher', 76 'activitypub-refresher',
69 'video-redundancy' 77 'video-redundancy',
78 'video-live-ending'
70] 79]
71 80
72class JobQueue { 81class JobQueue {
@@ -122,12 +131,12 @@ class JobQueue {
122 } 131 }
123 } 132 }
124 133
125 createJob (obj: CreateJobArgument): void { 134 createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
126 this.createJobWithPromise(obj) 135 this.createJobWithPromise(obj, options)
127 .catch(err => logger.error('Cannot create job.', { err, obj })) 136 .catch(err => logger.error('Cannot create job.', { err, obj }))
128 } 137 }
129 138
130 createJobWithPromise (obj: CreateJobArgument) { 139 createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
131 const queue = this.queues[obj.type] 140 const queue = this.queues[obj.type]
132 if (queue === undefined) { 141 if (queue === undefined) {
133 logger.error('Unknown queue %s: cannot create job.', obj.type) 142 logger.error('Unknown queue %s: cannot create job.', obj.type)
@@ -137,7 +146,8 @@ class JobQueue {
137 const jobArgs: Bull.JobOptions = { 146 const jobArgs: Bull.JobOptions = {
138 backoff: { delay: 60 * 1000, type: 'exponential' }, 147 backoff: { delay: 60 * 1000, type: 'exponential' },
139 attempts: JOB_ATTEMPTS[obj.type], 148 attempts: JOB_ATTEMPTS[obj.type],
140 timeout: JOB_TTL[obj.type] 149 timeout: JOB_TTL[obj.type],
150 delay: options.delay
141 } 151 }
142 152
143 return queue.add(obj.payload, jobArgs) 153 return queue.add(obj.payload, jobArgs)