diff options
Diffstat (limited to 'packages/peertube-runner/server/process/shared/process-live.ts')
-rw-r--r-- | packages/peertube-runner/server/process/shared/process-live.ts | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts new file mode 100644 index 000000000..5a3b596a2 --- /dev/null +++ b/packages/peertube-runner/server/process/shared/process-live.ts | |||
@@ -0,0 +1,295 @@ | |||
1 | import { FSWatcher, watch } from 'chokidar' | ||
2 | import { FfmpegCommand } from 'fluent-ffmpeg' | ||
3 | import { ensureDir, remove } from 'fs-extra' | ||
4 | import { logger } from 'packages/peertube-runner/shared' | ||
5 | import { basename, join } from 'path' | ||
6 | import { wait } from '@shared/core-utils' | ||
7 | import { buildUUID } from '@shared/extra-utils' | ||
8 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg' | ||
9 | import { | ||
10 | LiveRTMPHLSTranscodingSuccess, | ||
11 | LiveRTMPHLSTranscodingUpdatePayload, | ||
12 | PeerTubeProblemDocument, | ||
13 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
14 | ServerErrorCode | ||
15 | } from '@shared/models' | ||
16 | import { ConfigManager } from '../../../shared/config-manager' | ||
17 | import { buildFFmpegLive, ProcessOptions } from './common' | ||
18 | |||
19 | export class ProcessLiveRTMPHLSTranscoding { | ||
20 | |||
21 | private readonly outputPath: string | ||
22 | private readonly fsWatchers: FSWatcher[] = [] | ||
23 | |||
24 | private readonly playlistsCreated = new Set<string>() | ||
25 | private allPlaylistsCreated = false | ||
26 | |||
27 | private ffmpegCommand: FfmpegCommand | ||
28 | |||
29 | private ended = false | ||
30 | private errored = false | ||
31 | |||
32 | constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { | ||
33 | this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | ||
34 | } | ||
35 | |||
36 | process () { | ||
37 | const job = this.options.job | ||
38 | const payload = job.payload | ||
39 | |||
40 | return new Promise<void>(async (res, rej) => { | ||
41 | try { | ||
42 | await ensureDir(this.outputPath) | ||
43 | |||
44 | logger.info(`Probing ${payload.input.rtmpUrl}`) | ||
45 | const probe = await ffprobePromise(payload.input.rtmpUrl) | ||
46 | logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`) | ||
47 | |||
48 | const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe) | ||
49 | const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe) | ||
50 | const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe) | ||
51 | |||
52 | const m3u8Watcher = watch(this.outputPath + '/*.m3u8') | ||
53 | this.fsWatchers.push(m3u8Watcher) | ||
54 | |||
55 | const tsWatcher = watch(this.outputPath + '/*.ts') | ||
56 | this.fsWatchers.push(tsWatcher) | ||
57 | |||
58 | m3u8Watcher.on('change', p => { | ||
59 | logger.debug(`${p} m3u8 playlist changed`) | ||
60 | }) | ||
61 | |||
62 | m3u8Watcher.on('add', p => { | ||
63 | this.playlistsCreated.add(p) | ||
64 | |||
65 | if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) { | ||
66 | this.allPlaylistsCreated = true | ||
67 | logger.info('All m3u8 playlists are created.') | ||
68 | } | ||
69 | }) | ||
70 | |||
71 | tsWatcher.on('add', p => { | ||
72 | this.sendAddedChunkUpdate(p) | ||
73 | .catch(err => this.onUpdateError(err, rej)) | ||
74 | }) | ||
75 | |||
76 | tsWatcher.on('unlink', p => { | ||
77 | this.sendDeletedChunkUpdate(p) | ||
78 | .catch(err => this.onUpdateError(err, rej)) | ||
79 | }) | ||
80 | |||
81 | this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({ | ||
82 | inputUrl: payload.input.rtmpUrl, | ||
83 | |||
84 | outPath: this.outputPath, | ||
85 | masterPlaylistName: 'master.m3u8', | ||
86 | |||
87 | segmentListSize: payload.output.segmentListSize, | ||
88 | segmentDuration: payload.output.segmentDuration, | ||
89 | |||
90 | toTranscode: payload.output.toTranscode, | ||
91 | |||
92 | bitrate, | ||
93 | ratio, | ||
94 | |||
95 | hasAudio | ||
96 | }) | ||
97 | |||
98 | logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`) | ||
99 | |||
100 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | ||
101 | this.onFFmpegError({ err, stdout, stderr }) | ||
102 | |||
103 | res() | ||
104 | }) | ||
105 | |||
106 | this.ffmpegCommand.on('end', () => { | ||
107 | this.onFFmpegEnded() | ||
108 | .catch(err => logger.error({ err }, 'Error in FFmpeg end handler')) | ||
109 | |||
110 | res() | ||
111 | }) | ||
112 | |||
113 | this.ffmpegCommand.run() | ||
114 | } catch (err) { | ||
115 | rej(err) | ||
116 | } | ||
117 | }) | ||
118 | } | ||
119 | |||
120 | // --------------------------------------------------------------------------- | ||
121 | |||
122 | private onUpdateError (err: Error, reject: (reason?: any) => void) { | ||
123 | if (this.errored) return | ||
124 | if (this.ended) return | ||
125 | |||
126 | this.errored = true | ||
127 | |||
128 | reject(err) | ||
129 | this.ffmpegCommand.kill('SIGINT') | ||
130 | |||
131 | const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code | ||
132 | if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
133 | logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore') | ||
134 | } else { | ||
135 | logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding') | ||
136 | |||
137 | this.sendError(err) | ||
138 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
139 | } | ||
140 | |||
141 | this.cleanup() | ||
142 | } | ||
143 | |||
144 | // --------------------------------------------------------------------------- | ||
145 | |||
146 | private onFFmpegError (options: { | ||
147 | err: any | ||
148 | stdout: string | ||
149 | stderr: string | ||
150 | }) { | ||
151 | const { err, stdout, stderr } = options | ||
152 | |||
153 | // Don't care that we killed the ffmpeg process | ||
154 | if (err?.message?.includes('Exiting normally')) return | ||
155 | if (this.errored) return | ||
156 | if (this.ended) return | ||
157 | |||
158 | this.errored = true | ||
159 | |||
160 | logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.') | ||
161 | |||
162 | this.sendError(err) | ||
163 | .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) | ||
164 | |||
165 | this.cleanup() | ||
166 | } | ||
167 | |||
168 | private async sendError (err: Error) { | ||
169 | await this.options.server.runnerJobs.error({ | ||
170 | jobToken: this.options.job.jobToken, | ||
171 | jobUUID: this.options.job.uuid, | ||
172 | runnerToken: this.options.runnerToken, | ||
173 | message: err.message | ||
174 | }) | ||
175 | } | ||
176 | |||
177 | // --------------------------------------------------------------------------- | ||
178 | |||
179 | private async onFFmpegEnded () { | ||
180 | if (this.ended) return | ||
181 | |||
182 | this.ended = true | ||
183 | logger.info('FFmpeg ended, sending success to server') | ||
184 | |||
185 | // Wait last ffmpeg chunks generation | ||
186 | await wait(1500) | ||
187 | |||
188 | this.sendSuccess() | ||
189 | .catch(err => logger.error({ err }, 'Cannot send success')) | ||
190 | |||
191 | this.cleanup() | ||
192 | } | ||
193 | |||
194 | private async sendSuccess () { | ||
195 | const successBody: LiveRTMPHLSTranscodingSuccess = {} | ||
196 | |||
197 | await this.options.server.runnerJobs.success({ | ||
198 | jobToken: this.options.job.jobToken, | ||
199 | jobUUID: this.options.job.uuid, | ||
200 | runnerToken: this.options.runnerToken, | ||
201 | payload: successBody | ||
202 | }) | ||
203 | } | ||
204 | |||
205 | // --------------------------------------------------------------------------- | ||
206 | |||
207 | private sendDeletedChunkUpdate (deletedChunk: string) { | ||
208 | if (this.ended) return | ||
209 | |||
210 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) | ||
211 | |||
212 | const videoChunkFilename = basename(deletedChunk) | ||
213 | |||
214 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
215 | type: 'remove-chunk', | ||
216 | videoChunkFilename | ||
217 | } | ||
218 | |||
219 | if (this.allPlaylistsCreated) { | ||
220 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
221 | |||
222 | payload = { | ||
223 | ...payload, | ||
224 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
225 | resolutionPlaylistFilename: playlistName, | ||
226 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
227 | } | ||
228 | } | ||
229 | |||
230 | return this.updateWithRetry(payload) | ||
231 | } | ||
232 | |||
233 | private sendAddedChunkUpdate (addedChunk: string) { | ||
234 | if (this.ended) return | ||
235 | |||
236 | logger.debug(`Sending added live chunk ${addedChunk} update`) | ||
237 | |||
238 | const videoChunkFilename = basename(addedChunk) | ||
239 | |||
240 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | ||
241 | type: 'add-chunk', | ||
242 | videoChunkFilename, | ||
243 | videoChunkFile: addedChunk | ||
244 | } | ||
245 | |||
246 | if (this.allPlaylistsCreated) { | ||
247 | const playlistName = this.getPlaylistName(videoChunkFilename) | ||
248 | |||
249 | payload = { | ||
250 | ...payload, | ||
251 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | ||
252 | resolutionPlaylistFilename: playlistName, | ||
253 | resolutionPlaylistFile: join(this.outputPath, playlistName) | ||
254 | } | ||
255 | } | ||
256 | |||
257 | return this.updateWithRetry(payload) | ||
258 | } | ||
259 | |||
260 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) { | ||
261 | if (this.ended || this.errored) return | ||
262 | |||
263 | try { | ||
264 | await this.options.server.runnerJobs.update({ | ||
265 | jobToken: this.options.job.jobToken, | ||
266 | jobUUID: this.options.job.uuid, | ||
267 | runnerToken: this.options.runnerToken, | ||
268 | payload | ||
269 | }) | ||
270 | } catch (err) { | ||
271 | if (currentTry >= 3) throw err | ||
272 | |||
273 | logger.warn({ err }, 'Will retry update after error') | ||
274 | await wait(250) | ||
275 | |||
276 | return this.updateWithRetry(payload, currentTry + 1) | ||
277 | } | ||
278 | } | ||
279 | |||
280 | private getPlaylistName (videoChunkFilename: string) { | ||
281 | return `${videoChunkFilename.split('-')[0]}.m3u8` | ||
282 | } | ||
283 | |||
284 | // --------------------------------------------------------------------------- | ||
285 | |||
286 | private cleanup () { | ||
287 | for (const fsWatcher of this.fsWatchers) { | ||
288 | fsWatcher.close() | ||
289 | .catch(err => logger.error({ err }, 'Cannot close watcher')) | ||
290 | } | ||
291 | |||
292 | remove(this.outputPath) | ||
293 | .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`)) | ||
294 | } | ||
295 | } | ||