-import * as Bull from 'bull'
+import Bull, { Job, JobOptions, Queue } from 'bull'
import { jobStates } from '@server/helpers/custom-validators/jobs'
import { CONFIG } from '@server/initializers/config'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
ActivitypubHttpFetcherPayload,
ActivitypubHttpUnicastPayload,
ActorKeysPayload,
+ DeleteResumableUploadMetaFilePayload,
EmailPayload,
JobState,
JobType,
+ MoveObjectStoragePayload,
RefreshPayload,
VideoFileImportPayload,
VideoImportPayload,
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processActorKeys } from './handlers/actor-keys'
import { processEmail } from './handlers/email'
+import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending'
{ type: 'videos-views', payload: {} } |
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } |
{ type: 'actor-keys', payload: ActorKeysPayload } |
- { type: 'video-redundancy', payload: VideoRedundancyPayload }
+ { type: 'video-redundancy', payload: VideoRedundancyPayload } |
+ { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
+ { type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
-type CreateJobOptions = {
+export type CreateJobOptions = {
delay?: number
priority?: number
}
-const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
+const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'activitypub-refresher': refreshAPObject,
'video-live-ending': processVideoLiveEnding,
'actor-keys': processActorKeys,
- 'video-redundancy': processVideoRedundancy
+ 'video-redundancy': processVideoRedundancy,
+ 'move-to-object-storage': processMoveToObjectStorage
}
const jobTypes: JobType[] = [
'activitypub-refresher',
'video-redundancy',
'actor-keys',
- 'video-live-ending'
+ 'video-live-ending',
+ 'move-to-object-storage'
]
class JobQueue {
private static instance: JobQueue
- private queues: { [id in JobType]?: Bull.Queue } = {}
+ private queues: { [id in JobType]?: Queue } = {}
private initialized = false
private jobRedisPrefix: string
return
}
- const jobArgs: Bull.JobOptions = {
+ const jobArgs: JobOptions = {
backoff: { delay: 60 * 1000, type: 'exponential' },
attempts: JOB_ATTEMPTS[obj.type],
timeout: JOB_TTL[obj.type],
count: number
asc?: boolean
jobType: JobType
- }): Promise<Bull.Job[]> {
+ }): Promise<Job[]> {
const { state, start, count, asc, jobType } = options
const states = state ? [ state ] : jobStates
- let results: Bull.Job[] = []
+ let results: Job[] = []
const filteredJobTypes = this.filterJobTypes(jobType)