From 65fcc3119c334b75dd13bcfdebf186afdc580a8f Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 15 May 2017 22:22:03 +0200 Subject: First typescript iteration --- server/lib/jobs/handlers/index.js | 7 -- server/lib/jobs/handlers/index.ts | 9 ++ server/lib/jobs/handlers/video-transcoder.js | 43 --------- server/lib/jobs/handlers/video-transcoder.ts | 37 ++++++++ server/lib/jobs/index.ts | 1 + server/lib/jobs/job-scheduler.js | 129 ------------------------- server/lib/jobs/job-scheduler.ts | 137 +++++++++++++++++++++++++++ 7 files changed, 184 insertions(+), 179 deletions(-) delete mode 100644 server/lib/jobs/handlers/index.js create mode 100644 server/lib/jobs/handlers/index.ts delete mode 100644 server/lib/jobs/handlers/video-transcoder.js create mode 100644 server/lib/jobs/handlers/video-transcoder.ts create mode 100644 server/lib/jobs/index.ts delete mode 100644 server/lib/jobs/job-scheduler.js create mode 100644 server/lib/jobs/job-scheduler.ts (limited to 'server/lib/jobs') diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js deleted file mode 100644 index 59c1ccce5..000000000 --- a/server/lib/jobs/handlers/index.js +++ /dev/null @@ -1,7 +0,0 @@ -'use strict' - -const videoTranscoder = require('./video-transcoder') - -module.exports = { - videoTranscoder -} diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts new file mode 100644 index 000000000..ae5440031 --- /dev/null +++ b/server/lib/jobs/handlers/index.ts @@ -0,0 +1,9 @@ +import * as videoTranscoder from './video-transcoder' + +const jobHandlers = { + videoTranscoder +} + +export { + jobHandlers +} diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.js deleted file mode 100644 index d2ad4f9c7..000000000 --- a/server/lib/jobs/handlers/video-transcoder.js +++ /dev/null @@ -1,43 +0,0 @@ -'use strict' - -const db = require('../../../initializers/database') -const logger = require('../../../helpers/logger') -const friends = require('../../../lib/friends') - -const VideoTranscoderHandler = { - process, - onError, - onSuccess -} - -// --------------------------------------------------------------------------- - -function process (data, callback) { - db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { - if (err) return callback(err) - - video.transcodeVideofile(function (err) { - return callback(err, video) - }) - }) -} - -function onError (err, jobId, video, callback) { - logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) - return callback() -} - -function onSuccess (data, jobId, video, callback) { - logger.info('Job %d is a success.', jobId) - - video.toAddRemoteJSON(function (err, remoteVideo) { - if (err) return callback(err) - - // Now we'll add the video's meta data to our friends - friends.addVideoToFriends(remoteVideo, null, callback) - }) -} - -// --------------------------------------------------------------------------- - -module.exports = VideoTranscoderHandler diff --git a/server/lib/jobs/handlers/video-transcoder.ts b/server/lib/jobs/handlers/video-transcoder.ts new file mode 100644 index 000000000..35db5fb96 --- /dev/null +++ b/server/lib/jobs/handlers/video-transcoder.ts @@ -0,0 +1,37 @@ +const db = require('../../../initializers/database') +import { logger } from '../../../helpers' +import { addVideoToFriends } from '../../../lib' + +function process (data, callback) { + db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { + if (err) return callback(err) + + video.transcodeVideofile(function (err) { + return callback(err, video) + }) + }) +} + +function onError (err, jobId, video, callback) { + logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) + return callback() +} + +function onSuccess (data, jobId, video, callback) { + logger.info('Job %d is a success.', jobId) + + video.toAddRemoteJSON(function (err, remoteVideo) { + if (err) return callback(err) + + // Now we'll add the video's meta data to our friends + addVideoToFriends(remoteVideo, null, callback) + }) +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts new file mode 100644 index 000000000..b18a3d845 --- /dev/null +++ b/server/lib/jobs/index.ts @@ -0,0 +1 @@ +export * from './job-scheduler' diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js deleted file mode 100644 index 7b239577f..000000000 --- a/server/lib/jobs/job-scheduler.js +++ /dev/null @@ -1,129 +0,0 @@ -'use strict' - -const forever = require('async/forever') -const queue = require('async/queue') - -const constants = require('../../initializers/constants') -const db = require('../../initializers/database') -const logger = require('../../helpers/logger') - -const jobHandlers = require('./handlers') - -const jobScheduler = { - activate, - createJob -} - -function activate () { - const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE - - logger.info('Jobs scheduler activated.') - - const jobsQueue = queue(processJob) - - // Finish processing jobs from a previous start - const state = constants.JOB_STATES.PROCESSING - db.Job.listWithLimit(limit, state, function (err, jobs) { - enqueueJobs(err, jobsQueue, jobs) - - forever( - function (next) { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - } - - const state = constants.JOB_STATES.PENDING - db.Job.listWithLimit(limit, state, function (err, jobs) { - if (err) { - logger.error('Cannot list pending jobs.', { error: err }) - } else { - jobs.forEach(function (job) { - jobsQueue.push(job) - }) - } - - // Optimization: we could use "drain" from queue object - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - }) - } - ) - }) -} - -// --------------------------------------------------------------------------- - -module.exports = jobScheduler - -// --------------------------------------------------------------------------- - -function enqueueJobs (err, jobsQueue, jobs) { - if (err) { - logger.error('Cannot list pending jobs.', { error: err }) - } else { - jobs.forEach(function (job) { - jobsQueue.push(job) - }) - } -} - -function createJob (transaction, handlerName, handlerInputData, callback) { - const createQuery = { - state: constants.JOB_STATES.PENDING, - handlerName, - handlerInputData - } - const options = { transaction } - - db.Job.create(createQuery, options).asCallback(callback) -} - -function processJob (job, callback) { - const jobHandler = jobHandlers[job.handlerName] - - logger.info('Processing job %d with handler %s.', job.id, job.handlerName) - - job.state = constants.JOB_STATES.PROCESSING - job.save().asCallback(function (err) { - if (err) return cannotSaveJobError(err, callback) - - if (jobHandler === undefined) { - logger.error('Unknown job handler for job %s.', jobHandler.handlerName) - return callback() - } - - return jobHandler.process(job.handlerInputData, function (err, result) { - if (err) { - logger.error('Error in job handler %s.', job.handlerName, { error: err }) - return onJobError(jobHandler, job, result, callback) - } - - return onJobSuccess(jobHandler, job, result, callback) - }) - }) -} - -function onJobError (jobHandler, job, jobResult, callback) { - job.state = constants.JOB_STATES.ERROR - - job.save().asCallback(function (err) { - if (err) return cannotSaveJobError(err, callback) - - return jobHandler.onError(err, job.id, jobResult, callback) - }) -} - -function onJobSuccess (jobHandler, job, jobResult, callback) { - job.state = constants.JOB_STATES.SUCCESS - - job.save().asCallback(function (err) { - if (err) return cannotSaveJobError(err, callback) - - return jobHandler.onSuccess(err, job.id, jobResult, callback) - }) -} - -function cannotSaveJobError (err, callback) { - logger.error('Cannot save new job state.', { error: err }) - return callback(err) -} diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts new file mode 100644 index 000000000..7b8c6faf9 --- /dev/null +++ b/server/lib/jobs/job-scheduler.ts @@ -0,0 +1,137 @@ +import { forever, queue } from 'async' + +const db = require('../../initializers/database') +import { + JOBS_FETCHING_INTERVAL, + JOBS_FETCH_LIMIT_PER_CYCLE, + JOB_STATES +} from '../../initializers' +import { logger } from '../../helpers' +import { jobHandlers } from './handlers' + +class JobScheduler { + + private static instance: JobScheduler + + private constructor () { } + + static get Instance () { + return this.instance || (this.instance = new this()) + } + + activate () { + const limit = JOBS_FETCH_LIMIT_PER_CYCLE + + logger.info('Jobs scheduler activated.') + + const jobsQueue = queue(this.processJob) + + // Finish processing jobs from a previous start + const state = JOB_STATES.PROCESSING + db.Job.listWithLimit(limit, state, (err, jobs) => { + this.enqueueJobs(err, jobsQueue, jobs) + + forever( + next => { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, JOBS_FETCHING_INTERVAL) + } + + const state = JOB_STATES.PENDING + db.Job.listWithLimit(limit, state, (err, jobs) => { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(job => { + jobsQueue.push(job) + }) + } + + // Optimization: we could use "drain" from queue object + return setTimeout(next, JOBS_FETCHING_INTERVAL) + }) + }, + + err => { logger.error('Error in job scheduler queue.', { error: err }) } + ) + }) + } + + createJob (transaction, handlerName, handlerInputData, callback) { + const createQuery = { + state: JOB_STATES.PENDING, + handlerName, + handlerInputData + } + const options = { transaction } + + db.Job.create(createQuery, options).asCallback(callback) + } + + private enqueueJobs (err, jobsQueue, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(job => { + jobsQueue.push(job) + }) + } + } + + private processJob (job, callback) { + const jobHandler = jobHandlers[job.handlerName] + + logger.info('Processing job %d with handler %s.', job.id, job.handlerName) + + job.state = JOB_STATES.PROCESSING + job.save().asCallback(err => { + if (err) return this.cannotSaveJobError(err, callback) + + if (jobHandler === undefined) { + logger.error('Unknown job handler for job %s.', jobHandler.handlerName) + return callback() + } + + return jobHandler.process(job.handlerInputData, (err, result) => { + if (err) { + logger.error('Error in job handler %s.', job.handlerName, { error: err }) + return this.onJobError(jobHandler, job, result, callback) + } + + return this.onJobSuccess(jobHandler, job, result, callback) + }) + }) + } + + private onJobError (jobHandler, job, jobResult, callback) { + job.state = JOB_STATES.ERROR + + job.save().asCallback(err => { + if (err) return this.cannotSaveJobError(err, callback) + + return jobHandler.onError(err, job.id, jobResult, callback) + }) + } + + private onJobSuccess (jobHandler, job, jobResult, callback) { + job.state = JOB_STATES.SUCCESS + + job.save().asCallback(err => { + if (err) return this.cannotSaveJobError(err, callback) + + return jobHandler.onSuccess(err, job.id, jobResult, callback) + }) + } + + private cannotSaveJobError (err, callback) { + logger.error('Cannot save new job state.', { error: err }) + return callback(err) + } +} + +// --------------------------------------------------------------------------- + +export { + JobScheduler +} -- cgit v1.2.3