diff options
author | Chocobozzz <me@florianbigard.com> | 2023-05-22 13:44:22 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-05-22 13:54:46 +0200 |
commit | 17ecdf61ce1d374cc8ba17601b93c9bda08112b2 (patch) | |
tree | d3405d11636f661815c3ad09821944a5ae26ebfa | |
parent | f3bc1b541619673f14db7de220b9c520a4f35ca8 (diff) | |
download | PeerTube-17ecdf61ce1d374cc8ba17601b93c9bda08112b2.tar.gz PeerTube-17ecdf61ce1d374cc8ba17601b93c9bda08112b2.tar.zst PeerTube-17ecdf61ce1d374cc8ba17601b93c9bda08112b2.zip |
Force stop remote live transcoding
10 files changed, 45 insertions, 5 deletions
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts index df1b677f0..6edb1f1e9 100644 --- a/packages/peertube-runner/server/process/shared/process-live.ts +++ b/packages/peertube-runner/server/process/shared/process-live.ts | |||
@@ -34,6 +34,8 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
34 | 34 | ||
35 | constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { | 35 | constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { |
36 | this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) | 36 | this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) |
37 | |||
38 | logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`) | ||
37 | } | 39 | } |
38 | 40 | ||
39 | process () { | 41 | process () { |
@@ -289,6 +291,7 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
289 | }) | 291 | }) |
290 | } catch (err) { | 292 | } catch (err) { |
291 | if (currentTry >= 3) throw err | 293 | if (currentTry >= 3) throw err |
294 | if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err | ||
292 | 295 | ||
293 | logger.warn({ err }, 'Will retry update after error') | 296 | logger.warn({ err }, 'Will retry update after error') |
294 | await wait(250) | 297 | await wait(250) |
@@ -310,6 +313,8 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
310 | // --------------------------------------------------------------------------- | 313 | // --------------------------------------------------------------------------- |
311 | 314 | ||
312 | private cleanup () { | 315 | private cleanup () { |
316 | logger.debug(`Cleaning up job ${this.options.job.uuid}`) | ||
317 | |||
313 | for (const fsWatcher of this.fsWatchers) { | 318 | for (const fsWatcher of this.fsWatchers) { |
314 | fsWatcher.close() | 319 | fsWatcher.close() |
315 | .catch(err => logger.error({ err }, 'Cannot close watcher')) | 320 | .catch(err => logger.error({ err }, 'Cannot close watcher')) |
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 68aa18443..f062e2fd3 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -178,6 +178,10 @@ class LiveManager { | |||
178 | return !!this.rtmpServer | 178 | return !!this.rtmpServer |
179 | } | 179 | } |
180 | 180 | ||
181 | hasSession (sessionId: string) { | ||
182 | return this.getContext().sessions.has(sessionId) | ||
183 | } | ||
184 | |||
181 | stopSessionOf (videoUUID: string, error: LiveVideoError | null) { | 185 | stopSessionOf (videoUUID: string, error: LiveVideoError | null) { |
182 | const sessionId = this.videoSessions.get(videoUUID) | 186 | const sessionId = this.videoSessions.get(videoUUID) |
183 | if (!sessionId) { | 187 | if (!sessionId) { |
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 6632499ff..c672ec4d6 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -477,6 +477,7 @@ class MuxingSession extends EventEmitter { | |||
477 | 477 | ||
478 | lTags: this.lTags, | 478 | lTags: this.lTags, |
479 | 479 | ||
480 | sessionId: this.sessionId, | ||
480 | inputLocalUrl: this.inputLocalUrl, | 481 | inputLocalUrl: this.inputLocalUrl, |
481 | inputPublicUrl: this.inputPublicUrl, | 482 | inputPublicUrl: this.inputPublicUrl, |
482 | 483 | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts index ee61d7690..95168745d 100644 --- a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts +++ b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts | |||
@@ -25,6 +25,7 @@ interface AbstractTranscodingWrapperOptions { | |||
25 | 25 | ||
26 | lTags: LoggerTagsFn | 26 | lTags: LoggerTagsFn |
27 | 27 | ||
28 | sessionId: string | ||
28 | inputLocalUrl: string | 29 | inputLocalUrl: string |
29 | inputPublicUrl: string | 30 | inputPublicUrl: string |
30 | 31 | ||
@@ -52,6 +53,7 @@ abstract class AbstractTranscodingWrapper extends EventEmitter { | |||
52 | fps: number | 53 | fps: number |
53 | }[] | 54 | }[] |
54 | 55 | ||
56 | protected readonly sessionId: string | ||
55 | protected readonly inputLocalUrl: string | 57 | protected readonly inputLocalUrl: string |
56 | protected readonly inputPublicUrl: string | 58 | protected readonly inputPublicUrl: string |
57 | 59 | ||
@@ -80,6 +82,7 @@ abstract class AbstractTranscodingWrapper extends EventEmitter { | |||
80 | this.videoUUID = options.videoLive.Video.uuid | 82 | this.videoUUID = options.videoLive.Video.uuid |
81 | this.streamingPlaylist = options.streamingPlaylist | 83 | this.streamingPlaylist = options.streamingPlaylist |
82 | 84 | ||
85 | this.sessionId = options.sessionId | ||
83 | this.inputLocalUrl = options.inputLocalUrl | 86 | this.inputLocalUrl = options.inputLocalUrl |
84 | this.inputPublicUrl = options.inputPublicUrl | 87 | this.inputPublicUrl = options.inputPublicUrl |
85 | 88 | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts index 6770a5e6f..2aeeb31fb 100644 --- a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts +++ b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts | |||
@@ -5,6 +5,7 @@ export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper { | |||
5 | async run () { | 5 | async run () { |
6 | await new LiveRTMPHLSTranscodingJobHandler().create({ | 6 | await new LiveRTMPHLSTranscodingJobHandler().create({ |
7 | rtmpUrl: this.inputPublicUrl, | 7 | rtmpUrl: this.inputPublicUrl, |
8 | sessionId: this.sessionId, | ||
8 | toTranscode: this.toTranscode, | 9 | toTranscode: this.toTranscode, |
9 | video: this.videoLive.Video, | 10 | video: this.videoLive.Video, |
10 | outputDirectory: this.outDirectory, | 11 | outputDirectory: this.outDirectory, |
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts index 87b6f0702..6b2894f8c 100644 --- a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts +++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts | |||
@@ -20,6 +20,7 @@ type CreateOptions = { | |||
20 | video: MVideo | 20 | video: MVideo |
21 | playlist: MStreamingPlaylist | 21 | playlist: MStreamingPlaylist |
22 | 22 | ||
23 | sessionId: string | ||
23 | rtmpUrl: string | 24 | rtmpUrl: string |
24 | 25 | ||
25 | toTranscode: { | 26 | toTranscode: { |
@@ -37,7 +38,7 @@ type CreateOptions = { | |||
37 | export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> { | 38 | export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> { |
38 | 39 | ||
39 | async create (options: CreateOptions) { | 40 | async create (options: CreateOptions) { |
40 | const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options | 41 | const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory, sessionId } = options |
41 | 42 | ||
42 | const jobUUID = buildUUID() | 43 | const jobUUID = buildUUID() |
43 | const payload: RunnerJobLiveRTMPHLSTranscodingPayload = { | 44 | const payload: RunnerJobLiveRTMPHLSTranscodingPayload = { |
@@ -54,6 +55,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO | |||
54 | const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = { | 55 | const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = { |
55 | videoUUID: video.uuid, | 56 | videoUUID: video.uuid, |
56 | masterPlaylistName: playlist.playlistFilename, | 57 | masterPlaylistName: playlist.playlistFilename, |
58 | sessionId, | ||
57 | outputDirectory | 59 | outputDirectory |
58 | } | 60 | } |
59 | 61 | ||
diff --git a/server/middlewares/validators/runners/jobs.ts b/server/middlewares/validators/runners/jobs.ts index de956a1ca..8174d4289 100644 --- a/server/middlewares/validators/runners/jobs.ts +++ b/server/middlewares/validators/runners/jobs.ts | |||
@@ -11,8 +11,16 @@ import { | |||
11 | } from '@server/helpers/custom-validators/runners/jobs' | 11 | } from '@server/helpers/custom-validators/runners/jobs' |
12 | import { isRunnerTokenValid } from '@server/helpers/custom-validators/runners/runners' | 12 | import { isRunnerTokenValid } from '@server/helpers/custom-validators/runners/runners' |
13 | import { cleanUpReqFiles } from '@server/helpers/express-utils' | 13 | import { cleanUpReqFiles } from '@server/helpers/express-utils' |
14 | import { LiveManager } from '@server/lib/live' | ||
14 | import { RunnerJobModel } from '@server/models/runner/runner-job' | 15 | import { RunnerJobModel } from '@server/models/runner/runner-job' |
15 | import { HttpStatusCode, RunnerJobState, RunnerJobSuccessBody, RunnerJobUpdateBody, ServerErrorCode } from '@shared/models' | 16 | import { |
17 | HttpStatusCode, | ||
18 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | ||
19 | RunnerJobState, | ||
20 | RunnerJobSuccessBody, | ||
21 | RunnerJobUpdateBody, | ||
22 | ServerErrorCode | ||
23 | } from '@shared/models' | ||
16 | import { areValidationErrors } from '../shared' | 24 | import { areValidationErrors } from '../shared' |
17 | 25 | ||
18 | const tags = [ 'runner' ] | 26 | const tags = [ 'runner' ] |
@@ -48,8 +56,9 @@ export const updateRunnerJobValidator = [ | |||
48 | if (areValidationErrors(req, res, { tags })) return cleanUpReqFiles(req) | 56 | if (areValidationErrors(req, res, { tags })) return cleanUpReqFiles(req) |
49 | 57 | ||
50 | const body = req.body as RunnerJobUpdateBody | 58 | const body = req.body as RunnerJobUpdateBody |
59 | const job = res.locals.runnerJob | ||
51 | 60 | ||
52 | if (isRunnerJobUpdatePayloadValid(body.payload, res.locals.runnerJob.type, req.files) !== true) { | 61 | if (isRunnerJobUpdatePayloadValid(body.payload, job.type, req.files) !== true) { |
53 | cleanUpReqFiles(req) | 62 | cleanUpReqFiles(req) |
54 | 63 | ||
55 | return res.fail({ | 64 | return res.fail({ |
@@ -59,6 +68,20 @@ export const updateRunnerJobValidator = [ | |||
59 | }) | 68 | }) |
60 | } | 69 | } |
61 | 70 | ||
71 | if (res.locals.runnerJob.type === 'live-rtmp-hls-transcoding') { | ||
72 | const privatePayload = job.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload | ||
73 | |||
74 | if (!LiveManager.Instance.hasSession(privatePayload.sessionId)) { | ||
75 | cleanUpReqFiles(req) | ||
76 | |||
77 | return res.fail({ | ||
78 | status: HttpStatusCode.BAD_REQUEST_400, | ||
79 | message: 'Session of this live ended', | ||
80 | tags | ||
81 | }) | ||
82 | } | ||
83 | } | ||
84 | |||
62 | return next() | 85 | return next() |
63 | } | 86 | } |
64 | ] | 87 | ] |
diff --git a/server/tests/api/live/live-constraints.ts b/server/tests/api/live/live-constraints.ts index 59e27cc62..37fd5b2a6 100644 --- a/server/tests/api/live/live-constraints.ts +++ b/server/tests/api/live/live-constraints.ts | |||
@@ -197,7 +197,7 @@ describe('Test live constraints', function () { | |||
197 | live: { | 197 | live: { |
198 | enabled: true, | 198 | enabled: true, |
199 | allowReplay: true, | 199 | allowReplay: true, |
200 | maxDuration: 1, | 200 | maxDuration: 3, |
201 | transcoding: { | 201 | transcoding: { |
202 | enabled: true, | 202 | enabled: true, |
203 | resolutions: ConfigCommand.getCustomConfigResolutions(true) | 203 | resolutions: ConfigCommand.getCustomConfigResolutions(true) |
diff --git a/server/tests/api/search/search-videos.ts b/server/tests/api/search/search-videos.ts index 66c462942..f7f50147d 100644 --- a/server/tests/api/search/search-videos.ts +++ b/server/tests/api/search/search-videos.ts | |||
@@ -525,7 +525,7 @@ describe('Test videos search', function () { | |||
525 | }) | 525 | }) |
526 | 526 | ||
527 | it('Should search by live', async function () { | 527 | it('Should search by live', async function () { |
528 | this.timeout(60000) | 528 | this.timeout(120000) |
529 | 529 | ||
530 | { | 530 | { |
531 | const newConfig = { | 531 | const newConfig = { |
diff --git a/shared/models/runners/runner-job-private-payload.model.ts b/shared/models/runners/runner-job-private-payload.model.ts index 32b837de4..529034db8 100644 --- a/shared/models/runners/runner-job-private-payload.model.ts +++ b/shared/models/runners/runner-job-private-payload.model.ts | |||
@@ -34,6 +34,7 @@ export interface RunnerJobLiveRTMPHLSTranscodingPrivatePayload { | |||
34 | videoUUID: string | 34 | videoUUID: string |
35 | masterPlaylistName: string | 35 | masterPlaylistName: string |
36 | outputDirectory: string | 36 | outputDirectory: string |
37 | sessionId: string | ||
37 | } | 38 | } |
38 | 39 | ||
39 | // --------------------------------------------------------------------------- | 40 | // --------------------------------------------------------------------------- |