]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - shared/extra-utils/server/jobs.ts
Use an object to represent a server
[github/Chocobozzz/PeerTube.git] / shared / extra-utils / server / jobs.ts
1
2 import { JobState } from '../../models'
3 import { wait } from '../miscs'
4 import { PeerTubeServer } from './server'
5
6 async function waitJobs (serversArg: PeerTubeServer[] | PeerTubeServer) {
7 const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT
8 ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10)
9 : 250
10
11 let servers: PeerTubeServer[]
12
13 if (Array.isArray(serversArg) === false) servers = [ serversArg as PeerTubeServer ]
14 else servers = serversArg as PeerTubeServer[]
15
16 const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
17 const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ]
18 let pendingRequests: boolean
19
20 function tasksBuilder () {
21 const tasks: Promise<any>[] = []
22
23 // Check if each server has pending request
24 for (const server of servers) {
25 for (const state of states) {
26 const p = server.jobs.getJobsList({
27 state,
28 start: 0,
29 count: 10,
30 sort: '-createdAt'
31 }).then(body => body.data)
32 .then(jobs => jobs.filter(j => !repeatableJobs.includes(j.type)))
33 .then(jobs => {
34 if (jobs.length !== 0) {
35 pendingRequests = true
36 }
37 })
38
39 tasks.push(p)
40 }
41
42 const p = server.debug.getDebug()
43 .then(obj => {
44 if (obj.activityPubMessagesWaiting !== 0) {
45 pendingRequests = true
46 }
47 })
48
49 tasks.push(p)
50 }
51
52 return tasks
53 }
54
55 do {
56 pendingRequests = false
57 await Promise.all(tasksBuilder())
58
59 // Retry, in case of new jobs were created
60 if (pendingRequests === false) {
61 await wait(pendingJobWait)
62 await Promise.all(tasksBuilder())
63 }
64
65 if (pendingRequests) {
66 await wait(pendingJobWait)
67 }
68 } while (pendingRequests)
69 }
70
71 // ---------------------------------------------------------------------------
72
73 export {
74 waitJobs
75 }