1 import { FSWatcher, watch } from 'chokidar'
2 import { FfmpegCommand } from 'fluent-ffmpeg'
3 import { ensureDir, remove } from 'fs-extra'
4 import { logger } from 'packages/peertube-runner/shared'
5 import { basename, join } from 'path'
6 import { wait } from '@shared/core-utils'
7 import { buildUUID } from '@shared/extra-utils'
8 import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg'
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 PeerTubeProblemDocument,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
15 } from '@shared/models'
16 import { ConfigManager } from '../../../shared/config-manager'
17 import { buildFFmpegLive, ProcessOptions } from './common'
19 export class ProcessLiveRTMPHLSTranscoding {
21 private readonly outputPath: string
22 private readonly fsWatchers: FSWatcher[] = []
24 // Playlist name -> chunks
25 private readonly pendingChunksPerPlaylist = new Map<string, string[]>()
27 private readonly playlistsCreated = new Set<string>()
28 private allPlaylistsCreated = false
30 private ffmpegCommand: FfmpegCommand
33 private errored = false
35 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
36 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
38 logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
42 const job = this.options.job
43 const payload = job.payload
45 return new Promise<void>(async (res, rej) => {
47 await ensureDir(this.outputPath)
49 logger.info(`Probing ${payload.input.rtmpUrl}`)
50 const probe = await ffprobePromise(payload.input.rtmpUrl)
51 logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
53 const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
54 const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
55 const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
57 const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
58 this.fsWatchers.push(m3u8Watcher)
60 const tsWatcher = watch(this.outputPath + '/*.ts')
61 this.fsWatchers.push(tsWatcher)
63 m3u8Watcher.on('change', p => {
64 logger.debug(`${p} m3u8 playlist changed`)
67 m3u8Watcher.on('add', p => {
68 this.playlistsCreated.add(p)
70 if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
71 this.allPlaylistsCreated = true
72 logger.info('All m3u8 playlists are created.')
76 tsWatcher.on('add', async p => {
78 await this.sendPendingChunks()
80 this.onUpdateError({ err, rej, res })
83 const playlistName = this.getPlaylistIdFromTS(p)
85 const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || []
88 this.pendingChunksPerPlaylist.set(playlistName, pendingChunks)
91 tsWatcher.on('unlink', p => {
92 this.sendDeletedChunkUpdate(p)
93 .catch(err => this.onUpdateError({ err, rej, res }))
96 this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
97 inputUrl: payload.input.rtmpUrl,
99 outPath: this.outputPath,
100 masterPlaylistName: 'master.m3u8',
102 segmentListSize: payload.output.segmentListSize,
103 segmentDuration: payload.output.segmentDuration,
105 toTranscode: payload.output.toTranscode,
113 logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
115 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
116 this.onFFmpegError({ err, stdout, stderr })
121 this.ffmpegCommand.on('end', () => {
123 .catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
128 this.ffmpegCommand.run()
135 // ---------------------------------------------------------------------------
137 private onUpdateError (options: {
140 rej: (reason?: any) => void
142 const { err, res, rej } = options
144 if (this.errored) return
145 if (this.ended) return
149 this.ffmpegCommand.kill('SIGINT')
151 const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
152 if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
153 logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore')
157 logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
160 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
168 // ---------------------------------------------------------------------------
170 private onFFmpegError (options: {
175 const { err, stdout, stderr } = options
177 // Don't care that we killed the ffmpeg process
178 if (err?.message?.includes('Exiting normally')) return
179 if (this.errored) return
180 if (this.ended) return
184 logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
187 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
192 private async sendError (err: Error) {
193 await this.options.server.runnerJobs.error({
194 jobToken: this.options.job.jobToken,
195 jobUUID: this.options.job.uuid,
196 runnerToken: this.options.runnerToken,
201 // ---------------------------------------------------------------------------
203 private async onFFmpegEnded () {
204 if (this.ended) return
207 logger.info('FFmpeg ended, sending success to server')
209 // Wait last ffmpeg chunks generation
213 .catch(err => logger.error({ err }, 'Cannot send success'))
218 private async sendSuccess () {
219 const successBody: LiveRTMPHLSTranscodingSuccess = {}
221 await this.options.server.runnerJobs.success({
222 jobToken: this.options.job.jobToken,
223 jobUUID: this.options.job.uuid,
224 runnerToken: this.options.runnerToken,
229 // ---------------------------------------------------------------------------
231 private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
232 if (this.ended) return Promise.resolve()
234 logger.debug(`Sending removed live chunk ${deletedChunk} update`)
236 const videoChunkFilename = basename(deletedChunk)
238 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
239 type: 'remove-chunk',
243 if (this.allPlaylistsCreated) {
244 const playlistName = this.getPlaylistName(videoChunkFilename)
248 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
249 resolutionPlaylistFilename: playlistName,
250 resolutionPlaylistFile: join(this.outputPath, playlistName)
254 return this.updateWithRetry(payload)
257 private async sendPendingChunks (): Promise<any> {
258 if (this.ended) return Promise.resolve()
260 const promises: Promise<any>[] = []
262 for (const playlist of this.pendingChunksPerPlaylist.keys()) {
263 for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) {
264 logger.debug(`Sending added live chunk ${chunk} update`)
266 const videoChunkFilename = basename(chunk)
268 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
271 videoChunkFile: chunk
274 if (this.allPlaylistsCreated) {
275 const playlistName = this.getPlaylistName(videoChunkFilename)
279 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
280 resolutionPlaylistFilename: playlistName,
281 resolutionPlaylistFile: join(this.outputPath, playlistName)
285 promises.push(this.updateWithRetry(payload))
288 this.pendingChunksPerPlaylist.set(playlist, [])
291 await Promise.all(promises)
294 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
295 if (this.ended || this.errored) return
298 await this.options.server.runnerJobs.update({
299 jobToken: this.options.job.jobToken,
300 jobUUID: this.options.job.uuid,
301 runnerToken: this.options.runnerToken,
305 if (currentTry >= 3) throw err
306 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err
308 logger.warn({ err }, 'Will retry update after error')
311 return this.updateWithRetry(payload, currentTry + 1)
315 private getPlaylistName (videoChunkFilename: string) {
316 return `${videoChunkFilename.split('-')[0]}.m3u8`
319 private getPlaylistIdFromTS (segmentPath: string) {
320 const playlistIdMatcher = /^([\d+])-/
322 return basename(segmentPath).match(playlistIdMatcher)[1]
325 // ---------------------------------------------------------------------------
328 logger.debug(`Cleaning up job ${this.options.job.uuid}`)
330 for (const fsWatcher of this.fsWatchers) {
332 .catch(err => logger.error({ err }, 'Cannot close watcher'))
335 remove(this.outputPath)
336 .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))