From e4f97babf701481b55cc10fb3448feab5f97c867 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 9 Nov 2017 17:51:58 +0100 Subject: Begin activitypub --- server/lib/jobs/handlers/index.ts | 17 ----- server/lib/jobs/handlers/video-file-optimizer.ts | 85 ---------------------- server/lib/jobs/handlers/video-file-transcoder.ts | 49 ------------- .../http-request-broadcast-handler.ts | 25 +++++++ .../http-request-job-scheduler.ts | 17 +++++ .../http-request-unicast-handler.ts | 25 +++++++ .../lib/jobs/http-request-job-scheduler/index.ts | 1 + server/lib/jobs/index.ts | 3 +- server/lib/jobs/job-scheduler.ts | 35 +++++---- server/lib/jobs/transcoding-job-scheduler/index.ts | 1 + .../transcoding-job-scheduler.ts | 17 +++++ .../video-file-optimizer-handler.ts | 85 ++++++++++++++++++++++ .../video-file-transcoder-handler.ts | 49 +++++++++++++ 13 files changed, 241 insertions(+), 168 deletions(-) delete mode 100644 server/lib/jobs/handlers/index.ts delete mode 100644 server/lib/jobs/handlers/video-file-optimizer.ts delete mode 100644 server/lib/jobs/handlers/video-file-transcoder.ts create mode 100644 server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts create mode 100644 server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts create mode 100644 server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts create mode 100644 server/lib/jobs/http-request-job-scheduler/index.ts create mode 100644 server/lib/jobs/transcoding-job-scheduler/index.ts create mode 100644 server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts create mode 100644 server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts create mode 100644 server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts (limited to 'server/lib/jobs') diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts deleted file mode 100644 index cef1f89a9..000000000 --- a/server/lib/jobs/handlers/index.ts +++ /dev/null @@ -1,17 +0,0 @@ -import * as videoFileOptimizer from './video-file-optimizer' -import * as videoFileTranscoder from './video-file-transcoder' - -export interface JobHandler { - process (data: object, jobId: number): T - onError (err: Error, jobId: number) - onSuccess (jobId: number, jobResult: T) -} - -const jobHandlers: { [ handlerName: string ]: JobHandler } = { - videoFileOptimizer, - videoFileTranscoder -} - -export { - jobHandlers -} diff --git a/server/lib/jobs/handlers/video-file-optimizer.ts b/server/lib/jobs/handlers/video-file-optimizer.ts deleted file mode 100644 index ccded4721..000000000 --- a/server/lib/jobs/handlers/video-file-optimizer.ts +++ /dev/null @@ -1,85 +0,0 @@ -import * as Bluebird from 'bluebird' - -import { database as db } from '../../../initializers/database' -import { logger, computeResolutionsToTranscode } from '../../../helpers' -import { VideoInstance } from '../../../models' -import { addVideoToFriends } from '../../friends' -import { JobScheduler } from '../job-scheduler' - -async function process (data: { videoUUID: string }, jobId: number) { - const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) - return undefined - } - - await video.optimizeOriginalVideofile() - - return video -} - -function onError (err: Error, jobId: number) { - logger.error('Error when optimized video file in job %d.', jobId, err) - return Promise.resolve() -} - -async function onSuccess (jobId: number, video: VideoInstance) { - if (video === undefined) return undefined - - logger.info('Job %d is a success.', jobId) - - // Maybe the video changed in database, refresh it - const videoDatabase = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined - - const remoteVideo = await videoDatabase.toAddRemoteJSON() - - // Now we'll add the video's meta data to our friends - await addVideoToFriends(remoteVideo, null) - - const originalFileHeight = await videoDatabase.getOriginalFileHeight() - // Create transcoding jobs if there are enabled resolutions - - const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) - logger.info( - 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight, - { resolutions: resolutionsEnabled } - ) - - if (resolutionsEnabled.length !== 0) { - try { - await db.sequelize.transaction(async t => { - const tasks: Bluebird[] = [] - - for (const resolution of resolutionsEnabled) { - const dataInput = { - videoUUID: videoDatabase.uuid, - resolution - } - - const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) - tasks.push(p) - } - - await Promise.all(tasks) - }) - - logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) - } catch (err) { - logger.warn('Cannot transcode the video.', err) - } - } else { - logger.info('No transcoding jobs created for video %s (no resolutions enabled).') - return undefined - } -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/handlers/video-file-transcoder.ts b/server/lib/jobs/handlers/video-file-transcoder.ts deleted file mode 100644 index 853645510..000000000 --- a/server/lib/jobs/handlers/video-file-transcoder.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { database as db } from '../../../initializers/database' -import { updateVideoToFriends } from '../../friends' -import { logger } from '../../../helpers' -import { VideoInstance } from '../../../models' -import { VideoResolution } from '../../../../shared' - -async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { - const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) - return undefined - } - - await video.transcodeOriginalVideofile(data.resolution) - - return video -} - -function onError (err: Error, jobId: number) { - logger.error('Error when transcoding video file in job %d.', jobId, err) - return Promise.resolve() -} - -async function onSuccess (jobId: number, video: VideoInstance) { - if (video === undefined) return undefined - - logger.info('Job %d is a success.', jobId) - - // Maybe the video changed in database, refresh it - const videoDatabase = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined - - const remoteVideo = videoDatabase.toUpdateRemoteJSON() - - // Now we'll add the video's meta data to our friends - await updateVideoToFriends(remoteVideo, null) - - return undefined -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts new file mode 100644 index 000000000..6b6946d02 --- /dev/null +++ b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts @@ -0,0 +1,25 @@ +import * as Bluebird from 'bluebird' + +import { database as db } from '../../../initializers/database' +import { logger } from '../../../helpers' + +async function process (data: { videoUUID: string }, jobId: number) { + +} + +function onError (err: Error, jobId: number) { + logger.error('Error when optimized video file in job %d.', jobId, err) + return Promise.resolve() +} + +async function onSuccess (jobId: number) { + +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts new file mode 100644 index 000000000..42cb9139c --- /dev/null +++ b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts @@ -0,0 +1,17 @@ +import { JobScheduler, JobHandler } from '../job-scheduler' + +import * as httpRequestBroadcastHandler from './http-request-broadcast-handler' +import * as httpRequestUnicastHandler from './http-request-unicast-handler' +import { JobCategory } from '../../../../shared' + +const jobHandlers: { [ handlerName: string ]: JobHandler } = { + httpRequestBroadcastHandler, + httpRequestUnicastHandler +} +const jobCategory: JobCategory = 'http-request' + +const httpRequestJobScheduler = new JobScheduler(jobCategory, jobHandlers) + +export { + httpRequestJobScheduler +} diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts new file mode 100644 index 000000000..6b6946d02 --- /dev/null +++ b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts @@ -0,0 +1,25 @@ +import * as Bluebird from 'bluebird' + +import { database as db } from '../../../initializers/database' +import { logger } from '../../../helpers' + +async function process (data: { videoUUID: string }, jobId: number) { + +} + +function onError (err: Error, jobId: number) { + logger.error('Error when optimized video file in job %d.', jobId, err) + return Promise.resolve() +} + +async function onSuccess (jobId: number) { + +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} diff --git a/server/lib/jobs/http-request-job-scheduler/index.ts b/server/lib/jobs/http-request-job-scheduler/index.ts new file mode 100644 index 000000000..4d2573296 --- /dev/null +++ b/server/lib/jobs/http-request-job-scheduler/index.ts @@ -0,0 +1 @@ +export * from './http-request-job-scheduler' diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts index b18a3d845..a92743707 100644 --- a/server/lib/jobs/index.ts +++ b/server/lib/jobs/index.ts @@ -1 +1,2 @@ -export * from './job-scheduler' +export * from './http-request-job-scheduler' +export * from './transcoding-job-scheduler' diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index 61d483268..89a4bca88 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts @@ -1,39 +1,41 @@ import { AsyncQueue, forever, queue } from 'async' import * as Sequelize from 'sequelize' -import { database as db } from '../../initializers/database' import { + database as db, JOBS_FETCHING_INTERVAL, JOBS_FETCH_LIMIT_PER_CYCLE, JOB_STATES } from '../../initializers' import { logger } from '../../helpers' import { JobInstance } from '../../models' -import { JobHandler, jobHandlers } from './handlers' +import { JobCategory } from '../../../shared' +export interface JobHandler { + process (data: object, jobId: number): T + onError (err: Error, jobId: number) + onSuccess (jobId: number, jobResult: T) +} type JobQueueCallback = (err: Error) => void -class JobScheduler { - - private static instance: JobScheduler +class JobScheduler { - private constructor () { } - - static get Instance () { - return this.instance || (this.instance = new this()) - } + constructor ( + private jobCategory: JobCategory, + private jobHandlers: { [ id: string ]: JobHandler } + ) {} async activate () { - const limit = JOBS_FETCH_LIMIT_PER_CYCLE + const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory] - logger.info('Jobs scheduler activated.') + logger.info('Jobs scheduler %s activated.', this.jobCategory) const jobsQueue = queue(this.processJob.bind(this)) // Finish processing jobs from a previous start const state = JOB_STATES.PROCESSING try { - const jobs = await db.Job.listWithLimit(limit, state) + const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) this.enqueueJobs(jobsQueue, jobs) } catch (err) { @@ -49,7 +51,7 @@ class JobScheduler { const state = JOB_STATES.PENDING try { - const jobs = await db.Job.listWithLimit(limit, state) + const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) this.enqueueJobs(jobsQueue, jobs) } catch (err) { @@ -64,9 +66,10 @@ class JobScheduler { ) } - createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { + createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) { const createQuery = { state: JOB_STATES.PENDING, + category, handlerName, handlerInputData } @@ -80,7 +83,7 @@ class JobScheduler { } private async processJob (job: JobInstance, callback: (err: Error) => void) { - const jobHandler = jobHandlers[job.handlerName] + const jobHandler = this.jobHandlers[job.handlerName] if (jobHandler === undefined) { logger.error('Unknown job handler for job %s.', job.handlerName) return callback(null) diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts new file mode 100644 index 000000000..73152a1be --- /dev/null +++ b/server/lib/jobs/transcoding-job-scheduler/index.ts @@ -0,0 +1 @@ +export * from './transcoding-job-scheduler' diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts new file mode 100644 index 000000000..d7c614fb8 --- /dev/null +++ b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts @@ -0,0 +1,17 @@ +import { JobScheduler, JobHandler } from '../job-scheduler' + +import * as videoFileOptimizer from './video-file-optimizer-handler' +import * as videoFileTranscoder from './video-file-transcoder-handler' +import { JobCategory } from '../../../../shared' + +const jobHandlers: { [ handlerName: string ]: JobHandler } = { + videoFileOptimizer, + videoFileTranscoder +} +const jobCategory: JobCategory = 'transcoding' + +const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers) + +export { + transcodingJobScheduler +} diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts new file mode 100644 index 000000000..ccded4721 --- /dev/null +++ b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts @@ -0,0 +1,85 @@ +import * as Bluebird from 'bluebird' + +import { database as db } from '../../../initializers/database' +import { logger, computeResolutionsToTranscode } from '../../../helpers' +import { VideoInstance } from '../../../models' +import { addVideoToFriends } from '../../friends' +import { JobScheduler } from '../job-scheduler' + +async function process (data: { videoUUID: string }, jobId: number) { + const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) + // No video, maybe deleted? + if (!video) { + logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) + return undefined + } + + await video.optimizeOriginalVideofile() + + return video +} + +function onError (err: Error, jobId: number) { + logger.error('Error when optimized video file in job %d.', jobId, err) + return Promise.resolve() +} + +async function onSuccess (jobId: number, video: VideoInstance) { + if (video === undefined) return undefined + + logger.info('Job %d is a success.', jobId) + + // Maybe the video changed in database, refresh it + const videoDatabase = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(video.uuid) + // Video does not exist anymore + if (!videoDatabase) return undefined + + const remoteVideo = await videoDatabase.toAddRemoteJSON() + + // Now we'll add the video's meta data to our friends + await addVideoToFriends(remoteVideo, null) + + const originalFileHeight = await videoDatabase.getOriginalFileHeight() + // Create transcoding jobs if there are enabled resolutions + + const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) + logger.info( + 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight, + { resolutions: resolutionsEnabled } + ) + + if (resolutionsEnabled.length !== 0) { + try { + await db.sequelize.transaction(async t => { + const tasks: Bluebird[] = [] + + for (const resolution of resolutionsEnabled) { + const dataInput = { + videoUUID: videoDatabase.uuid, + resolution + } + + const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) + tasks.push(p) + } + + await Promise.all(tasks) + }) + + logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) + } catch (err) { + logger.warn('Cannot transcode the video.', err) + } + } else { + logger.info('No transcoding jobs created for video %s (no resolutions enabled).') + return undefined + } +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts new file mode 100644 index 000000000..853645510 --- /dev/null +++ b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts @@ -0,0 +1,49 @@ +import { database as db } from '../../../initializers/database' +import { updateVideoToFriends } from '../../friends' +import { logger } from '../../../helpers' +import { VideoInstance } from '../../../models' +import { VideoResolution } from '../../../../shared' + +async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { + const video = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(data.videoUUID) + // No video, maybe deleted? + if (!video) { + logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) + return undefined + } + + await video.transcodeOriginalVideofile(data.resolution) + + return video +} + +function onError (err: Error, jobId: number) { + logger.error('Error when transcoding video file in job %d.', jobId, err) + return Promise.resolve() +} + +async function onSuccess (jobId: number, video: VideoInstance) { + if (video === undefined) return undefined + + logger.info('Job %d is a success.', jobId) + + // Maybe the video changed in database, refresh it + const videoDatabase = await db.Video.loadByUUIDAndPopulateAuthorAndPodAndTags(video.uuid) + // Video does not exist anymore + if (!videoDatabase) return undefined + + const remoteVideo = videoDatabase.toUpdateRemoteJSON() + + // Now we'll add the video's meta data to our friends + await updateVideoToFriends(remoteVideo, null) + + return undefined +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} -- cgit v1.2.3