]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - packages/peertube-runner/server/process/shared/process-live.ts
Enable external plugins to test the PR
[github/Chocobozzz/PeerTube.git] / packages / peertube-runner / server / process / shared / process-live.ts
CommitLineData
1772b383
C
1import { FSWatcher, watch } from 'chokidar'
2import { FfmpegCommand } from 'fluent-ffmpeg'
3import { ensureDir, remove } from 'fs-extra'
4import { logger } from 'packages/peertube-runner/shared'
5import { basename, join } from 'path'
6import { wait } from '@shared/core-utils'
7import { buildUUID } from '@shared/extra-utils'
8import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg'
9import {
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 PeerTubeProblemDocument,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
14 ServerErrorCode
15} from '@shared/models'
16import { ConfigManager } from '../../../shared/config-manager'
17import { buildFFmpegLive, ProcessOptions } from './common'
18
19export class ProcessLiveRTMPHLSTranscoding {
20
21 private readonly outputPath: string
22 private readonly fsWatchers: FSWatcher[] = []
23
24 private readonly playlistsCreated = new Set<string>()
25 private allPlaylistsCreated = false
26
27 private ffmpegCommand: FfmpegCommand
28
29 private ended = false
30 private errored = false
31
32 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
33 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
34 }
35
36 process () {
37 const job = this.options.job
38 const payload = job.payload
39
40 return new Promise<void>(async (res, rej) => {
41 try {
42 await ensureDir(this.outputPath)
43
44 logger.info(`Probing ${payload.input.rtmpUrl}`)
45 const probe = await ffprobePromise(payload.input.rtmpUrl)
46 logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
47
48 const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
49 const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
50 const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
51
52 const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
53 this.fsWatchers.push(m3u8Watcher)
54
55 const tsWatcher = watch(this.outputPath + '/*.ts')
56 this.fsWatchers.push(tsWatcher)
57
58 m3u8Watcher.on('change', p => {
59 logger.debug(`${p} m3u8 playlist changed`)
60 })
61
62 m3u8Watcher.on('add', p => {
63 this.playlistsCreated.add(p)
64
65 if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
66 this.allPlaylistsCreated = true
67 logger.info('All m3u8 playlists are created.')
68 }
69 })
70
71 tsWatcher.on('add', p => {
72 this.sendAddedChunkUpdate(p)
73 .catch(err => this.onUpdateError(err, rej))
74 })
75
76 tsWatcher.on('unlink', p => {
77 this.sendDeletedChunkUpdate(p)
78 .catch(err => this.onUpdateError(err, rej))
79 })
80
81 this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
82 inputUrl: payload.input.rtmpUrl,
83
84 outPath: this.outputPath,
85 masterPlaylistName: 'master.m3u8',
86
87 segmentListSize: payload.output.segmentListSize,
88 segmentDuration: payload.output.segmentDuration,
89
90 toTranscode: payload.output.toTranscode,
91
92 bitrate,
93 ratio,
94
95 hasAudio
96 })
97
98 logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
99
100 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
101 this.onFFmpegError({ err, stdout, stderr })
102
103 res()
104 })
105
106 this.ffmpegCommand.on('end', () => {
107 this.onFFmpegEnded()
108 .catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
109
110 res()
111 })
112
113 this.ffmpegCommand.run()
114 } catch (err) {
115 rej(err)
116 }
117 })
118 }
119
120 // ---------------------------------------------------------------------------
121
122 private onUpdateError (err: Error, reject: (reason?: any) => void) {
123 if (this.errored) return
124 if (this.ended) return
125
126 this.errored = true
127
128 reject(err)
129 this.ffmpegCommand.kill('SIGINT')
130
131 const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
132 if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
133 logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore')
134 } else {
135 logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
136
137 this.sendError(err)
138 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
139 }
140
141 this.cleanup()
142 }
143
144 // ---------------------------------------------------------------------------
145
146 private onFFmpegError (options: {
147 err: any
148 stdout: string
149 stderr: string
150 }) {
151 const { err, stdout, stderr } = options
152
153 // Don't care that we killed the ffmpeg process
154 if (err?.message?.includes('Exiting normally')) return
155 if (this.errored) return
156 if (this.ended) return
157
158 this.errored = true
159
160 logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
161
162 this.sendError(err)
163 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
164
165 this.cleanup()
166 }
167
168 private async sendError (err: Error) {
169 await this.options.server.runnerJobs.error({
170 jobToken: this.options.job.jobToken,
171 jobUUID: this.options.job.uuid,
172 runnerToken: this.options.runnerToken,
173 message: err.message
174 })
175 }
176
177 // ---------------------------------------------------------------------------
178
179 private async onFFmpegEnded () {
180 if (this.ended) return
181
182 this.ended = true
183 logger.info('FFmpeg ended, sending success to server')
184
185 // Wait last ffmpeg chunks generation
186 await wait(1500)
187
188 this.sendSuccess()
189 .catch(err => logger.error({ err }, 'Cannot send success'))
190
191 this.cleanup()
192 }
193
194 private async sendSuccess () {
195 const successBody: LiveRTMPHLSTranscodingSuccess = {}
196
197 await this.options.server.runnerJobs.success({
198 jobToken: this.options.job.jobToken,
199 jobUUID: this.options.job.uuid,
200 runnerToken: this.options.runnerToken,
201 payload: successBody
202 })
203 }
204
205 // ---------------------------------------------------------------------------
206
3a0c2a77
C
207 private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
208 if (this.ended) return Promise.resolve()
1772b383
C
209
210 logger.debug(`Sending removed live chunk ${deletedChunk} update`)
211
212 const videoChunkFilename = basename(deletedChunk)
213
214 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
215 type: 'remove-chunk',
216 videoChunkFilename
217 }
218
219 if (this.allPlaylistsCreated) {
220 const playlistName = this.getPlaylistName(videoChunkFilename)
221
222 payload = {
223 ...payload,
224 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
225 resolutionPlaylistFilename: playlistName,
226 resolutionPlaylistFile: join(this.outputPath, playlistName)
227 }
228 }
229
230 return this.updateWithRetry(payload)
231 }
232
3a0c2a77
C
233 private sendAddedChunkUpdate (addedChunk: string): Promise<any> {
234 if (this.ended) return Promise.resolve()
1772b383
C
235
236 logger.debug(`Sending added live chunk ${addedChunk} update`)
237
238 const videoChunkFilename = basename(addedChunk)
239
240 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
241 type: 'add-chunk',
242 videoChunkFilename,
243 videoChunkFile: addedChunk
244 }
245
246 if (this.allPlaylistsCreated) {
247 const playlistName = this.getPlaylistName(videoChunkFilename)
248
249 payload = {
250 ...payload,
251 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
252 resolutionPlaylistFilename: playlistName,
253 resolutionPlaylistFile: join(this.outputPath, playlistName)
254 }
255 }
256
257 return this.updateWithRetry(payload)
258 }
259
3a0c2a77 260 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
1772b383
C
261 if (this.ended || this.errored) return
262
263 try {
264 await this.options.server.runnerJobs.update({
265 jobToken: this.options.job.jobToken,
266 jobUUID: this.options.job.uuid,
267 runnerToken: this.options.runnerToken,
268 payload
269 })
270 } catch (err) {
271 if (currentTry >= 3) throw err
272
273 logger.warn({ err }, 'Will retry update after error')
274 await wait(250)
275
276 return this.updateWithRetry(payload, currentTry + 1)
277 }
278 }
279
280 private getPlaylistName (videoChunkFilename: string) {
281 return `${videoChunkFilename.split('-')[0]}.m3u8`
282 }
283
284 // ---------------------------------------------------------------------------
285
286 private cleanup () {
287 for (const fsWatcher of this.fsWatchers) {
288 fsWatcher.close()
289 .catch(err => logger.error({ err }, 'Cannot close watcher'))
290 }
291
292 remove(this.outputPath)
293 .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))
294 }
295}