]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame_incremental - shared/extra-utils/server/jobs.ts
Prepare changelog v3
[github/Chocobozzz/PeerTube.git] / shared / extra-utils / server / jobs.ts
... / ...
CommitLineData
1import * as request from 'supertest'
2import { Job, JobState, JobType } from '../../models'
3import { wait } from '../miscs/miscs'
4import { ServerInfo } from './servers'
5import { makeGetRequest } from '../../../shared/extra-utils'
6import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
7
8function getJobsList (url: string, accessToken: string, state: JobState) {
9 const path = '/api/v1/jobs/' + state
10
11 return request(url)
12 .get(path)
13 .set('Accept', 'application/json')
14 .set('Authorization', 'Bearer ' + accessToken)
15 .expect(HttpStatusCode.OK_200)
16 .expect('Content-Type', /json/)
17}
18
19function getJobsListPaginationAndSort (options: {
20 url: string
21 accessToken: string
22 state: JobState
23 start: number
24 count: number
25 sort: string
26 jobType?: JobType
27}) {
28 const { url, accessToken, state, start, count, sort, jobType } = options
29 const path = '/api/v1/jobs/' + state
30
31 const query = {
32 start,
33 count,
34 sort,
35 jobType
36 }
37
38 return makeGetRequest({
39 url,
40 path,
41 token: accessToken,
42 statusCodeExpected: HttpStatusCode.OK_200,
43 query
44 })
45}
46
47async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
48 const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10) : 2000
49 let servers: ServerInfo[]
50
51 if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ]
52 else servers = serversArg as ServerInfo[]
53
54 const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
55 let pendingRequests: boolean
56
57 function tasksBuilder () {
58 const tasks: Promise<any>[] = []
59
60 // Check if each server has pending request
61 for (const server of servers) {
62 for (const state of states) {
63 const p = getJobsListPaginationAndSort({
64 url: server.url,
65 accessToken: server.accessToken,
66 state: state,
67 start: 0,
68 count: 10,
69 sort: '-createdAt'
70 })
71 .then(res => res.body.data)
72 .then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views'))
73 .then(jobs => {
74 if (jobs.length !== 0) {
75 pendingRequests = true
76 }
77 })
78 tasks.push(p)
79 }
80 }
81
82 return tasks
83 }
84
85 do {
86 pendingRequests = false
87 await Promise.all(tasksBuilder())
88
89 // Retry, in case of new jobs were created
90 if (pendingRequests === false) {
91 await wait(pendingJobWait)
92 await Promise.all(tasksBuilder())
93 }
94
95 if (pendingRequests) {
96 await wait(1000)
97 }
98 } while (pendingRequests)
99}
100
101// ---------------------------------------------------------------------------
102
103export {
104 getJobsList,
105 waitJobs,
106 getJobsListPaginationAndSort
107}