]>
Commit | Line | Data |
---|---|---|
1772b383 C |
1 | import { FSWatcher, watch } from 'chokidar' |
2 | import { FfmpegCommand } from 'fluent-ffmpeg' | |
3 | import { ensureDir, remove } from 'fs-extra' | |
4 | import { logger } from 'packages/peertube-runner/shared' | |
5 | import { basename, join } from 'path' | |
6 | import { wait } from '@shared/core-utils' | |
7 | import { buildUUID } from '@shared/extra-utils' | |
8 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg' | |
9 | import { | |
10 | LiveRTMPHLSTranscodingSuccess, | |
11 | LiveRTMPHLSTranscodingUpdatePayload, | |
12 | PeerTubeProblemDocument, | |
13 | RunnerJobLiveRTMPHLSTranscodingPayload, | |
14 | ServerErrorCode | |
15 | } from '@shared/models' | |
16 | import { ConfigManager } from '../../../shared/config-manager' | |
17 | import { buildFFmpegLive, ProcessOptions } from './common' | |
18 | ||
19 | export class ProcessLiveRTMPHLSTranscoding { | |
20 | ||
21 | private readonly outputPath: string | |
22 | private readonly fsWatchers: FSWatcher[] = [] | |
23 | ||
def4ea4f C |
24 | // Playlist name -> chunks |
25 | private readonly pendingChunksPerPlaylist = new Map<string, string[]>() | |
26 | ||
1772b383 C |
27 | private readonly playlistsCreated = new Set<string>() |
28 | private allPlaylistsCreated = false | |
29 | ||
30 | private ffmpegCommand: FfmpegCommand | |
31 | ||
32 | private ended = false | |
33 | private errored = false | |
34 | ||
35 | constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { | |
36 | this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | |
37 | } | |
38 | ||
39 | process () { | |
40 | const job = this.options.job | |
41 | const payload = job.payload | |
42 | ||
43 | return new Promise<void>(async (res, rej) => { | |
44 | try { | |
45 | await ensureDir(this.outputPath) | |
46 | ||
47 | logger.info(`Probing ${payload.input.rtmpUrl}`) | |
48 | const probe = await ffprobePromise(payload.input.rtmpUrl) | |
49 | logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`) | |
50 | ||
51 | const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe) | |
52 | const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe) | |
53 | const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe) | |
54 | ||
55 | const m3u8Watcher = watch(this.outputPath + '/*.m3u8') | |
56 | this.fsWatchers.push(m3u8Watcher) | |
57 | ||
58 | const tsWatcher = watch(this.outputPath + '/*.ts') | |
59 | this.fsWatchers.push(tsWatcher) | |
60 | ||
61 | m3u8Watcher.on('change', p => { | |
62 | logger.debug(`${p} m3u8 playlist changed`) | |
63 | }) | |
64 | ||
65 | m3u8Watcher.on('add', p => { | |
66 | this.playlistsCreated.add(p) | |
67 | ||
68 | if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) { | |
69 | this.allPlaylistsCreated = true | |
70 | logger.info('All m3u8 playlists are created.') | |
71 | } | |
72 | }) | |
73 | ||
def4ea4f C |
74 | tsWatcher.on('add', async p => { |
75 | try { | |
76 | await this.sendPendingChunks() | |
77 | } catch (err) { | |
78 | this.onUpdateError(err, rej) | |
79 | } | |
80 | ||
81 | const playlistName = this.getPlaylistIdFromTS(p) | |
82 | ||
83 | const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || [] | |
84 | pendingChunks.push(p) | |
85 | ||
86 | this.pendingChunksPerPlaylist.set(playlistName, pendingChunks) | |
1772b383 C |
87 | }) |
88 | ||
89 | tsWatcher.on('unlink', p => { | |
90 | this.sendDeletedChunkUpdate(p) | |
91 | .catch(err => this.onUpdateError(err, rej)) | |
92 | }) | |
93 | ||
94 | this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({ | |
95 | inputUrl: payload.input.rtmpUrl, | |
96 | ||
97 | outPath: this.outputPath, | |
98 | masterPlaylistName: 'master.m3u8', | |
99 | ||
100 | segmentListSize: payload.output.segmentListSize, | |
101 | segmentDuration: payload.output.segmentDuration, | |
102 | ||
103 | toTranscode: payload.output.toTranscode, | |
104 | ||
105 | bitrate, | |
106 | ratio, | |
107 | ||
108 | hasAudio | |
109 | }) | |
110 | ||
111 | logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`) | |
112 | ||
113 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | |
114 | this.onFFmpegError({ err, stdout, stderr }) | |
115 | ||
116 | res() | |
117 | }) | |
118 | ||
119 | this.ffmpegCommand.on('end', () => { | |
120 | this.onFFmpegEnded() | |
121 | .catch(err => logger.error({ err }, 'Error in FFmpeg end handler')) | |
122 | ||
123 | res() | |
124 | }) | |
125 | ||
126 | this.ffmpegCommand.run() | |
127 | } catch (err) { | |
128 | rej(err) | |
129 | } | |
130 | }) | |
131 | } | |
132 | ||
133 | // --------------------------------------------------------------------------- | |
134 | ||
135 | private onUpdateError (err: Error, reject: (reason?: any) => void) { | |
136 | if (this.errored) return | |
137 | if (this.ended) return | |
138 | ||
139 | this.errored = true | |
140 | ||
141 | reject(err) | |
142 | this.ffmpegCommand.kill('SIGINT') | |
143 | ||
144 | const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code | |
145 | if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | |
146 | logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore') | |
147 | } else { | |
148 | logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding') | |
149 | ||
150 | this.sendError(err) | |
151 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | |
152 | } | |
153 | ||
154 | this.cleanup() | |
155 | } | |
156 | ||
157 | // --------------------------------------------------------------------------- | |
158 | ||
159 | private onFFmpegError (options: { | |
160 | err: any | |
161 | stdout: string | |
162 | stderr: string | |
163 | }) { | |
164 | const { err, stdout, stderr } = options | |
165 | ||
166 | // Don't care that we killed the ffmpeg process | |
167 | if (err?.message?.includes('Exiting normally')) return | |
168 | if (this.errored) return | |
169 | if (this.ended) return | |
170 | ||
171 | this.errored = true | |
172 | ||
173 | logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.') | |
174 | ||
175 | this.sendError(err) | |
176 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | |
177 | ||
178 | this.cleanup() | |
179 | } | |
180 | ||
181 | private async sendError (err: Error) { | |
182 | await this.options.server.runnerJobs.error({ | |
183 | jobToken: this.options.job.jobToken, | |
184 | jobUUID: this.options.job.uuid, | |
185 | runnerToken: this.options.runnerToken, | |
186 | message: err.message | |
187 | }) | |
188 | } | |
189 | ||
190 | // --------------------------------------------------------------------------- | |
191 | ||
192 | private async onFFmpegEnded () { | |
193 | if (this.ended) return | |
194 | ||
195 | this.ended = true | |
196 | logger.info('FFmpeg ended, sending success to server') | |
197 | ||
198 | // Wait last ffmpeg chunks generation | |
199 | await wait(1500) | |
200 | ||
201 | this.sendSuccess() | |
202 | .catch(err => logger.error({ err }, 'Cannot send success')) | |
203 | ||
204 | this.cleanup() | |
205 | } | |
206 | ||
207 | private async sendSuccess () { | |
208 | const successBody: LiveRTMPHLSTranscodingSuccess = {} | |
209 | ||
210 | await this.options.server.runnerJobs.success({ | |
211 | jobToken: this.options.job.jobToken, | |
212 | jobUUID: this.options.job.uuid, | |
213 | runnerToken: this.options.runnerToken, | |
214 | payload: successBody | |
215 | }) | |
216 | } | |
217 | ||
218 | // --------------------------------------------------------------------------- | |
219 | ||
3a0c2a77 C |
220 | private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> { |
221 | if (this.ended) return Promise.resolve() | |
1772b383 C |
222 | |
223 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) | |
224 | ||
225 | const videoChunkFilename = basename(deletedChunk) | |
226 | ||
227 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | |
228 | type: 'remove-chunk', | |
229 | videoChunkFilename | |
230 | } | |
231 | ||
232 | if (this.allPlaylistsCreated) { | |
233 | const playlistName = this.getPlaylistName(videoChunkFilename) | |
234 | ||
235 | payload = { | |
236 | ...payload, | |
237 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | |
238 | resolutionPlaylistFilename: playlistName, | |
239 | resolutionPlaylistFile: join(this.outputPath, playlistName) | |
240 | } | |
241 | } | |
242 | ||
243 | return this.updateWithRetry(payload) | |
244 | } | |
245 | ||
def4ea4f | 246 | private async sendPendingChunks (): Promise<any> { |
3a0c2a77 | 247 | if (this.ended) return Promise.resolve() |
1772b383 | 248 | |
def4ea4f C |
249 | for (const playlist of this.pendingChunksPerPlaylist.keys()) { |
250 | for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) { | |
251 | logger.debug(`Sending added live chunk ${chunk} update`) | |
1772b383 | 252 | |
def4ea4f | 253 | const videoChunkFilename = basename(chunk) |
1772b383 | 254 | |
def4ea4f C |
255 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { |
256 | type: 'add-chunk', | |
257 | videoChunkFilename, | |
258 | videoChunkFile: chunk | |
259 | } | |
1772b383 | 260 | |
def4ea4f C |
261 | if (this.allPlaylistsCreated) { |
262 | const playlistName = this.getPlaylistName(videoChunkFilename) | |
1772b383 | 263 | |
def4ea4f C |
264 | payload = { |
265 | ...payload, | |
266 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | |
267 | resolutionPlaylistFilename: playlistName, | |
268 | resolutionPlaylistFile: join(this.outputPath, playlistName) | |
269 | } | |
270 | } | |
271 | ||
272 | this.updateWithRetry(payload) | |
273 | .catch(err => logger.error({ err }, 'Cannot update with retry')) | |
1772b383 | 274 | } |
1772b383 | 275 | |
def4ea4f C |
276 | this.pendingChunksPerPlaylist.set(playlist, []) |
277 | } | |
1772b383 C |
278 | } |
279 | ||
3a0c2a77 | 280 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> { |
1772b383 C |
281 | if (this.ended || this.errored) return |
282 | ||
283 | try { | |
284 | await this.options.server.runnerJobs.update({ | |
285 | jobToken: this.options.job.jobToken, | |
286 | jobUUID: this.options.job.uuid, | |
287 | runnerToken: this.options.runnerToken, | |
288 | payload | |
289 | }) | |
290 | } catch (err) { | |
291 | if (currentTry >= 3) throw err | |
292 | ||
293 | logger.warn({ err }, 'Will retry update after error') | |
294 | await wait(250) | |
295 | ||
296 | return this.updateWithRetry(payload, currentTry + 1) | |
297 | } | |
298 | } | |
299 | ||
300 | private getPlaylistName (videoChunkFilename: string) { | |
301 | return `${videoChunkFilename.split('-')[0]}.m3u8` | |
302 | } | |
303 | ||
def4ea4f C |
304 | private getPlaylistIdFromTS (segmentPath: string) { |
305 | const playlistIdMatcher = /^([\d+])-/ | |
306 | ||
307 | return basename(segmentPath).match(playlistIdMatcher)[1] | |
308 | } | |
309 | ||
1772b383 C |
310 | // --------------------------------------------------------------------------- |
311 | ||
312 | private cleanup () { | |
313 | for (const fsWatcher of this.fsWatchers) { | |
314 | fsWatcher.close() | |
315 | .catch(err => logger.error({ err }, 'Cannot close watcher')) | |
316 | } | |
317 | ||
318 | remove(this.outputPath) | |
319 | .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`)) | |
320 | } | |
321 | } |