1 import { FSWatcher, watch } from 'chokidar'
2 import { FfmpegCommand } from 'fluent-ffmpeg'
3 import { ensureDir, remove } from 'fs-extra'
4 import { logger } from 'packages/peertube-runner/shared'
5 import { basename, join } from 'path'
6 import { wait } from '@shared/core-utils'
7 import { buildUUID } from '@shared/extra-utils'
8 import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg'
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 PeerTubeProblemDocument,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
15 } from '@shared/models'
16 import { ConfigManager } from '../../../shared/config-manager'
17 import { buildFFmpegLive, ProcessOptions } from './common'
19 export class ProcessLiveRTMPHLSTranscoding {
21 private readonly outputPath: string
22 private readonly fsWatchers: FSWatcher[] = []
24 private readonly playlistsCreated = new Set<string>()
25 private allPlaylistsCreated = false
27 private ffmpegCommand: FfmpegCommand
30 private errored = false
32 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
33 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
37 const job = this.options.job
38 const payload = job.payload
40 return new Promise<void>(async (res, rej) => {
42 await ensureDir(this.outputPath)
44 logger.info(`Probing ${payload.input.rtmpUrl}`)
45 const probe = await ffprobePromise(payload.input.rtmpUrl)
46 logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
48 const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
49 const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
50 const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
52 const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
53 this.fsWatchers.push(m3u8Watcher)
55 const tsWatcher = watch(this.outputPath + '/*.ts')
56 this.fsWatchers.push(tsWatcher)
58 m3u8Watcher.on('change', p => {
59 logger.debug(`${p} m3u8 playlist changed`)
62 m3u8Watcher.on('add', p => {
63 this.playlistsCreated.add(p)
65 if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
66 this.allPlaylistsCreated = true
67 logger.info('All m3u8 playlists are created.')
71 tsWatcher.on('add', p => {
72 this.sendAddedChunkUpdate(p)
73 .catch(err => this.onUpdateError(err, rej))
76 tsWatcher.on('unlink', p => {
77 this.sendDeletedChunkUpdate(p)
78 .catch(err => this.onUpdateError(err, rej))
81 this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
82 inputUrl: payload.input.rtmpUrl,
84 outPath: this.outputPath,
85 masterPlaylistName: 'master.m3u8',
87 segmentListSize: payload.output.segmentListSize,
88 segmentDuration: payload.output.segmentDuration,
90 toTranscode: payload.output.toTranscode,
98 logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
100 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
101 this.onFFmpegError({ err, stdout, stderr })
106 this.ffmpegCommand.on('end', () => {
108 .catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
113 this.ffmpegCommand.run()
120 // ---------------------------------------------------------------------------
122 private onUpdateError (err: Error, reject: (reason?: any) => void) {
123 if (this.errored) return
124 if (this.ended) return
129 this.ffmpegCommand.kill('SIGINT')
131 const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
132 if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
133 logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore')
135 logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
138 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
144 // ---------------------------------------------------------------------------
146 private onFFmpegError (options: {
151 const { err, stdout, stderr } = options
153 // Don't care that we killed the ffmpeg process
154 if (err?.message?.includes('Exiting normally')) return
155 if (this.errored) return
156 if (this.ended) return
160 logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
163 .catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
168 private async sendError (err: Error) {
169 await this.options.server.runnerJobs.error({
170 jobToken: this.options.job.jobToken,
171 jobUUID: this.options.job.uuid,
172 runnerToken: this.options.runnerToken,
177 // ---------------------------------------------------------------------------
179 private async onFFmpegEnded () {
180 if (this.ended) return
183 logger.info('FFmpeg ended, sending success to server')
185 // Wait last ffmpeg chunks generation
189 .catch(err => logger.error({ err }, 'Cannot send success'))
194 private async sendSuccess () {
195 const successBody: LiveRTMPHLSTranscodingSuccess = {}
197 await this.options.server.runnerJobs.success({
198 jobToken: this.options.job.jobToken,
199 jobUUID: this.options.job.uuid,
200 runnerToken: this.options.runnerToken,
205 // ---------------------------------------------------------------------------
207 private sendDeletedChunkUpdate (deletedChunk: string) {
208 if (this.ended) return
210 logger.debug(`Sending removed live chunk ${deletedChunk} update`)
212 const videoChunkFilename = basename(deletedChunk)
214 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
215 type: 'remove-chunk',
219 if (this.allPlaylistsCreated) {
220 const playlistName = this.getPlaylistName(videoChunkFilename)
224 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
225 resolutionPlaylistFilename: playlistName,
226 resolutionPlaylistFile: join(this.outputPath, playlistName)
230 return this.updateWithRetry(payload)
233 private sendAddedChunkUpdate (addedChunk: string) {
234 if (this.ended) return
236 logger.debug(`Sending added live chunk ${addedChunk} update`)
238 const videoChunkFilename = basename(addedChunk)
240 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
243 videoChunkFile: addedChunk
246 if (this.allPlaylistsCreated) {
247 const playlistName = this.getPlaylistName(videoChunkFilename)
251 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
252 resolutionPlaylistFilename: playlistName,
253 resolutionPlaylistFile: join(this.outputPath, playlistName)
257 return this.updateWithRetry(payload)
260 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) {
261 if (this.ended || this.errored) return
264 await this.options.server.runnerJobs.update({
265 jobToken: this.options.job.jobToken,
266 jobUUID: this.options.job.uuid,
267 runnerToken: this.options.runnerToken,
271 if (currentTry >= 3) throw err
273 logger.warn({ err }, 'Will retry update after error')
276 return this.updateWithRetry(payload, currentTry + 1)
280 private getPlaylistName (videoChunkFilename: string) {
281 return `${videoChunkFilename.split('-')[0]}.m3u8`
284 // ---------------------------------------------------------------------------
287 for (const fsWatcher of this.fsWatchers) {
289 .catch(err => logger.error({ err }, 'Cannot close watcher'))
292 remove(this.outputPath)
293 .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))