aboutsummaryrefslogtreecommitdiffhomepage
path: root/shared/extra-utils/server/jobs.ts
blob: 763374e030e39323a0ef8983ebc437f57c186bfb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import * as request from 'supertest'
import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
import { getDebug, makeGetRequest } from '../../../shared/extra-utils'
import { Job, JobState, JobType, ServerDebug } from '../../models'
import { wait } from '../miscs/miscs'
import { ServerInfo } from './servers'

function buildJobsUrl (state?: JobState) {
  let path = '/api/v1/jobs'

  if (state) path += '/' + state

  return path
}

function getJobsList (url: string, accessToken: string, state?: JobState) {
  const path = buildJobsUrl(state)

  return request(url)
    .get(path)
    .set('Accept', 'application/json')
    .set('Authorization', 'Bearer ' + accessToken)
    .expect(HttpStatusCode.OK_200)
    .expect('Content-Type', /json/)
}

function getJobsListPaginationAndSort (options: {
  url: string
  accessToken: string
  start: number
  count: number
  sort: string
  state?: JobState
  jobType?: JobType
}) {
  const { url, accessToken, state, start, count, sort, jobType } = options
  const path = buildJobsUrl(state)

  const query = {
    start,
    count,
    sort,
    jobType
  }

  return makeGetRequest({
    url,
    path,
    token: accessToken,
    statusCodeExpected: HttpStatusCode.OK_200,
    query
  })
}

async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
  const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT
    ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10)
    : 250

  let servers: ServerInfo[]

  if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ]
  else servers = serversArg as ServerInfo[]

  const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
  const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ]
  let pendingRequests: boolean

  function tasksBuilder () {
    const tasks: Promise<any>[] = []

    // Check if each server has pending request
    for (const server of servers) {
      for (const state of states) {
        const p = getJobsListPaginationAndSort({
          url: server.url,
          accessToken: server.accessToken,
          state: state,
          start: 0,
          count: 10,
          sort: '-createdAt'
        }).then(res => res.body.data)
          .then((jobs: Job[]) => jobs.filter(j => !repeatableJobs.includes(j.type)))
          .then(jobs => {
            if (jobs.length !== 0) {
              pendingRequests = true
            }
          })

        tasks.push(p)
      }

      const p = getDebug(server.url, server.accessToken)
        .then(res => res.body)
        .then((obj: ServerDebug) => {
          if (obj.activityPubMessagesWaiting !== 0) {
            pendingRequests = true
          }
        })

      tasks.push(p)
    }

    return tasks
  }

  do {
    pendingRequests = false
    await Promise.all(tasksBuilder())

    // Retry, in case of new jobs were created
    if (pendingRequests === false) {
      await wait(pendingJobWait)
      await Promise.all(tasksBuilder())
    }

    if (pendingRequests) {
      await wait(pendingJobWait)
    }
  } while (pendingRequests)
}

// ---------------------------------------------------------------------------

export {
  getJobsList,
  waitJobs,
  getJobsListPaginationAndSort
}