]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - packages/peertube-runner/server/process/shared/process-live.ts
Bumped to version v5.2.1
[github/Chocobozzz/PeerTube.git] / packages / peertube-runner / server / process / shared / process-live.ts
1 import { FSWatcher, watch } from 'chokidar'
2 import { FfmpegCommand } from 'fluent-ffmpeg'
3 import { ensureDir, remove } from 'fs-extra'
4 import { logger } from 'packages/peertube-runner/shared'
5 import { basename, join } from 'path'
6 import { wait } from '@shared/core-utils'
7 import { buildUUID } from '@shared/extra-utils'
8 import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg'
9 import {
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 PeerTubeProblemDocument,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
14 ServerErrorCode
15 } from '@shared/models'
16 import { ConfigManager } from '../../../shared/config-manager'
17 import { buildFFmpegLive, ProcessOptions } from './common'
18
19 export class ProcessLiveRTMPHLSTranscoding {
20
21 private readonly outputPath: string
22 private readonly fsWatchers: FSWatcher[] = []
23
24 // Playlist name -> chunks
25 private readonly pendingChunksPerPlaylist = new Map<string, string[]>()
26
27 private readonly playlistsCreated = new Set<string>()
28 private allPlaylistsCreated = false
29
30 private ffmpegCommand: FfmpegCommand
31
32 private ended = false
33 private errored = false
34
35 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
36 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
37
38 logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
39 }
40
41 process () {
42 const job = this.options.job
43 const payload = job.payload
44
45 return new Promise<void>(async (res, rej) => {
46 try {
47 await ensureDir(this.outputPath)
48
49 logger.info(`Probing ${payload.input.rtmpUrl}`)
50 const probe = await ffprobePromise(payload.input.rtmpUrl)
51 logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
52
53 const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
54 const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
55 const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
56
57 const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
58 this.fsWatchers.push(m3u8Watcher)
59
60 const tsWatcher = watch(this.outputPath + '/*.ts')
61 this.fsWatchers.push(tsWatcher)
62
63 m3u8Watcher.on('change', p => {
64 logger.debug(`${p} m3u8 playlist changed`)
65 })
66
67 m3u8Watcher.on('add', p => {
68 this.playlistsCreated.add(p)
69
70 if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
71 this.allPlaylistsCreated = true
72 logger.info('All m3u8 playlists are created.')
73 }
74 })
75
76 tsWatcher.on('add', async p => {
77 try {
78 await this.sendPendingChunks()
79 } catch (err) {
80 this.onUpdateError({ err, rej, res })
81 }
82
83 const playlistName = this.getPlaylistIdFromTS(p)
84
85 const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || []
86 pendingChunks.push(p)
87
88 this.pendingChunksPerPlaylist.set(playlistName, pendingChunks)
89 })
90
91 tsWatcher.on('unlink', p => {
92 this.sendDeletedChunkUpdate(p)
93 .catch(err => this.onUpdateError({ err, rej, res }))
94 })
95
96 this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
97 inputUrl: payload.input.rtmpUrl,
98
99 outPath: this.outputPath,
100 masterPlaylistName: 'master.m3u8',
101
102 segmentListSize: payload.output.segmentListSize,
103 segmentDuration: payload.output.segmentDuration,
104
105 toTranscode: payload.output.toTranscode,
106
107 bitrate,
108 ratio,
109
110 hasAudio
111 })
112
113 logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
114
115 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
116 this.onFFmpegError({ err, stdout, stderr })
117
118 res()
119 })
120
121 this.ffmpegCommand.on('end', () => {
122 this.onFFmpegEnded()
123 .catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
124
125 res()
126 })
127
128 this.ffmpegCommand.run()
129 } catch (err) {
130 rej(err)
131 }
132 })
133 }
134
135 // ---------------------------------------------------------------------------
136
137 private onUpdateError (options: {
138 err: Error
139 res: () => void
140 rej: (reason?: any) => void
141 }) {
142 const { err, res, rej } = options
143
144 if (this.errored) return
145 if (this.ended) return
146
147 this.errored = true
148
149 this.ffmpegCommand.kill('SIGINT')
150
151 const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
152 if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
153 logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore')
154
155 res()
156 } else {
157 logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
158
159 this.sendError(err)
160 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
161
162 rej(err)
163 }
164
165 this.cleanup()
166 }
167
168 // ---------------------------------------------------------------------------
169
170 private onFFmpegError (options: {
171 err: any
172 stdout: string
173 stderr: string
174 }) {
175 const { err, stdout, stderr } = options
176
177 // Don't care that we killed the ffmpeg process
178 if (err?.message?.includes('Exiting normally')) return
179 if (this.errored) return
180 if (this.ended) return
181
182 this.errored = true
183
184 logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
185
186 this.sendError(err)
187 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
188
189 this.cleanup()
190 }
191
192 private async sendError (err: Error) {
193 await this.options.server.runnerJobs.error({
194 jobToken: this.options.job.jobToken,
195 jobUUID: this.options.job.uuid,
196 runnerToken: this.options.runnerToken,
197 message: err.message
198 })
199 }
200
201 // ---------------------------------------------------------------------------
202
203 private async onFFmpegEnded () {
204 if (this.ended) return
205
206 this.ended = true
207 logger.info('FFmpeg ended, sending success to server')
208
209 // Wait last ffmpeg chunks generation
210 await wait(1500)
211
212 this.sendSuccess()
213 .catch(err => logger.error({ err }, 'Cannot send success'))
214
215 this.cleanup()
216 }
217
218 private async sendSuccess () {
219 const successBody: LiveRTMPHLSTranscodingSuccess = {}
220
221 await this.options.server.runnerJobs.success({
222 jobToken: this.options.job.jobToken,
223 jobUUID: this.options.job.uuid,
224 runnerToken: this.options.runnerToken,
225 payload: successBody
226 })
227 }
228
229 // ---------------------------------------------------------------------------
230
231 private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
232 if (this.ended) return Promise.resolve()
233
234 logger.debug(`Sending removed live chunk ${deletedChunk} update`)
235
236 const videoChunkFilename = basename(deletedChunk)
237
238 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
239 type: 'remove-chunk',
240 videoChunkFilename
241 }
242
243 if (this.allPlaylistsCreated) {
244 const playlistName = this.getPlaylistName(videoChunkFilename)
245
246 payload = {
247 ...payload,
248 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
249 resolutionPlaylistFilename: playlistName,
250 resolutionPlaylistFile: join(this.outputPath, playlistName)
251 }
252 }
253
254 return this.updateWithRetry(payload)
255 }
256
257 private async sendPendingChunks (): Promise<any> {
258 if (this.ended) return Promise.resolve()
259
260 const promises: Promise<any>[] = []
261
262 for (const playlist of this.pendingChunksPerPlaylist.keys()) {
263 for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) {
264 logger.debug(`Sending added live chunk ${chunk} update`)
265
266 const videoChunkFilename = basename(chunk)
267
268 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
269 type: 'add-chunk',
270 videoChunkFilename,
271 videoChunkFile: chunk
272 }
273
274 if (this.allPlaylistsCreated) {
275 const playlistName = this.getPlaylistName(videoChunkFilename)
276
277 payload = {
278 ...payload,
279 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
280 resolutionPlaylistFilename: playlistName,
281 resolutionPlaylistFile: join(this.outputPath, playlistName)
282 }
283 }
284
285 promises.push(this.updateWithRetry(payload))
286 }
287
288 this.pendingChunksPerPlaylist.set(playlist, [])
289 }
290
291 await Promise.all(promises)
292 }
293
294 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
295 if (this.ended || this.errored) return
296
297 try {
298 await this.options.server.runnerJobs.update({
299 jobToken: this.options.job.jobToken,
300 jobUUID: this.options.job.uuid,
301 runnerToken: this.options.runnerToken,
302 payload
303 })
304 } catch (err) {
305 if (currentTry >= 3) throw err
306 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err
307
308 logger.warn({ err }, 'Will retry update after error')
309 await wait(250)
310
311 return this.updateWithRetry(payload, currentTry + 1)
312 }
313 }
314
315 private getPlaylistName (videoChunkFilename: string) {
316 return `${videoChunkFilename.split('-')[0]}.m3u8`
317 }
318
319 private getPlaylistIdFromTS (segmentPath: string) {
320 const playlistIdMatcher = /^([\d+])-/
321
322 return basename(segmentPath).match(playlistIdMatcher)[1]
323 }
324
325 // ---------------------------------------------------------------------------
326
327 private cleanup () {
328 logger.debug(`Cleaning up job ${this.options.job.uuid}`)
329
330 for (const fsWatcher of this.fsWatchers) {
331 fsWatcher.close()
332 .catch(err => logger.error({ err }, 'Cannot close watcher'))
333 }
334
335 remove(this.outputPath)
336 .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))
337 }
338 }