]>
Commit | Line | Data |
---|---|---|
9c6327f8 C |
1 | |
2 | import { JobState } from '../../models' | |
d175a6f7 | 3 | import { wait } from '../miscs/miscs' |
bc22d608 | 4 | import { ServerInfo } from './servers' |
5cd80545 | 5 | |
3cd0734f | 6 | async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { |
fae6e4da C |
7 | const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT |
8 | ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10) | |
4b91bc15 | 9 | : 250 |
fae6e4da | 10 | |
3cd0734f C |
11 | let servers: ServerInfo[] |
12 | ||
13 | if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ] | |
14 | else servers = serversArg as ServerInfo[] | |
15 | ||
94831479 | 16 | const states: JobState[] = [ 'waiting', 'active', 'delayed' ] |
74d249bc | 17 | const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ] |
bfbd9128 | 18 | let pendingRequests: boolean |
3cd0734f | 19 | |
cef534ed C |
20 | function tasksBuilder () { |
21 | const tasks: Promise<any>[] = [] | |
3cd0734f C |
22 | |
23 | // Check if each server has pending request | |
24 | for (const server of servers) { | |
25 | for (const state of states) { | |
9c6327f8 C |
26 | const p = server.jobsCommand.getJobsList({ |
27 | state, | |
1061c73f C |
28 | start: 0, |
29 | count: 10, | |
30 | sort: '-createdAt' | |
9c6327f8 C |
31 | }).then(body => body.data) |
32 | .then(jobs => jobs.filter(j => !repeatableJobs.includes(j.type))) | |
6b616860 | 33 | .then(jobs => { |
2284f202 C |
34 | if (jobs.length !== 0) { |
35 | pendingRequests = true | |
36 | } | |
3cd0734f | 37 | }) |
fae6e4da | 38 | |
3cd0734f C |
39 | tasks.push(p) |
40 | } | |
fae6e4da | 41 | |
883a9019 C |
42 | const p = server.debugCommand.getDebug() |
43 | .then(obj => { | |
fae6e4da C |
44 | if (obj.activityPubMessagesWaiting !== 0) { |
45 | pendingRequests = true | |
46 | } | |
47 | }) | |
48 | ||
49 | tasks.push(p) | |
3cd0734f C |
50 | } |
51 | ||
cef534ed C |
52 | return tasks |
53 | } | |
54 | ||
55 | do { | |
bfbd9128 | 56 | pendingRequests = false |
cef534ed | 57 | await Promise.all(tasksBuilder()) |
3cd0734f C |
58 | |
59 | // Retry, in case of new jobs were created | |
60 | if (pendingRequests === false) { | |
2f1548fd | 61 | await wait(pendingJobWait) |
cef534ed | 62 | await Promise.all(tasksBuilder()) |
3cd0734f C |
63 | } |
64 | ||
65 | if (pendingRequests) { | |
4b91bc15 | 66 | await wait(pendingJobWait) |
3cd0734f C |
67 | } |
68 | } while (pendingRequests) | |
69 | } | |
70 | ||
5cd80545 C |
71 | // --------------------------------------------------------------------------- |
72 | ||
73 | export { | |
9c6327f8 | 74 | waitJobs |
5cd80545 | 75 | } |