aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
blob: 7a2b6c78d43883bf4fc9c5a10ae7050c62b3b51b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import * as kue from 'kue'
import { JobType, JobState } from '../../../shared/models'
import { logger } from '../../helpers/logger'
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { processVideoFile, VideoFilePayload } from './handlers/video-file'

type CreateJobArgument =
  { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
  { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
  { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
  { type: 'video-file', payload: VideoFilePayload }

const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
  'activitypub-http-broadcast': processActivityPubHttpBroadcast,
  'activitypub-http-unicast': processActivityPubHttpUnicast,
  'activitypub-http-fetcher': processActivityPubHttpFetcher,
  'video-file': processVideoFile
}

class JobQueue {

  private static instance: JobQueue

  private jobQueue: kue.Queue
  private initialized = false

  private constructor () {}

  init () {
    // Already initialized
    if (this.initialized === true) return
    this.initialized = true

    this.jobQueue = kue.createQueue({
      prefix: 'q-' + CONFIG.WEBSERVER.HOST,
      redis: {
        host: CONFIG.REDIS.HOSTNAME,
        port: CONFIG.REDIS.PORT,
        auth: CONFIG.REDIS.AUTH
      }
    })

    this.jobQueue.on('error', err => {
      logger.error('Error in job queue.', err)
      process.exit(-1)
    })
    this.jobQueue.watchStuckJobs(5000)

    for (const handlerName of Object.keys(handlers)) {
      this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
        try {
          const res = await handlers[ handlerName ](job)
          return done(null, res)
        } catch (err) {
          return done(err)
        }
      })
    }
  }

  createJob (obj: CreateJobArgument, priority = 'normal') {
    return new Promise((res, rej) => {
      this.jobQueue
        .create(obj.type, obj.payload)
        .priority(priority)
        .attempts(JOB_ATTEMPTS[obj.type])
        .backoff({ type: 'exponential' })
        .save(err => {
          if (err) return rej(err)

          return res()
        })
    })
  }

  listForApi (state: JobState, start: number, count: number, sort: string) {
    return new Promise<kue.Job[]>((res, rej) => {
      kue.Job.rangeByState(state, start, count, sort, (err, jobs) => {
        if (err) return rej(err)

        return res(jobs)
      })
    })
  }

  count (state: JobState) {
    return new Promise<number>((res, rej) => {
      this.jobQueue[state + 'Count']((err, total) => {
        if (err) return rej(err)

        return res(total)
      })
    })
  }

  removeOldJobs () {
    const now = new Date().getTime()
    kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
      if (err) {
        logger.error('Cannot get jobs when removing old jobs.', err)
        return
      }

      for (const job of jobs) {
        if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
          job.remove()
        }
      }
    })
  }

  static get Instance () {
    return this.instance || (this.instance = new this())
  }
}

// ---------------------------------------------------------------------------

export {
  JobQueue
}