]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Enable external plugins to test the PR
authorChocobozzz <me@florianbigard.com>
Tue, 2 May 2023 11:51:06 +0000 (13:51 +0200)
committerChocobozzz <chocobozzz@cpy.re>
Tue, 9 May 2023 06:57:34 +0000 (08:57 +0200)
15 files changed:
.github/workflows/test.yml
client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html
packages/peertube-runner/README.md
packages/peertube-runner/peertube-runner.ts
packages/peertube-runner/server/process/shared/process-live.ts
packages/peertube-runner/server/server.ts
packages/peertube-runner/shared/http.ts
packages/peertube-runner/shared/ipc/ipc-server.ts
scripts/build/peertube-runner.sh
server/lib/job-queue/job-queue.ts
server/lib/live/shared/muxing-session.ts
server/lib/runners/job-handlers/abstract-job-handler.ts
server/tests/api/videos/resumable-upload.ts
server/tests/cli/create-import-video-file-job.ts
shared/server-commands/requests/requests.ts

index 678b0674bc24c3cd4396817d9a7be3812cc1b54d..2288bcd3f699d7be53475b255e753d459ab792d8 100644 (file)
@@ -77,7 +77,7 @@ jobs:
 
       - name: Run Test
         # external-plugins tests only run on schedule
-        if: github.event_name == 'schedule' || matrix.test_suite != 'external-plugins'
+        if: github.event_name == 'schedule' || matrix.test_suite != 'external-plugins'
         env:
           AKISMET_KEY: ${{ secrets.AKISMET_KEY }}
         run: npm run ci -- ${{ matrix.test_suite }}
index 7858b4bcaadd8606107a702b7346b575ede32e7e..d42f600e0d933a8bce3911d89d00bbb67726ff5a 100644 (file)
@@ -84,7 +84,7 @@
           <pre>{{ runnerJob.privatePayload }}</pre>
         </div>
 
-        <pre *ngIf="runnerJob.error" class=".text-danger" >{{ runnerJob.error }}</pre>
+        <pre *ngIf="runnerJob.error" class="text-danger" >{{ runnerJob.error }}</pre>
       </td>
     </tr>
   </ng-template>
index b7cf174d5ed2f47901b178ac4f0ef3c27243165d..87c3c53540db9b27d7bdb5e7f97343315935b01a 100644 (file)
@@ -1 +1,23 @@
 # PeerTube runner
+
+Runner program to execute jobs (transcoding...) of remote PeerTube instances.
+
+Commands below has to be run at the root of PeerTube git repository.
+
+## Develop
+
+```bash
+npm run dev:peertube-runner
+```
+
+## Build
+
+```bash
+npm run build:peertube-runner
+```
+
+## Run
+
+```bash
+node packages/peertube-runner/dist/peertube-runner.js --help
+```
index 6bfd9ac0fa45ed0a01365bf5b22865cf53641e45..f02526ef9b5b6ac601195379d47c2f4c50115991 100644 (file)
@@ -26,7 +26,7 @@ program.command('server')
     try {
       await RunnerServer.Instance.run()
     } catch (err) {
-      console.error('Cannot run PeerTube runner as server mode', err)
+      logger.error('Cannot run PeerTube runner as server mode', err)
       process.exit(-1)
     }
   })
@@ -41,7 +41,7 @@ program.command('register')
     try {
       await registerRunner(options)
     } catch (err) {
-      console.error('Cannot register this PeerTube runner.', err)
+      logger.error('Cannot register this PeerTube runner.', err)
       process.exit(-1)
     }
   })
@@ -53,7 +53,7 @@ program.command('unregister')
     try {
       await unregisterRunner(options)
     } catch (err) {
-      console.error('Cannot unregister this PeerTube runner.', err)
+      logger.error('Cannot unregister this PeerTube runner.', err)
       process.exit(-1)
     }
   })
@@ -64,7 +64,7 @@ program.command('list-registered')
     try {
       await listRegistered()
     } catch (err) {
-      console.error('Cannot list registered PeerTube instances.', err)
+      logger.error('Cannot list registered PeerTube instances.', err)
       process.exit(-1)
     }
   })
