From 94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 25 Jan 2018 15:05:18 +0100 Subject: Move job queue to redis We'll use it as cache in the future. /!\ You'll loose your old jobs (pending jobs too) so upgrade only when you don't have pending job anymore. --- .../activitypub-http-broadcast-handler.ts | 53 -------- .../activitypub-http-fetcher-handler.ts | 68 ---------- .../activitypub-http-job-scheduler.ts | 94 -------------- .../activitypub-http-unicast-handler.ts | 50 ------- .../jobs/activitypub-http-job-scheduler/index.ts | 1 - server/lib/jobs/index.ts | 2 - server/lib/jobs/job-scheduler.ts | 144 --------------------- server/lib/jobs/transcoding-job-scheduler/index.ts | 1 - .../transcoding-job-scheduler.ts | 23 ---- .../video-file-optimizer-handler.ts | 90 ------------- .../video-file-transcoder-handler.ts | 48 ------- 11 files changed, 574 deletions(-) delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/index.ts delete mode 100644 server/lib/jobs/index.ts delete mode 100644 server/lib/jobs/job-scheduler.ts delete mode 100644 server/lib/jobs/transcoding-job-scheduler/index.ts delete mode 100644 server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts delete mode 100644 server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts delete mode 100644 server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts (limited to 'server/lib/jobs') diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts deleted file mode 100644 index 3f780e319..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' -import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' - -async function process (payload: ActivityPubHttpPayload, jobId: number) { - logger.info('Processing ActivityPub broadcast in job %d.', jobId) - - const body = await computeBody(payload) - const httpSignatureOptions = await buildSignedRequestOptions(payload) - - const options = { - method: 'POST', - uri: '', - json: body, - httpSignature: httpSignatureOptions - } - - const badUrls: string[] = [] - const goodUrls: string[] = [] - - for (const uri of payload.uris) { - options.uri = uri - - try { - await doRequest(options) - goodUrls.push(uri) - } catch (err) { - const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) - if (isRetryingLater === false) badUrls.push(uri) - } - } - - return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) -} - -function onError (err: Error, jobId: number) { - logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err) - return Promise.resolve() -} - -function onSuccess (jobId: number) { - logger.info('Job %d is a success.', jobId) - return Promise.resolve() -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts deleted file mode 100644 index a7b5aabd0..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { ACTIVITY_PUB } from '../../../initializers' -import { processActivities } from '../../activitypub/process' -import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' - -async function process (payload: ActivityPubHttpPayload, jobId: number) { - logger.info('Processing ActivityPub fetcher in job %d.', jobId) - - const options = { - method: 'GET', - uri: '', - json: true, - activityPub: true - } - - for (const uri of payload.uris) { - options.uri = uri - logger.info('Fetching ActivityPub data on %s.', uri) - - const response = await doRequest(options) - const firstBody = response.body - - if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { - const activities = firstBody.first.orderedItems - - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) - - await processActivities(activities) - } - - let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT - let i = 0 - let nextLink = firstBody.first.next - while (nextLink && i < limit) { - options.uri = nextLink - - const { body } = await doRequest(options) - nextLink = body.next - i++ - - if (Array.isArray(body.orderedItems)) { - const activities = body.orderedItems - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) - - await processActivities(activities) - } - } - } -} - -function onError (err: Error, jobId: number) { - logger.error('Error when fetcher ActivityPub request in job %d.', jobId, err) - return Promise.resolve() -} - -function onSuccess (jobId: number) { - logger.info('Job %d is a success.', jobId) - return Promise.resolve() -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts deleted file mode 100644 index 4459152db..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { JobCategory } from '../../../../shared' -import { buildSignedActivity } from '../../../helpers/activitypub' -import { logger } from '../../../helpers/logger' -import { getServerActor } from '../../../helpers/utils' -import { ACTIVITY_PUB } from '../../../initializers' -import { ActorModel } from '../../../models/activitypub/actor' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' -import { JobHandler, JobScheduler } from '../job-scheduler' - -import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' -import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' -import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' - -type ActivityPubHttpPayload = { - uris: string[] - signatureActorId?: number - body?: any - attemptNumber?: number -} - -const jobHandlers: { [ handlerName: string ]: JobHandler } = { - activitypubHttpBroadcastHandler, - activitypubHttpUnicastHandler, - activitypubHttpFetcherHandler -} -const jobCategory: JobCategory = 'activitypub-http' - -const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers) - -async function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) { - logger.warn('Cannot make request to %s.', uri, err) - - let attemptNumber = payload.attemptNumber || 1 - attemptNumber += 1 - - if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) { - logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err) - - const actor = await ActorFollowModel.loadByFollowerInbox(uri, undefined) - if (!actor) { - logger.debug('Actor %s is not a follower, do not retry the request.', uri) - return false - } - - const newPayload = Object.assign(payload, { - uris: [ uri ], - attemptNumber - }) - await activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload) - - return true - } - - return false -} - -async function computeBody (payload: ActivityPubHttpPayload) { - let body = payload.body - - if (payload.signatureActorId) { - const actorSignature = await ActorModel.load(payload.signatureActorId) - if (!actorSignature) throw new Error('Unknown signature actor id.') - body = await buildSignedActivity(actorSignature, payload.body) - } - - return body -} - -async function buildSignedRequestOptions (payload: ActivityPubHttpPayload) { - let actor: ActorModel - if (payload.signatureActorId) { - actor = await ActorModel.load(payload.signatureActorId) - if (!actor) throw new Error('Unknown signature actor id.') - } else { - // We need to sign the request, so use the server - actor = await getServerActor() - } - - const keyId = actor.getWebfingerUrl() - return { - algorithm: 'rsa-sha256', - authorizationHeaderName: 'Signature', - keyId, - key: actor.privateKey - } -} - -export { - ActivityPubHttpPayload, - activitypubHttpJobScheduler, - maybeRetryRequestLater, - computeBody, - buildSignedRequestOptions -} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts deleted file mode 100644 index 54a7504e8..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' -import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' - -async function process (payload: ActivityPubHttpPayload, jobId: number) { - logger.info('Processing ActivityPub unicast in job %d.', jobId) - - const uri = payload.uris[0] - - const body = await computeBody(payload) - const httpSignatureOptions = await buildSignedRequestOptions(payload) - - const options = { - method: 'POST', - uri, - json: body, - httpSignature: httpSignatureOptions - } - - try { - await doRequest(options) - ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) - } catch (err) { - const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) - if (isRetryingLater === false) { - ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) - } - - throw err - } -} - -function onError (err: Error, jobId: number) { - logger.error('Error when sending ActivityPub request in job %d.', jobId, err) - return Promise.resolve() -} - -function onSuccess (jobId: number) { - logger.info('Job %d is a success.', jobId) - return Promise.resolve() -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts deleted file mode 100644 index ad8f527b4..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './activitypub-http-job-scheduler' diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts deleted file mode 100644 index 394264ec1..000000000 --- a/server/lib/jobs/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './activitypub-http-job-scheduler' -export * from './transcoding-job-scheduler' diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts deleted file mode 100644 index 9d55880e6..000000000 --- a/server/lib/jobs/job-scheduler.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { AsyncQueue, forever, queue } from 'async' -import * as Sequelize from 'sequelize' -import { JobCategory } from '../../../shared' -import { logger } from '../../helpers/logger' -import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' -import { JobModel } from '../../models/job/job' - -export interface JobHandler { - process (data: object, jobId: number): Promise - onError (err: Error, jobId: number) - onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler): Promise -} -type JobQueueCallback = (err: Error) => void - -class JobScheduler { - - constructor ( - private jobCategory: JobCategory, - private jobHandlers: { [ id: string ]: JobHandler } - ) {} - - async activate () { - const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory] - - logger.info('Jobs scheduler %s activated.', this.jobCategory) - - const jobsQueue = queue(this.processJob.bind(this)) - - // Finish processing jobs from a previous start - const state = JOB_STATES.PROCESSING - try { - const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) - - this.enqueueJobs(jobsQueue, jobs) - } catch (err) { - logger.error('Cannot list pending jobs.', err) - } - - forever( - async next => { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, JOBS_FETCHING_INTERVAL) - } - - const state = JOB_STATES.PENDING - try { - const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) - - this.enqueueJobs(jobsQueue, jobs) - } catch (err) { - logger.error('Cannot list pending jobs.', err) - } - - // Optimization: we could use "drain" from queue object - return setTimeout(next, JOBS_FETCHING_INTERVAL) - }, - - err => logger.error('Error in job scheduler queue.', err) - ) - } - - createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) { - const createQuery = { - state: JOB_STATES.PENDING, - category: this.jobCategory, - handlerName, - handlerInputData - } - - const options = { transaction } - - return JobModel.create(createQuery, options) - } - - private enqueueJobs (jobsQueue: AsyncQueue, jobs: JobModel[]) { - jobs.forEach(job => jobsQueue.push(job)) - } - - private async processJob (job: JobModel, callback: (err: Error) => void) { - const jobHandler = this.jobHandlers[job.handlerName] - if (jobHandler === undefined) { - const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id - logger.error(errorString) - - const error = new Error(errorString) - await this.onJobError(jobHandler, job, error) - return callback(error) - } - - logger.info('Processing job %d with handler %s.', job.id, job.handlerName) - - job.state = JOB_STATES.PROCESSING - await job.save() - - try { - const result: T = await jobHandler.process(job.handlerInputData, job.id) - await this.onJobSuccess(jobHandler, job, result) - } catch (err) { - logger.error('Error in job handler %s.', job.handlerName, err) - - try { - await this.onJobError(jobHandler, job, err) - } catch (innerErr) { - this.cannotSaveJobError(innerErr) - return callback(innerErr) - } - } - - return callback(null) - } - - private async onJobError (jobHandler: JobHandler, job: JobModel, err: Error) { - job.state = JOB_STATES.ERROR - - try { - await job.save() - if (jobHandler) await jobHandler.onError(err, job.id) - } catch (err) { - this.cannotSaveJobError(err) - } - } - - private async onJobSuccess (jobHandler: JobHandler, job: JobModel, jobResult: T) { - job.state = JOB_STATES.SUCCESS - - try { - await job.save() - await jobHandler.onSuccess(job.id, jobResult, this) - } catch (err) { - this.cannotSaveJobError(err) - } - } - - private cannotSaveJobError (err: Error) { - logger.error('Cannot save new job state.', err) - } -} - -// --------------------------------------------------------------------------- - -export { - JobScheduler -} diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts deleted file mode 100644 index 73152a1be..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './transcoding-job-scheduler' diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts deleted file mode 100644 index e5530a73c..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { JobCategory } from '../../../../shared' -import { VideoModel } from '../../../models/video/video' -import { JobHandler, JobScheduler } from '../job-scheduler' - -import * as videoFileOptimizer from './video-file-optimizer-handler' -import * as videoFileTranscoder from './video-file-transcoder-handler' - -type TranscodingJobPayload = { - videoUUID: string - resolution?: number -} -const jobHandlers: { [ handlerName: string ]: JobHandler } = { - videoFileOptimizer, - videoFileTranscoder -} -const jobCategory: JobCategory = 'transcoding' - -const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers) - -export { - TranscodingJobPayload, - transcodingJobScheduler -} diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts deleted file mode 100644 index f224a31b4..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts +++ /dev/null @@ -1,90 +0,0 @@ -import * as Bluebird from 'bluebird' -import { VideoPrivacy } from '../../../../shared/models/videos' -import { logger } from '../../../helpers/logger' -import { computeResolutionsToTranscode } from '../../../helpers/utils' -import { sequelizeTypescript } from '../../../initializers' -import { JobModel } from '../../../models/job/job' -import { VideoModel } from '../../../models/video/video' -import { shareVideoByServerAndChannel } from '../../activitypub' -import { sendCreateVideo } from '../../activitypub/send' -import { JobScheduler } from '../job-scheduler' -import { TranscodingJobPayload } from './transcoding-job-scheduler' - -async function process (data: TranscodingJobPayload, jobId: number) { - const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) - return undefined - } - - await video.optimizeOriginalVideofile() - - return video -} - -function onError (err: Error, jobId: number) { - logger.error('Error when optimized video file in job %d.', jobId, err) - return Promise.resolve() -} - -async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler) { - if (video === undefined) return undefined - - logger.info('Job %d is a success.', jobId) - - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined - - if (video.privacy !== VideoPrivacy.PRIVATE) { - // Now we'll add the video's meta data to our followers - await sendCreateVideo(video, undefined) - await shareVideoByServerAndChannel(video, undefined) - } - - const originalFileHeight = await videoDatabase.getOriginalFileHeight() - - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) - logger.info( - 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight, - { resolutions: resolutionsEnabled } - ) - - if (resolutionsEnabled.length !== 0) { - try { - await sequelizeTypescript.transaction(async t => { - const tasks: Bluebird[] = [] - - for (const resolution of resolutionsEnabled) { - const dataInput = { - videoUUID: videoDatabase.uuid, - resolution - } - - const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput) - tasks.push(p) - } - - await Promise.all(tasks) - }) - - logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) - } catch (err) { - logger.warn('Cannot transcode the video.', err) - } - } else { - logger.info('No transcoding jobs created for video %s (no resolutions enabled).') - return undefined - } -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts deleted file mode 100644 index 883d3eba8..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { VideoResolution } from '../../../../shared' -import { VideoPrivacy } from '../../../../shared/models/videos' -import { logger } from '../../../helpers/logger' -import { VideoModel } from '../../../models/video/video' -import { sendUpdateVideo } from '../../activitypub/send' - -async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { - const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) - return undefined - } - - await video.transcodeOriginalVideofile(data.resolution) - - return video -} - -function onError (err: Error, jobId: number) { - logger.error('Error when transcoding video file in job %d.', jobId, err) - return Promise.resolve() -} - -async function onSuccess (jobId: number, video: VideoModel) { - if (video === undefined) return undefined - - logger.info('Job %d is a success.', jobId) - - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined - - if (video.privacy !== VideoPrivacy.PRIVATE) { - await sendUpdateVideo(video, undefined) - } - - return undefined -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} -- cgit v1.2.3