]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
Implement remote runner jobs in server
[github/Chocobozzz/PeerTube.git] / server / lib / runners / job-handlers / live-rtmp-hls-transcoding-job-handler.ts
1 import { move, remove } from 'fs-extra'
2 import { join } from 'path'
3 import { logger } from '@server/helpers/logger'
4 import { JOB_PRIORITY } from '@server/initializers/constants'
5 import { LiveManager } from '@server/lib/live'
6 import { MStreamingPlaylist, MVideo } from '@server/types/models'
7 import { MRunnerJob } from '@server/types/models/runners'
8 import { buildUUID } from '@shared/extra-utils'
9 import {
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 LiveVideoError,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
14 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
15 RunnerJobState
16 } from '@shared/models'
17 import { AbstractJobHandler } from './abstract-job-handler'
18
19 type CreateOptions = {
20 video: MVideo
21 playlist: MStreamingPlaylist
22
23 rtmpUrl: string
24
25 toTranscode: {
26 resolution: number
27 fps: number
28 }[]
29
30 segmentListSize: number
31 segmentDuration: number
32
33 outputDirectory: string
34 }
35
36 // eslint-disable-next-line max-len
37 export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> {
38
39 async create (options: CreateOptions) {
40 const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options
41
42 const jobUUID = buildUUID()
43 const payload: RunnerJobLiveRTMPHLSTranscodingPayload = {
44 input: {
45 rtmpUrl
46 },
47 output: {
48 toTranscode,
49 segmentListSize,
50 segmentDuration
51 }
52 }
53
54 const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = {
55 videoUUID: video.uuid,
56 masterPlaylistName: playlist.playlistFilename,
57 outputDirectory
58 }
59
60 const job = await this.createRunnerJob({
61 type: 'live-rtmp-hls-transcoding',
62 jobUUID,
63 payload,
64 privatePayload,
65 priority: JOB_PRIORITY.TRANSCODING
66 })
67
68 return job
69 }
70
71 // ---------------------------------------------------------------------------
72
73 async specificUpdate (options: {
74 runnerJob: MRunnerJob
75 updatePayload: LiveRTMPHLSTranscodingUpdatePayload
76 }) {
77 const { runnerJob, updatePayload } = options
78
79 const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
80 const outputDirectory = privatePayload.outputDirectory
81 const videoUUID = privatePayload.videoUUID
82
83 if (updatePayload.type === 'add-chunk') {
84 await move(
85 updatePayload.videoChunkFile as string,
86 join(outputDirectory, updatePayload.videoChunkFilename),
87 { overwrite: true }
88 )
89 } else if (updatePayload.type === 'remove-chunk') {
90 await remove(join(outputDirectory, updatePayload.videoChunkFilename))
91 }
92
93 if (updatePayload.resolutionPlaylistFile && updatePayload.resolutionPlaylistFilename) {
94 await move(
95 updatePayload.resolutionPlaylistFile as string,
96 join(outputDirectory, updatePayload.resolutionPlaylistFilename),
97 { overwrite: true }
98 )
99 }
100
101 if (updatePayload.masterPlaylistFile) {
102 await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true })
103 }
104
105 logger.info(
106 'Runner live RTMP to HLS job %s for %s updated.',
107 runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) }
108 )
109 }
110
111 // ---------------------------------------------------------------------------
112
113 protected specificComplete (options: {
114 runnerJob: MRunnerJob
115 }) {
116 return this.stopLive({
117 runnerJob: options.runnerJob,
118 type: 'ended'
119 })
120 }
121
122 // ---------------------------------------------------------------------------
123
124 protected isAbortSupported () {
125 return false
126 }
127
128 protected specificAbort () {
129 throw new Error('Not implemented')
130 }
131
132 protected specificError (options: {
133 runnerJob: MRunnerJob
134 nextState: RunnerJobState
135 }) {
136 return this.stopLive({
137 runnerJob: options.runnerJob,
138 type: 'errored'
139 })
140 }
141
142 protected specificCancel (options: {
143 runnerJob: MRunnerJob
144 }) {
145 return this.stopLive({
146 runnerJob: options.runnerJob,
147 type: 'cancelled'
148 })
149 }
150
151 private stopLive (options: {
152 runnerJob: MRunnerJob
153 type: 'ended' | 'errored' | 'cancelled'
154 }) {
155 const { runnerJob, type } = options
156
157 const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
158 const videoUUID = privatePayload.videoUUID
159
160 const errorType = {
161 ended: null,
162 errored: LiveVideoError.RUNNER_JOB_ERROR,
163 cancelled: LiveVideoError.RUNNER_JOB_CANCEL
164 }
165
166 LiveManager.Instance.stopSessionOf(privatePayload.videoUUID, errorType[type])
167
168 logger.info('Runner live RTMP to HLS job %s for video %s %s.', runnerJob.uuid, videoUUID, type, this.lTags(runnerJob.uuid, videoUUID))
169 }
170 }