]>
Commit | Line | Data |
---|---|---|
1 | ||
2 | import { JobState } from '../../models' | |
3 | import { wait } from '../miscs' | |
4 | import { PeerTubeServer } from './server' | |
5 | ||
6 | async function waitJobs (serversArg: PeerTubeServer[] | PeerTubeServer, skipDelayed = false) { | |
7 | const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT | |
8 | ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10) | |
9 | : 250 | |
10 | ||
11 | let servers: PeerTubeServer[] | |
12 | ||
13 | if (Array.isArray(serversArg) === false) servers = [ serversArg as PeerTubeServer ] | |
14 | else servers = serversArg as PeerTubeServer[] | |
15 | ||
16 | const states: JobState[] = [ 'waiting', 'active' ] | |
17 | if (!skipDelayed) states.push('delayed') | |
18 | ||
19 | const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ] | |
20 | let pendingRequests: boolean | |
21 | ||
22 | function tasksBuilder () { | |
23 | const tasks: Promise<any>[] = [] | |
24 | ||
25 | // Check if each server has pending request | |
26 | for (const server of servers) { | |
27 | for (const state of states) { | |
28 | const p = server.jobs.list({ | |
29 | state, | |
30 | start: 0, | |
31 | count: 10, | |
32 | sort: '-createdAt' | |
33 | }).then(body => body.data) | |
34 | .then(jobs => jobs.filter(j => !repeatableJobs.includes(j.type))) | |
35 | .then(jobs => { | |
36 | if (jobs.length !== 0) { | |
37 | pendingRequests = true | |
38 | } | |
39 | }) | |
40 | ||
41 | tasks.push(p) | |
42 | } | |
43 | ||
44 | const p = server.debug.getDebug() | |
45 | .then(obj => { | |
46 | if (obj.activityPubMessagesWaiting !== 0) { | |
47 | pendingRequests = true | |
48 | } | |
49 | }) | |
50 | ||
51 | tasks.push(p) | |
52 | } | |
53 | ||
54 | return tasks | |
55 | } | |
56 | ||
57 | do { | |
58 | pendingRequests = false | |
59 | await Promise.all(tasksBuilder()) | |
60 | ||
61 | // Retry, in case of new jobs were created | |
62 | if (pendingRequests === false) { | |
63 | await wait(pendingJobWait) | |
64 | await Promise.all(tasksBuilder()) | |
65 | } | |
66 | ||
67 | if (pendingRequests) { | |
68 | await wait(pendingJobWait) | |
69 | } | |
70 | } while (pendingRequests) | |
71 | } | |
72 | ||
73 | // --------------------------------------------------------------------------- | |
74 | ||
75 | export { | |
76 | waitJobs | |
77 | } |