]>
Commit | Line | Data |
---|---|---|
1 | ||
2 | import { JobState } from '../../models' | |
3 | import { wait } from '../miscs' | |
4 | import { PeerTubeServer } from './server' | |
5 | ||
6 | async function waitJobs (serversArg: PeerTubeServer[] | PeerTubeServer) { | |
7 | const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT | |
8 | ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10) | |
9 | : 250 | |
10 | ||
11 | let servers: PeerTubeServer[] | |
12 | ||
13 | if (Array.isArray(serversArg) === false) servers = [ serversArg as PeerTubeServer ] | |
14 | else servers = serversArg as PeerTubeServer[] | |
15 | ||
16 | const states: JobState[] = [ 'waiting', 'active', 'delayed' ] | |
17 | const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ] | |
18 | let pendingRequests: boolean | |
19 | ||
20 | function tasksBuilder () { | |
21 | const tasks: Promise<any>[] = [] | |
22 | ||
23 | // Check if each server has pending request | |
24 | for (const server of servers) { | |
25 | for (const state of states) { | |
26 | const p = server.jobs.getJobsList({ | |
27 | state, | |
28 | start: 0, | |
29 | count: 10, | |
30 | sort: '-createdAt' | |
31 | }).then(body => body.data) | |
32 | .then(jobs => jobs.filter(j => !repeatableJobs.includes(j.type))) | |
33 | .then(jobs => { | |
34 | if (jobs.length !== 0) { | |
35 | pendingRequests = true | |
36 | } | |
37 | }) | |
38 | ||
39 | tasks.push(p) | |
40 | } | |
41 | ||
42 | const p = server.debug.getDebug() | |
43 | .then(obj => { | |
44 | if (obj.activityPubMessagesWaiting !== 0) { | |
45 | pendingRequests = true | |
46 | } | |
47 | }) | |
48 | ||
49 | tasks.push(p) | |
50 | } | |
51 | ||
52 | return tasks | |
53 | } | |
54 | ||
55 | do { | |
56 | pendingRequests = false | |
57 | await Promise.all(tasksBuilder()) | |
58 | ||
59 | // Retry, in case of new jobs were created | |
60 | if (pendingRequests === false) { | |
61 | await wait(pendingJobWait) | |
62 | await Promise.all(tasksBuilder()) | |
63 | } | |
64 | ||
65 | if (pendingRequests) { | |
66 | await wait(pendingJobWait) | |
67 | } | |
68 | } while (pendingRequests) | |
69 | } | |
70 | ||
71 | // --------------------------------------------------------------------------- | |
72 | ||
73 | export { | |
74 | waitJobs | |
75 | } |