diff options
author | Chocobozzz <me@florianbigard.com> | 2023-06-19 13:45:26 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-06-19 13:45:26 +0200 |
commit | a34c612f38af9c5f2c9f53931ed9df35ac834e90 (patch) | |
tree | 3d8f2b8a0d5c93dbe14a668cfe76cb974ce9ae74 | |
parent | 7d758898dc907b8643bab6ae2e70c3ca2a021fac (diff) | |
download | PeerTube-a34c612f38af9c5f2c9f53931ed9df35ac834e90.tar.gz PeerTube-a34c612f38af9c5f2c9f53931ed9df35ac834e90.tar.zst PeerTube-a34c612f38af9c5f2c9f53931ed9df35ac834e90.zip |
More robust runner update handler
-rw-r--r-- | packages/peertube-runner/server/process/shared/common.ts | 18 | ||||
-rw-r--r-- | shared/ffmpeg/ffmpeg-command-wrapper.ts | 12 |
2 files changed, 25 insertions, 5 deletions
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts index 88f7c33f1..dbeb9dfc1 100644 --- a/packages/peertube-runner/server/process/shared/common.ts +++ b/packages/peertube-runner/server/process/shared/common.ts | |||
@@ -1,5 +1,4 @@ | |||
1 | import { remove } from 'fs-extra' | 1 | import { remove } from 'fs-extra' |
2 | import { throttle } from 'lodash' | ||
3 | import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' | 2 | import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' |
4 | import { join } from 'path' | 3 | import { join } from 'path' |
5 | import { buildUUID } from '@shared/extra-utils' | 4 | import { buildUUID } from '@shared/extra-utils' |
@@ -60,17 +59,26 @@ export function buildFFmpegVOD (options: { | |||
60 | ? 500 | 59 | ? 500 |
61 | : 60000 | 60 | : 60000 |
62 | 61 | ||
63 | const updateJobProgress = throttle((progress: number) => { | 62 | let progress: number |
64 | if (progress < 0 || progress > 100) progress = undefined | ||
65 | 63 | ||
64 | const interval = setInterval(() => { | ||
66 | updateTranscodingProgress({ server, job, runnerToken, progress }) | 65 | updateTranscodingProgress({ server, job, runnerToken, progress }) |
67 | .catch(err => logger.error({ err }, 'Cannot send job progress')) | 66 | .catch(err => logger.error({ err }, 'Cannot send job progress')) |
68 | }, updateInterval, { trailing: false }) | 67 | }, updateInterval) |
69 | 68 | ||
70 | return new FFmpegVOD({ | 69 | return new FFmpegVOD({ |
71 | ...getCommonFFmpegOptions(), | 70 | ...getCommonFFmpegOptions(), |
72 | 71 | ||
73 | updateJobProgress | 72 | onError: () => clearInterval(interval), |
73 | onEnd: () => clearInterval(interval), | ||
74 | |||
75 | updateJobProgress: arg => { | ||
76 | if (arg < 0 || arg > 100) { | ||
77 | progress = undefined | ||
78 | } else { | ||
79 | progress = arg | ||
80 | } | ||
81 | } | ||
74 | }) | 82 | }) |
75 | } | 83 | } |
76 | 84 | ||
diff --git a/shared/ffmpeg/ffmpeg-command-wrapper.ts b/shared/ffmpeg/ffmpeg-command-wrapper.ts index 7a8c19d4b..efb75c198 100644 --- a/shared/ffmpeg/ffmpeg-command-wrapper.ts +++ b/shared/ffmpeg/ffmpeg-command-wrapper.ts | |||
@@ -21,6 +21,8 @@ export interface FFmpegCommandWrapperOptions { | |||
21 | lTags?: { tags: string[] } | 21 | lTags?: { tags: string[] } |
22 | 22 | ||
23 | updateJobProgress?: (progress?: number) => void | 23 | updateJobProgress?: (progress?: number) => void |
24 | onEnd?: () => void | ||
25 | onError?: (err: Error) => void | ||
24 | } | 26 | } |
25 | 27 | ||
26 | export class FFmpegCommandWrapper { | 28 | export class FFmpegCommandWrapper { |
@@ -37,6 +39,8 @@ export class FFmpegCommandWrapper { | |||
37 | private readonly lTags: { tags: string[] } | 39 | private readonly lTags: { tags: string[] } |
38 | 40 | ||
39 | private readonly updateJobProgress: (progress?: number) => void | 41 | private readonly updateJobProgress: (progress?: number) => void |
42 | private readonly onEnd?: () => void | ||
43 | private readonly onError?: (err: Error) => void | ||
40 | 44 | ||
41 | private command: FfmpegCommand | 45 | private command: FfmpegCommand |
42 | 46 | ||
@@ -48,7 +52,11 @@ export class FFmpegCommandWrapper { | |||
48 | this.threads = options.threads | 52 | this.threads = options.threads |
49 | this.logger = options.logger | 53 | this.logger = options.logger |
50 | this.lTags = options.lTags || { tags: [] } | 54 | this.lTags = options.lTags || { tags: [] } |
55 | |||
51 | this.updateJobProgress = options.updateJobProgress | 56 | this.updateJobProgress = options.updateJobProgress |
57 | |||
58 | this.onEnd = options.onEnd | ||
59 | this.onError = options.onError | ||
52 | } | 60 | } |
53 | 61 | ||
54 | getAvailableEncoders () { | 62 | getAvailableEncoders () { |
@@ -101,12 +109,16 @@ export class FFmpegCommandWrapper { | |||
101 | this.command.on('error', (err, stdout, stderr) => { | 109 | this.command.on('error', (err, stdout, stderr) => { |
102 | if (silent !== true) this.logger.error('Error in ffmpeg.', { stdout, stderr, shellCommand, ...this.lTags }) | 110 | if (silent !== true) this.logger.error('Error in ffmpeg.', { stdout, stderr, shellCommand, ...this.lTags }) |
103 | 111 | ||
112 | if (this.onError) this.onError(err) | ||
113 | |||
104 | rej(err) | 114 | rej(err) |
105 | }) | 115 | }) |
106 | 116 | ||
107 | this.command.on('end', (stdout, stderr) => { | 117 | this.command.on('end', (stdout, stderr) => { |
108 | this.logger.debug('FFmpeg command ended.', { stdout, stderr, shellCommand, ...this.lTags }) | 118 | this.logger.debug('FFmpeg command ended.', { stdout, stderr, shellCommand, ...this.lTags }) |
109 | 119 | ||
120 | if (this.onEnd) this.onEnd() | ||
121 | |||
110 | res() | 122 | res() |
111 | }) | 123 | }) |
112 | 124 | ||