]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/job-queue.ts
Improve youtube import script
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
CommitLineData
94a5ff8a
C
1import * as kue from 'kue'
2import { JobType, JobState } from '../../../shared/models'
3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
ecb4e35f 8import { EmailPayload, processEmail } from './handlers/email'
94a5ff8a
C
9import { processVideoFile, VideoFilePayload } from './handlers/video-file'
10
11type CreateJobArgument =
12 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
13 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
14 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
ecb4e35f
C
15 { type: 'video-file', payload: VideoFilePayload } |
16 { type: 'email', payload: EmailPayload }
94a5ff8a
C
17
18const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
19 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
20 'activitypub-http-unicast': processActivityPubHttpUnicast,
21 'activitypub-http-fetcher': processActivityPubHttpFetcher,
ecb4e35f
C
22 'video-file': processVideoFile,
23 'email': processEmail
94a5ff8a
C
24}
25
26class JobQueue {
27
28 private static instance: JobQueue
29
30 private jobQueue: kue.Queue
31 private initialized = false
32
33 private constructor () {}
34
35 init () {
36 // Already initialized
37 if (this.initialized === true) return
38 this.initialized = true
39
40 this.jobQueue = kue.createQueue({
41 prefix: 'q-' + CONFIG.WEBSERVER.HOST,
42 redis: {
43 host: CONFIG.REDIS.HOSTNAME,
44 port: CONFIG.REDIS.PORT,
45 auth: CONFIG.REDIS.AUTH
46 }
47 })
48
ecb4e35f
C
49 this.jobQueue.setMaxListeners(15)
50
94a5ff8a
C
51 this.jobQueue.on('error', err => {
52 logger.error('Error in job queue.', err)
53 process.exit(-1)
54 })
55 this.jobQueue.watchStuckJobs(5000)
56
57 for (const handlerName of Object.keys(handlers)) {
58 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
59 try {
60 const res = await handlers[ handlerName ](job)
61 return done(null, res)
62 } catch (err) {
63 return done(err)
64 }
65 })
66 }
67 }
68
69 createJob (obj: CreateJobArgument, priority = 'normal') {
70 return new Promise((res, rej) => {
71 this.jobQueue
72 .create(obj.type, obj.payload)
73 .priority(priority)
74 .attempts(JOB_ATTEMPTS[obj.type])
802dbc32 75 .backoff({ delay: 60 * 1000, type: 'exponential' })
94a5ff8a
C
76 .save(err => {
77 if (err) return rej(err)
78
79 return res()
80 })
81 })
82 }
83
84 listForApi (state: JobState, start: number, count: number, sort: string) {
85 return new Promise<kue.Job[]>((res, rej) => {
18cfac83 86 kue.Job.rangeByState(state, start, start + count - 1, sort, (err, jobs) => {
94a5ff8a
C
87 if (err) return rej(err)
88
89 return res(jobs)
90 })
91 })
92 }
93
94 count (state: JobState) {
95 return new Promise<number>((res, rej) => {
96 this.jobQueue[state + 'Count']((err, total) => {
97 if (err) return rej(err)
98
99 return res(total)
100 })
101 })
102 }
103
104 removeOldJobs () {
105 const now = new Date().getTime()
106 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
107 if (err) {
108 logger.error('Cannot get jobs when removing old jobs.', err)
109 return
110 }
111
112 for (const job of jobs) {
113 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
114 job.remove()
115 }
116 }
117 })
118 }
119
120 static get Instance () {
121 return this.instance || (this.instance = new this())
122 }
123}
124
125// ---------------------------------------------------------------------------
126
127export {
128 JobQueue
129}