From 94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 25 Jan 2018 15:05:18 +0100 Subject: Move job queue to redis We'll use it as cache in the future. /!\ You'll loose your old jobs (pending jobs too) so upgrade only when you don't have pending job anymore. --- server/controllers/api/jobs.ts | 38 +++++++++++++++++++++++++++----- server/controllers/api/server/follows.ts | 2 +- server/controllers/api/videos/index.ts | 24 +++++++++++--------- 3 files changed, 46 insertions(+), 18 deletions(-) (limited to 'server/controllers') diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index de37dea39..132d110ad 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts @@ -1,22 +1,29 @@ import * as express from 'express' +import { ResultList } from '../../../shared' +import { Job, JobType, JobState } from '../../../shared/models' import { UserRight } from '../../../shared/models/users' -import { getFormattedObjects } from '../../helpers/utils' +import { JobQueue } from '../../lib/job-queue' import { - asyncMiddleware, authenticate, ensureUserHasRight, jobsSortValidator, setDefaultPagination, + asyncMiddleware, + authenticate, + ensureUserHasRight, + jobsSortValidator, + setDefaultPagination, setDefaultSort } from '../../middlewares' import { paginationValidator } from '../../middlewares/validators' -import { JobModel } from '../../models/job/job' +import { listJobsValidator } from '../../middlewares/validators/jobs' const jobsRouter = express.Router() -jobsRouter.get('/', +jobsRouter.get('/:state', authenticate, ensureUserHasRight(UserRight.MANAGE_JOBS), paginationValidator, jobsSortValidator, setDefaultSort, setDefaultPagination, + asyncMiddleware(listJobsValidator), asyncMiddleware(listJobs) ) @@ -29,7 +36,26 @@ export { // --------------------------------------------------------------------------- async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { - const resultList = await JobModel.listForApi(req.query.start, req.query.count, req.query.sort) + const sort = req.query.sort === 'createdAt' ? 'asc' : 'desc' + + const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) + const total = await JobQueue.Instance.count(req.params.state) + + const result: ResultList = { + total, + data: jobs.map(j => formatJob(j.toJSON())) + } + return res.json(result) +} - return res.json(getFormattedObjects(resultList.data, resultList.total)) +function formatJob (job: any): Job { + return { + id: job.id, + state: job.state as JobState, + type: job.type as JobType, + data: job.data, + error: job.error, + createdAt: new Date(parseInt(job.created_at, 10)), + updatedAt: new Date(parseInt(job.updated_at, 10)) + } } diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts index 506b9668e..bb8713e7a 100644 --- a/server/controllers/api/server/follows.ts +++ b/server/controllers/api/server/follows.ts @@ -123,7 +123,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) { actorFollow.ActorFollower = fromActor // Send a notification to remote server - await sendFollow(actorFollow, t) + await sendFollow(actorFollow) }) } diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index c2fdb4f95..459795141 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -12,7 +12,7 @@ import { } from '../../../initializers' import { fetchRemoteVideoDescription, getVideoActivityPubUrl, shareVideoByServerAndChannel } from '../../../lib/activitypub' import { sendCreateVideo, sendCreateViewToOrigin, sendCreateViewToVideoFollowers, sendUpdateVideo } from '../../../lib/activitypub/send' -import { transcodingJobScheduler } from '../../../lib/jobs/transcoding-job-scheduler' +import { JobQueue } from '../../../lib/job-queue' import { asyncMiddleware, authenticate, paginationValidator, setDefaultSort, setDefaultPagination, videosAddValidator, videosGetValidator, videosRemoveValidator, videosSearchValidator, videosSortValidator, videosUpdateValidator @@ -176,18 +176,9 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi ) await Promise.all(tasks) - return sequelizeTypescript.transaction(async t => { + const videoCreated = await sequelizeTypescript.transaction(async t => { const sequelizeOptions = { transaction: t } - if (CONFIG.TRANSCODING.ENABLED === true) { - // Put uuid because we don't have id auto incremented for now - const dataInput = { - videoUUID: video.uuid - } - - await transcodingJobScheduler.createJob(t, 'videoFileOptimizer', dataInput) - } - const videoCreated = await video.save(sequelizeOptions) // Do not forget to add video channel information to the created video videoCreated.VideoChannel = res.locals.videoChannel @@ -216,6 +207,17 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi return videoCreated }) + + if (CONFIG.TRANSCODING.ENABLED === true) { + // Put uuid because we don't have id auto incremented for now + const dataInput = { + videoUUID: videoCreated.uuid + } + + await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) + } + + return videoCreated } async function updateVideoRetryWrapper (req: express.Request, res: express.Response, next: express.NextFunction) { -- cgit v1.2.3