index 5a3b596a2e5c1cfd3323c9eabfcdf83decdf5518..b17b51c7c5639892f2363835326847c6904d674e 100644 (file)
@@ -204,8 +204,8 @@ export class ProcessLiveRTMPHLSTranscoding {
 
   // ---------------------------------------------------------------------------
 
-  private sendDeletedChunkUpdate (deletedChunk: string) {
-    if (this.ended) return
+  private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
+    if (this.ended) return Promise.resolve()
 
     logger.debug(`Sending removed live chunk ${deletedChunk} update`)
 
@@ -230,8 +230,8 @@ export class ProcessLiveRTMPHLSTranscoding {
     return this.updateWithRetry(payload)
   }
 
-  private sendAddedChunkUpdate (addedChunk: string) {
-    if (this.ended) return
+  private sendAddedChunkUpdate (addedChunk: string): Promise<any> {
+    if (this.ended) return Promise.resolve()
 
     logger.debug(`Sending added live chunk ${addedChunk} update`)
 
@@ -257,7 +257,7 @@ export class ProcessLiveRTMPHLSTranscoding {
     return this.updateWithRetry(payload)
   }
 
-  private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) {
+  private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
     if (this.ended || this.errored) return
 
     try {
index 724f359bd36dd7f1dd4ad7c0ddd208d493163cb4..e851dfc7cc743521a217c357c101760949dd4394 100644 (file)
@@ -23,6 +23,8 @@ export class RunnerServer {
 
   private checkingAvailableJobs = false
 
+  private cleaningUp = false
+
   private readonly sockets = new Map<PeerTubeServer, Socket>()
 
   private constructor () {}
@@ -45,13 +47,17 @@ export class RunnerServer {
     try {
       await ipcServer.run(this)
     } catch (err) {
-      console.error('Cannot start local socket for IPC communication', err)
+      logger.error('Cannot start local socket for IPC communication', err)
       process.exit(-1)
     }
 
     // Cleanup on exit
     for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
-      process.on(code, async () => {
+      process.on(code, async (err, origin) => {
+        if (code === 'uncaughtException') {
+          logger.error({ err, origin }, 'uncaughtException')
+        }
+
         await this.onExit()
       })
     }
@@ -244,6 +250,11 @@ export class RunnerServer {
   }
 
   private async onExit () {
+    if (this.cleaningUp) return
+    this.cleaningUp = true
+
+    logger.info('Cleaning up after program exit')
+
     try {
       for (const { server, job } of this.processingJobs) {
         await server.runnerJobs.abort({
@@ -256,7 +267,7 @@ export class RunnerServer {
 
       await this.cleanupTMP()
     } catch (err) {
-      console.error(err)
+      logger.error(err)
       process.exit(-1)
     }
 
index d3fff70d1ccf92a3ce6d1a716c44e4cf97e4dd7a..df64dc168c4e33fe94476e65152a9c6190f44474 100644 (file)
@@ -47,7 +47,7 @@ export function downloadFile (options: {
 
     request.on('error', err => {
       remove(destination)
-        .catch(err => console.error(err))
+        .catch(err => logger.error(err))
 
       return rej(err)
     })
index bc340198b3de56c86c2204d13c392cd92b6e58c8..922dc93e5881eef04c2f41a52d64929fff14d850 100644 (file)
@@ -27,7 +27,7 @@ export class IPCServer {
 
         this.sendReponse(res, { success: true, data })
       } catch (err) {
-        console.error('Cannot execute RPC call', err)
+        logger.error('Cannot execute RPC call', err)
         this.sendReponse(res, { success: false, error: err.message })
       }
     })
@@ -56,6 +56,6 @@ export class IPCServer {
     body: IPCReponse<T>
   ) {
     response(body)
-      .catch(err => console.error('Cannot send response after IPC request', err))
+      .catch(err => logger.error('Cannot send response after IPC request', err))
   }
 }
index 690031af55854e802f69e76e198caf8426f5947f..9c326747bac0c3d5165bc4c1b85579b0d33070a3 100755 (executable)
@@ -10,4 +10,4 @@ rm -rf ./dist
 rm -rf ./dist
 mkdir ./dist
 
-./node_modules/.bin/esbuild ./peertube-runner.ts --bundle --platform=node --external:"./lib-cov/fluent-ffmpeg" --external:pg-hstore --outfile=dist/peertube-runner.js
+./node_modules/.bin/esbuild ./peertube-runner.ts --bundle --platform=node --target=node14 --external:"./lib-cov/fluent-ffmpeg" --external:pg-hstore --outfile=dist/peertube-runner.js
index 21bf0f22651eb10fbbc03d465cf1737d24e4c997..03f6fbea75a382f1f8b656d4e8525cf9dbf0c87d 100644 (file)
@@ -96,6 +96,7 @@ export type CreateJobArgument =
 export type CreateJobOptions = {
   delay?: number
   priority?: number
+  failParentOnFailure?: boolean
 }
 
 const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
@@ -363,7 +364,11 @@ class JobQueue {
       name: 'job',
       data: job.payload,
       queueName: job.type,
-      opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
+      opts: {
+        failParentOnFailure: true,
+
+        ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ]))
+      }
     }
   }
 
index f3f8fc8863fc9c65ef1f8ed8c6f959159aef9223..ef4ecb83e9affea3e63e566bc1aaf085a483b6f2 100644 (file)
@@ -79,9 +79,7 @@ class MuxingSession extends EventEmitter {
   private streamingPlaylist: MStreamingPlaylistVideo
   private liveSegmentShaStore: LiveSegmentShaStore
 
-  private tsWatcher: FSWatcher
-  private masterWatcher: FSWatcher
-  private m3u8Watcher: FSWatcher
+  private filesWatcher: FSWatcher
 
   private masterPlaylistCreated = false
   private liveReady = false
@@ -149,6 +147,8 @@ class MuxingSession extends EventEmitter {
 
     await this.transcodingWrapper.run()
 
+    this.filesWatcher = watch(this.outDirectory, { depth: 0 })
+
     this.watchMasterFile()
     this.watchTSFiles()
     this.watchM3U8File()
@@ -168,9 +168,10 @@ class MuxingSession extends EventEmitter {
   }
 
   private watchMasterFile () {
-    this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
+    this.filesWatcher.on('add', async path => {
+      if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return
+      if (this.masterPlaylistCreated === true) return
 
-    this.masterWatcher.on('add', async () => {
       try {
         if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
           const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
@@ -188,20 +189,18 @@ class MuxingSession extends EventEmitter {
       this.masterPlaylistCreated = true
 
       logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags())
-
-      this.masterWatcher.close()
-        .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
     })
   }
 
   private watchM3U8File () {
-    this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
-
     const sendQueues = new Map<string, PQueue>()
 
-    const onChangeOrAdd = async (m3u8Path: string) => {
+    const onChange = async (m3u8Path: string) => {
+      if (m3u8Path.endsWith('.m3u8') !== true) return
       if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
 
+      logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags())
+
       try {
         if (!sendQueues.has(m3u8Path)) {
           sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
@@ -214,18 +213,18 @@ class MuxingSession extends EventEmitter {
       }
     }
 
-    this.m3u8Watcher.on('change', onChangeOrAdd)
+    this.filesWatcher.on('change', onChange)
   }
 
   private watchTSFiles () {
     const startStreamDateTime = new Date().getTime()
 
-    this.tsWatcher = watch(this.outDirectory + '/*.ts')
-
     const playlistIdMatcher = /^([\d+])-/
 
     const addHandler = async (segmentPath: string) => {
-      logger.debug('Live add handler of %s.', segmentPath, this.lTags())
+      if (segmentPath.endsWith('.ts') !== true) return
+
+      logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
 
       const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
 
@@ -252,6 +251,10 @@ class MuxingSession extends EventEmitter {
     }
 
     const deleteHandler = async (segmentPath: string) => {
+      if (segmentPath.endsWith('.ts') !== true) return
+
+      logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags())
+
       try {
         await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
       } catch (err) {
@@ -267,8 +270,8 @@ class MuxingSession extends EventEmitter {
       }
     }
 
-    this.tsWatcher.on('add', p => addHandler(p))
-    this.tsWatcher.on('unlink', p => deleteHandler(p))
+    this.filesWatcher.on('add', p => addHandler(p))
+    this.filesWatcher.on('unlink', p => deleteHandler(p))
   }
 
   private async isQuotaExceeded (segmentPath: string) {
@@ -371,7 +374,8 @@ class MuxingSession extends EventEmitter {
     setTimeout(() => {
       // Wait latest segments generation, and close watchers
 
-      Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
+      const promise = this.filesWatcher?.close() || Promise.resolve()
+      promise
         .then(() => {
           // Process remaining segments hash
           for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
index 73fc1457407f9719cd686d10c987bc42ed27453d..74b455107ea254fe95393421ac95a463dc1761cf 100644 (file)
@@ -21,6 +21,7 @@ import {
   RunnerJobVODWebVideoTranscodingPayload,
   RunnerJobVODWebVideoTranscodingPrivatePayload
 } from '@shared/models'
+import { throttle } from 'lodash'
 
 type CreateRunnerJobArg =
   {
@@ -48,6 +49,8 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
 
   protected readonly lTags = loggerTagsFactory('runner')
 
+  static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000)
+
   // ---------------------------------------------------------------------------
 
   abstract create (options: C): Promise<MRunnerJob>
@@ -102,16 +105,19 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
 
     if (progress) runnerJob.progress = progress
 
+    if (!runnerJob.changed()) {
+      try {
+        await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id })
+      } catch (err) {
+        logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
+      }
+
+      return
+    }
+
     await retryTransactionWrapper(() => {
       return sequelizeTypescript.transaction(async transaction => {
-        if (runnerJob.changed()) {
-          return runnerJob.save({ transaction })
-        }
-
-        // Don't update the job too often
-        if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) {
-          await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction })
-        }
+        return runnerJob.save({ transaction })
       })
     })
   }
index a70a7258b9d87f3738d89a74bcc41a08cc7a7ddc..2fbefb39282d7ce372c20068a9d1354f7d98344e 100644 (file)
@@ -79,7 +79,7 @@ describe('Test resumable upload', function () {
   async function checkFileSize (uploadIdArg: string, expectedSize: number | null) {
     const uploadId = uploadIdArg.replace(/^upload_id=/, '')
 
-    const subPath = join('tmp', 'resumable-uploads', uploadId)
+    const subPath = join('tmp', 'resumable-uploads', `${rootId}-${uploadId}.mp4`)
     const filePath = server.servers.buildDirectory(subPath)
     const exists = await pathExists(filePath)
 
index 43f53035b947fe5f23655aa60e736448e41c2b03..3ece4f2ec293ef75932c9833ebb58078f6737597 100644 (file)
@@ -73,7 +73,7 @@ function runTests (objectStorage: boolean) {
   })
 
   it('Should run a import job on video 1 with a lower resolution', async function () {
-    const command = `npm run create-import-video-file-job -- -v ${video1ShortId} -i server/tests/fixtures/video_short-480.webm`
+    const command = `npm run create-import-video-file-job -- -v ${video1ShortId} -i server/tests/fixtures/video_short_480.webm`
     await servers[0].cli.execWithEnv(command)
 
     await waitJobs(servers)
index 96f67b4c7fbc1e0973c0b3cc8806619491c6cd98..e3f1817f1b45c934599f9905b2a2e8fb7a028905 100644 (file)
@@ -159,7 +159,7 @@ function unwrapBodyOrDecodeToJSON <T> (test: request.Test): Promise<T> {
       try {
         return JSON.parse(new TextDecoder().decode(res.body))
       } catch (err) {
-        console.error('Cannot decode JSON.', res.body instanceof Buffer ? res.body.toString() : res.body)
+        console.error('Cannot decode JSON.', { res, body: res.body instanceof Buffer ? res.body.toString() : res.body })
         throw err
       }
     }
@@ -168,7 +168,7 @@ function unwrapBodyOrDecodeToJSON <T> (test: request.Test): Promise<T> {
       try {
         return JSON.parse(res.text)
       } catch (err) {
-        console.error('Cannot decode json', res.text)
+        console.error('Cannot decode json', { res, text: res.text })
         throw err
       }
     }