]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - packages/peertube-runner/server/process/shared/process-live.ts
More robust chunk handler
[github/Chocobozzz/PeerTube.git] / packages / peertube-runner / server / process / shared / process-live.ts
CommitLineData
1772b383
C
1import { FSWatcher, watch } from 'chokidar'
2import { FfmpegCommand } from 'fluent-ffmpeg'
3import { ensureDir, remove } from 'fs-extra'
4import { logger } from 'packages/peertube-runner/shared'
5import { basename, join } from 'path'
6import { wait } from '@shared/core-utils'
7import { buildUUID } from '@shared/extra-utils'
8import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg'
9import {
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 PeerTubeProblemDocument,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
14 ServerErrorCode
15} from '@shared/models'
16import { ConfigManager } from '../../../shared/config-manager'
17import { buildFFmpegLive, ProcessOptions } from './common'
18
19export class ProcessLiveRTMPHLSTranscoding {
20
21 private readonly outputPath: string
22 private readonly fsWatchers: FSWatcher[] = []
23
def4ea4f
C
24 // Playlist name -> chunks
25 private readonly pendingChunksPerPlaylist = new Map<string, string[]>()
26
1772b383
C
27 private readonly playlistsCreated = new Set<string>()
28 private allPlaylistsCreated = false
29
30 private ffmpegCommand: FfmpegCommand
31
32 private ended = false
33 private errored = false
34
35 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
36 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
37 }
38
39 process () {
40 const job = this.options.job
41 const payload = job.payload
42
43 return new Promise<void>(async (res, rej) => {
44 try {
45 await ensureDir(this.outputPath)
46
47 logger.info(`Probing ${payload.input.rtmpUrl}`)
48 const probe = await ffprobePromise(payload.input.rtmpUrl)
49 logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
50
51 const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
52 const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
53 const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
54
55 const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
56 this.fsWatchers.push(m3u8Watcher)
57
58 const tsWatcher = watch(this.outputPath + '/*.ts')
59 this.fsWatchers.push(tsWatcher)
60
61 m3u8Watcher.on('change', p => {
62 logger.debug(`${p} m3u8 playlist changed`)
63 })
64
65 m3u8Watcher.on('add', p => {
66 this.playlistsCreated.add(p)
67
68 if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
69 this.allPlaylistsCreated = true
70 logger.info('All m3u8 playlists are created.')
71 }
72 })
73
def4ea4f
C
74 tsWatcher.on('add', async p => {
75 try {
76 await this.sendPendingChunks()
77 } catch (err) {
78 this.onUpdateError(err, rej)
79 }
80
81 const playlistName = this.getPlaylistIdFromTS(p)
82
83 const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || []
84 pendingChunks.push(p)
85
86 this.pendingChunksPerPlaylist.set(playlistName, pendingChunks)
1772b383
C
87 })
88
89 tsWatcher.on('unlink', p => {
90 this.sendDeletedChunkUpdate(p)
91 .catch(err => this.onUpdateError(err, rej))
92 })
93
94 this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
95 inputUrl: payload.input.rtmpUrl,
96
97 outPath: this.outputPath,
98 masterPlaylistName: 'master.m3u8',
99
100 segmentListSize: payload.output.segmentListSize,
101 segmentDuration: payload.output.segmentDuration,
102
103 toTranscode: payload.output.toTranscode,
104
105 bitrate,
106 ratio,
107
108 hasAudio
109 })
110
111 logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
112
113 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
114 this.onFFmpegError({ err, stdout, stderr })
115
116 res()
117 })
118
119 this.ffmpegCommand.on('end', () => {
120 this.onFFmpegEnded()
121 .catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
122
123 res()
124 })
125
126 this.ffmpegCommand.run()
127 } catch (err) {
128 rej(err)
129 }
130 })
131 }
132
133 // ---------------------------------------------------------------------------
134
135 private onUpdateError (err: Error, reject: (reason?: any) => void) {
136 if (this.errored) return
137 if (this.ended) return
138
139 this.errored = true
140
141 reject(err)
142 this.ffmpegCommand.kill('SIGINT')
143
144 const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
145 if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
146 logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore')
147 } else {
148 logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
149
150 this.sendError(err)
151 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
152 }
153
154 this.cleanup()
155 }
156
157 // ---------------------------------------------------------------------------
158
159 private onFFmpegError (options: {
160 err: any
161 stdout: string
162 stderr: string
163 }) {
164 const { err, stdout, stderr } = options
165
166 // Don't care that we killed the ffmpeg process
167 if (err?.message?.includes('Exiting normally')) return
168 if (this.errored) return
169 if (this.ended) return
170
171 this.errored = true
172
173 logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
174
175 this.sendError(err)
176 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
177
178 this.cleanup()
179 }
180
181 private async sendError (err: Error) {
182 await this.options.server.runnerJobs.error({
183 jobToken: this.options.job.jobToken,
184 jobUUID: this.options.job.uuid,
185 runnerToken: this.options.runnerToken,
186 message: err.message
187 })
188 }
189
190 // ---------------------------------------------------------------------------
191
192 private async onFFmpegEnded () {
193 if (this.ended) return
194
195 this.ended = true
196 logger.info('FFmpeg ended, sending success to server')
197
198 // Wait last ffmpeg chunks generation
199 await wait(1500)
200
201 this.sendSuccess()
202 .catch(err => logger.error({ err }, 'Cannot send success'))
203
204 this.cleanup()
205 }
206
207 private async sendSuccess () {
208 const successBody: LiveRTMPHLSTranscodingSuccess = {}
209
210 await this.options.server.runnerJobs.success({
211 jobToken: this.options.job.jobToken,
212 jobUUID: this.options.job.uuid,
213 runnerToken: this.options.runnerToken,
214 payload: successBody
215 })
216 }
217
218 // ---------------------------------------------------------------------------
219
3a0c2a77
C
220 private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
221 if (this.ended) return Promise.resolve()
1772b383
C
222
223 logger.debug(`Sending removed live chunk ${deletedChunk} update`)
224
225 const videoChunkFilename = basename(deletedChunk)
226
227 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
228 type: 'remove-chunk',
229 videoChunkFilename
230 }
231
232 if (this.allPlaylistsCreated) {
233 const playlistName = this.getPlaylistName(videoChunkFilename)
234
235 payload = {
236 ...payload,
237 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
238 resolutionPlaylistFilename: playlistName,
239 resolutionPlaylistFile: join(this.outputPath, playlistName)
240 }
241 }
242
243 return this.updateWithRetry(payload)
244 }
245
def4ea4f 246 private async sendPendingChunks (): Promise<any> {
3a0c2a77 247 if (this.ended) return Promise.resolve()
1772b383 248
def4ea4f
C
249 for (const playlist of this.pendingChunksPerPlaylist.keys()) {
250 for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) {
251 logger.debug(`Sending added live chunk ${chunk} update`)
1772b383 252
def4ea4f 253 const videoChunkFilename = basename(chunk)
1772b383 254
def4ea4f
C
255 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
256 type: 'add-chunk',
257 videoChunkFilename,
258 videoChunkFile: chunk
259 }
1772b383 260
def4ea4f
C
261 if (this.allPlaylistsCreated) {
262 const playlistName = this.getPlaylistName(videoChunkFilename)
1772b383 263
def4ea4f
C
264 payload = {
265 ...payload,
266 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
267 resolutionPlaylistFilename: playlistName,
268 resolutionPlaylistFile: join(this.outputPath, playlistName)
269 }
270 }
271
272 this.updateWithRetry(payload)
273 .catch(err => logger.error({ err }, 'Cannot update with retry'))
1772b383 274 }
1772b383 275
def4ea4f
C
276 this.pendingChunksPerPlaylist.set(playlist, [])
277 }
1772b383
C
278 }
279
3a0c2a77 280 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
1772b383
C
281 if (this.ended || this.errored) return
282
283 try {
284 await this.options.server.runnerJobs.update({
285 jobToken: this.options.job.jobToken,
286 jobUUID: this.options.job.uuid,
287 runnerToken: this.options.runnerToken,
288 payload
289 })
290 } catch (err) {
291 if (currentTry >= 3) throw err
292
293 logger.warn({ err }, 'Will retry update after error')
294 await wait(250)
295
296 return this.updateWithRetry(payload, currentTry + 1)
297 }
298 }
299
300 private getPlaylistName (videoChunkFilename: string) {
301 return `${videoChunkFilename.split('-')[0]}.m3u8`
302 }
303
def4ea4f
C
304 private getPlaylistIdFromTS (segmentPath: string) {
305 const playlistIdMatcher = /^([\d+])-/
306
307 return basename(segmentPath).match(playlistIdMatcher)[1]
308 }
309
1772b383
C
310 // ---------------------------------------------------------------------------
311
312 private cleanup () {
313 for (const fsWatcher of this.fsWatchers) {
314 fsWatcher.close()
315 .catch(err => logger.error({ err }, 'Cannot close watcher'))
316 }
317
318 remove(this.outputPath)
319 .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))
320 }
321}