aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-22 13:44:22 +0200
committerChocobozzz <me@florianbigard.com>2023-05-22 13:54:46 +0200
commit17ecdf61ce1d374cc8ba17601b93c9bda08112b2 (patch)
treed3405d11636f661815c3ad09821944a5ae26ebfa
parentf3bc1b541619673f14db7de220b9c520a4f35ca8 (diff)
downloadPeerTube-17ecdf61ce1d374cc8ba17601b93c9bda08112b2.tar.gz
PeerTube-17ecdf61ce1d374cc8ba17601b93c9bda08112b2.tar.zst
PeerTube-17ecdf61ce1d374cc8ba17601b93c9bda08112b2.zip
Force stop remote live transcoding
-rw-r--r--packages/peertube-runner/server/process/shared/process-live.ts5
-rw-r--r--server/lib/live/live-manager.ts4
-rw-r--r--server/lib/live/shared/muxing-session.ts1
-rw-r--r--server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts3
-rw-r--r--server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts1
-rw-r--r--server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts4
-rw-r--r--server/middlewares/validators/runners/jobs.ts27
-rw-r--r--server/tests/api/live/live-constraints.ts2
-rw-r--r--server/tests/api/search/search-videos.ts2
-rw-r--r--shared/models/runners/runner-job-private-payload.model.ts1
10 files changed, 45 insertions, 5 deletions
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts
index df1b677f0..6edb1f1e9 100644
--- a/packages/peertube-runner/server/process/shared/process-live.ts
+++ b/packages/peertube-runner/server/process/shared/process-live.ts
@@ -34,6 +34,8 @@ export class ProcessLiveRTMPHLSTranscoding {
34 34
35 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) { 35 constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
36 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) 36 this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
37
38 logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
37 } 39 }
38 40
39 process () { 41 process () {
@@ -289,6 +291,7 @@ export class ProcessLiveRTMPHLSTranscoding {
289 }) 291 })
290 } catch (err) { 292 } catch (err) {
291 if (currentTry >= 3) throw err 293 if (currentTry >= 3) throw err
294 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err
292 295
293 logger.warn({ err }, 'Will retry update after error') 296 logger.warn({ err }, 'Will retry update after error')
294 await wait(250) 297 await wait(250)
@@ -310,6 +313,8 @@ export class ProcessLiveRTMPHLSTranscoding {
310 // --------------------------------------------------------------------------- 313 // ---------------------------------------------------------------------------
311 314
312 private cleanup () { 315 private cleanup () {
316 logger.debug(`Cleaning up job ${this.options.job.uuid}`)
317
313 for (const fsWatcher of this.fsWatchers) { 318 for (const fsWatcher of this.fsWatchers) {
314 fsWatcher.close() 319 fsWatcher.close()
315 .catch(err => logger.error({ err }, 'Cannot close watcher')) 320 .catch(err => logger.error({ err }, 'Cannot close watcher'))
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 68aa18443..f062e2fd3 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -178,6 +178,10 @@ class LiveManager {
178 return !!this.rtmpServer 178 return !!this.rtmpServer
179 } 179 }
180 180
181 hasSession (sessionId: string) {
182 return this.getContext().sessions.has(sessionId)
183 }
184
181 stopSessionOf (videoUUID: string, error: LiveVideoError | null) { 185 stopSessionOf (videoUUID: string, error: LiveVideoError | null) {
182 const sessionId = this.videoSessions.get(videoUUID) 186 const sessionId = this.videoSessions.get(videoUUID)
183 if (!sessionId) { 187 if (!sessionId) {
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 6632499ff..c672ec4d6 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -477,6 +477,7 @@ class MuxingSession extends EventEmitter {
477 477
478 lTags: this.lTags, 478 lTags: this.lTags,
479 479
480 sessionId: this.sessionId,
480 inputLocalUrl: this.inputLocalUrl, 481 inputLocalUrl: this.inputLocalUrl,
481 inputPublicUrl: this.inputPublicUrl, 482 inputPublicUrl: this.inputPublicUrl,
482 483
diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
index ee61d7690..95168745d 100644
--- a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
+++ b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
@@ -25,6 +25,7 @@ interface AbstractTranscodingWrapperOptions {
25 25
26 lTags: LoggerTagsFn 26 lTags: LoggerTagsFn
27 27
28 sessionId: string
28 inputLocalUrl: string 29 inputLocalUrl: string
29 inputPublicUrl: string 30 inputPublicUrl: string
30 31
@@ -52,6 +53,7 @@ abstract class AbstractTranscodingWrapper extends EventEmitter {
52 fps: number 53 fps: number
53 }[] 54 }[]
54 55
56 protected readonly sessionId: string
55 protected readonly inputLocalUrl: string 57 protected readonly inputLocalUrl: string
56 protected readonly inputPublicUrl: string 58 protected readonly inputPublicUrl: string
57 59
@@ -80,6 +82,7 @@ abstract class AbstractTranscodingWrapper extends EventEmitter {
80 this.videoUUID = options.videoLive.Video.uuid 82 this.videoUUID = options.videoLive.Video.uuid
81 this.streamingPlaylist = options.streamingPlaylist 83 this.streamingPlaylist = options.streamingPlaylist
82 84
85 this.sessionId = options.sessionId
83 this.inputLocalUrl = options.inputLocalUrl 86 this.inputLocalUrl = options.inputLocalUrl
84 this.inputPublicUrl = options.inputPublicUrl 87 this.inputPublicUrl = options.inputPublicUrl
85 88
diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
index 6770a5e6f..2aeeb31fb 100644
--- a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
+++ b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
@@ -5,6 +5,7 @@ export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper {
5 async run () { 5 async run () {
6 await new LiveRTMPHLSTranscodingJobHandler().create({ 6 await new LiveRTMPHLSTranscodingJobHandler().create({
7 rtmpUrl: this.inputPublicUrl, 7 rtmpUrl: this.inputPublicUrl,
8 sessionId: this.sessionId,
8 toTranscode: this.toTranscode, 9 toTranscode: this.toTranscode,
9 video: this.videoLive.Video, 10 video: this.videoLive.Video,
10 outputDirectory: this.outDirectory, 11 outputDirectory: this.outDirectory,
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
index 87b6f0702..6b2894f8c 100644
--- a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
+++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
@@ -20,6 +20,7 @@ type CreateOptions = {
20 video: MVideo 20 video: MVideo
21 playlist: MStreamingPlaylist 21 playlist: MStreamingPlaylist
22 22
23 sessionId: string
23 rtmpUrl: string 24 rtmpUrl: string
24 25
25 toTranscode: { 26 toTranscode: {
@@ -37,7 +38,7 @@ type CreateOptions = {
37export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> { 38export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> {
38 39
39 async create (options: CreateOptions) { 40 async create (options: CreateOptions) {
40 const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options 41 const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory, sessionId } = options
41 42
42 const jobUUID = buildUUID() 43 const jobUUID = buildUUID()
43 const payload: RunnerJobLiveRTMPHLSTranscodingPayload = { 44 const payload: RunnerJobLiveRTMPHLSTranscodingPayload = {
@@ -54,6 +55,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO
54 const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = { 55 const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = {
55 videoUUID: video.uuid, 56 videoUUID: video.uuid,
56 masterPlaylistName: playlist.playlistFilename, 57 masterPlaylistName: playlist.playlistFilename,
58 sessionId,
57 outputDirectory 59 outputDirectory
58 } 60 }
59 61
diff --git a/server/middlewares/validators/runners/jobs.ts b/server/middlewares/validators/runners/jobs.ts
index de956a1ca..8174d4289 100644
--- a/server/middlewares/validators/runners/jobs.ts
+++ b/server/middlewares/validators/runners/jobs.ts
@@ -11,8 +11,16 @@ import {
11} from '@server/helpers/custom-validators/runners/jobs' 11} from '@server/helpers/custom-validators/runners/jobs'
12import { isRunnerTokenValid } from '@server/helpers/custom-validators/runners/runners' 12import { isRunnerTokenValid } from '@server/helpers/custom-validators/runners/runners'
13import { cleanUpReqFiles } from '@server/helpers/express-utils' 13import { cleanUpReqFiles } from '@server/helpers/express-utils'
14import { LiveManager } from '@server/lib/live'
14import { RunnerJobModel } from '@server/models/runner/runner-job' 15import { RunnerJobModel } from '@server/models/runner/runner-job'
15import { HttpStatusCode, RunnerJobState, RunnerJobSuccessBody, RunnerJobUpdateBody, ServerErrorCode } from '@shared/models' 16import {
17 HttpStatusCode,
18 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
19 RunnerJobState,
20 RunnerJobSuccessBody,
21 RunnerJobUpdateBody,
22 ServerErrorCode
23} from '@shared/models'
16import { areValidationErrors } from '../shared' 24import { areValidationErrors } from '../shared'
17 25
18const tags = [ 'runner' ] 26const tags = [ 'runner' ]
@@ -48,8 +56,9 @@ export const updateRunnerJobValidator = [
48 if (areValidationErrors(req, res, { tags })) return cleanUpReqFiles(req) 56 if (areValidationErrors(req, res, { tags })) return cleanUpReqFiles(req)
49 57
50 const body = req.body as RunnerJobUpdateBody 58 const body = req.body as RunnerJobUpdateBody
59 const job = res.locals.runnerJob
51 60
52 if (isRunnerJobUpdatePayloadValid(body.payload, res.locals.runnerJob.type, req.files) !== true) { 61 if (isRunnerJobUpdatePayloadValid(body.payload, job.type, req.files) !== true) {
53 cleanUpReqFiles(req) 62 cleanUpReqFiles(req)
54 63
55 return res.fail({ 64 return res.fail({
@@ -59,6 +68,20 @@ export const updateRunnerJobValidator = [
59 }) 68 })
60 } 69 }
61 70
71 if (res.locals.runnerJob.type === 'live-rtmp-hls-transcoding') {
72 const privatePayload = job.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
73
74 if (!LiveManager.Instance.hasSession(privatePayload.sessionId)) {
75 cleanUpReqFiles(req)
76
77 return res.fail({
78 status: HttpStatusCode.BAD_REQUEST_400,
79 message: 'Session of this live ended',
80 tags
81 })
82 }
83 }
84
62 return next() 85 return next()
63 } 86 }
64] 87]
diff --git a/server/tests/api/live/live-constraints.ts b/server/tests/api/live/live-constraints.ts
index 59e27cc62..37fd5b2a6 100644
--- a/server/tests/api/live/live-constraints.ts
+++ b/server/tests/api/live/live-constraints.ts
@@ -197,7 +197,7 @@ describe('Test live constraints', function () {
197 live: { 197 live: {
198 enabled: true, 198 enabled: true,
199 allowReplay: true, 199 allowReplay: true,
200 maxDuration: 1, 200 maxDuration: 3,
201 transcoding: { 201 transcoding: {
202 enabled: true, 202 enabled: true,
203 resolutions: ConfigCommand.getCustomConfigResolutions(true) 203 resolutions: ConfigCommand.getCustomConfigResolutions(true)
diff --git a/server/tests/api/search/search-videos.ts b/server/tests/api/search/search-videos.ts
index 66c462942..f7f50147d 100644
--- a/server/tests/api/search/search-videos.ts
+++ b/server/tests/api/search/search-videos.ts
@@ -525,7 +525,7 @@ describe('Test videos search', function () {
525 }) 525 })
526 526
527 it('Should search by live', async function () { 527 it('Should search by live', async function () {
528 this.timeout(60000) 528 this.timeout(120000)
529 529
530 { 530 {
531 const newConfig = { 531 const newConfig = {
diff --git a/shared/models/runners/runner-job-private-payload.model.ts b/shared/models/runners/runner-job-private-payload.model.ts
index 32b837de4..529034db8 100644
--- a/shared/models/runners/runner-job-private-payload.model.ts
+++ b/shared/models/runners/runner-job-private-payload.model.ts
@@ -34,6 +34,7 @@ export interface RunnerJobLiveRTMPHLSTranscodingPrivatePayload {
34 videoUUID: string 34 videoUUID: string
35 masterPlaylistName: string 35 masterPlaylistName: string
36 outputDirectory: string 36 outputDirectory: string
37 sessionId: string
37} 38}
38 39
39// --------------------------------------------------------------------------- 40// ---------------------------------------------------------------------------