]>
Commit | Line | Data |
---|---|---|
1 | import { FSWatcher, watch } from 'chokidar' | |
2 | import { FfmpegCommand } from 'fluent-ffmpeg' | |
3 | import { ensureDir, remove } from 'fs-extra' | |
4 | import { logger } from 'packages/peertube-runner/shared' | |
5 | import { basename, join } from 'path' | |
6 | import { wait } from '@shared/core-utils' | |
7 | import { buildUUID } from '@shared/extra-utils' | |
8 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg' | |
9 | import { | |
10 | LiveRTMPHLSTranscodingSuccess, | |
11 | LiveRTMPHLSTranscodingUpdatePayload, | |
12 | PeerTubeProblemDocument, | |
13 | RunnerJobLiveRTMPHLSTranscodingPayload, | |
14 | ServerErrorCode | |
15 | } from '@shared/models' | |
16 | import { ConfigManager } from '../../../shared/config-manager' | |
17 | import { buildFFmpegLive, ProcessOptions } from './common' | |
18 | ||
19 | export class ProcessLiveRTMPHLSTranscoding { | |
20 | ||
21 | private readonly outputPath: string | |
22 | private readonly fsWatchers: FSWatcher[] = [] | |
23 | ||
24 | private readonly playlistsCreated = new Set<string>() | |
25 | private allPlaylistsCreated = false | |
26 | ||
27 | private ffmpegCommand: FfmpegCommand | |
28 | ||
29 | private ended = false | |
30 | private errored = false | |
31 | ||
32 | constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { | |
33 | this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | |
34 | } | |
35 | ||
36 | process () { | |
37 | const job = this.options.job | |
38 | const payload = job.payload | |
39 | ||
40 | return new Promise<void>(async (res, rej) => { | |
41 | try { | |
42 | await ensureDir(this.outputPath) | |
43 | ||
44 | logger.info(`Probing ${payload.input.rtmpUrl}`) | |
45 | const probe = await ffprobePromise(payload.input.rtmpUrl) | |
46 | logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`) | |
47 | ||
48 | const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe) | |
49 | const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe) | |
50 | const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe) | |
51 | ||
52 | const m3u8Watcher = watch(this.outputPath + '/*.m3u8') | |
53 | this.fsWatchers.push(m3u8Watcher) | |
54 | ||
55 | const tsWatcher = watch(this.outputPath + '/*.ts') | |
56 | this.fsWatchers.push(tsWatcher) | |
57 | ||
58 | m3u8Watcher.on('change', p => { | |
59 | logger.debug(`${p} m3u8 playlist changed`) | |
60 | }) | |
61 | ||
62 | m3u8Watcher.on('add', p => { | |
63 | this.playlistsCreated.add(p) | |
64 | ||
65 | if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) { | |
66 | this.allPlaylistsCreated = true | |
67 | logger.info('All m3u8 playlists are created.') | |
68 | } | |
69 | }) | |
70 | ||
71 | tsWatcher.on('add', p => { | |
72 | this.sendAddedChunkUpdate(p) | |
73 | .catch(err => this.onUpdateError(err, rej)) | |
74 | }) | |
75 | ||
76 | tsWatcher.on('unlink', p => { | |
77 | this.sendDeletedChunkUpdate(p) | |
78 | .catch(err => this.onUpdateError(err, rej)) | |
79 | }) | |
80 | ||
81 | this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({ | |
82 | inputUrl: payload.input.rtmpUrl, | |
83 | ||
84 | outPath: this.outputPath, | |
85 | masterPlaylistName: 'master.m3u8', | |
86 | ||
87 | segmentListSize: payload.output.segmentListSize, | |
88 | segmentDuration: payload.output.segmentDuration, | |
89 | ||
90 | toTranscode: payload.output.toTranscode, | |
91 | ||
92 | bitrate, | |
93 | ratio, | |
94 | ||
95 | hasAudio | |
96 | }) | |
97 | ||
98 | logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`) | |
99 | ||
100 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | |
101 | this.onFFmpegError({ err, stdout, stderr }) | |
102 | ||
103 | res() | |
104 | }) | |
105 | ||
106 | this.ffmpegCommand.on('end', () => { | |
107 | this.onFFmpegEnded() | |
108 | .catch(err => logger.error({ err }, 'Error in FFmpeg end handler')) | |
109 | ||
110 | res() | |
111 | }) | |
112 | ||
113 | this.ffmpegCommand.run() | |
114 | } catch (err) { | |
115 | rej(err) | |
116 | } | |
117 | }) | |
118 | } | |
119 | ||
120 | // --------------------------------------------------------------------------- | |
121 | ||
122 | private onUpdateError (err: Error, reject: (reason?: any) => void) { | |
123 | if (this.errored) return | |
124 | if (this.ended) return | |
125 | ||
126 | this.errored = true | |
127 | ||
128 | reject(err) | |
129 | this.ffmpegCommand.kill('SIGINT') | |
130 | ||
131 | const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code | |
132 | if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | |
133 | logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore') | |
134 | } else { | |
135 | logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding') | |
136 | ||
137 | this.sendError(err) | |
138 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | |
139 | } | |
140 | ||
141 | this.cleanup() | |
142 | } | |
143 | ||
144 | // --------------------------------------------------------------------------- | |
145 | ||
146 | private onFFmpegError (options: { | |
147 | err: any | |
148 | stdout: string | |
149 | stderr: string | |
150 | }) { | |
151 | const { err, stdout, stderr } = options | |
152 | ||
153 | // Don't care that we killed the ffmpeg process | |
154 | if (err?.message?.includes('Exiting normally')) return | |
155 | if (this.errored) return | |
156 | if (this.ended) return | |
157 | ||
158 | this.errored = true | |
159 | ||
160 | logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.') | |
161 | ||
162 | this.sendError(err) | |
163 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | |
164 | ||
165 | this.cleanup() | |
166 | } | |
167 | ||
168 | private async sendError (err: Error) { | |
169 | await this.options.server.runnerJobs.error({ | |
170 | jobToken: this.options.job.jobToken, | |
171 | jobUUID: this.options.job.uuid, | |
172 | runnerToken: this.options.runnerToken, | |
173 | message: err.message | |
174 | }) | |
175 | } | |
176 | ||
177 | // --------------------------------------------------------------------------- | |
178 | ||
179 | private async onFFmpegEnded () { | |
180 | if (this.ended) return | |
181 | ||
182 | this.ended = true | |
183 | logger.info('FFmpeg ended, sending success to server') | |
184 | ||
185 | // Wait last ffmpeg chunks generation | |
186 | await wait(1500) | |
187 | ||
188 | this.sendSuccess() | |
189 | .catch(err => logger.error({ err }, 'Cannot send success')) | |
190 | ||
191 | this.cleanup() | |
192 | } | |
193 | ||
194 | private async sendSuccess () { | |
195 | const successBody: LiveRTMPHLSTranscodingSuccess = {} | |
196 | ||
197 | await this.options.server.runnerJobs.success({ | |
198 | jobToken: this.options.job.jobToken, | |
199 | jobUUID: this.options.job.uuid, | |
200 | runnerToken: this.options.runnerToken, | |
201 | payload: successBody | |
202 | }) | |
203 | } | |
204 | ||
205 | // --------------------------------------------------------------------------- | |
206 | ||
207 | private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> { | |
208 | if (this.ended) return Promise.resolve() | |
209 | ||
210 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) | |
211 | ||
212 | const videoChunkFilename = basename(deletedChunk) | |
213 | ||
214 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | |
215 | type: 'remove-chunk', | |
216 | videoChunkFilename | |
217 | } | |
218 | ||
219 | if (this.allPlaylistsCreated) { | |
220 | const playlistName = this.getPlaylistName(videoChunkFilename) | |
221 | ||
222 | payload = { | |
223 | ...payload, | |
224 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | |
225 | resolutionPlaylistFilename: playlistName, | |
226 | resolutionPlaylistFile: join(this.outputPath, playlistName) | |
227 | } | |
228 | } | |
229 | ||
230 | return this.updateWithRetry(payload) | |
231 | } | |
232 | ||
233 | private sendAddedChunkUpdate (addedChunk: string): Promise<any> { | |
234 | if (this.ended) return Promise.resolve() | |
235 | ||
236 | logger.debug(`Sending added live chunk ${addedChunk} update`) | |
237 | ||
238 | const videoChunkFilename = basename(addedChunk) | |
239 | ||
240 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | |
241 | type: 'add-chunk', | |
242 | videoChunkFilename, | |
243 | videoChunkFile: addedChunk | |
244 | } | |
245 | ||
246 | if (this.allPlaylistsCreated) { | |
247 | const playlistName = this.getPlaylistName(videoChunkFilename) | |
248 | ||
249 | payload = { | |
250 | ...payload, | |
251 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | |
252 | resolutionPlaylistFilename: playlistName, | |
253 | resolutionPlaylistFile: join(this.outputPath, playlistName) | |
254 | } | |
255 | } | |
256 | ||
257 | return this.updateWithRetry(payload) | |
258 | } | |
259 | ||
260 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> { | |
261 | if (this.ended || this.errored) return | |
262 | ||
263 | try { | |
264 | await this.options.server.runnerJobs.update({ | |
265 | jobToken: this.options.job.jobToken, | |
266 | jobUUID: this.options.job.uuid, | |
267 | runnerToken: this.options.runnerToken, | |
268 | payload | |
269 | }) | |
270 | } catch (err) { | |
271 | if (currentTry >= 3) throw err | |
272 | ||
273 | logger.warn({ err }, 'Will retry update after error') | |
274 | await wait(250) | |
275 | ||
276 | return this.updateWithRetry(payload, currentTry + 1) | |
277 | } | |
278 | } | |
279 | ||
280 | private getPlaylistName (videoChunkFilename: string) { | |
281 | return `${videoChunkFilename.split('-')[0]}.m3u8` | |
282 | } | |
283 | ||
284 | // --------------------------------------------------------------------------- | |
285 | ||
286 | private cleanup () { | |
287 | for (const fsWatcher of this.fsWatchers) { | |
288 | fsWatcher.close() | |
289 | .catch(err => logger.error({ err }, 'Cannot close watcher')) | |
290 | } | |
291 | ||
292 | remove(this.outputPath) | |
293 | .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`)) | |
294 | } | |
295 | } |