From 94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 25 Jan 2018 15:05:18 +0100 Subject: Move job queue to redis We'll use it as cache in the future. /!\ You'll loose your old jobs (pending jobs too) so upgrade only when you don't have pending job anymore. --- .../handlers/activitypub-http-broadcast.ts | 49 ++++++++ .../job-queue/handlers/activitypub-http-fetcher.ts | 63 +++++++++++ .../job-queue/handlers/activitypub-http-unicast.ts | 43 +++++++ .../handlers/utils/activitypub-http-utils.ts | 39 +++++++ server/lib/job-queue/handlers/video-file.ts | 110 ++++++++++++++++++ server/lib/job-queue/index.ts | 1 + server/lib/job-queue/job-queue.ts | 124 +++++++++++++++++++++ 7 files changed, 429 insertions(+) create mode 100644 server/lib/job-queue/handlers/activitypub-http-broadcast.ts create mode 100644 server/lib/job-queue/handlers/activitypub-http-fetcher.ts create mode 100644 server/lib/job-queue/handlers/activitypub-http-unicast.ts create mode 100644 server/lib/job-queue/handlers/utils/activitypub-http-utils.ts create mode 100644 server/lib/job-queue/handlers/video-file.ts create mode 100644 server/lib/job-queue/index.ts create mode 100644 server/lib/job-queue/job-queue.ts (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts new file mode 100644 index 000000000..159856cda --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -0,0 +1,49 @@ +import * as kue from 'kue' +import { logger } from '../../../helpers/logger' +import { doRequest } from '../../../helpers/requests' +import { ActorFollowModel } from '../../../models/activitypub/actor-follow' +import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' + +export type ActivitypubHttpBroadcastPayload = { + uris: string[] + signatureActorId?: number + body: any +} + +async function processActivityPubHttpBroadcast (job: kue.Job) { + logger.info('Processing ActivityPub broadcast in job %d.', job.id) + + const payload = job.data as ActivitypubHttpBroadcastPayload + + const body = await computeBody(payload) + const httpSignatureOptions = await buildSignedRequestOptions(payload) + + const options = { + method: 'POST', + uri: '', + json: body, + httpSignature: httpSignatureOptions + } + + const badUrls: string[] = [] + const goodUrls: string[] = [] + + for (const uri of payload.uris) { + options.uri = uri + + try { + await doRequest(options) + goodUrls.push(uri) + } catch (err) { + badUrls.push(uri) + } + } + + return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) +} + +// --------------------------------------------------------------------------- + +export { + processActivityPubHttpBroadcast +} diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts new file mode 100644 index 000000000..062211c85 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -0,0 +1,63 @@ +import * as kue from 'kue' +import { logger } from '../../../helpers/logger' +import { doRequest } from '../../../helpers/requests' +import { ACTIVITY_PUB } from '../../../initializers' +import { processActivities } from '../../activitypub/process' +import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' + +export type ActivitypubHttpFetcherPayload = { + uris: string[] +} + +async function processActivityPubHttpFetcher (job: kue.Job) { + logger.info('Processing ActivityPub fetcher in job %d.', job.id) + + const payload = job.data as ActivitypubHttpBroadcastPayload + + const options = { + method: 'GET', + uri: '', + json: true, + activityPub: true + } + + for (const uri of payload.uris) { + options.uri = uri + logger.info('Fetching ActivityPub data on %s.', uri) + + const response = await doRequest(options) + const firstBody = response.body + + if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { + const activities = firstBody.first.orderedItems + + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) + + await processActivities(activities) + } + + let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT + let i = 0 + let nextLink = firstBody.first.next + while (nextLink && i < limit) { + options.uri = nextLink + + const { body } = await doRequest(options) + nextLink = body.next + i++ + + if (Array.isArray(body.orderedItems)) { + const activities = body.orderedItems + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) + + await processActivities(activities) + } + } + } +} + +// --------------------------------------------------------------------------- + +export { + processActivityPubHttpFetcher +} diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts new file mode 100644 index 000000000..9b4188c50 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -0,0 +1,43 @@ +import * as kue from 'kue' +import { logger } from '../../../helpers/logger' +import { doRequest } from '../../../helpers/requests' +import { ActorFollowModel } from '../../../models/activitypub/actor-follow' +import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' + +export type ActivitypubHttpUnicastPayload = { + uri: string + signatureActorId?: number + body: any +} + +async function processActivityPubHttpUnicast (job: kue.Job) { + logger.info('Processing ActivityPub unicast in job %d.', job.id) + + const payload = job.data as ActivitypubHttpUnicastPayload + const uri = payload.uri + + const body = await computeBody(payload) + const httpSignatureOptions = await buildSignedRequestOptions(payload) + + const options = { + method: 'POST', + uri, + json: body, + httpSignature: httpSignatureOptions + } + + try { + await doRequest(options) + ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) + } catch (err) { + ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) + + throw err + } +} + +// --------------------------------------------------------------------------- + +export { + processActivityPubHttpUnicast +} diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts new file mode 100644 index 000000000..c087371c6 --- /dev/null +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts @@ -0,0 +1,39 @@ +import { buildSignedActivity } from '../../../../helpers/activitypub' +import { getServerActor } from '../../../../helpers/utils' +import { ActorModel } from '../../../../models/activitypub/actor' + +async function computeBody (payload: { body: any, signatureActorId?: number }) { + let body = payload.body + + if (payload.signatureActorId) { + const actorSignature = await ActorModel.load(payload.signatureActorId) + if (!actorSignature) throw new Error('Unknown signature actor id.') + body = await buildSignedActivity(actorSignature, payload.body) + } + + return body +} + +async function buildSignedRequestOptions (payload: { signatureActorId?: number }) { + let actor: ActorModel + if (payload.signatureActorId) { + actor = await ActorModel.load(payload.signatureActorId) + if (!actor) throw new Error('Unknown signature actor id.') + } else { + // We need to sign the request, so use the server + actor = await getServerActor() + } + + const keyId = actor.getWebfingerUrl() + return { + algorithm: 'rsa-sha256', + authorizationHeaderName: 'Signature', + keyId, + key: actor.privateKey + } +} + +export { + computeBody, + buildSignedRequestOptions +} diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts new file mode 100644 index 000000000..5294483bd --- /dev/null +++ b/server/lib/job-queue/handlers/video-file.ts @@ -0,0 +1,110 @@ +import * as kue from 'kue' +import { VideoResolution } from '../../../../shared' +import { VideoPrivacy } from '../../../../shared/models/videos' +import { logger } from '../../../helpers/logger' +import { computeResolutionsToTranscode } from '../../../helpers/utils' +import { sequelizeTypescript } from '../../../initializers' +import { VideoModel } from '../../../models/video/video' +import { shareVideoByServerAndChannel } from '../../activitypub' +import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send' +import { JobQueue } from '../job-queue' + +export type VideoFilePayload = { + videoUUID: string + resolution?: VideoResolution +} + +async function processVideoFile (job: kue.Job) { + const payload = job.data as VideoFilePayload + logger.info('Processing video file in job %d.', job.id) + + const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID) + // No video, maybe deleted? + if (!video) { + logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid }) + return undefined + } + + // Transcoding in other resolution + if (payload.resolution) { + await video.transcodeOriginalVideofile(payload.resolution) + await onVideoFileTranscoderSuccess(video) + } else { + await video.optimizeOriginalVideofile() + await onVideoFileOptimizerSuccess(video) + } + + return video +} + +async function onVideoFileTranscoderSuccess (video: VideoModel) { + if (video === undefined) return undefined + + // Maybe the video changed in database, refresh it + const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) + // Video does not exist anymore + if (!videoDatabase) return undefined + + if (video.privacy !== VideoPrivacy.PRIVATE) { + await sendUpdateVideo(video, undefined) + } + + return undefined +} + +async function onVideoFileOptimizerSuccess (video: VideoModel) { + if (video === undefined) return undefined + + // Maybe the video changed in database, refresh it + const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) + // Video does not exist anymore + if (!videoDatabase) return undefined + + if (video.privacy !== VideoPrivacy.PRIVATE) { + // Now we'll add the video's meta data to our followers + await sendCreateVideo(video, undefined) + await shareVideoByServerAndChannel(video, undefined) + } + + const originalFileHeight = await videoDatabase.getOriginalFileHeight() + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) + logger.info( + 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight, + { resolutions: resolutionsEnabled } + ) + + if (resolutionsEnabled.length !== 0) { + try { + await sequelizeTypescript.transaction(async t => { + const tasks: Promise[] = [] + + for (const resolution of resolutionsEnabled) { + const dataInput = { + videoUUID: videoDatabase.uuid, + resolution + } + + const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) + tasks.push(p) + } + + await Promise.all(tasks) + }) + + logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) + } catch (err) { + logger.warn('Cannot transcode the video.', err) + } + } else { + logger.info('No transcoding jobs created for video %s (no resolutions enabled).') + return undefined + } +} + +// --------------------------------------------------------------------------- + +export { + processVideoFile +} diff --git a/server/lib/job-queue/index.ts b/server/lib/job-queue/index.ts new file mode 100644 index 000000000..57231e649 --- /dev/null +++ b/server/lib/job-queue/index.ts @@ -0,0 +1 @@ +export * from './job-queue' diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts new file mode 100644 index 000000000..7a2b6c78d --- /dev/null +++ b/server/lib/job-queue/job-queue.ts @@ -0,0 +1,124 @@ +import * as kue from 'kue' +import { JobType, JobState } from '../../../shared/models' +import { logger } from '../../helpers/logger' +import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' +import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' +import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' +import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' +import { processVideoFile, VideoFilePayload } from './handlers/video-file' + +type CreateJobArgument = + { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | + { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | + { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | + { type: 'video-file', payload: VideoFilePayload } + +const handlers: { [ id in JobType ]: (job: kue.Job) => Promise} = { + 'activitypub-http-broadcast': processActivityPubHttpBroadcast, + 'activitypub-http-unicast': processActivityPubHttpUnicast, + 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'video-file': processVideoFile +} + +class JobQueue { + + private static instance: JobQueue + + private jobQueue: kue.Queue + private initialized = false + + private constructor () {} + + init () { + // Already initialized + if (this.initialized === true) return + this.initialized = true + + this.jobQueue = kue.createQueue({ + prefix: 'q-' + CONFIG.WEBSERVER.HOST, + redis: { + host: CONFIG.REDIS.HOSTNAME, + port: CONFIG.REDIS.PORT, + auth: CONFIG.REDIS.AUTH + } + }) + + this.jobQueue.on('error', err => { + logger.error('Error in job queue.', err) + process.exit(-1) + }) + this.jobQueue.watchStuckJobs(5000) + + for (const handlerName of Object.keys(handlers)) { + this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { + try { + const res = await handlers[ handlerName ](job) + return done(null, res) + } catch (err) { + return done(err) + } + }) + } + } + + createJob (obj: CreateJobArgument, priority = 'normal') { + return new Promise((res, rej) => { + this.jobQueue + .create(obj.type, obj.payload) + .priority(priority) + .attempts(JOB_ATTEMPTS[obj.type]) + .backoff({ type: 'exponential' }) + .save(err => { + if (err) return rej(err) + + return res() + }) + }) + } + + listForApi (state: JobState, start: number, count: number, sort: string) { + return new Promise((res, rej) => { + kue.Job.rangeByState(state, start, count, sort, (err, jobs) => { + if (err) return rej(err) + + return res(jobs) + }) + }) + } + + count (state: JobState) { + return new Promise((res, rej) => { + this.jobQueue[state + 'Count']((err, total) => { + if (err) return rej(err) + + return res(total) + }) + }) + } + + removeOldJobs () { + const now = new Date().getTime() + kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { + if (err) { + logger.error('Cannot get jobs when removing old jobs.', err) + return + } + + for (const job of jobs) { + if (now - job.created_at > JOB_COMPLETED_LIFETIME) { + job.remove() + } + } + }) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} + +// --------------------------------------------------------------------------- + +export { + JobQueue +} -- cgit v1.2.3