]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Force stop remote live transcoding
authorChocobozzz <me@florianbigard.com>
Mon, 22 May 2023 11:44:22 +0000 (13:44 +0200)
committerChocobozzz <me@florianbigard.com>
Mon, 22 May 2023 11:54:46 +0000 (13:54 +0200)
packages/peertube-runner/server/process/shared/process-live.ts
server/lib/live/live-manager.ts
server/lib/live/shared/muxing-session.ts
server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts
server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts
server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
server/middlewares/validators/runners/jobs.ts
server/tests/api/live/live-constraints.ts
server/tests/api/search/search-videos.ts
shared/models/runners/runner-job-private-payload.model.ts

index df1b677f0c81a89e9fd227cda6cff9720a053565..6edb1f1e9c116020a99849e0fb3b5d9adbba7437 100644 (file)
@@ -34,6 +34,8 @@ export class ProcessLiveRTMPHLSTranscoding {
 
   constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
     this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
+
+    logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
   }
 
   process () {
@@ -289,6 +291,7 @@ export class ProcessLiveRTMPHLSTranscoding {
       })
     } catch (err) {
       if (currentTry >= 3) throw err
+      if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err
 
       logger.warn({ err }, 'Will retry update after error')
       await wait(250)
@@ -310,6 +313,8 @@ export class ProcessLiveRTMPHLSTranscoding {
   // ---------------------------------------------------------------------------
 
   private cleanup () {
+    logger.debug(`Cleaning up job ${this.options.job.uuid}`)
+
     for (const fsWatcher of this.fsWatchers) {
       fsWatcher.close()
         .catch(err => logger.error({ err }, 'Cannot close watcher'))
index 68aa184431e3c96e21b8d986cad26321d935f00e..f062e2fd3219059bebe405814fa42178ed5ee8f0 100644 (file)
@@ -178,6 +178,10 @@ class LiveManager {
     return !!this.rtmpServer
   }
 
+  hasSession (sessionId: string) {
+    return this.getContext().sessions.has(sessionId)
+  }
+
   stopSessionOf (videoUUID: string, error: LiveVideoError | null) {
     const sessionId = this.videoSessions.get(videoUUID)
     if (!sessionId) {
index 6632499ffdeb351b9366e2cede9ab5b897ccb77b..c672ec4d6c28db87bfe39f3cbd2a15422f1abcc4 100644 (file)
@@ -477,6 +477,7 @@ class MuxingSession extends EventEmitter {
 
       lTags: this.lTags,
 
+      sessionId: this.sessionId,
       inputLocalUrl: this.inputLocalUrl,
       inputPublicUrl: this.inputPublicUrl,
 
index ee61d769072c6dd405ac50000e37fceb951ae08b..95168745dacd21de175443ae77453e096c9a9ae1 100644 (file)
@@ -25,6 +25,7 @@ interface AbstractTranscodingWrapperOptions {
 
   lTags: LoggerTagsFn
 
+  sessionId: string
   inputLocalUrl: string
   inputPublicUrl: string
 
@@ -52,6 +53,7 @@ abstract class AbstractTranscodingWrapper extends EventEmitter {
     fps: number
   }[]
 
+  protected readonly sessionId: string
   protected readonly inputLocalUrl: string
   protected readonly inputPublicUrl: string
 
@@ -80,6 +82,7 @@ abstract class AbstractTranscodingWrapper extends EventEmitter {
     this.videoUUID = options.videoLive.Video.uuid
     this.streamingPlaylist = options.streamingPlaylist
 
+    this.sessionId = options.sessionId
     this.inputLocalUrl = options.inputLocalUrl
     this.inputPublicUrl = options.inputPublicUrl
 
index 6770a5e6f1df73af2dda6ccf8484be3ff74bd968..2aeeb31fb1941dcd0cfa9fc5b0ec7ad894f02bb5 100644 (file)
@@ -5,6 +5,7 @@ export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper {
   async run () {
     await new LiveRTMPHLSTranscodingJobHandler().create({
       rtmpUrl: this.inputPublicUrl,
+      sessionId: this.sessionId,
       toTranscode: this.toTranscode,
       video: this.videoLive.Video,
       outputDirectory: this.outDirectory,
index 87b6f07025aaa6bb68c3f2bd17f34eaf7e96eb17..6b2894f8c82196dbadf72c005b6dfc5bb12bcf4a 100644 (file)
@@ -20,6 +20,7 @@ type CreateOptions = {
   video: MVideo
   playlist: MStreamingPlaylist
 
+  sessionId: string
   rtmpUrl: string
 
   toTranscode: {
@@ -37,7 +38,7 @@ type CreateOptions = {
 export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> {
 
   async create (options: CreateOptions) {
-    const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options
+    const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory, sessionId } = options
 
     const jobUUID = buildUUID()
     const payload: RunnerJobLiveRTMPHLSTranscodingPayload = {
@@ -54,6 +55,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO
     const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = {
       videoUUID: video.uuid,
       masterPlaylistName: playlist.playlistFilename,
+      sessionId,
       outputDirectory
     }
 
index de956a1ca01fb49359efb5d9db68537bb493ffcb..8174d428929fc003e79e6bd90ae84364143175c8 100644 (file)
@@ -11,8 +11,16 @@ import {
 } from '@server/helpers/custom-validators/runners/jobs'
 import { isRunnerTokenValid } from '@server/helpers/custom-validators/runners/runners'
 import { cleanUpReqFiles } from '@server/helpers/express-utils'
+import { LiveManager } from '@server/lib/live'
 import { RunnerJobModel } from '@server/models/runner/runner-job'
-import { HttpStatusCode, RunnerJobState, RunnerJobSuccessBody, RunnerJobUpdateBody, ServerErrorCode } from '@shared/models'
+import {
+  HttpStatusCode,
+  RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
+  RunnerJobState,
+  RunnerJobSuccessBody,
+  RunnerJobUpdateBody,
+  ServerErrorCode
+} from '@shared/models'
 import { areValidationErrors } from '../shared'
 
 const tags = [ 'runner' ]
@@ -48,8 +56,9 @@ export const updateRunnerJobValidator = [
     if (areValidationErrors(req, res, { tags })) return cleanUpReqFiles(req)
 
     const body = req.body as RunnerJobUpdateBody
+    const job = res.locals.runnerJob
 
-    if (isRunnerJobUpdatePayloadValid(body.payload, res.locals.runnerJob.type, req.files) !== true) {
+    if (isRunnerJobUpdatePayloadValid(body.payload, job.type, req.files) !== true) {
       cleanUpReqFiles(req)
 
       return res.fail({
@@ -59,6 +68,20 @@ export const updateRunnerJobValidator = [
       })
     }
 
+    if (res.locals.runnerJob.type === 'live-rtmp-hls-transcoding') {
+      const privatePayload = job.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
+
+      if (!LiveManager.Instance.hasSession(privatePayload.sessionId)) {
+        cleanUpReqFiles(req)
+
+        return res.fail({
+          status: HttpStatusCode.BAD_REQUEST_400,
+          message: 'Session of this live ended',
+          tags
+        })
+      }
+    }
+
     return next()
   }
 ]
index 59e27cc62680e2015d160e7f33929f1b05f83547..37fd5b2a639c15bd0806f9af8762ffc650d9782d 100644 (file)
@@ -197,7 +197,7 @@ describe('Test live constraints', function () {
         live: {
           enabled: true,
           allowReplay: true,
-          maxDuration: 1,
+          maxDuration: 3,
           transcoding: {
             enabled: true,
             resolutions: ConfigCommand.getCustomConfigResolutions(true)
index 66c462942f4b1f0fb45d2c2b0305815d73ce3edd..f7f50147d524f4c10e413e1788303eaad85f272f 100644 (file)
@@ -525,7 +525,7 @@ describe('Test videos search', function () {
   })
 
   it('Should search by live', async function () {
-    this.timeout(60000)
+    this.timeout(120000)
 
     {
       const newConfig = {
index 32b837de482db9062fed14e39a9cd2155f8a3535..529034db886eb4010885b0aa985bb7ad5e20f134 100644 (file)
@@ -34,6 +34,7 @@ export interface RunnerJobLiveRTMPHLSTranscodingPrivatePayload {
   videoUUID: string
   masterPlaylistName: string
   outputDirectory: string
+  sessionId: string
 }
 
 // ---------------------------------------------------------------------------