aboutsummaryrefslogblamecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
blob: 695fe0eea2bec605b5a1f573e10414397174c6c1 (plain) (tree)
1
2
3
4
5
6
7
8
9
                          
                                                          
                                             
                                                                                                                   
                                


                                                                                                                        
                                                             
                                                                                                                          
                                                                                                  




                                                                                    
                                                                     
                                                                  

                                                     




                                                                        
                                                 
                                              

                                 

 






                                





                                   
                                


                           
                 



                                         

                                                      
                                     
                                  


                                    

                                


       
                                     
 
                                      
                                                  



                                      

                                  





                                                                                             
                                                                 







                                                           
                             


                                         
                                                           
 








                                                 


      
                                                                                                              
                                                                                                             
 




                                                             















                                                                   
                                                                        










                                                            
























                                               
                               
                                               







                                     









                                                                              
import * as kue from 'kue'
import { JobState, JobType } from '../../../shared/models'
import { logger } from '../../helpers/logger'
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
import { Redis } from '../redis'
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { EmailPayload, processEmail } from './handlers/email'
import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file'
import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'

type CreateJobArgument =
  { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
  { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
  { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
  { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
  { type: 'video-file-import', payload: VideoFileImportPayload } |
  { type: 'video-file', payload: VideoFilePayload } |
  { type: 'email', payload: EmailPayload }

const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
  'activitypub-http-broadcast': processActivityPubHttpBroadcast,
  'activitypub-http-unicast': processActivityPubHttpUnicast,
  'activitypub-http-fetcher': processActivityPubHttpFetcher,
  'activitypub-follow': processActivityPubFollow,
  'video-file-import': processVideoFileImport,
  'video-file': processVideoFile,
  'email': processEmail
}

const jobsWithTLL: JobType[] = [
  'activitypub-http-broadcast',
  'activitypub-http-unicast',
  'activitypub-http-fetcher',
  'activitypub-follow'
]

class JobQueue {

  private static instance: JobQueue

  private jobQueue: kue.Queue
  private initialized = false
  private jobRedisPrefix: string

  private constructor () {}

  async init () {
    // Already initialized
    if (this.initialized === true) return
    this.initialized = true

    this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST

    this.jobQueue = kue.createQueue({
      prefix: this.jobRedisPrefix,
      redis: {
        host: CONFIG.REDIS.HOSTNAME,
        port: CONFIG.REDIS.PORT,
        auth: CONFIG.REDIS.AUTH,
        db: CONFIG.REDIS.DB
      }
    })

    this.jobQueue.setMaxListeners(20)

    this.jobQueue.on('error', err => {
      logger.error('Error in job queue.', { err })
      process.exit(-1)
    })
    this.jobQueue.watchStuckJobs(5000)

    await this.reactiveStuckJobs()

    for (const handlerName of Object.keys(handlers)) {
      this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
        try {
          const res = await handlers[ handlerName ](job)
          return done(null, res)
        } catch (err) {
          logger.error('Cannot execute job %d.', job.id, { err })
          return done(err)
        }
      })
    }
  }

  createJob (obj: CreateJobArgument, priority = 'normal') {
    return new Promise((res, rej) => {
      let job = this.jobQueue
        .create(obj.type, obj.payload)
        .priority(priority)
        .attempts(JOB_ATTEMPTS[obj.type])
        .backoff({ delay: 60 * 1000, type: 'exponential' })

      if (jobsWithTLL.indexOf(obj.type) !== -1) {
        job = job.ttl(JOB_REQUEST_TTL)
      }

      return job.save(err => {
        if (err) return rej(err)

        return res()
      })
    })
  }

  async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> {
    const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count)

    const jobPromises = jobStrings
      .map(s => s.split('|'))
      .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10)))

    return Promise.all(jobPromises)
  }

  count (state: JobState) {
    return new Promise<number>((res, rej) => {
      this.jobQueue[state + 'Count']((err, total) => {
        if (err) return rej(err)

        return res(total)
      })
    })
  }

  removeOldJobs () {
    const now = new Date().getTime()
    kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
      if (err) {
        logger.error('Cannot get jobs when removing old jobs.', { err })
        return
      }

      for (const job of jobs) {
        if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
          job.remove()
        }
      }
    })
  }

  private reactiveStuckJobs () {
    const promises: Promise<any>[] = []

    this.jobQueue.active((err, ids) => {
      if (err) throw err

      for (const id of ids) {
        kue.Job.get(id, (err, job) => {
          if (err) throw err

          const p = new Promise((res, rej) => {
            job.inactive(err => {
              if (err) return rej(err)
              return res()
            })
          })

          promises.push(p)
        })
      }
    })

    return Promise.all(promises)
  }

  private getJob (id: number) {
    return new Promise<kue.Job>((res, rej) => {
      kue.Job.get(id, (err, job) => {
        if (err) return rej(err)

        return res(job)
      })
    })
  }

  static get Instance () {
    return this.instance || (this.instance = new this())
  }
}

// ---------------------------------------------------------------------------

export {
  JobQueue
}