From 94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 25 Jan 2018 15:05:18 +0100 Subject: Move job queue to redis We'll use it as cache in the future. /!\ You'll loose your old jobs (pending jobs too) so upgrade only when you don't have pending job anymore. --- server/controllers/api/jobs.ts | 38 +++++- server/controllers/api/server/follows.ts | 2 +- server/controllers/api/videos/index.ts | 24 ++-- server/helpers/custom-validators/jobs.ts | 14 ++ server/helpers/database-utils.ts | 1 + server/initializers/constants.ts | 49 ++++--- server/initializers/database.ts | 2 - server/initializers/migrations/0100-activitypub.ts | 5 +- .../migrations/0180-job-table-delete.ts | 18 +++ server/lib/activitypub/actor.ts | 59 +++++---- server/lib/activitypub/fetch.ts | 9 +- server/lib/activitypub/process/process-accept.ts | 2 +- server/lib/activitypub/process/process-follow.ts | 2 +- server/lib/activitypub/send/misc.ts | 26 ++-- server/lib/activitypub/send/send-accept.ts | 5 +- server/lib/activitypub/send/send-announce.ts | 2 +- server/lib/activitypub/send/send-create.ts | 22 ++-- server/lib/activitypub/send/send-follow.ts | 5 +- server/lib/activitypub/send/send-like.ts | 2 +- server/lib/activitypub/send/send-undo.ts | 14 +- .../handlers/activitypub-http-broadcast.ts | 49 +++++++ .../job-queue/handlers/activitypub-http-fetcher.ts | 63 +++++++++ .../job-queue/handlers/activitypub-http-unicast.ts | 43 ++++++ .../handlers/utils/activitypub-http-utils.ts | 39 ++++++ server/lib/job-queue/handlers/video-file.ts | 110 ++++++++++++++++ server/lib/job-queue/index.ts | 1 + server/lib/job-queue/job-queue.ts | 124 ++++++++++++++++++ .../activitypub-http-broadcast-handler.ts | 53 -------- .../activitypub-http-fetcher-handler.ts | 68 ---------- .../activitypub-http-job-scheduler.ts | 94 -------------- .../activitypub-http-unicast-handler.ts | 50 ------- .../jobs/activitypub-http-job-scheduler/index.ts | 1 - server/lib/jobs/index.ts | 2 - server/lib/jobs/job-scheduler.ts | 144 --------------------- server/lib/jobs/transcoding-job-scheduler/index.ts | 1 - .../transcoding-job-scheduler.ts | 23 ---- .../video-file-optimizer-handler.ts | 90 ------------- .../video-file-transcoder-handler.ts | 48 ------- server/lib/schedulers/remove-old-jobs-scheduler.ts | 19 +++ server/middlewares/validators/jobs.ts | 23 ++++ server/models/job/job.ts | 80 ------------ server/tests/api/check-params/jobs.ts | 11 +- server/tests/api/server/handle-down.ts | 10 +- server/tests/api/server/jobs.ts | 10 +- server/tests/api/videos/multiple-servers.ts | 21 +-- server/tests/real-world/real-world.ts | 21 ++- server/tests/utils/server/jobs.ts | 9 +- 47 files changed, 697 insertions(+), 811 deletions(-) create mode 100644 server/helpers/custom-validators/jobs.ts create mode 100644 server/initializers/migrations/0180-job-table-delete.ts create mode 100644 server/lib/job-queue/handlers/activitypub-http-broadcast.ts create mode 100644 server/lib/job-queue/handlers/activitypub-http-fetcher.ts create mode 100644 server/lib/job-queue/handlers/activitypub-http-unicast.ts create mode 100644 server/lib/job-queue/handlers/utils/activitypub-http-utils.ts create mode 100644 server/lib/job-queue/handlers/video-file.ts create mode 100644 server/lib/job-queue/index.ts create mode 100644 server/lib/job-queue/job-queue.ts delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts delete mode 100644 server/lib/jobs/activitypub-http-job-scheduler/index.ts delete mode 100644 server/lib/jobs/index.ts delete mode 100644 server/lib/jobs/job-scheduler.ts delete mode 100644 server/lib/jobs/transcoding-job-scheduler/index.ts delete mode 100644 server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts delete mode 100644 server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts delete mode 100644 server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts create mode 100644 server/lib/schedulers/remove-old-jobs-scheduler.ts create mode 100644 server/middlewares/validators/jobs.ts delete mode 100644 server/models/job/job.ts (limited to 'server') diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index de37dea39..132d110ad 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts @@ -1,22 +1,29 @@ import * as express from 'express' +import { ResultList } from '../../../shared' +import { Job, JobType, JobState } from '../../../shared/models' import { UserRight } from '../../../shared/models/users' -import { getFormattedObjects } from '../../helpers/utils' +import { JobQueue } from '../../lib/job-queue' import { - asyncMiddleware, authenticate, ensureUserHasRight, jobsSortValidator, setDefaultPagination, + asyncMiddleware, + authenticate, + ensureUserHasRight, + jobsSortValidator, + setDefaultPagination, setDefaultSort } from '../../middlewares' import { paginationValidator } from '../../middlewares/validators' -import { JobModel } from '../../models/job/job' +import { listJobsValidator } from '../../middlewares/validators/jobs' const jobsRouter = express.Router() -jobsRouter.get('/', +jobsRouter.get('/:state', authenticate, ensureUserHasRight(UserRight.MANAGE_JOBS), paginationValidator, jobsSortValidator, setDefaultSort, setDefaultPagination, + asyncMiddleware(listJobsValidator), asyncMiddleware(listJobs) ) @@ -29,7 +36,26 @@ export { // --------------------------------------------------------------------------- async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { - const resultList = await JobModel.listForApi(req.query.start, req.query.count, req.query.sort) + const sort = req.query.sort === 'createdAt' ? 'asc' : 'desc' + + const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) + const total = await JobQueue.Instance.count(req.params.state) + + const result: ResultList = { + total, + data: jobs.map(j => formatJob(j.toJSON())) + } + return res.json(result) +} - return res.json(getFormattedObjects(resultList.data, resultList.total)) +function formatJob (job: any): Job { + return { + id: job.id, + state: job.state as JobState, + type: job.type as JobType, + data: job.data, + error: job.error, + createdAt: new Date(parseInt(job.created_at, 10)), + updatedAt: new Date(parseInt(job.updated_at, 10)) + } } diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts index 506b9668e..bb8713e7a 100644 --- a/server/controllers/api/server/follows.ts +++ b/server/controllers/api/server/follows.ts @@ -123,7 +123,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) { actorFollow.ActorFollower = fromActor // Send a notification to remote server - await sendFollow(actorFollow, t) + await sendFollow(actorFollow) }) } diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index c2fdb4f95..459795141 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -12,7 +12,7 @@ import { } from '../../../initializers' import { fetchRemoteVideoDescription, getVideoActivityPubUrl, shareVideoByServerAndChannel } from '../../../lib/activitypub' import { sendCreateVideo, sendCreateViewToOrigin, sendCreateViewToVideoFollowers, sendUpdateVideo } from '../../../lib/activitypub/send' -import { transcodingJobScheduler } from '../../../lib/jobs/transcoding-job-scheduler' +import { JobQueue } from '../../../lib/job-queue' import { asyncMiddleware, authenticate, paginationValidator, setDefaultSort, setDefaultPagination, videosAddValidator, videosGetValidator, videosRemoveValidator, videosSearchValidator, videosSortValidator, videosUpdateValidator @@ -176,18 +176,9 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi ) await Promise.all(tasks) - return sequelizeTypescript.transaction(async t => { + const videoCreated = await sequelizeTypescript.transaction(async t => { const sequelizeOptions = { transaction: t } - if (CONFIG.TRANSCODING.ENABLED === true) { - // Put uuid because we don't have id auto incremented for now - const dataInput = { - videoUUID: video.uuid - } - - await transcodingJobScheduler.createJob(t, 'videoFileOptimizer', dataInput) - } - const videoCreated = await video.save(sequelizeOptions) // Do not forget to add video channel information to the created video videoCreated.VideoChannel = res.locals.videoChannel @@ -216,6 +207,17 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi return videoCreated }) + + if (CONFIG.TRANSCODING.ENABLED === true) { + // Put uuid because we don't have id auto incremented for now + const dataInput = { + videoUUID: videoCreated.uuid + } + + await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) + } + + return videoCreated } async function updateVideoRetryWrapper (req: express.Request, res: express.Response, next: express.NextFunction) { diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts new file mode 100644 index 000000000..9700fbd12 --- /dev/null +++ b/server/helpers/custom-validators/jobs.ts @@ -0,0 +1,14 @@ +import { JobState } from '../../../shared/models' +import { exists } from './misc' + +const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ] + +function isValidJobState (value: JobState) { + return exists(value) && jobStates.indexOf(value) !== -1 +} + +// --------------------------------------------------------------------------- + +export { + isValidJobState +} diff --git a/server/helpers/database-utils.ts b/server/helpers/database-utils.ts index 78ca768b9..b4adaf9cc 100644 --- a/server/helpers/database-utils.ts +++ b/server/helpers/database-utils.ts @@ -16,6 +16,7 @@ function retryTransactionWrapper ( .catch(err => callback(err)) }) .catch(err => { + console.error(err) logger.error(options.errorMessage, err) throw err }) diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index cb043251a..329d0ffe8 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -1,6 +1,6 @@ import { IConfig } from 'config' import { dirname, join } from 'path' -import { JobCategory, JobState, VideoRateType } from '../../shared/models' +import { JobType, VideoRateType } from '../../shared/models' import { ActivityPubActorType } from '../../shared/models/activitypub' import { FollowState } from '../../shared/models/actors' import { VideoPrivacy } from '../../shared/models/videos' @@ -12,7 +12,7 @@ let config: IConfig = require('config') // --------------------------------------------------------------------------- -const LAST_MIGRATION_VERSION = 175 +const LAST_MIGRATION_VERSION = 180 // --------------------------------------------------------------------------- @@ -26,7 +26,7 @@ const PAGINATION_COUNT_DEFAULT = 15 const SORTABLE_COLUMNS = { USERS: [ 'id', 'username', 'createdAt' ], ACCOUNTS: [ 'createdAt' ], - JOBS: [ 'id', 'createdAt' ], + JOBS: [ 'createdAt' ], VIDEO_ABUSES: [ 'id', 'createdAt' ], VIDEO_CHANNELS: [ 'id', 'name', 'updatedAt', 'createdAt' ], VIDEOS: [ 'name', 'duration', 'createdAt', 'views', 'likes' ], @@ -61,23 +61,20 @@ const REMOTE_SCHEME = { WS: 'wss' } -const JOB_STATES: { [ id: string ]: JobState } = { - PENDING: 'pending', - PROCESSING: 'processing', - ERROR: 'error', - SUCCESS: 'success' -} -const JOB_CATEGORIES: { [ id: string ]: JobCategory } = { - TRANSCODING: 'transcoding', - ACTIVITYPUB_HTTP: 'activitypub-http' +const JOB_ATTEMPTS: { [ id in JobType ]: number } = { + 'activitypub-http-broadcast': 5, + 'activitypub-http-unicast': 5, + 'activitypub-http-fetcher': 5, + 'video-file': 1 } -// How many maximum jobs we fetch from the database per cycle -const JOBS_FETCH_LIMIT_PER_CYCLE = { - transcoding: 10, - httpRequest: 20 +const JOB_CONCURRENCY: { [ id in JobType ]: number } = { + 'activitypub-http-broadcast': 1, + 'activitypub-http-unicast': 5, + 'activitypub-http-fetcher': 1, + 'video-file': 1 } -// 1 minutes -let JOBS_FETCHING_INTERVAL = 60000 +// 2 days +const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 1 hour let SCHEDULER_INTERVAL = 60000 * 60 @@ -96,6 +93,11 @@ const CONFIG = { USERNAME: config.get('database.username'), PASSWORD: config.get('database.password') }, + REDIS: { + HOSTNAME: config.get('redis.hostname'), + PORT: config.get('redis.port'), + AUTH: config.get('redis.auth') + }, STORAGE: { AVATARS_DIR: buildPath(config.get('storage.avatars')), LOG_DIR: buildPath(config.get('storage.logs')), @@ -284,7 +286,6 @@ const ACTIVITY_PUB = { PUBLIC: 'https://www.w3.org/ns/activitystreams#Public', COLLECTION_ITEMS_PER_PAGE: 10, FETCH_PAGE_LIMIT: 100, - MAX_HTTP_ATTEMPT: 5, URL_MIME_TYPES: { VIDEO: Object.keys(VIDEO_MIMETYPE_EXT), TORRENT: [ 'application/x-bittorrent' ], @@ -358,7 +359,6 @@ const OPENGRAPH_AND_OEMBED_COMMENT = '' // Special constants for a test instance if (isTestInstance() === true) { ACTOR_FOLLOW_SCORE.BASE = 20 - JOBS_FETCHING_INTERVAL = 1000 REMOTE_SCHEME.HTTP = 'http' REMOTE_SCHEME.WS = 'ws' STATIC_MAX_AGE = '0' @@ -381,10 +381,8 @@ export { CONFIG, CONSTRAINTS_FIELDS, EMBED_SIZE, - JOB_STATES, - JOBS_FETCH_LIMIT_PER_CYCLE, - JOBS_FETCHING_INTERVAL, - JOB_CATEGORIES, + JOB_CONCURRENCY, + JOB_ATTEMPTS, LAST_MIGRATION_VERSION, OAUTH_LIFETIME, OPENGRAPH_AND_OEMBED_COMMENT, @@ -408,7 +406,8 @@ export { VIDEO_RATE_TYPES, VIDEO_MIMETYPE_EXT, AVATAR_MIMETYPE_EXT, - SCHEDULER_INTERVAL + SCHEDULER_INTERVAL, + JOB_COMPLETED_LIFETIME } // --------------------------------------------------------------------------- diff --git a/server/initializers/database.ts b/server/initializers/database.ts index 852db68a0..b537ee59a 100644 --- a/server/initializers/database.ts +++ b/server/initializers/database.ts @@ -9,7 +9,6 @@ import { ActorModel } from '../models/activitypub/actor' import { ActorFollowModel } from '../models/activitypub/actor-follow' import { ApplicationModel } from '../models/application/application' import { AvatarModel } from '../models/avatar/avatar' -import { JobModel } from '../models/job/job' import { OAuthClientModel } from '../models/oauth/oauth-client' import { OAuthTokenModel } from '../models/oauth/oauth-token' import { ServerModel } from '../models/server/server' @@ -61,7 +60,6 @@ async function initDatabaseModels (silent: boolean) { ActorFollowModel, AvatarModel, AccountModel, - JobModel, OAuthClientModel, OAuthTokenModel, ServerModel, diff --git a/server/initializers/migrations/0100-activitypub.ts b/server/initializers/migrations/0100-activitypub.ts index 8c5198f85..a7ebd804c 100644 --- a/server/initializers/migrations/0100-activitypub.ts +++ b/server/initializers/migrations/0100-activitypub.ts @@ -1,11 +1,10 @@ -import { values } from 'lodash' import * as Sequelize from 'sequelize' import { createPrivateAndPublicKeys } from '../../helpers/peertube-crypto' import { shareVideoByServerAndChannel } from '../../lib/activitypub/share' import { getVideoActivityPubUrl, getVideoChannelActivityPubUrl } from '../../lib/activitypub/url' import { createLocalAccountWithoutKeys } from '../../lib/user' import { ApplicationModel } from '../../models/application/application' -import { JOB_CATEGORIES, SERVER_ACTOR_NAME } from '../constants' +import { SERVER_ACTOR_NAME } from '../constants' async function up (utils: { transaction: Sequelize.Transaction, @@ -161,7 +160,7 @@ async function up (utils: { { const data = { - type: Sequelize.ENUM(values(JOB_CATEGORIES)), + type: Sequelize.ENUM('transcoding', 'activitypub-http'), defaultValue: 'transcoding', allowNull: false } diff --git a/server/initializers/migrations/0180-job-table-delete.ts b/server/initializers/migrations/0180-job-table-delete.ts new file mode 100644 index 000000000..df29145d0 --- /dev/null +++ b/server/initializers/migrations/0180-job-table-delete.ts @@ -0,0 +1,18 @@ +import * as Sequelize from 'sequelize' + +async function up (utils: { + transaction: Sequelize.Transaction, + queryInterface: Sequelize.QueryInterface, + sequelize: Sequelize.Sequelize +}): Promise { + await utils.queryInterface.dropTable('job') +} + +function down (options) { + throw new Error('Not implemented.') +} + +export { + up, + down +} diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts index c708b38ba..712de7d0d 100644 --- a/server/lib/activitypub/actor.ts +++ b/server/lib/activitypub/actor.ts @@ -64,7 +64,11 @@ async function getOrCreateActorAndServerAndModel (actorUrl: string, recurseIfNee actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options) } - return refreshActorIfNeeded(actor) + const options = { + arguments: [ actor ], + errorMessage: 'Cannot refresh actor if needed with many retries.' + } + return retryTransactionWrapper(refreshActorIfNeeded, options) } function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) { @@ -325,38 +329,43 @@ async function saveVideoChannel (actor: ActorModel, result: FetchRemoteActorResu async function refreshActorIfNeeded (actor: ActorModel) { if (!actor.isOutdated()) return actor - const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost()) - const result = await fetchRemoteActor(actorUrl) - if (result === undefined) { - logger.warn('Cannot fetch remote actor in refresh actor.') - return actor - } - - return sequelizeTypescript.transaction(async t => { - updateInstanceWithAnother(actor, result.actor) - - if (result.avatarName !== undefined) { - await updateActorAvatarInstance(actor, result.avatarName, t) + try { + const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost()) + const result = await fetchRemoteActor(actorUrl) + if (result === undefined) { + logger.warn('Cannot fetch remote actor in refresh actor.') + return actor } - // Force update - actor.setDataValue('updatedAt', new Date()) - await actor.save({ transaction: t }) + return sequelizeTypescript.transaction(async t => { + updateInstanceWithAnother(actor, result.actor) - if (actor.Account) { - await actor.save({ transaction: t }) + if (result.avatarName !== undefined) { + await updateActorAvatarInstance(actor, result.avatarName, t) + } - actor.Account.set('name', result.name) - await actor.Account.save({ transaction: t }) - } else if (actor.VideoChannel) { + // Force update + actor.setDataValue('updatedAt', new Date()) await actor.save({ transaction: t }) - actor.VideoChannel.set('name', result.name) - await actor.VideoChannel.save({ transaction: t }) - } + if (actor.Account) { + await actor.save({ transaction: t }) + + actor.Account.set('name', result.name) + await actor.Account.save({ transaction: t }) + } else if (actor.VideoChannel) { + await actor.save({ transaction: t }) + + actor.VideoChannel.set('name', result.name) + await actor.VideoChannel.save({ transaction: t }) + } + return actor + }) + } catch (err) { + logger.warn('Cannot refresh actor.', err) return actor - }) + } } function normalizeActor (actor: any) { diff --git a/server/lib/activitypub/fetch.ts b/server/lib/activitypub/fetch.ts index 4fc97cc38..b1b370a1a 100644 --- a/server/lib/activitypub/fetch.ts +++ b/server/lib/activitypub/fetch.ts @@ -1,13 +1,12 @@ -import { Transaction } from 'sequelize' import { ActorModel } from '../../models/activitypub/actor' -import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler' +import { JobQueue } from '../job-queue' -async function addFetchOutboxJob (actor: ActorModel, t: Transaction) { - const jobPayload: ActivityPubHttpPayload = { +async function addFetchOutboxJob (actor: ActorModel) { + const payload = { uris: [ actor.outboxUrl ] } - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) } export { diff --git a/server/lib/activitypub/process/process-accept.ts b/server/lib/activitypub/process/process-accept.ts index 551f09ea7..7db2f8ff0 100644 --- a/server/lib/activitypub/process/process-accept.ts +++ b/server/lib/activitypub/process/process-accept.ts @@ -26,6 +26,6 @@ async function processAccept (actor: ActorModel, targetActor: ActorModel) { if (follow.state !== 'accepted') { follow.set('state', 'accepted') await follow.save() - await addFetchOutboxJob(targetActor, undefined) + await addFetchOutboxJob(targetActor) } } diff --git a/server/lib/activitypub/process/process-follow.ts b/server/lib/activitypub/process/process-follow.ts index 69f5c51b5..dc1d542b5 100644 --- a/server/lib/activitypub/process/process-follow.ts +++ b/server/lib/activitypub/process/process-follow.ts @@ -63,7 +63,7 @@ async function follow (actor: ActorModel, targetActorURL: string) { actorFollow.ActorFollowing = targetActor // Target sends to actor he accepted the follow request - return sendAccept(actorFollow, t) + return sendAccept(actorFollow) }) logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url) diff --git a/server/lib/activitypub/send/misc.ts b/server/lib/activitypub/send/misc.ts index dc0d3de57..7a21f0c94 100644 --- a/server/lib/activitypub/send/misc.ts +++ b/server/lib/activitypub/send/misc.ts @@ -7,7 +7,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { VideoModel } from '../../../models/video/video' import { VideoCommentModel } from '../../../models/video/video-comment' import { VideoShareModel } from '../../../models/video/video-share' -import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../../jobs/activitypub-http-job-scheduler' +import { JobQueue } from '../../job-queue' async function forwardActivity ( activity: Activity, @@ -35,12 +35,11 @@ async function forwardActivity ( logger.debug('Creating forwarding job.', { uris }) - const jobPayload: ActivityPubHttpPayload = { + const payload = { uris, body: activity } - - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) } async function broadcastToFollowers ( @@ -51,44 +50,43 @@ async function broadcastToFollowers ( actorsException: ActorModel[] = [] ) { const uris = await computeFollowerUris(toActorFollowers, actorsException, t) - return broadcastTo(uris, data, byActor, t) + return broadcastTo(uris, data, byActor) } async function broadcastToActors ( data: any, byActor: ActorModel, toActors: ActorModel[], - t: Transaction, actorsException: ActorModel[] = [] ) { const uris = await computeUris(toActors, actorsException) - return broadcastTo(uris, data, byActor, t) + return broadcastTo(uris, data, byActor) } -async function broadcastTo (uris: string[], data: any, byActor: ActorModel, t: Transaction) { +async function broadcastTo (uris: string[], data: any, byActor: ActorModel) { if (uris.length === 0) return undefined logger.debug('Creating broadcast job.', { uris }) - const jobPayload: ActivityPubHttpPayload = { + const payload = { uris, signatureActorId: byActor.id, body: data } - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) } -async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string, t: Transaction) { +async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string) { logger.debug('Creating unicast job.', { uri: toActorUrl }) - const jobPayload: ActivityPubHttpPayload = { - uris: [ toActorUrl ], + const payload = { + uri: toActorUrl, signatureActorId: byActor.id, body: data } - return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload) + return JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) } function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) { diff --git a/server/lib/activitypub/send/send-accept.ts b/server/lib/activitypub/send/send-accept.ts index 4eaa329d9..064fd88d2 100644 --- a/server/lib/activitypub/send/send-accept.ts +++ b/server/lib/activitypub/send/send-accept.ts @@ -1,4 +1,3 @@ -import { Transaction } from 'sequelize' import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub' import { ActorModel } from '../../../models/activitypub/actor' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' @@ -6,7 +5,7 @@ import { getActorFollowAcceptActivityPubUrl, getActorFollowActivityPubUrl } from import { unicastTo } from './misc' import { followActivityData } from './send-follow' -async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { +async function sendAccept (actorFollow: ActorFollowModel) { const follower = actorFollow.ActorFollower const me = actorFollow.ActorFollowing @@ -16,7 +15,7 @@ async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { const url = getActorFollowAcceptActivityPubUrl(actorFollow) const data = acceptActivityData(url, me, followData) - return unicastTo(data, me, follower.inboxUrl, t) + return unicastTo(data, me, follower.inboxUrl) } // --------------------------------------------------------------------------- diff --git a/server/lib/activitypub/send/send-announce.ts b/server/lib/activitypub/send/send-announce.ts index 578fbc630..93b5668d2 100644 --- a/server/lib/activitypub/send/send-announce.ts +++ b/server/lib/activitypub/send/send-announce.ts @@ -42,7 +42,7 @@ async function sendVideoAnnounceToOrigin (byActor: ActorModel, video: VideoModel const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) const data = await createActivityData(url, byActor, announcedActivity, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function announceActivityData ( diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index 9db663be1..b92615e9b 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts @@ -8,8 +8,14 @@ import { VideoAbuseModel } from '../../../models/video/video-abuse' import { VideoCommentModel } from '../../../models/video/video-comment' import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url' import { - audiencify, broadcastToActors, broadcastToFollowers, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience, - getOriginVideoAudience, getOriginVideoCommentAudience, + audiencify, + broadcastToActors, + broadcastToFollowers, + getActorsInvolvedInVideo, + getAudience, + getObjectFollowersAudience, + getOriginVideoAudience, + getOriginVideoCommentAudience, unicastTo } from './misc' @@ -31,7 +37,7 @@ async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel, const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] } const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) { @@ -47,13 +53,13 @@ async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Tr // This was a reply, send it to the parent actors const actorsException = [ byActor ] - await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) + await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) // Broadcast to our followers await broadcastToFollowers(data, byActor, [ byActor ], t) // Send to origin - return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) { @@ -69,7 +75,7 @@ async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentMode // This was a reply, send it to the parent actors const actorsException = [ byActor ] - await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) + await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) // Broadcast to our followers await broadcastToFollowers(data, byActor, [ byActor ], t) @@ -86,7 +92,7 @@ async function sendCreateViewToOrigin (byActor: ActorModel, video: VideoModel, t const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) const data = await createActivityData(url, byActor, viewActivityData, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { @@ -111,7 +117,7 @@ async function sendCreateDislikeToOrigin (byActor: ActorModel, video: VideoModel const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) const data = await createActivityData(url, byActor, dislikeActivityData, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { diff --git a/server/lib/activitypub/send/send-follow.ts b/server/lib/activitypub/send/send-follow.ts index eac60e94f..4e9865af4 100644 --- a/server/lib/activitypub/send/send-follow.ts +++ b/server/lib/activitypub/send/send-follow.ts @@ -1,18 +1,17 @@ -import { Transaction } from 'sequelize' import { ActivityFollow } from '../../../../shared/models/activitypub' import { ActorModel } from '../../../models/activitypub/actor' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { getActorFollowActivityPubUrl } from '../url' import { unicastTo } from './misc' -function sendFollow (actorFollow: ActorFollowModel, t: Transaction) { +function sendFollow (actorFollow: ActorFollowModel) { const me = actorFollow.ActorFollower const following = actorFollow.ActorFollowing const url = getActorFollowActivityPubUrl(actorFollow) const data = followActivityData(url, me, following) - return unicastTo(data, me, following.inboxUrl, t) + return unicastTo(data, me, following.inboxUrl) } function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { diff --git a/server/lib/activitypub/send/send-like.ts b/server/lib/activitypub/send/send-like.ts index 743646455..78ed1aaf2 100644 --- a/server/lib/activitypub/send/send-like.ts +++ b/server/lib/activitypub/send/send-like.ts @@ -20,7 +20,7 @@ async function sendLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Tran const audience = getOriginVideoAudience(video, accountsInvolvedInVideo) const data = await likeActivityData(url, byActor, video, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { diff --git a/server/lib/activitypub/send/send-undo.ts b/server/lib/activitypub/send/send-undo.ts index 3a0597fba..4a08b5ca1 100644 --- a/server/lib/activitypub/send/send-undo.ts +++ b/server/lib/activitypub/send/send-undo.ts @@ -1,11 +1,5 @@ import { Transaction } from 'sequelize' -import { - ActivityAudience, - ActivityCreate, - ActivityFollow, - ActivityLike, - ActivityUndo -} from '../../../../shared/models/activitypub' +import { ActivityAudience, ActivityCreate, ActivityFollow, ActivityLike, ActivityUndo } from '../../../../shared/models/activitypub' import { ActorModel } from '../../../models/activitypub/actor' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { VideoModel } from '../../../models/video/video' @@ -33,7 +27,7 @@ async function sendUndoFollow (actorFollow: ActorFollowModel, t: Transaction) { const object = followActivityData(followUrl, me, following) const data = await undoActivityData(undoUrl, me, object, t) - return unicastTo(data, me, following.inboxUrl, t) + return unicastTo(data, me, following.inboxUrl) } async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) { @@ -45,7 +39,7 @@ async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: const object = await likeActivityData(likeUrl, byActor, video, t) const data = await undoActivityData(undoUrl, byActor, object, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { @@ -72,7 +66,7 @@ async function sendUndoDislikeToOrigin (byActor: ActorModel, video: VideoModel, const data = await undoActivityData(undoUrl, byActor, object, t, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) + return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts new file mode 100644 index 000000000..159856cda --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -0,0 +1,49 @@ +import * as kue from 'kue' +import { logger } from '../../../helpers/logger' +import { doRequest } from '../../../helpers/requests' +import { ActorFollowModel } from '../../../models/activitypub/actor-follow' +import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' + +export type ActivitypubHttpBroadcastPayload = { + uris: string[] + signatureActorId?: number + body: any +} + +async function processActivityPubHttpBroadcast (job: kue.Job) { + logger.info('Processing ActivityPub broadcast in job %d.', job.id) + + const payload = job.data as ActivitypubHttpBroadcastPayload + + const body = await computeBody(payload) + const httpSignatureOptions = await buildSignedRequestOptions(payload) + + const options = { + method: 'POST', + uri: '', + json: body, + httpSignature: httpSignatureOptions + } + + const badUrls: string[] = [] + const goodUrls: string[] = [] + + for (const uri of payload.uris) { + options.uri = uri + + try { + await doRequest(options) + goodUrls.push(uri) + } catch (err) { + badUrls.push(uri) + } + } + + return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) +} + +// --------------------------------------------------------------------------- + +export { + processActivityPubHttpBroadcast +} diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts new file mode 100644 index 000000000..062211c85 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -0,0 +1,63 @@ +import * as kue from 'kue' +import { logger } from '../../../helpers/logger' +import { doRequest } from '../../../helpers/requests' +import { ACTIVITY_PUB } from '../../../initializers' +import { processActivities } from '../../activitypub/process' +import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' + +export type ActivitypubHttpFetcherPayload = { + uris: string[] +} + +async function processActivityPubHttpFetcher (job: kue.Job) { + logger.info('Processing ActivityPub fetcher in job %d.', job.id) + + const payload = job.data as ActivitypubHttpBroadcastPayload + + const options = { + method: 'GET', + uri: '', + json: true, + activityPub: true + } + + for (const uri of payload.uris) { + options.uri = uri + logger.info('Fetching ActivityPub data on %s.', uri) + + const response = await doRequest(options) + const firstBody = response.body + + if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { + const activities = firstBody.first.orderedItems + + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) + + await processActivities(activities) + } + + let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT + let i = 0 + let nextLink = firstBody.first.next + while (nextLink && i < limit) { + options.uri = nextLink + + const { body } = await doRequest(options) + nextLink = body.next + i++ + + if (Array.isArray(body.orderedItems)) { + const activities = body.orderedItems + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) + + await processActivities(activities) + } + } + } +} + +// --------------------------------------------------------------------------- + +export { + processActivityPubHttpFetcher +} diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts new file mode 100644 index 000000000..9b4188c50 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -0,0 +1,43 @@ +import * as kue from 'kue' +import { logger } from '../../../helpers/logger' +import { doRequest } from '../../../helpers/requests' +import { ActorFollowModel } from '../../../models/activitypub/actor-follow' +import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' + +export type ActivitypubHttpUnicastPayload = { + uri: string + signatureActorId?: number + body: any +} + +async function processActivityPubHttpUnicast (job: kue.Job) { + logger.info('Processing ActivityPub unicast in job %d.', job.id) + + const payload = job.data as ActivitypubHttpUnicastPayload + const uri = payload.uri + + const body = await computeBody(payload) + const httpSignatureOptions = await buildSignedRequestOptions(payload) + + const options = { + method: 'POST', + uri, + json: body, + httpSignature: httpSignatureOptions + } + + try { + await doRequest(options) + ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) + } catch (err) { + ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) + + throw err + } +} + +// --------------------------------------------------------------------------- + +export { + processActivityPubHttpUnicast +} diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts new file mode 100644 index 000000000..c087371c6 --- /dev/null +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts @@ -0,0 +1,39 @@ +import { buildSignedActivity } from '../../../../helpers/activitypub' +import { getServerActor } from '../../../../helpers/utils' +import { ActorModel } from '../../../../models/activitypub/actor' + +async function computeBody (payload: { body: any, signatureActorId?: number }) { + let body = payload.body + + if (payload.signatureActorId) { + const actorSignature = await ActorModel.load(payload.signatureActorId) + if (!actorSignature) throw new Error('Unknown signature actor id.') + body = await buildSignedActivity(actorSignature, payload.body) + } + + return body +} + +async function buildSignedRequestOptions (payload: { signatureActorId?: number }) { + let actor: ActorModel + if (payload.signatureActorId) { + actor = await ActorModel.load(payload.signatureActorId) + if (!actor) throw new Error('Unknown signature actor id.') + } else { + // We need to sign the request, so use the server + actor = await getServerActor() + } + + const keyId = actor.getWebfingerUrl() + return { + algorithm: 'rsa-sha256', + authorizationHeaderName: 'Signature', + keyId, + key: actor.privateKey + } +} + +export { + computeBody, + buildSignedRequestOptions +} diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts new file mode 100644 index 000000000..5294483bd --- /dev/null +++ b/server/lib/job-queue/handlers/video-file.ts @@ -0,0 +1,110 @@ +import * as kue from 'kue' +import { VideoResolution } from '../../../../shared' +import { VideoPrivacy } from '../../../../shared/models/videos' +import { logger } from '../../../helpers/logger' +import { computeResolutionsToTranscode } from '../../../helpers/utils' +import { sequelizeTypescript } from '../../../initializers' +import { VideoModel } from '../../../models/video/video' +import { shareVideoByServerAndChannel } from '../../activitypub' +import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send' +import { JobQueue } from '../job-queue' + +export type VideoFilePayload = { + videoUUID: string + resolution?: VideoResolution +} + +async function processVideoFile (job: kue.Job) { + const payload = job.data as VideoFilePayload + logger.info('Processing video file in job %d.', job.id) + + const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID) + // No video, maybe deleted? + if (!video) { + logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid }) + return undefined + } + + // Transcoding in other resolution + if (payload.resolution) { + await video.transcodeOriginalVideofile(payload.resolution) + await onVideoFileTranscoderSuccess(video) + } else { + await video.optimizeOriginalVideofile() + await onVideoFileOptimizerSuccess(video) + } + + return video +} + +async function onVideoFileTranscoderSuccess (video: VideoModel) { + if (video === undefined) return undefined + + // Maybe the video changed in database, refresh it + const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) + // Video does not exist anymore + if (!videoDatabase) return undefined + + if (video.privacy !== VideoPrivacy.PRIVATE) { + await sendUpdateVideo(video, undefined) + } + + return undefined +} + +async function onVideoFileOptimizerSuccess (video: VideoModel) { + if (video === undefined) return undefined + + // Maybe the video changed in database, refresh it + const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) + // Video does not exist anymore + if (!videoDatabase) return undefined + + if (video.privacy !== VideoPrivacy.PRIVATE) { + // Now we'll add the video's meta data to our followers + await sendCreateVideo(video, undefined) + await shareVideoByServerAndChannel(video, undefined) + } + + const originalFileHeight = await videoDatabase.getOriginalFileHeight() + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) + logger.info( + 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight, + { resolutions: resolutionsEnabled } + ) + + if (resolutionsEnabled.length !== 0) { + try { + await sequelizeTypescript.transaction(async t => { + const tasks: Promise[] = [] + + for (const resolution of resolutionsEnabled) { + const dataInput = { + videoUUID: videoDatabase.uuid, + resolution + } + + const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) + tasks.push(p) + } + + await Promise.all(tasks) + }) + + logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) + } catch (err) { + logger.warn('Cannot transcode the video.', err) + } + } else { + logger.info('No transcoding jobs created for video %s (no resolutions enabled).') + return undefined + } +} + +// --------------------------------------------------------------------------- + +export { + processVideoFile +} diff --git a/server/lib/job-queue/index.ts b/server/lib/job-queue/index.ts new file mode 100644 index 000000000..57231e649 --- /dev/null +++ b/server/lib/job-queue/index.ts @@ -0,0 +1 @@ +export * from './job-queue' diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts new file mode 100644 index 000000000..7a2b6c78d --- /dev/null +++ b/server/lib/job-queue/job-queue.ts @@ -0,0 +1,124 @@ +import * as kue from 'kue' +import { JobType, JobState } from '../../../shared/models' +import { logger } from '../../helpers/logger' +import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' +import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' +import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' +import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' +import { processVideoFile, VideoFilePayload } from './handlers/video-file' + +type CreateJobArgument = + { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | + { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | + { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | + { type: 'video-file', payload: VideoFilePayload } + +const handlers: { [ id in JobType ]: (job: kue.Job) => Promise} = { + 'activitypub-http-broadcast': processActivityPubHttpBroadcast, + 'activitypub-http-unicast': processActivityPubHttpUnicast, + 'activitypub-http-fetcher': processActivityPubHttpFetcher, + 'video-file': processVideoFile +} + +class JobQueue { + + private static instance: JobQueue + + private jobQueue: kue.Queue + private initialized = false + + private constructor () {} + + init () { + // Already initialized + if (this.initialized === true) return + this.initialized = true + + this.jobQueue = kue.createQueue({ + prefix: 'q-' + CONFIG.WEBSERVER.HOST, + redis: { + host: CONFIG.REDIS.HOSTNAME, + port: CONFIG.REDIS.PORT, + auth: CONFIG.REDIS.AUTH + } + }) + + this.jobQueue.on('error', err => { + logger.error('Error in job queue.', err) + process.exit(-1) + }) + this.jobQueue.watchStuckJobs(5000) + + for (const handlerName of Object.keys(handlers)) { + this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { + try { + const res = await handlers[ handlerName ](job) + return done(null, res) + } catch (err) { + return done(err) + } + }) + } + } + + createJob (obj: CreateJobArgument, priority = 'normal') { + return new Promise((res, rej) => { + this.jobQueue + .create(obj.type, obj.payload) + .priority(priority) + .attempts(JOB_ATTEMPTS[obj.type]) + .backoff({ type: 'exponential' }) + .save(err => { + if (err) return rej(err) + + return res() + }) + }) + } + + listForApi (state: JobState, start: number, count: number, sort: string) { + return new Promise((res, rej) => { + kue.Job.rangeByState(state, start, count, sort, (err, jobs) => { + if (err) return rej(err) + + return res(jobs) + }) + }) + } + + count (state: JobState) { + return new Promise((res, rej) => { + this.jobQueue[state + 'Count']((err, total) => { + if (err) return rej(err) + + return res(total) + }) + }) + } + + removeOldJobs () { + const now = new Date().getTime() + kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { + if (err) { + logger.error('Cannot get jobs when removing old jobs.', err) + return + } + + for (const job of jobs) { + if (now - job.created_at > JOB_COMPLETED_LIFETIME) { + job.remove() + } + } + }) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} + +// --------------------------------------------------------------------------- + +export { + JobQueue +} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts deleted file mode 100644 index 3f780e319..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' -import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' - -async function process (payload: ActivityPubHttpPayload, jobId: number) { - logger.info('Processing ActivityPub broadcast in job %d.', jobId) - - const body = await computeBody(payload) - const httpSignatureOptions = await buildSignedRequestOptions(payload) - - const options = { - method: 'POST', - uri: '', - json: body, - httpSignature: httpSignatureOptions - } - - const badUrls: string[] = [] - const goodUrls: string[] = [] - - for (const uri of payload.uris) { - options.uri = uri - - try { - await doRequest(options) - goodUrls.push(uri) - } catch (err) { - const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) - if (isRetryingLater === false) badUrls.push(uri) - } - } - - return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) -} - -function onError (err: Error, jobId: number) { - logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err) - return Promise.resolve() -} - -function onSuccess (jobId: number) { - logger.info('Job %d is a success.', jobId) - return Promise.resolve() -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts deleted file mode 100644 index a7b5aabd0..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { ACTIVITY_PUB } from '../../../initializers' -import { processActivities } from '../../activitypub/process' -import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' - -async function process (payload: ActivityPubHttpPayload, jobId: number) { - logger.info('Processing ActivityPub fetcher in job %d.', jobId) - - const options = { - method: 'GET', - uri: '', - json: true, - activityPub: true - } - - for (const uri of payload.uris) { - options.uri = uri - logger.info('Fetching ActivityPub data on %s.', uri) - - const response = await doRequest(options) - const firstBody = response.body - - if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { - const activities = firstBody.first.orderedItems - - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) - - await processActivities(activities) - } - - let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT - let i = 0 - let nextLink = firstBody.first.next - while (nextLink && i < limit) { - options.uri = nextLink - - const { body } = await doRequest(options) - nextLink = body.next - i++ - - if (Array.isArray(body.orderedItems)) { - const activities = body.orderedItems - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) - - await processActivities(activities) - } - } - } -} - -function onError (err: Error, jobId: number) { - logger.error('Error when fetcher ActivityPub request in job %d.', jobId, err) - return Promise.resolve() -} - -function onSuccess (jobId: number) { - logger.info('Job %d is a success.', jobId) - return Promise.resolve() -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts deleted file mode 100644 index 4459152db..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { JobCategory } from '../../../../shared' -import { buildSignedActivity } from '../../../helpers/activitypub' -import { logger } from '../../../helpers/logger' -import { getServerActor } from '../../../helpers/utils' -import { ACTIVITY_PUB } from '../../../initializers' -import { ActorModel } from '../../../models/activitypub/actor' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' -import { JobHandler, JobScheduler } from '../job-scheduler' - -import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' -import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' -import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' - -type ActivityPubHttpPayload = { - uris: string[] - signatureActorId?: number - body?: any - attemptNumber?: number -} - -const jobHandlers: { [ handlerName: string ]: JobHandler } = { - activitypubHttpBroadcastHandler, - activitypubHttpUnicastHandler, - activitypubHttpFetcherHandler -} -const jobCategory: JobCategory = 'activitypub-http' - -const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers) - -async function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) { - logger.warn('Cannot make request to %s.', uri, err) - - let attemptNumber = payload.attemptNumber || 1 - attemptNumber += 1 - - if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) { - logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err) - - const actor = await ActorFollowModel.loadByFollowerInbox(uri, undefined) - if (!actor) { - logger.debug('Actor %s is not a follower, do not retry the request.', uri) - return false - } - - const newPayload = Object.assign(payload, { - uris: [ uri ], - attemptNumber - }) - await activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload) - - return true - } - - return false -} - -async function computeBody (payload: ActivityPubHttpPayload) { - let body = payload.body - - if (payload.signatureActorId) { - const actorSignature = await ActorModel.load(payload.signatureActorId) - if (!actorSignature) throw new Error('Unknown signature actor id.') - body = await buildSignedActivity(actorSignature, payload.body) - } - - return body -} - -async function buildSignedRequestOptions (payload: ActivityPubHttpPayload) { - let actor: ActorModel - if (payload.signatureActorId) { - actor = await ActorModel.load(payload.signatureActorId) - if (!actor) throw new Error('Unknown signature actor id.') - } else { - // We need to sign the request, so use the server - actor = await getServerActor() - } - - const keyId = actor.getWebfingerUrl() - return { - algorithm: 'rsa-sha256', - authorizationHeaderName: 'Signature', - keyId, - key: actor.privateKey - } -} - -export { - ActivityPubHttpPayload, - activitypubHttpJobScheduler, - maybeRetryRequestLater, - computeBody, - buildSignedRequestOptions -} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts deleted file mode 100644 index 54a7504e8..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' -import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' - -async function process (payload: ActivityPubHttpPayload, jobId: number) { - logger.info('Processing ActivityPub unicast in job %d.', jobId) - - const uri = payload.uris[0] - - const body = await computeBody(payload) - const httpSignatureOptions = await buildSignedRequestOptions(payload) - - const options = { - method: 'POST', - uri, - json: body, - httpSignature: httpSignatureOptions - } - - try { - await doRequest(options) - ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) - } catch (err) { - const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) - if (isRetryingLater === false) { - ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) - } - - throw err - } -} - -function onError (err: Error, jobId: number) { - logger.error('Error when sending ActivityPub request in job %d.', jobId, err) - return Promise.resolve() -} - -function onSuccess (jobId: number) { - logger.info('Job %d is a success.', jobId) - return Promise.resolve() -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts deleted file mode 100644 index ad8f527b4..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './activitypub-http-job-scheduler' diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts deleted file mode 100644 index 394264ec1..000000000 --- a/server/lib/jobs/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './activitypub-http-job-scheduler' -export * from './transcoding-job-scheduler' diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts deleted file mode 100644 index 9d55880e6..000000000 --- a/server/lib/jobs/job-scheduler.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { AsyncQueue, forever, queue } from 'async' -import * as Sequelize from 'sequelize' -import { JobCategory } from '../../../shared' -import { logger } from '../../helpers/logger' -import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' -import { JobModel } from '../../models/job/job' - -export interface JobHandler { - process (data: object, jobId: number): Promise - onError (err: Error, jobId: number) - onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler): Promise -} -type JobQueueCallback = (err: Error) => void - -class JobScheduler { - - constructor ( - private jobCategory: JobCategory, - private jobHandlers: { [ id: string ]: JobHandler } - ) {} - - async activate () { - const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory] - - logger.info('Jobs scheduler %s activated.', this.jobCategory) - - const jobsQueue = queue(this.processJob.bind(this)) - - // Finish processing jobs from a previous start - const state = JOB_STATES.PROCESSING - try { - const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) - - this.enqueueJobs(jobsQueue, jobs) - } catch (err) { - logger.error('Cannot list pending jobs.', err) - } - - forever( - async next => { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, JOBS_FETCHING_INTERVAL) - } - - const state = JOB_STATES.PENDING - try { - const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) - - this.enqueueJobs(jobsQueue, jobs) - } catch (err) { - logger.error('Cannot list pending jobs.', err) - } - - // Optimization: we could use "drain" from queue object - return setTimeout(next, JOBS_FETCHING_INTERVAL) - }, - - err => logger.error('Error in job scheduler queue.', err) - ) - } - - createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) { - const createQuery = { - state: JOB_STATES.PENDING, - category: this.jobCategory, - handlerName, - handlerInputData - } - - const options = { transaction } - - return JobModel.create(createQuery, options) - } - - private enqueueJobs (jobsQueue: AsyncQueue, jobs: JobModel[]) { - jobs.forEach(job => jobsQueue.push(job)) - } - - private async processJob (job: JobModel, callback: (err: Error) => void) { - const jobHandler = this.jobHandlers[job.handlerName] - if (jobHandler === undefined) { - const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id - logger.error(errorString) - - const error = new Error(errorString) - await this.onJobError(jobHandler, job, error) - return callback(error) - } - - logger.info('Processing job %d with handler %s.', job.id, job.handlerName) - - job.state = JOB_STATES.PROCESSING - await job.save() - - try { - const result: T = await jobHandler.process(job.handlerInputData, job.id) - await this.onJobSuccess(jobHandler, job, result) - } catch (err) { - logger.error('Error in job handler %s.', job.handlerName, err) - - try { - await this.onJobError(jobHandler, job, err) - } catch (innerErr) { - this.cannotSaveJobError(innerErr) - return callback(innerErr) - } - } - - return callback(null) - } - - private async onJobError (jobHandler: JobHandler, job: JobModel, err: Error) { - job.state = JOB_STATES.ERROR - - try { - await job.save() - if (jobHandler) await jobHandler.onError(err, job.id) - } catch (err) { - this.cannotSaveJobError(err) - } - } - - private async onJobSuccess (jobHandler: JobHandler, job: JobModel, jobResult: T) { - job.state = JOB_STATES.SUCCESS - - try { - await job.save() - await jobHandler.onSuccess(job.id, jobResult, this) - } catch (err) { - this.cannotSaveJobError(err) - } - } - - private cannotSaveJobError (err: Error) { - logger.error('Cannot save new job state.', err) - } -} - -// --------------------------------------------------------------------------- - -export { - JobScheduler -} diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts deleted file mode 100644 index 73152a1be..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './transcoding-job-scheduler' diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts deleted file mode 100644 index e5530a73c..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { JobCategory } from '../../../../shared' -import { VideoModel } from '../../../models/video/video' -import { JobHandler, JobScheduler } from '../job-scheduler' - -import * as videoFileOptimizer from './video-file-optimizer-handler' -import * as videoFileTranscoder from './video-file-transcoder-handler' - -type TranscodingJobPayload = { - videoUUID: string - resolution?: number -} -const jobHandlers: { [ handlerName: string ]: JobHandler } = { - videoFileOptimizer, - videoFileTranscoder -} -const jobCategory: JobCategory = 'transcoding' - -const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers) - -export { - TranscodingJobPayload, - transcodingJobScheduler -} diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts deleted file mode 100644 index f224a31b4..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts +++ /dev/null @@ -1,90 +0,0 @@ -import * as Bluebird from 'bluebird' -import { VideoPrivacy } from '../../../../shared/models/videos' -import { logger } from '../../../helpers/logger' -import { computeResolutionsToTranscode } from '../../../helpers/utils' -import { sequelizeTypescript } from '../../../initializers' -import { JobModel } from '../../../models/job/job' -import { VideoModel } from '../../../models/video/video' -import { shareVideoByServerAndChannel } from '../../activitypub' -import { sendCreateVideo } from '../../activitypub/send' -import { JobScheduler } from '../job-scheduler' -import { TranscodingJobPayload } from './transcoding-job-scheduler' - -async function process (data: TranscodingJobPayload, jobId: number) { - const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) - return undefined - } - - await video.optimizeOriginalVideofile() - - return video -} - -function onError (err: Error, jobId: number) { - logger.error('Error when optimized video file in job %d.', jobId, err) - return Promise.resolve() -} - -async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler) { - if (video === undefined) return undefined - - logger.info('Job %d is a success.', jobId) - - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined - - if (video.privacy !== VideoPrivacy.PRIVATE) { - // Now we'll add the video's meta data to our followers - await sendCreateVideo(video, undefined) - await shareVideoByServerAndChannel(video, undefined) - } - - const originalFileHeight = await videoDatabase.getOriginalFileHeight() - - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) - logger.info( - 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight, - { resolutions: resolutionsEnabled } - ) - - if (resolutionsEnabled.length !== 0) { - try { - await sequelizeTypescript.transaction(async t => { - const tasks: Bluebird[] = [] - - for (const resolution of resolutionsEnabled) { - const dataInput = { - videoUUID: videoDatabase.uuid, - resolution - } - - const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput) - tasks.push(p) - } - - await Promise.all(tasks) - }) - - logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) - } catch (err) { - logger.warn('Cannot transcode the video.', err) - } - } else { - logger.info('No transcoding jobs created for video %s (no resolutions enabled).') - return undefined - } -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts deleted file mode 100644 index 883d3eba8..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { VideoResolution } from '../../../../shared' -import { VideoPrivacy } from '../../../../shared/models/videos' -import { logger } from '../../../helpers/logger' -import { VideoModel } from '../../../models/video/video' -import { sendUpdateVideo } from '../../activitypub/send' - -async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { - const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) - return undefined - } - - await video.transcodeOriginalVideofile(data.resolution) - - return video -} - -function onError (err: Error, jobId: number) { - logger.error('Error when transcoding video file in job %d.', jobId, err) - return Promise.resolve() -} - -async function onSuccess (jobId: number, video: VideoModel) { - if (video === undefined) return undefined - - logger.info('Job %d is a success.', jobId) - - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined - - if (video.privacy !== VideoPrivacy.PRIVATE) { - await sendUpdateVideo(video, undefined) - } - - return undefined -} - -// --------------------------------------------------------------------------- - -export { - process, - onError, - onSuccess -} diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts new file mode 100644 index 000000000..add5677ac --- /dev/null +++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts @@ -0,0 +1,19 @@ +import { JobQueue } from '../job-queue' +import { AbstractScheduler } from './abstract-scheduler' + +export class RemoveOldJobsScheduler extends AbstractScheduler { + + private static instance: AbstractScheduler + + private constructor () { + super() + } + + async execute () { + JobQueue.Instance.removeOldJobs() + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} diff --git a/server/middlewares/validators/jobs.ts b/server/middlewares/validators/jobs.ts new file mode 100644 index 000000000..2f8b1738c --- /dev/null +++ b/server/middlewares/validators/jobs.ts @@ -0,0 +1,23 @@ +import * as express from 'express' +import { param } from 'express-validator/check' +import { isValidJobState } from '../../helpers/custom-validators/jobs' +import { logger } from '../../helpers/logger' +import { areValidationErrors } from './utils' + +const listJobsValidator = [ + param('state').custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'), + + async (req: express.Request, res: express.Response, next: express.NextFunction) => { + logger.debug('Checking listJobsValidator parameters.', { parameters: req.params }) + + if (areValidationErrors(req, res)) return + + return next() + } +] + +// --------------------------------------------------------------------------- + +export { + listJobsValidator +} diff --git a/server/models/job/job.ts b/server/models/job/job.ts deleted file mode 100644 index ba1c6737e..000000000 --- a/server/models/job/job.ts +++ /dev/null @@ -1,80 +0,0 @@ -import { values } from 'lodash' -import { AllowNull, Column, CreatedAt, DataType, Model, Table, UpdatedAt } from 'sequelize-typescript' -import { JobCategory, JobState } from '../../../shared/models' -import { JOB_CATEGORIES, JOB_STATES } from '../../initializers' -import { getSort } from '../utils' - -@Table({ - tableName: 'job', - indexes: [ - { - fields: [ 'state', 'category' ] - } - ] -}) -export class JobModel extends Model { - @AllowNull(false) - @Column(DataType.ENUM(values(JOB_STATES))) - state: JobState - - @AllowNull(false) - @Column(DataType.ENUM(values(JOB_CATEGORIES))) - category: JobCategory - - @AllowNull(false) - @Column - handlerName: string - - @AllowNull(true) - @Column(DataType.JSON) - handlerInputData: any - - @CreatedAt - createdAt: Date - - @UpdatedAt - updatedAt: Date - - static listWithLimitByCategory (limit: number, state: JobState, jobCategory: JobCategory) { - const query = { - order: [ - [ 'id', 'ASC' ] - ], - limit: limit, - where: { - state, - category: jobCategory - }, - logging: false - } - - return JobModel.findAll(query) - } - - static listForApi (start: number, count: number, sort: string) { - const query = { - offset: start, - limit: count, - order: [ getSort(sort) ] - } - - return JobModel.findAndCountAll(query).then(({ rows, count }) => { - return { - data: rows, - total: count - } - }) - } - - toFormattedJSON () { - return { - id: this.id, - state: this.state, - category: this.category, - handlerName: this.handlerName, - handlerInputData: this.handlerInputData, - createdAt: this.createdAt, - updatedAt: this.updatedAt - } - } -} diff --git a/server/tests/api/check-params/jobs.ts b/server/tests/api/check-params/jobs.ts index b12818bb1..ce3ac8809 100644 --- a/server/tests/api/check-params/jobs.ts +++ b/server/tests/api/check-params/jobs.ts @@ -7,7 +7,7 @@ import { checkBadCountPagination, checkBadSortPagination, checkBadStartPaginatio import { makeGetRequest } from '../../utils/requests/requests' describe('Test jobs API validators', function () { - const path = '/api/v1/jobs/' + const path = '/api/v1/jobs/failed' let server: ServerInfo let userAccessToken = '' @@ -31,6 +31,15 @@ describe('Test jobs API validators', function () { }) describe('When listing jobs', function () { + + it('Should fail with a bad state', async function () { + await makeGetRequest({ + url: server.url, + token: server.accessToken, + path: path + 'ade' + }) + }) + it('Should fail with a bad start pagination', async function () { await checkBadStartPagination(server.url, path, server.accessToken) }) diff --git a/server/tests/api/server/handle-down.ts b/server/tests/api/server/handle-down.ts index de4e77b2f..4cedeb89e 100644 --- a/server/tests/api/server/handle-down.ts +++ b/server/tests/api/server/handle-down.ts @@ -2,6 +2,7 @@ import * as chai from 'chai' import 'mocha' +import { JobState } from '../../../../shared/models' import { VideoPrivacy } from '../../../../shared/models/videos' import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model' import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils' @@ -139,12 +140,11 @@ describe('Test handle downs', function () { }) it('Should not have pending/processing jobs anymore', async function () { - const res = await getJobsListPaginationAndSort(servers[0].url, servers[0].accessToken, 0, 50, '-createdAt') - const jobs = res.body.data + const states: JobState[] = [ 'inactive', 'active' ] - for (const job of jobs) { - expect(job.state).not.to.equal('pending') - expect(job.state).not.to.equal('processing') + for (const state of states) { + const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt') + expect(res.body.data).to.have.length(0) } }) diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts index 2e17e71a4..671498769 100644 --- a/server/tests/api/server/jobs.ts +++ b/server/tests/api/server/jobs.ts @@ -35,20 +35,20 @@ describe('Test jobs', function () { }) it('Should list jobs', async function () { - const res = await getJobsList(servers[1].url, servers[1].accessToken) + const res = await getJobsList(servers[1].url, servers[1].accessToken, 'complete') expect(res.body.total).to.be.above(2) expect(res.body.data).to.have.length.above(2) }) it('Should list jobs with sort and pagination', async function () { - const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 4, 1, 'createdAt') + const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'complete', 1, 1, 'createdAt') expect(res.body.total).to.be.above(2) expect(res.body.data).to.have.lengthOf(1) const job = res.body.data[0] - expect(job.state).to.equal('success') - expect(job.category).to.equal('transcoding') - expect(job.handlerName).to.have.length.above(3) + + expect(job.state).to.equal('complete') + expect(job.type).to.equal('activitypub-http-unicast') expect(dateIsValid(job.createdAt)).to.be.true expect(dateIsValid(job.updatedAt)).to.be.true }) diff --git a/server/tests/api/videos/multiple-servers.ts b/server/tests/api/videos/multiple-servers.ts index 4c4b5123d..0215b3011 100644 --- a/server/tests/api/videos/multiple-servers.ts +++ b/server/tests/api/videos/multiple-servers.ts @@ -475,16 +475,17 @@ describe('Test multiple servers', function () { it('Should like and dislikes videos on different services', async function () { this.timeout(20000) - const tasks: Promise[] = [] - tasks.push(rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')) - tasks.push(rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'dislike')) - tasks.push(rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')) - tasks.push(rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'like')) - tasks.push(rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'dislike')) - tasks.push(rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[1], 'dislike')) - tasks.push(rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[0], 'like')) - - await Promise.all(tasks) + await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like') + await wait(200) + await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'dislike') + await wait(200) + await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like') + await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'like') + await wait(200) + await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'dislike') + await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[1], 'dislike') + await wait(200) + await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[0], 'like') await wait(10000) diff --git a/server/tests/real-world/real-world.ts b/server/tests/real-world/real-world.ts index e41203351..7f67525ed 100644 --- a/server/tests/real-world/real-world.ts +++ b/server/tests/real-world/real-world.ts @@ -3,6 +3,7 @@ process.env.NODE_ENV = 'test' import * as program from 'commander' import { Video, VideoFile, VideoRateType } from '../../../shared' +import { JobState } from '../../../shared/models' import { flushAndRunMultipleServers, flushTests, follow, @@ -346,23 +347,19 @@ function goodbye () { } async function isTherePendingRequests (servers: ServerInfo[]) { + const states: JobState[] = [ 'inactive', 'active' ] const tasks: Promise[] = [] let pendingRequests = false // Check if each server has pending request for (const server of servers) { - const p = getJobsListPaginationAndSort(server.url, server.accessToken, 0, 10, '-createdAt') - .then(res => { - const jobs = res.body.data - - for (const job of jobs) { - if (job.state === 'pending' || job.state === 'processing') { - pendingRequests = true - } - } - }) - - tasks.push(p) + for (const state of states) { + const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt') + .then(res => { + if (res.body.total > 0) pendingRequests = true + }) + tasks.push(p) + } } await Promise.all(tasks) diff --git a/server/tests/utils/server/jobs.ts b/server/tests/utils/server/jobs.ts index 0a8c51575..4053dd40b 100644 --- a/server/tests/utils/server/jobs.ts +++ b/server/tests/utils/server/jobs.ts @@ -1,7 +1,8 @@ import * as request from 'supertest' +import { JobState } from '../../../../shared/models' -function getJobsList (url: string, accessToken: string) { - const path = '/api/v1/jobs' +function getJobsList (url: string, accessToken: string, state: JobState) { + const path = '/api/v1/jobs/' + state return request(url) .get(path) @@ -11,8 +12,8 @@ function getJobsList (url: string, accessToken: string) { .expect('Content-Type', /json/) } -function getJobsListPaginationAndSort (url: string, accessToken: string, start: number, count: number, sort: string) { - const path = '/api/v1/jobs' +function getJobsListPaginationAndSort (url: string, accessToken: string, state: JobState, start: number, count: number, sort: string) { + const path = '/api/v1/jobs/' + state return request(url) .get(path) -- cgit v1.2.3