diff options
-rw-r--r-- | .github/workflows/test.yml | 2 | ||||
-rw-r--r-- | client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html | 2 | ||||
-rw-r--r-- | packages/peertube-runner/README.md | 22 | ||||
-rw-r--r-- | packages/peertube-runner/peertube-runner.ts | 8 | ||||
-rw-r--r-- | packages/peertube-runner/server/process/shared/process-live.ts | 10 | ||||
-rw-r--r-- | packages/peertube-runner/server/server.ts | 17 | ||||
-rw-r--r-- | packages/peertube-runner/shared/http.ts | 2 | ||||
-rw-r--r-- | packages/peertube-runner/shared/ipc/ipc-server.ts | 4 | ||||
-rwxr-xr-x | scripts/build/peertube-runner.sh | 2 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 7 | ||||
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 40 | ||||
-rw-r--r-- | server/lib/runners/job-handlers/abstract-job-handler.ts | 22 | ||||
-rw-r--r-- | server/tests/api/videos/resumable-upload.ts | 2 | ||||
-rw-r--r-- | server/tests/cli/create-import-video-file-job.ts | 2 | ||||
-rw-r--r-- | shared/server-commands/requests/requests.ts | 4 |
15 files changed, 97 insertions, 49 deletions
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 678b0674b..2288bcd3f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml | |||
@@ -77,7 +77,7 @@ jobs: | |||
77 | 77 | ||
78 | - name: Run Test | 78 | - name: Run Test |
79 | # external-plugins tests only run on schedule | 79 | # external-plugins tests only run on schedule |
80 | if: github.event_name == 'schedule' || matrix.test_suite != 'external-plugins' | 80 | # if: github.event_name == 'schedule' || matrix.test_suite != 'external-plugins' |
81 | env: | 81 | env: |
82 | AKISMET_KEY: ${{ secrets.AKISMET_KEY }} | 82 | AKISMET_KEY: ${{ secrets.AKISMET_KEY }} |
83 | run: npm run ci -- ${{ matrix.test_suite }} | 83 | run: npm run ci -- ${{ matrix.test_suite }} |
diff --git a/client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html b/client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html index 7858b4bca..d42f600e0 100644 --- a/client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html +++ b/client/src/app/+admin/system/runners/runner-job-list/runner-job-list.component.html | |||
@@ -84,7 +84,7 @@ | |||
84 | <pre>{{ runnerJob.privatePayload }}</pre> | 84 | <pre>{{ runnerJob.privatePayload }}</pre> |
85 | </div> | 85 | </div> |
86 | 86 | ||
87 | <pre *ngIf="runnerJob.error" class=".text-danger" >{{ runnerJob.error }}</pre> | 87 | <pre *ngIf="runnerJob.error" class="text-danger" >{{ runnerJob.error }}</pre> |
88 | </td> | 88 | </td> |
89 | </tr> | 89 | </tr> |
90 | </ng-template> | 90 | </ng-template> |
diff --git a/packages/peertube-runner/README.md b/packages/peertube-runner/README.md index b7cf174d5..87c3c5354 100644 --- a/packages/peertube-runner/README.md +++ b/packages/peertube-runner/README.md | |||
@@ -1 +1,23 @@ | |||
1 | # PeerTube runner | 1 | # PeerTube runner |
2 | |||
3 | Runner program to execute jobs (transcoding...) of remote PeerTube instances. | ||
4 | |||
5 | Commands below has to be run at the root of PeerTube git repository. | ||
6 | |||
7 | ## Develop | ||
8 | |||
9 | ```bash | ||
10 | npm run dev:peertube-runner | ||
11 | ``` | ||
12 | |||
13 | ## Build | ||
14 | |||
15 | ```bash | ||
16 | npm run build:peertube-runner | ||
17 | ``` | ||
18 | |||
19 | ## Run | ||
20 | |||
21 | ```bash | ||
22 | node packages/peertube-runner/dist/peertube-runner.js --help | ||
23 | ``` | ||
diff --git a/packages/peertube-runner/peertube-runner.ts b/packages/peertube-runner/peertube-runner.ts index 6bfd9ac0f..f02526ef9 100644 --- a/packages/peertube-runner/peertube-runner.ts +++ b/packages/peertube-runner/peertube-runner.ts | |||
@@ -26,7 +26,7 @@ program.command('server') | |||
26 | try { | 26 | try { |
27 | await RunnerServer.Instance.run() | 27 | await RunnerServer.Instance.run() |
28 | } catch (err) { | 28 | } catch (err) { |
29 | console.error('Cannot run PeerTube runner as server mode', err) | 29 | logger.error('Cannot run PeerTube runner as server mode', err) |
30 | process.exit(-1) | 30 | process.exit(-1) |
31 | } | 31 | } |
32 | }) | 32 | }) |
@@ -41,7 +41,7 @@ program.command('register') | |||
41 | try { | 41 | try { |
42 | await registerRunner(options) | 42 | await registerRunner(options) |
43 | } catch (err) { | 43 | } catch (err) { |
44 | console.error('Cannot register this PeerTube runner.', err) | 44 | logger.error('Cannot register this PeerTube runner.', err) |
45 | process.exit(-1) | 45 | process.exit(-1) |
46 | } | 46 | } |
47 | }) | 47 | }) |
@@ -53,7 +53,7 @@ program.command('unregister') | |||
53 | try { | 53 | try { |
54 | await unregisterRunner(options) | 54 | await unregisterRunner(options) |
55 | } catch (err) { | 55 | } catch (err) { |
56 | console.error('Cannot unregister this PeerTube runner.', err) | 56 | logger.error('Cannot unregister this PeerTube runner.', err) |
57 | process.exit(-1) | 57 | process.exit(-1) |
58 | } | 58 | } |
59 | }) | 59 | }) |
@@ -64,7 +64,7 @@ program.command('list-registered') | |||
64 | try { | 64 | try { |
65 | await listRegistered() | 65 | await listRegistered() |
66 | } catch (err) { | 66 | } catch (err) { |
67 | console.error('Cannot list registered PeerTube instances.', err) | 67 | logger.error('Cannot list registered PeerTube instances.', err) |
68 | process.exit(-1) | 68 | process.exit(-1) |
69 | } | 69 | } |
70 | }) | 70 | }) |
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts index 5a3b596a2..b17b51c7c 100644 --- a/packages/peertube-runner/server/process/shared/process-live.ts +++ b/packages/peertube-runner/server/process/shared/process-live.ts | |||
@@ -204,8 +204,8 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
204 | 204 | ||
205 | // --------------------------------------------------------------------------- | 205 | // --------------------------------------------------------------------------- |
206 | 206 | ||
207 | private sendDeletedChunkUpdate (deletedChunk: string) { | 207 | private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> { |
208 | if (this.ended) return | 208 | if (this.ended) return Promise.resolve() |
209 | 209 | ||
210 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) | 210 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) |
211 | 211 | ||
@@ -230,8 +230,8 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
230 | return this.updateWithRetry(payload) | 230 | return this.updateWithRetry(payload) |
231 | } | 231 | } |
232 | 232 | ||
233 | private sendAddedChunkUpdate (addedChunk: string) { | 233 | private sendAddedChunkUpdate (addedChunk: string): Promise<any> { |
234 | if (this.ended) return | 234 | if (this.ended) return Promise.resolve() |
235 | 235 | ||
236 | logger.debug(`Sending added live chunk ${addedChunk} update`) | 236 | logger.debug(`Sending added live chunk ${addedChunk} update`) |
237 | 237 | ||
@@ -257,7 +257,7 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
257 | return this.updateWithRetry(payload) | 257 | return this.updateWithRetry(payload) |
258 | } | 258 | } |
259 | 259 | ||
260 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) { | 260 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> { |
261 | if (this.ended || this.errored) return | 261 | if (this.ended || this.errored) return |
262 | 262 | ||
263 | try { | 263 | try { |
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index 724f359bd..e851dfc7c 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts | |||
@@ -23,6 +23,8 @@ export class RunnerServer { | |||
23 | 23 | ||
24 | private checkingAvailableJobs = false | 24 | private checkingAvailableJobs = false |
25 | 25 | ||
26 | private cleaningUp = false | ||
27 | |||
26 | private readonly sockets = new Map<PeerTubeServer, Socket>() | 28 | private readonly sockets = new Map<PeerTubeServer, Socket>() |
27 | 29 | ||
28 | private constructor () {} | 30 | private constructor () {} |
@@ -45,13 +47,17 @@ export class RunnerServer { | |||
45 | try { | 47 | try { |
46 | await ipcServer.run(this) | 48 | await ipcServer.run(this) |
47 | } catch (err) { | 49 | } catch (err) { |
48 | console.error('Cannot start local socket for IPC communication', err) | 50 | logger.error('Cannot start local socket for IPC communication', err) |
49 | process.exit(-1) | 51 | process.exit(-1) |
50 | } | 52 | } |
51 | 53 | ||
52 | // Cleanup on exit | 54 | // Cleanup on exit |
53 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { | 55 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { |
54 | process.on(code, async () => { | 56 | process.on(code, async (err, origin) => { |
57 | if (code === 'uncaughtException') { | ||
58 | logger.error({ err, origin }, 'uncaughtException') | ||
59 | } | ||
60 | |||
55 | await this.onExit() | 61 | await this.onExit() |
56 | }) | 62 | }) |
57 | } | 63 | } |
@@ -244,6 +250,11 @@ export class RunnerServer { | |||
244 | } | 250 | } |
245 | 251 | ||
246 | private async onExit () { | 252 | private async onExit () { |
253 | if (this.cleaningUp) return | ||
254 | this.cleaningUp = true | ||
255 | |||
256 | logger.info('Cleaning up after program exit') | ||
257 | |||
247 | try { | 258 | try { |
248 | for (const { server, job } of this.processingJobs) { | 259 | for (const { server, job } of this.processingJobs) { |
249 | await server.runnerJobs.abort({ | 260 | await server.runnerJobs.abort({ |
@@ -256,7 +267,7 @@ export class RunnerServer { | |||
256 | 267 | ||
257 | await this.cleanupTMP() | 268 | await this.cleanupTMP() |
258 | } catch (err) { | 269 | } catch (err) { |
259 | console.error(err) | 270 | logger.error(err) |
260 | process.exit(-1) | 271 | process.exit(-1) |
261 | } | 272 | } |
262 | 273 | ||
diff --git a/packages/peertube-runner/shared/http.ts b/packages/peertube-runner/shared/http.ts index d3fff70d1..df64dc168 100644 --- a/packages/peertube-runner/shared/http.ts +++ b/packages/peertube-runner/shared/http.ts | |||
@@ -47,7 +47,7 @@ export function downloadFile (options: { | |||
47 | 47 | ||
48 | request.on('error', err => { | 48 | request.on('error', err => { |
49 | remove(destination) | 49 | remove(destination) |
50 | .catch(err => console.error(err)) | 50 | .catch(err => logger.error(err)) |
51 | 51 | ||
52 | return rej(err) | 52 | return rej(err) |
53 | }) | 53 | }) |
diff --git a/packages/peertube-runner/shared/ipc/ipc-server.ts b/packages/peertube-runner/shared/ipc/ipc-server.ts index bc340198b..922dc93e5 100644 --- a/packages/peertube-runner/shared/ipc/ipc-server.ts +++ b/packages/peertube-runner/shared/ipc/ipc-server.ts | |||
@@ -27,7 +27,7 @@ export class IPCServer { | |||
27 | 27 | ||
28 | this.sendReponse(res, { success: true, data }) | 28 | this.sendReponse(res, { success: true, data }) |
29 | } catch (err) { | 29 | } catch (err) { |
30 | console.error('Cannot execute RPC call', err) | 30 | logger.error('Cannot execute RPC call', err) |
31 | this.sendReponse(res, { success: false, error: err.message }) | 31 | this.sendReponse(res, { success: false, error: err.message }) |
32 | } | 32 | } |
33 | }) | 33 | }) |
@@ -56,6 +56,6 @@ export class IPCServer { | |||
56 | body: IPCReponse<T> | 56 | body: IPCReponse<T> |
57 | ) { | 57 | ) { |
58 | response(body) | 58 | response(body) |
59 | .catch(err => console.error('Cannot send response after IPC request', err)) | 59 | .catch(err => logger.error('Cannot send response after IPC request', err)) |
60 | } | 60 | } |
61 | } | 61 | } |
diff --git a/scripts/build/peertube-runner.sh b/scripts/build/peertube-runner.sh index 690031af5..9c326747b 100755 --- a/scripts/build/peertube-runner.sh +++ b/scripts/build/peertube-runner.sh | |||
@@ -10,4 +10,4 @@ rm -rf ./dist | |||
10 | rm -rf ./dist | 10 | rm -rf ./dist |
11 | mkdir ./dist | 11 | mkdir ./dist |
12 | 12 | ||
13 | ./node_modules/.bin/esbuild ./peertube-runner.ts --bundle --platform=node --external:"./lib-cov/fluent-ffmpeg" --external:pg-hstore --outfile=dist/peertube-runner.js | 13 | ./node_modules/.bin/esbuild ./peertube-runner.ts --bundle --platform=node --target=node14 --external:"./lib-cov/fluent-ffmpeg" --external:pg-hstore --outfile=dist/peertube-runner.js |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 21bf0f226..03f6fbea7 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -96,6 +96,7 @@ export type CreateJobArgument = | |||
96 | export type CreateJobOptions = { | 96 | export type CreateJobOptions = { |
97 | delay?: number | 97 | delay?: number |
98 | priority?: number | 98 | priority?: number |
99 | failParentOnFailure?: boolean | ||
99 | } | 100 | } |
100 | 101 | ||
101 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | 102 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { |
@@ -363,7 +364,11 @@ class JobQueue { | |||
363 | name: 'job', | 364 | name: 'job', |
364 | data: job.payload, | 365 | data: job.payload, |
365 | queueName: job.type, | 366 | queueName: job.type, |
366 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) | 367 | opts: { |
368 | failParentOnFailure: true, | ||
369 | |||
370 | ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ])) | ||
371 | } | ||
367 | } | 372 | } |
368 | } | 373 | } |
369 | 374 | ||
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index f3f8fc886..ef4ecb83e 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -79,9 +79,7 @@ class MuxingSession extends EventEmitter { | |||
79 | private streamingPlaylist: MStreamingPlaylistVideo | 79 | private streamingPlaylist: MStreamingPlaylistVideo |
80 | private liveSegmentShaStore: LiveSegmentShaStore | 80 | private liveSegmentShaStore: LiveSegmentShaStore |
81 | 81 | ||
82 | private tsWatcher: FSWatcher | 82 | private filesWatcher: FSWatcher |
83 | private masterWatcher: FSWatcher | ||
84 | private m3u8Watcher: FSWatcher | ||
85 | 83 | ||
86 | private masterPlaylistCreated = false | 84 | private masterPlaylistCreated = false |
87 | private liveReady = false | 85 | private liveReady = false |
@@ -149,6 +147,8 @@ class MuxingSession extends EventEmitter { | |||
149 | 147 | ||
150 | await this.transcodingWrapper.run() | 148 | await this.transcodingWrapper.run() |
151 | 149 | ||
150 | this.filesWatcher = watch(this.outDirectory, { depth: 0 }) | ||
151 | |||
152 | this.watchMasterFile() | 152 | this.watchMasterFile() |
153 | this.watchTSFiles() | 153 | this.watchTSFiles() |
154 | this.watchM3U8File() | 154 | this.watchM3U8File() |
@@ -168,9 +168,10 @@ class MuxingSession extends EventEmitter { | |||
168 | } | 168 | } |
169 | 169 | ||
170 | private watchMasterFile () { | 170 | private watchMasterFile () { |
171 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) | 171 | this.filesWatcher.on('add', async path => { |
172 | if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return | ||
173 | if (this.masterPlaylistCreated === true) return | ||
172 | 174 | ||
173 | this.masterWatcher.on('add', async () => { | ||
174 | try { | 175 | try { |
175 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | 176 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { |
176 | const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) | 177 | const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) |
@@ -188,20 +189,18 @@ class MuxingSession extends EventEmitter { | |||
188 | this.masterPlaylistCreated = true | 189 | this.masterPlaylistCreated = true |
189 | 190 | ||
190 | logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) | 191 | logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) |
191 | |||
192 | this.masterWatcher.close() | ||
193 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) | ||
194 | }) | 192 | }) |
195 | } | 193 | } |
196 | 194 | ||
197 | private watchM3U8File () { | 195 | private watchM3U8File () { |
198 | this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') | ||
199 | |||
200 | const sendQueues = new Map<string, PQueue>() | 196 | const sendQueues = new Map<string, PQueue>() |
201 | 197 | ||
202 | const onChangeOrAdd = async (m3u8Path: string) => { | 198 | const onChange = async (m3u8Path: string) => { |
199 | if (m3u8Path.endsWith('.m3u8') !== true) return | ||
203 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return | 200 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return |
204 | 201 | ||
202 | logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags()) | ||
203 | |||
205 | try { | 204 | try { |
206 | if (!sendQueues.has(m3u8Path)) { | 205 | if (!sendQueues.has(m3u8Path)) { |
207 | sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) | 206 | sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) |
@@ -214,18 +213,18 @@ class MuxingSession extends EventEmitter { | |||
214 | } | 213 | } |
215 | } | 214 | } |
216 | 215 | ||
217 | this.m3u8Watcher.on('change', onChangeOrAdd) | 216 | this.filesWatcher.on('change', onChange) |
218 | } | 217 | } |
219 | 218 | ||
220 | private watchTSFiles () { | 219 | private watchTSFiles () { |
221 | const startStreamDateTime = new Date().getTime() | 220 | const startStreamDateTime = new Date().getTime() |
222 | 221 | ||
223 | this.tsWatcher = watch(this.outDirectory + '/*.ts') | ||
224 | |||
225 | const playlistIdMatcher = /^([\d+])-/ | 222 | const playlistIdMatcher = /^([\d+])-/ |
226 | 223 | ||
227 | const addHandler = async (segmentPath: string) => { | 224 | const addHandler = async (segmentPath: string) => { |
228 | logger.debug('Live add handler of %s.', segmentPath, this.lTags()) | 225 | if (segmentPath.endsWith('.ts') !== true) return |
226 | |||
227 | logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) | ||
229 | 228 | ||
230 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] | 229 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] |
231 | 230 | ||
@@ -252,6 +251,10 @@ class MuxingSession extends EventEmitter { | |||
252 | } | 251 | } |
253 | 252 | ||
254 | const deleteHandler = async (segmentPath: string) => { | 253 | const deleteHandler = async (segmentPath: string) => { |
254 | if (segmentPath.endsWith('.ts') !== true) return | ||
255 | |||
256 | logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags()) | ||
257 | |||
255 | try { | 258 | try { |
256 | await this.liveSegmentShaStore.removeSegmentSha(segmentPath) | 259 | await this.liveSegmentShaStore.removeSegmentSha(segmentPath) |
257 | } catch (err) { | 260 | } catch (err) { |
@@ -267,8 +270,8 @@ class MuxingSession extends EventEmitter { | |||
267 | } | 270 | } |
268 | } | 271 | } |
269 | 272 | ||
270 | this.tsWatcher.on('add', p => addHandler(p)) | 273 | this.filesWatcher.on('add', p => addHandler(p)) |
271 | this.tsWatcher.on('unlink', p => deleteHandler(p)) | 274 | this.filesWatcher.on('unlink', p => deleteHandler(p)) |
272 | } | 275 | } |
273 | 276 | ||
274 | private async isQuotaExceeded (segmentPath: string) { | 277 | private async isQuotaExceeded (segmentPath: string) { |
@@ -371,7 +374,8 @@ class MuxingSession extends EventEmitter { | |||
371 | setTimeout(() => { | 374 | setTimeout(() => { |
372 | // Wait latest segments generation, and close watchers | 375 | // Wait latest segments generation, and close watchers |
373 | 376 | ||
374 | Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) | 377 | const promise = this.filesWatcher?.close() || Promise.resolve() |
378 | promise | ||
375 | .then(() => { | 379 | .then(() => { |
376 | // Process remaining segments hash | 380 | // Process remaining segments hash |
377 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | 381 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { |
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts index 73fc14574..74b455107 100644 --- a/server/lib/runners/job-handlers/abstract-job-handler.ts +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts | |||
@@ -21,6 +21,7 @@ import { | |||
21 | RunnerJobVODWebVideoTranscodingPayload, | 21 | RunnerJobVODWebVideoTranscodingPayload, |
22 | RunnerJobVODWebVideoTranscodingPrivatePayload | 22 | RunnerJobVODWebVideoTranscodingPrivatePayload |
23 | } from '@shared/models' | 23 | } from '@shared/models' |
24 | import { throttle } from 'lodash' | ||
24 | 25 | ||
25 | type CreateRunnerJobArg = | 26 | type CreateRunnerJobArg = |
26 | { | 27 | { |
@@ -48,6 +49,8 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S | |||
48 | 49 | ||
49 | protected readonly lTags = loggerTagsFactory('runner') | 50 | protected readonly lTags = loggerTagsFactory('runner') |
50 | 51 | ||
52 | static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000) | ||
53 | |||
51 | // --------------------------------------------------------------------------- | 54 | // --------------------------------------------------------------------------- |
52 | 55 | ||
53 | abstract create (options: C): Promise<MRunnerJob> | 56 | abstract create (options: C): Promise<MRunnerJob> |
@@ -102,16 +105,19 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S | |||
102 | 105 | ||
103 | if (progress) runnerJob.progress = progress | 106 | if (progress) runnerJob.progress = progress |
104 | 107 | ||
108 | if (!runnerJob.changed()) { | ||
109 | try { | ||
110 | await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id }) | ||
111 | } catch (err) { | ||
112 | logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | ||
113 | } | ||
114 | |||
115 | return | ||
116 | } | ||
117 | |||
105 | await retryTransactionWrapper(() => { | 118 | await retryTransactionWrapper(() => { |
106 | return sequelizeTypescript.transaction(async transaction => { | 119 | return sequelizeTypescript.transaction(async transaction => { |
107 | if (runnerJob.changed()) { | 120 | return runnerJob.save({ transaction }) |
108 | return runnerJob.save({ transaction }) | ||
109 | } | ||
110 | |||
111 | // Don't update the job too often | ||
112 | if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) { | ||
113 | await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction }) | ||
114 | } | ||
115 | }) | 121 | }) |
116 | }) | 122 | }) |
117 | } | 123 | } |
diff --git a/server/tests/api/videos/resumable-upload.ts b/server/tests/api/videos/resumable-upload.ts index a70a7258b..2fbefb392 100644 --- a/server/tests/api/videos/resumable-upload.ts +++ b/server/tests/api/videos/resumable-upload.ts | |||
@@ -79,7 +79,7 @@ describe('Test resumable upload', function () { | |||
79 | async function checkFileSize (uploadIdArg: string, expectedSize: number | null) { | 79 | async function checkFileSize (uploadIdArg: string, expectedSize: number | null) { |
80 | const uploadId = uploadIdArg.replace(/^upload_id=/, '') | 80 | const uploadId = uploadIdArg.replace(/^upload_id=/, '') |
81 | 81 | ||
82 | const subPath = join('tmp', 'resumable-uploads', uploadId) | 82 | const subPath = join('tmp', 'resumable-uploads', `${rootId}-${uploadId}.mp4`) |
83 | const filePath = server.servers.buildDirectory(subPath) | 83 | const filePath = server.servers.buildDirectory(subPath) |
84 | const exists = await pathExists(filePath) | 84 | const exists = await pathExists(filePath) |
85 | 85 | ||
diff --git a/server/tests/cli/create-import-video-file-job.ts b/server/tests/cli/create-import-video-file-job.ts index 43f53035b..3ece4f2ec 100644 --- a/server/tests/cli/create-import-video-file-job.ts +++ b/server/tests/cli/create-import-video-file-job.ts | |||
@@ -73,7 +73,7 @@ function runTests (objectStorage: boolean) { | |||
73 | }) | 73 | }) |
74 | 74 | ||
75 | it('Should run a import job on video 1 with a lower resolution', async function () { | 75 | it('Should run a import job on video 1 with a lower resolution', async function () { |
76 | const command = `npm run create-import-video-file-job -- -v ${video1ShortId} -i server/tests/fixtures/video_short-480.webm` | 76 | const command = `npm run create-import-video-file-job -- -v ${video1ShortId} -i server/tests/fixtures/video_short_480.webm` |
77 | await servers[0].cli.execWithEnv(command) | 77 | await servers[0].cli.execWithEnv(command) |
78 | 78 | ||
79 | await waitJobs(servers) | 79 | await waitJobs(servers) |
diff --git a/shared/server-commands/requests/requests.ts b/shared/server-commands/requests/requests.ts index 96f67b4c7..e3f1817f1 100644 --- a/shared/server-commands/requests/requests.ts +++ b/shared/server-commands/requests/requests.ts | |||
@@ -159,7 +159,7 @@ function unwrapBodyOrDecodeToJSON <T> (test: request.Test): Promise<T> { | |||
159 | try { | 159 | try { |
160 | return JSON.parse(new TextDecoder().decode(res.body)) | 160 | return JSON.parse(new TextDecoder().decode(res.body)) |
161 | } catch (err) { | 161 | } catch (err) { |
162 | console.error('Cannot decode JSON.', res.body instanceof Buffer ? res.body.toString() : res.body) | 162 | console.error('Cannot decode JSON.', { res, body: res.body instanceof Buffer ? res.body.toString() : res.body }) |
163 | throw err | 163 | throw err |
164 | } | 164 | } |
165 | } | 165 | } |
@@ -168,7 +168,7 @@ function unwrapBodyOrDecodeToJSON <T> (test: request.Test): Promise<T> { | |||
168 | try { | 168 | try { |
169 | return JSON.parse(res.text) | 169 | return JSON.parse(res.text) |
170 | } catch (err) { | 170 | } catch (err) { |
171 | console.error('Cannot decode json', res.text) | 171 | console.error('Cannot decode json', { res, text: res.text }) |
172 | throw err | 172 | throw err |
173 | } | 173 | } |
174 | } | 174 | } |