]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - shared/extra-utils/server/jobs.ts
Introduce jobs command
[github/Chocobozzz/PeerTube.git] / shared / extra-utils / server / jobs.ts
CommitLineData
9c6327f8
C
1
2import { JobState } from '../../models'
d175a6f7 3import { wait } from '../miscs/miscs'
bc22d608 4import { ServerInfo } from './servers'
5cd80545 5
3cd0734f 6async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
fae6e4da
C
7 const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT
8 ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10)
4b91bc15 9 : 250
fae6e4da 10
3cd0734f
C
11 let servers: ServerInfo[]
12
13 if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ]
14 else servers = serversArg as ServerInfo[]
15
94831479 16 const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
74d249bc 17 const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ]
bfbd9128 18 let pendingRequests: boolean
3cd0734f 19
cef534ed
C
20 function tasksBuilder () {
21 const tasks: Promise<any>[] = []
3cd0734f
C
22
23 // Check if each server has pending request
24 for (const server of servers) {
25 for (const state of states) {
9c6327f8
C
26 const p = server.jobsCommand.getJobsList({
27 state,
1061c73f
C
28 start: 0,
29 count: 10,
30 sort: '-createdAt'
9c6327f8
C
31 }).then(body => body.data)
32 .then(jobs => jobs.filter(j => !repeatableJobs.includes(j.type)))
6b616860 33 .then(jobs => {
2284f202
C
34 if (jobs.length !== 0) {
35 pendingRequests = true
36 }
3cd0734f 37 })
fae6e4da 38
3cd0734f
C
39 tasks.push(p)
40 }
fae6e4da 41
883a9019
C
42 const p = server.debugCommand.getDebug()
43 .then(obj => {
fae6e4da
C
44 if (obj.activityPubMessagesWaiting !== 0) {
45 pendingRequests = true
46 }
47 })
48
49 tasks.push(p)
3cd0734f
C
50 }
51
cef534ed
C
52 return tasks
53 }
54
55 do {
bfbd9128 56 pendingRequests = false
cef534ed 57 await Promise.all(tasksBuilder())
3cd0734f
C
58
59 // Retry, in case of new jobs were created
60 if (pendingRequests === false) {
2f1548fd 61 await wait(pendingJobWait)
cef534ed 62 await Promise.all(tasksBuilder())
3cd0734f
C
63 }
64
65 if (pendingRequests) {
4b91bc15 66 await wait(pendingJobWait)
3cd0734f
C
67 }
68 } while (pendingRequests)
69}
70
5cd80545
C
71// ---------------------------------------------------------------------------
72
73export {
9c6327f8 74 waitJobs
5cd80545 75}