aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-06-19 13:45:26 +0200
committerChocobozzz <me@florianbigard.com>2023-06-19 13:45:26 +0200
commita34c612f38af9c5f2c9f53931ed9df35ac834e90 (patch)
tree3d8f2b8a0d5c93dbe14a668cfe76cb974ce9ae74
parent7d758898dc907b8643bab6ae2e70c3ca2a021fac (diff)
downloadPeerTube-a34c612f38af9c5f2c9f53931ed9df35ac834e90.tar.gz
PeerTube-a34c612f38af9c5f2c9f53931ed9df35ac834e90.tar.zst
PeerTube-a34c612f38af9c5f2c9f53931ed9df35ac834e90.zip
More robust runner update handler
-rw-r--r--packages/peertube-runner/server/process/shared/common.ts18
-rw-r--r--shared/ffmpeg/ffmpeg-command-wrapper.ts12
2 files changed, 25 insertions, 5 deletions
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts
index 88f7c33f1..dbeb9dfc1 100644
--- a/packages/peertube-runner/server/process/shared/common.ts
+++ b/packages/peertube-runner/server/process/shared/common.ts
@@ -1,5 +1,4 @@
1import { remove } from 'fs-extra' 1import { remove } from 'fs-extra'
2import { throttle } from 'lodash'
3import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' 2import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared'
4import { join } from 'path' 3import { join } from 'path'
5import { buildUUID } from '@shared/extra-utils' 4import { buildUUID } from '@shared/extra-utils'
@@ -60,17 +59,26 @@ export function buildFFmpegVOD (options: {
60 ? 500 59 ? 500
61 : 60000 60 : 60000
62 61
63 const updateJobProgress = throttle((progress: number) => { 62 let progress: number
64 if (progress < 0 || progress > 100) progress = undefined
65 63
64 const interval = setInterval(() => {
66 updateTranscodingProgress({ server, job, runnerToken, progress }) 65 updateTranscodingProgress({ server, job, runnerToken, progress })
67 .catch(err => logger.error({ err }, 'Cannot send job progress')) 66 .catch(err => logger.error({ err }, 'Cannot send job progress'))
68 }, updateInterval, { trailing: false }) 67 }, updateInterval)
69 68
70 return new FFmpegVOD({ 69 return new FFmpegVOD({
71 ...getCommonFFmpegOptions(), 70 ...getCommonFFmpegOptions(),
72 71
73 updateJobProgress 72 onError: () => clearInterval(interval),
73 onEnd: () => clearInterval(interval),
74
75 updateJobProgress: arg => {
76 if (arg < 0 || arg > 100) {
77 progress = undefined
78 } else {
79 progress = arg
80 }
81 }
74 }) 82 })
75} 83}
76 84
diff --git a/shared/ffmpeg/ffmpeg-command-wrapper.ts b/shared/ffmpeg/ffmpeg-command-wrapper.ts
index 7a8c19d4b..efb75c198 100644
--- a/shared/ffmpeg/ffmpeg-command-wrapper.ts
+++ b/shared/ffmpeg/ffmpeg-command-wrapper.ts
@@ -21,6 +21,8 @@ export interface FFmpegCommandWrapperOptions {
21 lTags?: { tags: string[] } 21 lTags?: { tags: string[] }
22 22
23 updateJobProgress?: (progress?: number) => void 23 updateJobProgress?: (progress?: number) => void
24 onEnd?: () => void
25 onError?: (err: Error) => void
24} 26}
25 27
26export class FFmpegCommandWrapper { 28export class FFmpegCommandWrapper {
@@ -37,6 +39,8 @@ export class FFmpegCommandWrapper {
37 private readonly lTags: { tags: string[] } 39 private readonly lTags: { tags: string[] }
38 40
39 private readonly updateJobProgress: (progress?: number) => void 41 private readonly updateJobProgress: (progress?: number) => void
42 private readonly onEnd?: () => void
43 private readonly onError?: (err: Error) => void
40 44
41 private command: FfmpegCommand 45 private command: FfmpegCommand
42 46
@@ -48,7 +52,11 @@ export class FFmpegCommandWrapper {
48 this.threads = options.threads 52 this.threads = options.threads
49 this.logger = options.logger 53 this.logger = options.logger
50 this.lTags = options.lTags || { tags: [] } 54 this.lTags = options.lTags || { tags: [] }
55
51 this.updateJobProgress = options.updateJobProgress 56 this.updateJobProgress = options.updateJobProgress
57
58 this.onEnd = options.onEnd
59 this.onError = options.onError
52 } 60 }
53 61
54 getAvailableEncoders () { 62 getAvailableEncoders () {
@@ -101,12 +109,16 @@ export class FFmpegCommandWrapper {
101 this.command.on('error', (err, stdout, stderr) => { 109 this.command.on('error', (err, stdout, stderr) => {
102 if (silent !== true) this.logger.error('Error in ffmpeg.', { stdout, stderr, shellCommand, ...this.lTags }) 110 if (silent !== true) this.logger.error('Error in ffmpeg.', { stdout, stderr, shellCommand, ...this.lTags })
103 111
112 if (this.onError) this.onError(err)
113
104 rej(err) 114 rej(err)
105 }) 115 })
106 116
107 this.command.on('end', (stdout, stderr) => { 117 this.command.on('end', (stdout, stderr) => {
108 this.logger.debug('FFmpeg command ended.', { stdout, stderr, shellCommand, ...this.lTags }) 118 this.logger.debug('FFmpeg command ended.', { stdout, stderr, shellCommand, ...this.lTags })
109 119
120 if (this.onEnd) this.onEnd()
121
110 res() 122 res()
111 }) 123 })
112 124