diff options
Diffstat (limited to 'server')
44 files changed, 540 insertions, 654 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index de37dea39..132d110ad 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts | |||
@@ -1,22 +1,29 @@ | |||
1 | import * as express from 'express' | 1 | import * as express from 'express' |
2 | import { ResultList } from '../../../shared' | ||
3 | import { Job, JobType, JobState } from '../../../shared/models' | ||
2 | import { UserRight } from '../../../shared/models/users' | 4 | import { UserRight } from '../../../shared/models/users' |
3 | import { getFormattedObjects } from '../../helpers/utils' | 5 | import { JobQueue } from '../../lib/job-queue' |
4 | import { | 6 | import { |
5 | asyncMiddleware, authenticate, ensureUserHasRight, jobsSortValidator, setDefaultPagination, | 7 | asyncMiddleware, |
8 | authenticate, | ||
9 | ensureUserHasRight, | ||
10 | jobsSortValidator, | ||
11 | setDefaultPagination, | ||
6 | setDefaultSort | 12 | setDefaultSort |
7 | } from '../../middlewares' | 13 | } from '../../middlewares' |
8 | import { paginationValidator } from '../../middlewares/validators' | 14 | import { paginationValidator } from '../../middlewares/validators' |
9 | import { JobModel } from '../../models/job/job' | 15 | import { listJobsValidator } from '../../middlewares/validators/jobs' |
10 | 16 | ||
11 | const jobsRouter = express.Router() | 17 | const jobsRouter = express.Router() |
12 | 18 | ||
13 | jobsRouter.get('/', | 19 | jobsRouter.get('/:state', |
14 | authenticate, | 20 | authenticate, |
15 | ensureUserHasRight(UserRight.MANAGE_JOBS), | 21 | ensureUserHasRight(UserRight.MANAGE_JOBS), |
16 | paginationValidator, | 22 | paginationValidator, |
17 | jobsSortValidator, | 23 | jobsSortValidator, |
18 | setDefaultSort, | 24 | setDefaultSort, |
19 | setDefaultPagination, | 25 | setDefaultPagination, |
26 | asyncMiddleware(listJobsValidator), | ||
20 | asyncMiddleware(listJobs) | 27 | asyncMiddleware(listJobs) |
21 | ) | 28 | ) |
22 | 29 | ||
@@ -29,7 +36,26 @@ export { | |||
29 | // --------------------------------------------------------------------------- | 36 | // --------------------------------------------------------------------------- |
30 | 37 | ||
31 | async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { | 38 | async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { |
32 | const resultList = await JobModel.listForApi(req.query.start, req.query.count, req.query.sort) | 39 | const sort = req.query.sort === 'createdAt' ? 'asc' : 'desc' |
40 | |||
41 | const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) | ||
42 | const total = await JobQueue.Instance.count(req.params.state) | ||
43 | |||
44 | const result: ResultList<any> = { | ||
45 | total, | ||
46 | data: jobs.map(j => formatJob(j.toJSON())) | ||
47 | } | ||
48 | return res.json(result) | ||
49 | } | ||
33 | 50 | ||
34 | return res.json(getFormattedObjects(resultList.data, resultList.total)) | 51 | function formatJob (job: any): Job { |
52 | return { | ||
53 | id: job.id, | ||
54 | state: job.state as JobState, | ||
55 | type: job.type as JobType, | ||
56 | data: job.data, | ||
57 | error: job.error, | ||
58 | createdAt: new Date(parseInt(job.created_at, 10)), | ||
59 | updatedAt: new Date(parseInt(job.updated_at, 10)) | ||
60 | } | ||
35 | } | 61 | } |
diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts index 506b9668e..bb8713e7a 100644 --- a/server/controllers/api/server/follows.ts +++ b/server/controllers/api/server/follows.ts | |||
@@ -123,7 +123,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) { | |||
123 | actorFollow.ActorFollower = fromActor | 123 | actorFollow.ActorFollower = fromActor |
124 | 124 | ||
125 | // Send a notification to remote server | 125 | // Send a notification to remote server |
126 | await sendFollow(actorFollow, t) | 126 | await sendFollow(actorFollow) |
127 | }) | 127 | }) |
128 | } | 128 | } |
129 | 129 | ||
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index c2fdb4f95..459795141 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts | |||
@@ -12,7 +12,7 @@ import { | |||
12 | } from '../../../initializers' | 12 | } from '../../../initializers' |
13 | import { fetchRemoteVideoDescription, getVideoActivityPubUrl, shareVideoByServerAndChannel } from '../../../lib/activitypub' | 13 | import { fetchRemoteVideoDescription, getVideoActivityPubUrl, shareVideoByServerAndChannel } from '../../../lib/activitypub' |
14 | import { sendCreateVideo, sendCreateViewToOrigin, sendCreateViewToVideoFollowers, sendUpdateVideo } from '../../../lib/activitypub/send' | 14 | import { sendCreateVideo, sendCreateViewToOrigin, sendCreateViewToVideoFollowers, sendUpdateVideo } from '../../../lib/activitypub/send' |
15 | import { transcodingJobScheduler } from '../../../lib/jobs/transcoding-job-scheduler' | 15 | import { JobQueue } from '../../../lib/job-queue' |
16 | import { | 16 | import { |
17 | asyncMiddleware, authenticate, paginationValidator, setDefaultSort, setDefaultPagination, videosAddValidator, videosGetValidator, | 17 | asyncMiddleware, authenticate, paginationValidator, setDefaultSort, setDefaultPagination, videosAddValidator, videosGetValidator, |
18 | videosRemoveValidator, videosSearchValidator, videosSortValidator, videosUpdateValidator | 18 | videosRemoveValidator, videosSearchValidator, videosSortValidator, videosUpdateValidator |
@@ -176,18 +176,9 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi | |||
176 | ) | 176 | ) |
177 | await Promise.all(tasks) | 177 | await Promise.all(tasks) |
178 | 178 | ||
179 | return sequelizeTypescript.transaction(async t => { | 179 | const videoCreated = await sequelizeTypescript.transaction(async t => { |
180 | const sequelizeOptions = { transaction: t } | 180 | const sequelizeOptions = { transaction: t } |
181 | 181 | ||
182 | if (CONFIG.TRANSCODING.ENABLED === true) { | ||
183 | // Put uuid because we don't have id auto incremented for now | ||
184 | const dataInput = { | ||
185 | videoUUID: video.uuid | ||
186 | } | ||
187 | |||
188 | await transcodingJobScheduler.createJob(t, 'videoFileOptimizer', dataInput) | ||
189 | } | ||
190 | |||
191 | const videoCreated = await video.save(sequelizeOptions) | 182 | const videoCreated = await video.save(sequelizeOptions) |
192 | // Do not forget to add video channel information to the created video | 183 | // Do not forget to add video channel information to the created video |
193 | videoCreated.VideoChannel = res.locals.videoChannel | 184 | videoCreated.VideoChannel = res.locals.videoChannel |
@@ -216,6 +207,17 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi | |||
216 | 207 | ||
217 | return videoCreated | 208 | return videoCreated |
218 | }) | 209 | }) |
210 | |||
211 | if (CONFIG.TRANSCODING.ENABLED === true) { | ||
212 | // Put uuid because we don't have id auto incremented for now | ||
213 | const dataInput = { | ||
214 | videoUUID: videoCreated.uuid | ||
215 | } | ||
216 | |||
217 | await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) | ||
218 | } | ||
219 | |||
220 | return videoCreated | ||
219 | } | 221 | } |
220 | 222 | ||
221 | async function updateVideoRetryWrapper (req: express.Request, res: express.Response, next: express.NextFunction) { | 223 | async function updateVideoRetryWrapper (req: express.Request, res: express.Response, next: express.NextFunction) { |
diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts new file mode 100644 index 000000000..9700fbd12 --- /dev/null +++ b/server/helpers/custom-validators/jobs.ts | |||
@@ -0,0 +1,14 @@ | |||
1 | import { JobState } from '../../../shared/models' | ||
2 | import { exists } from './misc' | ||
3 | |||
4 | const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ] | ||
5 | |||
6 | function isValidJobState (value: JobState) { | ||
7 | return exists(value) && jobStates.indexOf(value) !== -1 | ||
8 | } | ||
9 | |||
10 | // --------------------------------------------------------------------------- | ||
11 | |||
12 | export { | ||
13 | isValidJobState | ||
14 | } | ||
diff --git a/server/helpers/database-utils.ts b/server/helpers/database-utils.ts index 78ca768b9..b4adaf9cc 100644 --- a/server/helpers/database-utils.ts +++ b/server/helpers/database-utils.ts | |||
@@ -16,6 +16,7 @@ function retryTransactionWrapper <T> ( | |||
16 | .catch(err => callback(err)) | 16 | .catch(err => callback(err)) |
17 | }) | 17 | }) |
18 | .catch(err => { | 18 | .catch(err => { |
19 | console.error(err) | ||
19 | logger.error(options.errorMessage, err) | 20 | logger.error(options.errorMessage, err) |
20 | throw err | 21 | throw err |
21 | }) | 22 | }) |
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index cb043251a..329d0ffe8 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -1,6 +1,6 @@ | |||
1 | import { IConfig } from 'config' | 1 | import { IConfig } from 'config' |
2 | import { dirname, join } from 'path' | 2 | import { dirname, join } from 'path' |
3 | import { JobCategory, JobState, VideoRateType } from '../../shared/models' | 3 | import { JobType, VideoRateType } from '../../shared/models' |
4 | import { ActivityPubActorType } from '../../shared/models/activitypub' | 4 | import { ActivityPubActorType } from '../../shared/models/activitypub' |
5 | import { FollowState } from '../../shared/models/actors' | 5 | import { FollowState } from '../../shared/models/actors' |
6 | import { VideoPrivacy } from '../../shared/models/videos' | 6 | import { VideoPrivacy } from '../../shared/models/videos' |
@@ -12,7 +12,7 @@ let config: IConfig = require('config') | |||
12 | 12 | ||
13 | // --------------------------------------------------------------------------- | 13 | // --------------------------------------------------------------------------- |
14 | 14 | ||
15 | const LAST_MIGRATION_VERSION = 175 | 15 | const LAST_MIGRATION_VERSION = 180 |
16 | 16 | ||
17 | // --------------------------------------------------------------------------- | 17 | // --------------------------------------------------------------------------- |
18 | 18 | ||
@@ -26,7 +26,7 @@ const PAGINATION_COUNT_DEFAULT = 15 | |||
26 | const SORTABLE_COLUMNS = { | 26 | const SORTABLE_COLUMNS = { |
27 | USERS: [ 'id', 'username', 'createdAt' ], | 27 | USERS: [ 'id', 'username', 'createdAt' ], |
28 | ACCOUNTS: [ 'createdAt' ], | 28 | ACCOUNTS: [ 'createdAt' ], |
29 | JOBS: [ 'id', 'createdAt' ], | 29 | JOBS: [ 'createdAt' ], |
30 | VIDEO_ABUSES: [ 'id', 'createdAt' ], | 30 | VIDEO_ABUSES: [ 'id', 'createdAt' ], |
31 | VIDEO_CHANNELS: [ 'id', 'name', 'updatedAt', 'createdAt' ], | 31 | VIDEO_CHANNELS: [ 'id', 'name', 'updatedAt', 'createdAt' ], |
32 | VIDEOS: [ 'name', 'duration', 'createdAt', 'views', 'likes' ], | 32 | VIDEOS: [ 'name', 'duration', 'createdAt', 'views', 'likes' ], |
@@ -61,23 +61,20 @@ const REMOTE_SCHEME = { | |||
61 | WS: 'wss' | 61 | WS: 'wss' |
62 | } | 62 | } |
63 | 63 | ||
64 | const JOB_STATES: { [ id: string ]: JobState } = { | 64 | const JOB_ATTEMPTS: { [ id in JobType ]: number } = { |
65 | PENDING: 'pending', | 65 | 'activitypub-http-broadcast': 5, |
66 | PROCESSING: 'processing', | 66 | 'activitypub-http-unicast': 5, |
67 | ERROR: 'error', | 67 | 'activitypub-http-fetcher': 5, |
68 | SUCCESS: 'success' | 68 | 'video-file': 1 |
69 | } | ||
70 | const JOB_CATEGORIES: { [ id: string ]: JobCategory } = { | ||
71 | TRANSCODING: 'transcoding', | ||
72 | ACTIVITYPUB_HTTP: 'activitypub-http' | ||
73 | } | 69 | } |
74 | // How many maximum jobs we fetch from the database per cycle | 70 | const JOB_CONCURRENCY: { [ id in JobType ]: number } = { |
75 | const JOBS_FETCH_LIMIT_PER_CYCLE = { | 71 | 'activitypub-http-broadcast': 1, |
76 | transcoding: 10, | 72 | 'activitypub-http-unicast': 5, |
77 | httpRequest: 20 | 73 | 'activitypub-http-fetcher': 1, |
74 | 'video-file': 1 | ||
78 | } | 75 | } |
79 | // 1 minutes | 76 | // 2 days |
80 | let JOBS_FETCHING_INTERVAL = 60000 | 77 | const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 |
81 | 78 | ||
82 | // 1 hour | 79 | // 1 hour |
83 | let SCHEDULER_INTERVAL = 60000 * 60 | 80 | let SCHEDULER_INTERVAL = 60000 * 60 |
@@ -96,6 +93,11 @@ const CONFIG = { | |||
96 | USERNAME: config.get<string>('database.username'), | 93 | USERNAME: config.get<string>('database.username'), |
97 | PASSWORD: config.get<string>('database.password') | 94 | PASSWORD: config.get<string>('database.password') |
98 | }, | 95 | }, |
96 | REDIS: { | ||
97 | HOSTNAME: config.get<string>('redis.hostname'), | ||
98 | PORT: config.get<string>('redis.port'), | ||
99 | AUTH: config.get<string>('redis.auth') | ||
100 | }, | ||
99 | STORAGE: { | 101 | STORAGE: { |
100 | AVATARS_DIR: buildPath(config.get<string>('storage.avatars')), | 102 | AVATARS_DIR: buildPath(config.get<string>('storage.avatars')), |
101 | LOG_DIR: buildPath(config.get<string>('storage.logs')), | 103 | LOG_DIR: buildPath(config.get<string>('storage.logs')), |
@@ -284,7 +286,6 @@ const ACTIVITY_PUB = { | |||
284 | PUBLIC: 'https://www.w3.org/ns/activitystreams#Public', | 286 | PUBLIC: 'https://www.w3.org/ns/activitystreams#Public', |
285 | COLLECTION_ITEMS_PER_PAGE: 10, | 287 | COLLECTION_ITEMS_PER_PAGE: 10, |
286 | FETCH_PAGE_LIMIT: 100, | 288 | FETCH_PAGE_LIMIT: 100, |
287 | MAX_HTTP_ATTEMPT: 5, | ||
288 | URL_MIME_TYPES: { | 289 | URL_MIME_TYPES: { |
289 | VIDEO: Object.keys(VIDEO_MIMETYPE_EXT), | 290 | VIDEO: Object.keys(VIDEO_MIMETYPE_EXT), |
290 | TORRENT: [ 'application/x-bittorrent' ], | 291 | TORRENT: [ 'application/x-bittorrent' ], |
@@ -358,7 +359,6 @@ const OPENGRAPH_AND_OEMBED_COMMENT = '<!-- open graph and oembed tags -->' | |||
358 | // Special constants for a test instance | 359 | // Special constants for a test instance |
359 | if (isTestInstance() === true) { | 360 | if (isTestInstance() === true) { |
360 | ACTOR_FOLLOW_SCORE.BASE = 20 | 361 | ACTOR_FOLLOW_SCORE.BASE = 20 |
361 | JOBS_FETCHING_INTERVAL = 1000 | ||
362 | REMOTE_SCHEME.HTTP = 'http' | 362 | REMOTE_SCHEME.HTTP = 'http' |
363 | REMOTE_SCHEME.WS = 'ws' | 363 | REMOTE_SCHEME.WS = 'ws' |
364 | STATIC_MAX_AGE = '0' | 364 | STATIC_MAX_AGE = '0' |
@@ -381,10 +381,8 @@ export { | |||
381 | CONFIG, | 381 | CONFIG, |
382 | CONSTRAINTS_FIELDS, | 382 | CONSTRAINTS_FIELDS, |
383 | EMBED_SIZE, | 383 | EMBED_SIZE, |
384 | JOB_STATES, | 384 | JOB_CONCURRENCY, |
385 | JOBS_FETCH_LIMIT_PER_CYCLE, | 385 | JOB_ATTEMPTS, |
386 | JOBS_FETCHING_INTERVAL, | ||
387 | JOB_CATEGORIES, | ||
388 | LAST_MIGRATION_VERSION, | 386 | LAST_MIGRATION_VERSION, |
389 | OAUTH_LIFETIME, | 387 | OAUTH_LIFETIME, |
390 | OPENGRAPH_AND_OEMBED_COMMENT, | 388 | OPENGRAPH_AND_OEMBED_COMMENT, |
@@ -408,7 +406,8 @@ export { | |||
408 | VIDEO_RATE_TYPES, | 406 | VIDEO_RATE_TYPES, |
409 | VIDEO_MIMETYPE_EXT, | 407 | VIDEO_MIMETYPE_EXT, |
410 | AVATAR_MIMETYPE_EXT, | 408 | AVATAR_MIMETYPE_EXT, |
411 | SCHEDULER_INTERVAL | 409 | SCHEDULER_INTERVAL, |
410 | JOB_COMPLETED_LIFETIME | ||
412 | } | 411 | } |
413 | 412 | ||
414 | // --------------------------------------------------------------------------- | 413 | // --------------------------------------------------------------------------- |
diff --git a/server/initializers/database.ts b/server/initializers/database.ts index 852db68a0..b537ee59a 100644 --- a/server/initializers/database.ts +++ b/server/initializers/database.ts | |||
@@ -9,7 +9,6 @@ import { ActorModel } from '../models/activitypub/actor' | |||
9 | import { ActorFollowModel } from '../models/activitypub/actor-follow' | 9 | import { ActorFollowModel } from '../models/activitypub/actor-follow' |
10 | import { ApplicationModel } from '../models/application/application' | 10 | import { ApplicationModel } from '../models/application/application' |
11 | import { AvatarModel } from '../models/avatar/avatar' | 11 | import { AvatarModel } from '../models/avatar/avatar' |
12 | import { JobModel } from '../models/job/job' | ||
13 | import { OAuthClientModel } from '../models/oauth/oauth-client' | 12 | import { OAuthClientModel } from '../models/oauth/oauth-client' |
14 | import { OAuthTokenModel } from '../models/oauth/oauth-token' | 13 | import { OAuthTokenModel } from '../models/oauth/oauth-token' |
15 | import { ServerModel } from '../models/server/server' | 14 | import { ServerModel } from '../models/server/server' |
@@ -61,7 +60,6 @@ async function initDatabaseModels (silent: boolean) { | |||
61 | ActorFollowModel, | 60 | ActorFollowModel, |
62 | AvatarModel, | 61 | AvatarModel, |
63 | AccountModel, | 62 | AccountModel, |
64 | JobModel, | ||
65 | OAuthClientModel, | 63 | OAuthClientModel, |
66 | OAuthTokenModel, | 64 | OAuthTokenModel, |
67 | ServerModel, | 65 | ServerModel, |
diff --git a/server/initializers/migrations/0100-activitypub.ts b/server/initializers/migrations/0100-activitypub.ts index 8c5198f85..a7ebd804c 100644 --- a/server/initializers/migrations/0100-activitypub.ts +++ b/server/initializers/migrations/0100-activitypub.ts | |||
@@ -1,11 +1,10 @@ | |||
1 | import { values } from 'lodash' | ||
2 | import * as Sequelize from 'sequelize' | 1 | import * as Sequelize from 'sequelize' |
3 | import { createPrivateAndPublicKeys } from '../../helpers/peertube-crypto' | 2 | import { createPrivateAndPublicKeys } from '../../helpers/peertube-crypto' |
4 | import { shareVideoByServerAndChannel } from '../../lib/activitypub/share' | 3 | import { shareVideoByServerAndChannel } from '../../lib/activitypub/share' |
5 | import { getVideoActivityPubUrl, getVideoChannelActivityPubUrl } from '../../lib/activitypub/url' | 4 | import { getVideoActivityPubUrl, getVideoChannelActivityPubUrl } from '../../lib/activitypub/url' |
6 | import { createLocalAccountWithoutKeys } from '../../lib/user' | 5 | import { createLocalAccountWithoutKeys } from '../../lib/user' |
7 | import { ApplicationModel } from '../../models/application/application' | 6 | import { ApplicationModel } from '../../models/application/application' |
8 | import { JOB_CATEGORIES, SERVER_ACTOR_NAME } from '../constants' | 7 | import { SERVER_ACTOR_NAME } from '../constants' |
9 | 8 | ||
10 | async function up (utils: { | 9 | async function up (utils: { |
11 | transaction: Sequelize.Transaction, | 10 | transaction: Sequelize.Transaction, |
@@ -161,7 +160,7 @@ async function up (utils: { | |||
161 | 160 | ||
162 | { | 161 | { |
163 | const data = { | 162 | const data = { |
164 | type: Sequelize.ENUM(values(JOB_CATEGORIES)), | 163 | type: Sequelize.ENUM('transcoding', 'activitypub-http'), |
165 | defaultValue: 'transcoding', | 164 | defaultValue: 'transcoding', |
166 | allowNull: false | 165 | allowNull: false |
167 | } | 166 | } |
diff --git a/server/initializers/migrations/0180-job-table-delete.ts b/server/initializers/migrations/0180-job-table-delete.ts new file mode 100644 index 000000000..df29145d0 --- /dev/null +++ b/server/initializers/migrations/0180-job-table-delete.ts | |||
@@ -0,0 +1,18 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | |||
3 | async function up (utils: { | ||
4 | transaction: Sequelize.Transaction, | ||
5 | queryInterface: Sequelize.QueryInterface, | ||
6 | sequelize: Sequelize.Sequelize | ||
7 | }): Promise<void> { | ||
8 | await utils.queryInterface.dropTable('job') | ||
9 | } | ||
10 | |||
11 | function down (options) { | ||
12 | throw new Error('Not implemented.') | ||
13 | } | ||
14 | |||
15 | export { | ||
16 | up, | ||
17 | down | ||
18 | } | ||
diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts index c708b38ba..712de7d0d 100644 --- a/server/lib/activitypub/actor.ts +++ b/server/lib/activitypub/actor.ts | |||
@@ -64,7 +64,11 @@ async function getOrCreateActorAndServerAndModel (actorUrl: string, recurseIfNee | |||
64 | actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options) | 64 | actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options) |
65 | } | 65 | } |
66 | 66 | ||
67 | return refreshActorIfNeeded(actor) | 67 | const options = { |
68 | arguments: [ actor ], | ||
69 | errorMessage: 'Cannot refresh actor if needed with many retries.' | ||
70 | } | ||
71 | return retryTransactionWrapper(refreshActorIfNeeded, options) | ||
68 | } | 72 | } |
69 | 73 | ||
70 | function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) { | 74 | function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) { |
@@ -325,38 +329,43 @@ async function saveVideoChannel (actor: ActorModel, result: FetchRemoteActorResu | |||
325 | async function refreshActorIfNeeded (actor: ActorModel) { | 329 | async function refreshActorIfNeeded (actor: ActorModel) { |
326 | if (!actor.isOutdated()) return actor | 330 | if (!actor.isOutdated()) return actor |
327 | 331 | ||
328 | const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost()) | 332 | try { |
329 | const result = await fetchRemoteActor(actorUrl) | 333 | const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost()) |
330 | if (result === undefined) { | 334 | const result = await fetchRemoteActor(actorUrl) |
331 | logger.warn('Cannot fetch remote actor in refresh actor.') | 335 | if (result === undefined) { |
332 | return actor | 336 | logger.warn('Cannot fetch remote actor in refresh actor.') |
333 | } | 337 | return actor |
334 | |||
335 | return sequelizeTypescript.transaction(async t => { | ||
336 | updateInstanceWithAnother(actor, result.actor) | ||
337 | |||
338 | if (result.avatarName !== undefined) { | ||
339 | await updateActorAvatarInstance(actor, result.avatarName, t) | ||
340 | } | 338 | } |
341 | 339 | ||
342 | // Force update | 340 | return sequelizeTypescript.transaction(async t => { |
343 | actor.setDataValue('updatedAt', new Date()) | 341 | updateInstanceWithAnother(actor, result.actor) |
344 | await actor.save({ transaction: t }) | ||
345 | 342 | ||
346 | if (actor.Account) { | 343 | if (result.avatarName !== undefined) { |
347 | await actor.save({ transaction: t }) | 344 | await updateActorAvatarInstance(actor, result.avatarName, t) |
345 | } | ||
348 | 346 | ||
349 | actor.Account.set('name', result.name) | 347 | // Force update |
350 | await actor.Account.save({ transaction: t }) | 348 | actor.setDataValue('updatedAt', new Date()) |
351 | } else if (actor.VideoChannel) { | ||
352 | await actor.save({ transaction: t }) | 349 | await actor.save({ transaction: t }) |
353 | 350 | ||
354 | actor.VideoChannel.set('name', result.name) | 351 | if (actor.Account) { |
355 | await actor.VideoChannel.save({ transaction: t }) | 352 | await actor.save({ transaction: t }) |
356 | } | 353 | |
354 | actor.Account.set('name', result.name) | ||
355 | await actor.Account.save({ transaction: t }) | ||
356 | } else if (actor.VideoChannel) { | ||
357 | await actor.save({ transaction: t }) | ||
358 | |||
359 | actor.VideoChannel.set('name', result.name) | ||
360 | await actor.VideoChannel.save({ transaction: t }) | ||
361 | } | ||
357 | 362 | ||
363 | return actor | ||
364 | }) | ||
365 | } catch (err) { | ||
366 | logger.warn('Cannot refresh actor.', err) | ||
358 | return actor | 367 | return actor |
359 | }) | 368 | } |
360 | } | 369 | } |
361 | 370 | ||
362 | function normalizeActor (actor: any) { | 371 | function normalizeActor (actor: any) { |
diff --git a/server/lib/activitypub/fetch.ts b/server/lib/activitypub/fetch.ts index 4fc97cc38..b1b370a1a 100644 --- a/server/lib/activitypub/fetch.ts +++ b/server/lib/activitypub/fetch.ts | |||
@@ -1,13 +1,12 @@ | |||
1 | import { Transaction } from 'sequelize' | ||
2 | import { ActorModel } from '../../models/activitypub/actor' | 1 | import { ActorModel } from '../../models/activitypub/actor' |
3 | import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler' | 2 | import { JobQueue } from '../job-queue' |
4 | 3 | ||
5 | async function addFetchOutboxJob (actor: ActorModel, t: Transaction) { | 4 | async function addFetchOutboxJob (actor: ActorModel) { |
6 | const jobPayload: ActivityPubHttpPayload = { | 5 | const payload = { |
7 | uris: [ actor.outboxUrl ] | 6 | uris: [ actor.outboxUrl ] |
8 | } | 7 | } |
9 | 8 | ||
10 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload) | 9 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) |
11 | } | 10 | } |
12 | 11 | ||
13 | export { | 12 | export { |
diff --git a/server/lib/activitypub/process/process-accept.ts b/server/lib/activitypub/process/process-accept.ts index 551f09ea7..7db2f8ff0 100644 --- a/server/lib/activitypub/process/process-accept.ts +++ b/server/lib/activitypub/process/process-accept.ts | |||
@@ -26,6 +26,6 @@ async function processAccept (actor: ActorModel, targetActor: ActorModel) { | |||
26 | if (follow.state !== 'accepted') { | 26 | if (follow.state !== 'accepted') { |
27 | follow.set('state', 'accepted') | 27 | follow.set('state', 'accepted') |
28 | await follow.save() | 28 | await follow.save() |
29 | await addFetchOutboxJob(targetActor, undefined) | 29 | await addFetchOutboxJob(targetActor) |
30 | } | 30 | } |
31 | } | 31 | } |
diff --git a/server/lib/activitypub/process/process-follow.ts b/server/lib/activitypub/process/process-follow.ts index 69f5c51b5..dc1d542b5 100644 --- a/server/lib/activitypub/process/process-follow.ts +++ b/server/lib/activitypub/process/process-follow.ts | |||
@@ -63,7 +63,7 @@ async function follow (actor: ActorModel, targetActorURL: string) { | |||
63 | actorFollow.ActorFollowing = targetActor | 63 | actorFollow.ActorFollowing = targetActor |
64 | 64 | ||
65 | // Target sends to actor he accepted the follow request | 65 | // Target sends to actor he accepted the follow request |
66 | return sendAccept(actorFollow, t) | 66 | return sendAccept(actorFollow) |
67 | }) | 67 | }) |
68 | 68 | ||
69 | logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url) | 69 | logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url) |
diff --git a/server/lib/activitypub/send/misc.ts b/server/lib/activitypub/send/misc.ts index dc0d3de57..7a21f0c94 100644 --- a/server/lib/activitypub/send/misc.ts +++ b/server/lib/activitypub/send/misc.ts | |||
@@ -7,7 +7,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | |||
7 | import { VideoModel } from '../../../models/video/video' | 7 | import { VideoModel } from '../../../models/video/video' |
8 | import { VideoCommentModel } from '../../../models/video/video-comment' | 8 | import { VideoCommentModel } from '../../../models/video/video-comment' |
9 | import { VideoShareModel } from '../../../models/video/video-share' | 9 | import { VideoShareModel } from '../../../models/video/video-share' |
10 | import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../../jobs/activitypub-http-job-scheduler' | 10 | import { JobQueue } from '../../job-queue' |
11 | 11 | ||
12 | async function forwardActivity ( | 12 | async function forwardActivity ( |
13 | activity: Activity, | 13 | activity: Activity, |
@@ -35,12 +35,11 @@ async function forwardActivity ( | |||
35 | 35 | ||
36 | logger.debug('Creating forwarding job.', { uris }) | 36 | logger.debug('Creating forwarding job.', { uris }) |
37 | 37 | ||
38 | const jobPayload: ActivityPubHttpPayload = { | 38 | const payload = { |
39 | uris, | 39 | uris, |
40 | body: activity | 40 | body: activity |
41 | } | 41 | } |
42 | 42 | return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) | |
43 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) | ||
44 | } | 43 | } |
45 | 44 | ||
46 | async function broadcastToFollowers ( | 45 | async function broadcastToFollowers ( |
@@ -51,44 +50,43 @@ async function broadcastToFollowers ( | |||
51 | actorsException: ActorModel[] = [] | 50 | actorsException: ActorModel[] = [] |
52 | ) { | 51 | ) { |
53 | const uris = await computeFollowerUris(toActorFollowers, actorsException, t) | 52 | const uris = await computeFollowerUris(toActorFollowers, actorsException, t) |
54 | return broadcastTo(uris, data, byActor, t) | 53 | return broadcastTo(uris, data, byActor) |
55 | } | 54 | } |
56 | 55 | ||
57 | async function broadcastToActors ( | 56 | async function broadcastToActors ( |
58 | data: any, | 57 | data: any, |
59 | byActor: ActorModel, | 58 | byActor: ActorModel, |
60 | toActors: ActorModel[], | 59 | toActors: ActorModel[], |
61 | t: Transaction, | ||
62 | actorsException: ActorModel[] = [] | 60 | actorsException: ActorModel[] = [] |
63 | ) { | 61 | ) { |
64 | const uris = await computeUris(toActors, actorsException) | 62 | const uris = await computeUris(toActors, actorsException) |
65 | return broadcastTo(uris, data, byActor, t) | 63 | return broadcastTo(uris, data, byActor) |
66 | } | 64 | } |
67 | 65 | ||
68 | async function broadcastTo (uris: string[], data: any, byActor: ActorModel, t: Transaction) { | 66 | async function broadcastTo (uris: string[], data: any, byActor: ActorModel) { |
69 | if (uris.length === 0) return undefined | 67 | if (uris.length === 0) return undefined |
70 | 68 | ||
71 | logger.debug('Creating broadcast job.', { uris }) | 69 | logger.debug('Creating broadcast job.', { uris }) |
72 | 70 | ||
73 | const jobPayload: ActivityPubHttpPayload = { | 71 | const payload = { |
74 | uris, | 72 | uris, |
75 | signatureActorId: byActor.id, | 73 | signatureActorId: byActor.id, |
76 | body: data | 74 | body: data |
77 | } | 75 | } |
78 | 76 | ||
79 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) | 77 | return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) |
80 | } | 78 | } |
81 | 79 | ||
82 | async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string, t: Transaction) { | 80 | async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string) { |
83 | logger.debug('Creating unicast job.', { uri: toActorUrl }) | 81 | logger.debug('Creating unicast job.', { uri: toActorUrl }) |
84 | 82 | ||
85 | const jobPayload: ActivityPubHttpPayload = { | 83 | const payload = { |
86 | uris: [ toActorUrl ], | 84 | uri: toActorUrl, |
87 | signatureActorId: byActor.id, | 85 | signatureActorId: byActor.id, |
88 | body: data | 86 | body: data |
89 | } | 87 | } |
90 | 88 | ||
91 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload) | 89 | return JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) |
92 | } | 90 | } |
93 | 91 | ||
94 | function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) { | 92 | function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) { |
diff --git a/server/lib/activitypub/send/send-accept.ts b/server/lib/activitypub/send/send-accept.ts index 4eaa329d9..064fd88d2 100644 --- a/server/lib/activitypub/send/send-accept.ts +++ b/server/lib/activitypub/send/send-accept.ts | |||
@@ -1,4 +1,3 @@ | |||
1 | import { Transaction } from 'sequelize' | ||
2 | import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub' | 1 | import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub' |
3 | import { ActorModel } from '../../../models/activitypub/actor' | 2 | import { ActorModel } from '../../../models/activitypub/actor' |
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 3 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
@@ -6,7 +5,7 @@ import { getActorFollowAcceptActivityPubUrl, getActorFollowActivityPubUrl } from | |||
6 | import { unicastTo } from './misc' | 5 | import { unicastTo } from './misc' |
7 | import { followActivityData } from './send-follow' | 6 | import { followActivityData } from './send-follow' |
8 | 7 | ||
9 | async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { | 8 | async function sendAccept (actorFollow: ActorFollowModel) { |
10 | const follower = actorFollow.ActorFollower | 9 | const follower = actorFollow.ActorFollower |
11 | const me = actorFollow.ActorFollowing | 10 | const me = actorFollow.ActorFollowing |
12 | 11 | ||
@@ -16,7 +15,7 @@ async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { | |||
16 | const url = getActorFollowAcceptActivityPubUrl(actorFollow) | 15 | const url = getActorFollowAcceptActivityPubUrl(actorFollow) |
17 | const data = acceptActivityData(url, me, followData) | 16 | const data = acceptActivityData(url, me, followData) |
18 | 17 | ||
19 | return unicastTo(data, me, follower.inboxUrl, t) | 18 | return unicastTo(data, me, follower.inboxUrl) |
20 | } | 19 | } |
21 | 20 | ||
22 | // --------------------------------------------------------------------------- | 21 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/activitypub/send/send-announce.ts b/server/lib/activitypub/send/send-announce.ts index 578fbc630..93b5668d2 100644 --- a/server/lib/activitypub/send/send-announce.ts +++ b/server/lib/activitypub/send/send-announce.ts | |||
@@ -42,7 +42,7 @@ async function sendVideoAnnounceToOrigin (byActor: ActorModel, video: VideoModel | |||
42 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) | 42 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) |
43 | const data = await createActivityData(url, byActor, announcedActivity, t, audience) | 43 | const data = await createActivityData(url, byActor, announcedActivity, t, audience) |
44 | 44 | ||
45 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 45 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
46 | } | 46 | } |
47 | 47 | ||
48 | async function announceActivityData ( | 48 | async function announceActivityData ( |
diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index 9db663be1..b92615e9b 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts | |||
@@ -8,8 +8,14 @@ import { VideoAbuseModel } from '../../../models/video/video-abuse' | |||
8 | import { VideoCommentModel } from '../../../models/video/video-comment' | 8 | import { VideoCommentModel } from '../../../models/video/video-comment' |
9 | import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url' | 9 | import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url' |
10 | import { | 10 | import { |
11 | audiencify, broadcastToActors, broadcastToFollowers, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience, | 11 | audiencify, |
12 | getOriginVideoAudience, getOriginVideoCommentAudience, | 12 | broadcastToActors, |
13 | broadcastToFollowers, | ||
14 | getActorsInvolvedInVideo, | ||
15 | getAudience, | ||
16 | getObjectFollowersAudience, | ||
17 | getOriginVideoAudience, | ||
18 | getOriginVideoCommentAudience, | ||
13 | unicastTo | 19 | unicastTo |
14 | } from './misc' | 20 | } from './misc' |
15 | 21 | ||
@@ -31,7 +37,7 @@ async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel, | |||
31 | const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] } | 37 | const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] } |
32 | const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience) | 38 | const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience) |
33 | 39 | ||
34 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 40 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
35 | } | 41 | } |
36 | 42 | ||
37 | async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) { | 43 | async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) { |
@@ -47,13 +53,13 @@ async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Tr | |||
47 | 53 | ||
48 | // This was a reply, send it to the parent actors | 54 | // This was a reply, send it to the parent actors |
49 | const actorsException = [ byActor ] | 55 | const actorsException = [ byActor ] |
50 | await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) | 56 | await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) |
51 | 57 | ||
52 | // Broadcast to our followers | 58 | // Broadcast to our followers |
53 | await broadcastToFollowers(data, byActor, [ byActor ], t) | 59 | await broadcastToFollowers(data, byActor, [ byActor ], t) |
54 | 60 | ||
55 | // Send to origin | 61 | // Send to origin |
56 | return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 62 | return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl) |
57 | } | 63 | } |
58 | 64 | ||
59 | async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) { | 65 | async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) { |
@@ -69,7 +75,7 @@ async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentMode | |||
69 | 75 | ||
70 | // This was a reply, send it to the parent actors | 76 | // This was a reply, send it to the parent actors |
71 | const actorsException = [ byActor ] | 77 | const actorsException = [ byActor ] |
72 | await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) | 78 | await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) |
73 | 79 | ||
74 | // Broadcast to our followers | 80 | // Broadcast to our followers |
75 | await broadcastToFollowers(data, byActor, [ byActor ], t) | 81 | await broadcastToFollowers(data, byActor, [ byActor ], t) |
@@ -86,7 +92,7 @@ async function sendCreateViewToOrigin (byActor: ActorModel, video: VideoModel, t | |||
86 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) | 92 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) |
87 | const data = await createActivityData(url, byActor, viewActivityData, t, audience) | 93 | const data = await createActivityData(url, byActor, viewActivityData, t, audience) |
88 | 94 | ||
89 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 95 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
90 | } | 96 | } |
91 | 97 | ||
92 | async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 98 | async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
@@ -111,7 +117,7 @@ async function sendCreateDislikeToOrigin (byActor: ActorModel, video: VideoModel | |||
111 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) | 117 | const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) |
112 | const data = await createActivityData(url, byActor, dislikeActivityData, t, audience) | 118 | const data = await createActivityData(url, byActor, dislikeActivityData, t, audience) |
113 | 119 | ||
114 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 120 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
115 | } | 121 | } |
116 | 122 | ||
117 | async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 123 | async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
diff --git a/server/lib/activitypub/send/send-follow.ts b/server/lib/activitypub/send/send-follow.ts index eac60e94f..4e9865af4 100644 --- a/server/lib/activitypub/send/send-follow.ts +++ b/server/lib/activitypub/send/send-follow.ts | |||
@@ -1,18 +1,17 @@ | |||
1 | import { Transaction } from 'sequelize' | ||
2 | import { ActivityFollow } from '../../../../shared/models/activitypub' | 1 | import { ActivityFollow } from '../../../../shared/models/activitypub' |
3 | import { ActorModel } from '../../../models/activitypub/actor' | 2 | import { ActorModel } from '../../../models/activitypub/actor' |
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 3 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
5 | import { getActorFollowActivityPubUrl } from '../url' | 4 | import { getActorFollowActivityPubUrl } from '../url' |
6 | import { unicastTo } from './misc' | 5 | import { unicastTo } from './misc' |
7 | 6 | ||
8 | function sendFollow (actorFollow: ActorFollowModel, t: Transaction) { | 7 | function sendFollow (actorFollow: ActorFollowModel) { |
9 | const me = actorFollow.ActorFollower | 8 | const me = actorFollow.ActorFollower |
10 | const following = actorFollow.ActorFollowing | 9 | const following = actorFollow.ActorFollowing |
11 | 10 | ||
12 | const url = getActorFollowActivityPubUrl(actorFollow) | 11 | const url = getActorFollowActivityPubUrl(actorFollow) |
13 | const data = followActivityData(url, me, following) | 12 | const data = followActivityData(url, me, following) |
14 | 13 | ||
15 | return unicastTo(data, me, following.inboxUrl, t) | 14 | return unicastTo(data, me, following.inboxUrl) |
16 | } | 15 | } |
17 | 16 | ||
18 | function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { | 17 | function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { |
diff --git a/server/lib/activitypub/send/send-like.ts b/server/lib/activitypub/send/send-like.ts index 743646455..78ed1aaf2 100644 --- a/server/lib/activitypub/send/send-like.ts +++ b/server/lib/activitypub/send/send-like.ts | |||
@@ -20,7 +20,7 @@ async function sendLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Tran | |||
20 | const audience = getOriginVideoAudience(video, accountsInvolvedInVideo) | 20 | const audience = getOriginVideoAudience(video, accountsInvolvedInVideo) |
21 | const data = await likeActivityData(url, byActor, video, t, audience) | 21 | const data = await likeActivityData(url, byActor, video, t, audience) |
22 | 22 | ||
23 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 23 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
24 | } | 24 | } |
25 | 25 | ||
26 | async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 26 | async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
diff --git a/server/lib/activitypub/send/send-undo.ts b/server/lib/activitypub/send/send-undo.ts index 3a0597fba..4a08b5ca1 100644 --- a/server/lib/activitypub/send/send-undo.ts +++ b/server/lib/activitypub/send/send-undo.ts | |||
@@ -1,11 +1,5 @@ | |||
1 | import { Transaction } from 'sequelize' | 1 | import { Transaction } from 'sequelize' |
2 | import { | 2 | import { ActivityAudience, ActivityCreate, ActivityFollow, ActivityLike, ActivityUndo } from '../../../../shared/models/activitypub' |
3 | ActivityAudience, | ||
4 | ActivityCreate, | ||
5 | ActivityFollow, | ||
6 | ActivityLike, | ||
7 | ActivityUndo | ||
8 | } from '../../../../shared/models/activitypub' | ||
9 | import { ActorModel } from '../../../models/activitypub/actor' | 3 | import { ActorModel } from '../../../models/activitypub/actor' |
10 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
11 | import { VideoModel } from '../../../models/video/video' | 5 | import { VideoModel } from '../../../models/video/video' |
@@ -33,7 +27,7 @@ async function sendUndoFollow (actorFollow: ActorFollowModel, t: Transaction) { | |||
33 | const object = followActivityData(followUrl, me, following) | 27 | const object = followActivityData(followUrl, me, following) |
34 | const data = await undoActivityData(undoUrl, me, object, t) | 28 | const data = await undoActivityData(undoUrl, me, object, t) |
35 | 29 | ||
36 | return unicastTo(data, me, following.inboxUrl, t) | 30 | return unicastTo(data, me, following.inboxUrl) |
37 | } | 31 | } |
38 | 32 | ||
39 | async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) { | 33 | async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) { |
@@ -45,7 +39,7 @@ async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: | |||
45 | const object = await likeActivityData(likeUrl, byActor, video, t) | 39 | const object = await likeActivityData(likeUrl, byActor, video, t) |
46 | const data = await undoActivityData(undoUrl, byActor, object, t, audience) | 40 | const data = await undoActivityData(undoUrl, byActor, object, t, audience) |
47 | 41 | ||
48 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 42 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
49 | } | 43 | } |
50 | 44 | ||
51 | async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 45 | async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
@@ -72,7 +66,7 @@ async function sendUndoDislikeToOrigin (byActor: ActorModel, video: VideoModel, | |||
72 | 66 | ||
73 | const data = await undoActivityData(undoUrl, byActor, object, t, audience) | 67 | const data = await undoActivityData(undoUrl, byActor, object, t, audience) |
74 | 68 | ||
75 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) | 69 | return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) |
76 | } | 70 | } |
77 | 71 | ||
78 | async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { | 72 | async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 3f780e319..159856cda 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,10 +1,19 @@ | |||
1 | import * as kue from 'kue' | ||
1 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
2 | import { doRequest } from '../../../helpers/requests' | 3 | import { doRequest } from '../../../helpers/requests' |
3 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
4 | import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' | 5 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
5 | 6 | ||
6 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | 7 | export type ActivitypubHttpBroadcastPayload = { |
7 | logger.info('Processing ActivityPub broadcast in job %d.', jobId) | 8 | uris: string[] |
9 | signatureActorId?: number | ||
10 | body: any | ||
11 | } | ||
12 | |||
13 | async function processActivityPubHttpBroadcast (job: kue.Job) { | ||
14 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | ||
15 | |||
16 | const payload = job.data as ActivitypubHttpBroadcastPayload | ||
8 | 17 | ||
9 | const body = await computeBody(payload) | 18 | const body = await computeBody(payload) |
10 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | 19 | const httpSignatureOptions = await buildSignedRequestOptions(payload) |
@@ -26,28 +35,15 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) { | |||
26 | await doRequest(options) | 35 | await doRequest(options) |
27 | goodUrls.push(uri) | 36 | goodUrls.push(uri) |
28 | } catch (err) { | 37 | } catch (err) { |
29 | const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) | 38 | badUrls.push(uri) |
30 | if (isRetryingLater === false) badUrls.push(uri) | ||
31 | } | 39 | } |
32 | } | 40 | } |
33 | 41 | ||
34 | return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) | 42 | return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) |
35 | } | 43 | } |
36 | 44 | ||
37 | function onError (err: Error, jobId: number) { | ||
38 | logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err) | ||
39 | return Promise.resolve() | ||
40 | } | ||
41 | |||
42 | function onSuccess (jobId: number) { | ||
43 | logger.info('Job %d is a success.', jobId) | ||
44 | return Promise.resolve() | ||
45 | } | ||
46 | |||
47 | // --------------------------------------------------------------------------- | 45 | // --------------------------------------------------------------------------- |
48 | 46 | ||
49 | export { | 47 | export { |
50 | process, | 48 | processActivityPubHttpBroadcast |
51 | onError, | ||
52 | onSuccess | ||
53 | } | 49 | } |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index a7b5aabd0..062211c85 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -1,11 +1,18 @@ | |||
1 | import * as kue from 'kue' | ||
1 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
2 | import { doRequest } from '../../../helpers/requests' | 3 | import { doRequest } from '../../../helpers/requests' |
3 | import { ACTIVITY_PUB } from '../../../initializers' | 4 | import { ACTIVITY_PUB } from '../../../initializers' |
4 | import { processActivities } from '../../activitypub/process' | 5 | import { processActivities } from '../../activitypub/process' |
5 | import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' | 6 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' |
6 | 7 | ||
7 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | 8 | export type ActivitypubHttpFetcherPayload = { |
8 | logger.info('Processing ActivityPub fetcher in job %d.', jobId) | 9 | uris: string[] |
10 | } | ||
11 | |||
12 | async function processActivityPubHttpFetcher (job: kue.Job) { | ||
13 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | ||
14 | |||
15 | const payload = job.data as ActivitypubHttpBroadcastPayload | ||
9 | 16 | ||
10 | const options = { | 17 | const options = { |
11 | method: 'GET', | 18 | method: 'GET', |
@@ -49,20 +56,8 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) { | |||
49 | } | 56 | } |
50 | } | 57 | } |
51 | 58 | ||
52 | function onError (err: Error, jobId: number) { | ||
53 | logger.error('Error when fetcher ActivityPub request in job %d.', jobId, err) | ||
54 | return Promise.resolve() | ||
55 | } | ||
56 | |||
57 | function onSuccess (jobId: number) { | ||
58 | logger.info('Job %d is a success.', jobId) | ||
59 | return Promise.resolve() | ||
60 | } | ||
61 | |||
62 | // --------------------------------------------------------------------------- | 59 | // --------------------------------------------------------------------------- |
63 | 60 | ||
64 | export { | 61 | export { |
65 | process, | 62 | processActivityPubHttpFetcher |
66 | onError, | ||
67 | onSuccess | ||
68 | } | 63 | } |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts new file mode 100644 index 000000000..9b4188c50 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -0,0 +1,43 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
5 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | ||
6 | |||
7 | export type ActivitypubHttpUnicastPayload = { | ||
8 | uri: string | ||
9 | signatureActorId?: number | ||
10 | body: any | ||
11 | } | ||
12 | |||
13 | async function processActivityPubHttpUnicast (job: kue.Job) { | ||
14 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | ||
15 | |||
16 | const payload = job.data as ActivitypubHttpUnicastPayload | ||
17 | const uri = payload.uri | ||
18 | |||
19 | const body = await computeBody(payload) | ||
20 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | ||
21 | |||
22 | const options = { | ||
23 | method: 'POST', | ||
24 | uri, | ||
25 | json: body, | ||
26 | httpSignature: httpSignatureOptions | ||
27 | } | ||
28 | |||
29 | try { | ||
30 | await doRequest(options) | ||
31 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) | ||
32 | } catch (err) { | ||
33 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) | ||
34 | |||
35 | throw err | ||
36 | } | ||
37 | } | ||
38 | |||
39 | // --------------------------------------------------------------------------- | ||
40 | |||
41 | export { | ||
42 | processActivityPubHttpUnicast | ||
43 | } | ||
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts new file mode 100644 index 000000000..c087371c6 --- /dev/null +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -0,0 +1,39 @@ | |||
1 | import { buildSignedActivity } from '../../../../helpers/activitypub' | ||
2 | import { getServerActor } from '../../../../helpers/utils' | ||
3 | import { ActorModel } from '../../../../models/activitypub/actor' | ||
4 | |||
5 | async function computeBody (payload: { body: any, signatureActorId?: number }) { | ||
6 | let body = payload.body | ||
7 | |||
8 | if (payload.signatureActorId) { | ||
9 | const actorSignature = await ActorModel.load(payload.signatureActorId) | ||
10 | if (!actorSignature) throw new Error('Unknown signature actor id.') | ||
11 | body = await buildSignedActivity(actorSignature, payload.body) | ||
12 | } | ||
13 | |||
14 | return body | ||
15 | } | ||
16 | |||
17 | async function buildSignedRequestOptions (payload: { signatureActorId?: number }) { | ||
18 | let actor: ActorModel | ||
19 | if (payload.signatureActorId) { | ||
20 | actor = await ActorModel.load(payload.signatureActorId) | ||
21 | if (!actor) throw new Error('Unknown signature actor id.') | ||
22 | } else { | ||
23 | // We need to sign the request, so use the server | ||
24 | actor = await getServerActor() | ||
25 | } | ||
26 | |||
27 | const keyId = actor.getWebfingerUrl() | ||
28 | return { | ||
29 | algorithm: 'rsa-sha256', | ||
30 | authorizationHeaderName: 'Signature', | ||
31 | keyId, | ||
32 | key: actor.privateKey | ||
33 | } | ||
34 | } | ||
35 | |||
36 | export { | ||
37 | computeBody, | ||
38 | buildSignedRequestOptions | ||
39 | } | ||
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/job-queue/handlers/video-file.ts index f224a31b4..5294483bd 100644 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -1,38 +1,60 @@ | |||
1 | import * as Bluebird from 'bluebird' | 1 | import * as kue from 'kue' |
2 | import { VideoResolution } from '../../../../shared' | ||
2 | import { VideoPrivacy } from '../../../../shared/models/videos' | 3 | import { VideoPrivacy } from '../../../../shared/models/videos' |
3 | import { logger } from '../../../helpers/logger' | 4 | import { logger } from '../../../helpers/logger' |
4 | import { computeResolutionsToTranscode } from '../../../helpers/utils' | 5 | import { computeResolutionsToTranscode } from '../../../helpers/utils' |
5 | import { sequelizeTypescript } from '../../../initializers' | 6 | import { sequelizeTypescript } from '../../../initializers' |
6 | import { JobModel } from '../../../models/job/job' | ||
7 | import { VideoModel } from '../../../models/video/video' | 7 | import { VideoModel } from '../../../models/video/video' |
8 | import { shareVideoByServerAndChannel } from '../../activitypub' | 8 | import { shareVideoByServerAndChannel } from '../../activitypub' |
9 | import { sendCreateVideo } from '../../activitypub/send' | 9 | import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send' |
10 | import { JobScheduler } from '../job-scheduler' | 10 | import { JobQueue } from '../job-queue' |
11 | import { TranscodingJobPayload } from './transcoding-job-scheduler' | ||
12 | 11 | ||
13 | async function process (data: TranscodingJobPayload, jobId: number) { | 12 | export type VideoFilePayload = { |
14 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) | 13 | videoUUID: string |
14 | resolution?: VideoResolution | ||
15 | } | ||
16 | |||
17 | async function processVideoFile (job: kue.Job) { | ||
18 | const payload = job.data as VideoFilePayload | ||
19 | logger.info('Processing video file in job %d.', job.id) | ||
20 | |||
21 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID) | ||
15 | // No video, maybe deleted? | 22 | // No video, maybe deleted? |
16 | if (!video) { | 23 | if (!video) { |
17 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | 24 | logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid }) |
18 | return undefined | 25 | return undefined |
19 | } | 26 | } |
20 | 27 | ||
21 | await video.optimizeOriginalVideofile() | 28 | // Transcoding in other resolution |
29 | if (payload.resolution) { | ||
30 | await video.transcodeOriginalVideofile(payload.resolution) | ||
31 | await onVideoFileTranscoderSuccess(video) | ||
32 | } else { | ||
33 | await video.optimizeOriginalVideofile() | ||
34 | await onVideoFileOptimizerSuccess(video) | ||
35 | } | ||
22 | 36 | ||
23 | return video | 37 | return video |
24 | } | 38 | } |
25 | 39 | ||
26 | function onError (err: Error, jobId: number) { | 40 | async function onVideoFileTranscoderSuccess (video: VideoModel) { |
27 | logger.error('Error when optimized video file in job %d.', jobId, err) | 41 | if (video === undefined) return undefined |
28 | return Promise.resolve() | 42 | |
43 | // Maybe the video changed in database, refresh it | ||
44 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | ||
45 | // Video does not exist anymore | ||
46 | if (!videoDatabase) return undefined | ||
47 | |||
48 | if (video.privacy !== VideoPrivacy.PRIVATE) { | ||
49 | await sendUpdateVideo(video, undefined) | ||
50 | } | ||
51 | |||
52 | return undefined | ||
29 | } | 53 | } |
30 | 54 | ||
31 | async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler<TranscodingJobPayload, VideoModel>) { | 55 | async function onVideoFileOptimizerSuccess (video: VideoModel) { |
32 | if (video === undefined) return undefined | 56 | if (video === undefined) return undefined |
33 | 57 | ||
34 | logger.info('Job %d is a success.', jobId) | ||
35 | |||
36 | // Maybe the video changed in database, refresh it | 58 | // Maybe the video changed in database, refresh it |
37 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | 59 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) |
38 | // Video does not exist anymore | 60 | // Video does not exist anymore |
@@ -56,7 +78,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch | |||
56 | if (resolutionsEnabled.length !== 0) { | 78 | if (resolutionsEnabled.length !== 0) { |
57 | try { | 79 | try { |
58 | await sequelizeTypescript.transaction(async t => { | 80 | await sequelizeTypescript.transaction(async t => { |
59 | const tasks: Bluebird<JobModel>[] = [] | 81 | const tasks: Promise<any>[] = [] |
60 | 82 | ||
61 | for (const resolution of resolutionsEnabled) { | 83 | for (const resolution of resolutionsEnabled) { |
62 | const dataInput = { | 84 | const dataInput = { |
@@ -64,7 +86,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch | |||
64 | resolution | 86 | resolution |
65 | } | 87 | } |
66 | 88 | ||
67 | const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput) | 89 | const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) |
68 | tasks.push(p) | 90 | tasks.push(p) |
69 | } | 91 | } |
70 | 92 | ||
@@ -84,7 +106,5 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch | |||
84 | // --------------------------------------------------------------------------- | 106 | // --------------------------------------------------------------------------- |
85 | 107 | ||
86 | export { | 108 | export { |
87 | process, | 109 | processVideoFile |
88 | onError, | ||
89 | onSuccess | ||
90 | } | 110 | } |
diff --git a/server/lib/job-queue/index.ts b/server/lib/job-queue/index.ts new file mode 100644 index 000000000..57231e649 --- /dev/null +++ b/server/lib/job-queue/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './job-queue' | |||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts new file mode 100644 index 000000000..7a2b6c78d --- /dev/null +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -0,0 +1,124 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { JobType, JobState } from '../../../shared/models' | ||
3 | import { logger } from '../../helpers/logger' | ||
4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' | ||
5 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | ||
6 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | ||
7 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | ||
8 | import { processVideoFile, VideoFilePayload } from './handlers/video-file' | ||
9 | |||
10 | type CreateJobArgument = | ||
11 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | ||
12 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | ||
13 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | ||
14 | { type: 'video-file', payload: VideoFilePayload } | ||
15 | |||
16 | const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | ||
17 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | ||
18 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | ||
19 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | ||
20 | 'video-file': processVideoFile | ||
21 | } | ||
22 | |||
23 | class JobQueue { | ||
24 | |||
25 | private static instance: JobQueue | ||
26 | |||
27 | private jobQueue: kue.Queue | ||
28 | private initialized = false | ||
29 | |||
30 | private constructor () {} | ||
31 | |||
32 | init () { | ||
33 | // Already initialized | ||
34 | if (this.initialized === true) return | ||
35 | this.initialized = true | ||
36 | |||
37 | this.jobQueue = kue.createQueue({ | ||
38 | prefix: 'q-' + CONFIG.WEBSERVER.HOST, | ||
39 | redis: { | ||
40 | host: CONFIG.REDIS.HOSTNAME, | ||
41 | port: CONFIG.REDIS.PORT, | ||
42 | auth: CONFIG.REDIS.AUTH | ||
43 | } | ||
44 | }) | ||
45 | |||
46 | this.jobQueue.on('error', err => { | ||
47 | logger.error('Error in job queue.', err) | ||
48 | process.exit(-1) | ||
49 | }) | ||
50 | this.jobQueue.watchStuckJobs(5000) | ||
51 | |||
52 | for (const handlerName of Object.keys(handlers)) { | ||
53 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { | ||
54 | try { | ||
55 | const res = await handlers[ handlerName ](job) | ||
56 | return done(null, res) | ||
57 | } catch (err) { | ||
58 | return done(err) | ||
59 | } | ||
60 | }) | ||
61 | } | ||
62 | } | ||
63 | |||
64 | createJob (obj: CreateJobArgument, priority = 'normal') { | ||
65 | return new Promise((res, rej) => { | ||
66 | this.jobQueue | ||
67 | .create(obj.type, obj.payload) | ||
68 | .priority(priority) | ||
69 | .attempts(JOB_ATTEMPTS[obj.type]) | ||
70 | .backoff({ type: 'exponential' }) | ||
71 | .save(err => { | ||
72 | if (err) return rej(err) | ||
73 | |||
74 | return res() | ||
75 | }) | ||
76 | }) | ||
77 | } | ||
78 | |||
79 | listForApi (state: JobState, start: number, count: number, sort: string) { | ||
80 | return new Promise<kue.Job[]>((res, rej) => { | ||
81 | kue.Job.rangeByState(state, start, count, sort, (err, jobs) => { | ||
82 | if (err) return rej(err) | ||
83 | |||
84 | return res(jobs) | ||
85 | }) | ||
86 | }) | ||
87 | } | ||
88 | |||
89 | count (state: JobState) { | ||
90 | return new Promise<number>((res, rej) => { | ||
91 | this.jobQueue[state + 'Count']((err, total) => { | ||
92 | if (err) return rej(err) | ||
93 | |||
94 | return res(total) | ||
95 | }) | ||
96 | }) | ||
97 | } | ||
98 | |||
99 | removeOldJobs () { | ||
100 | const now = new Date().getTime() | ||
101 | kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { | ||
102 | if (err) { | ||
103 | logger.error('Cannot get jobs when removing old jobs.', err) | ||
104 | return | ||
105 | } | ||
106 | |||
107 | for (const job of jobs) { | ||
108 | if (now - job.created_at > JOB_COMPLETED_LIFETIME) { | ||
109 | job.remove() | ||
110 | } | ||
111 | } | ||
112 | }) | ||
113 | } | ||
114 | |||
115 | static get Instance () { | ||
116 | return this.instance || (this.instance = new this()) | ||
117 | } | ||
118 | } | ||
119 | |||
120 | // --------------------------------------------------------------------------- | ||
121 | |||
122 | export { | ||
123 | JobQueue | ||
124 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts deleted file mode 100644 index 4459152db..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts +++ /dev/null | |||
@@ -1,94 +0,0 @@ | |||
1 | import { JobCategory } from '../../../../shared' | ||
2 | import { buildSignedActivity } from '../../../helpers/activitypub' | ||
3 | import { logger } from '../../../helpers/logger' | ||
4 | import { getServerActor } from '../../../helpers/utils' | ||
5 | import { ACTIVITY_PUB } from '../../../initializers' | ||
6 | import { ActorModel } from '../../../models/activitypub/actor' | ||
7 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
8 | import { JobHandler, JobScheduler } from '../job-scheduler' | ||
9 | |||
10 | import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' | ||
11 | import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' | ||
12 | import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' | ||
13 | |||
14 | type ActivityPubHttpPayload = { | ||
15 | uris: string[] | ||
16 | signatureActorId?: number | ||
17 | body?: any | ||
18 | attemptNumber?: number | ||
19 | } | ||
20 | |||
21 | const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = { | ||
22 | activitypubHttpBroadcastHandler, | ||
23 | activitypubHttpUnicastHandler, | ||
24 | activitypubHttpFetcherHandler | ||
25 | } | ||
26 | const jobCategory: JobCategory = 'activitypub-http' | ||
27 | |||
28 | const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers) | ||
29 | |||
30 | async function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) { | ||
31 | logger.warn('Cannot make request to %s.', uri, err) | ||
32 | |||
33 | let attemptNumber = payload.attemptNumber || 1 | ||
34 | attemptNumber += 1 | ||
35 | |||
36 | if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) { | ||
37 | logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err) | ||
38 | |||
39 | const actor = await ActorFollowModel.loadByFollowerInbox(uri, undefined) | ||
40 | if (!actor) { | ||
41 | logger.debug('Actor %s is not a follower, do not retry the request.', uri) | ||
42 | return false | ||
43 | } | ||
44 | |||
45 | const newPayload = Object.assign(payload, { | ||
46 | uris: [ uri ], | ||
47 | attemptNumber | ||
48 | }) | ||
49 | await activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload) | ||
50 | |||
51 | return true | ||
52 | } | ||
53 | |||
54 | return false | ||
55 | } | ||
56 | |||
57 | async function computeBody (payload: ActivityPubHttpPayload) { | ||
58 | let body = payload.body | ||
59 | |||
60 | if (payload.signatureActorId) { | ||
61 | const actorSignature = await ActorModel.load(payload.signatureActorId) | ||
62 | if (!actorSignature) throw new Error('Unknown signature actor id.') | ||
63 | body = await buildSignedActivity(actorSignature, payload.body) | ||
64 | } | ||
65 | |||
66 | return body | ||
67 | } | ||
68 | |||
69 | async function buildSignedRequestOptions (payload: ActivityPubHttpPayload) { | ||
70 | let actor: ActorModel | ||
71 | if (payload.signatureActorId) { | ||
72 | actor = await ActorModel.load(payload.signatureActorId) | ||
73 | if (!actor) throw new Error('Unknown signature actor id.') | ||
74 | } else { | ||
75 | // We need to sign the request, so use the server | ||
76 | actor = await getServerActor() | ||
77 | } | ||
78 | |||
79 | const keyId = actor.getWebfingerUrl() | ||
80 | return { | ||
81 | algorithm: 'rsa-sha256', | ||
82 | authorizationHeaderName: 'Signature', | ||
83 | keyId, | ||
84 | key: actor.privateKey | ||
85 | } | ||
86 | } | ||
87 | |||
88 | export { | ||
89 | ActivityPubHttpPayload, | ||
90 | activitypubHttpJobScheduler, | ||
91 | maybeRetryRequestLater, | ||
92 | computeBody, | ||
93 | buildSignedRequestOptions | ||
94 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts deleted file mode 100644 index 54a7504e8..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts +++ /dev/null | |||
@@ -1,50 +0,0 @@ | |||
1 | import { logger } from '../../../helpers/logger' | ||
2 | import { doRequest } from '../../../helpers/requests' | ||
3 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
4 | import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' | ||
5 | |||
6 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | ||
7 | logger.info('Processing ActivityPub unicast in job %d.', jobId) | ||
8 | |||
9 | const uri = payload.uris[0] | ||
10 | |||
11 | const body = await computeBody(payload) | ||
12 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | ||
13 | |||
14 | const options = { | ||
15 | method: 'POST', | ||
16 | uri, | ||
17 | json: body, | ||
18 | httpSignature: httpSignatureOptions | ||
19 | } | ||
20 | |||
21 | try { | ||
22 | await doRequest(options) | ||
23 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) | ||
24 | } catch (err) { | ||
25 | const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) | ||
26 | if (isRetryingLater === false) { | ||
27 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) | ||
28 | } | ||
29 | |||
30 | throw err | ||
31 | } | ||
32 | } | ||
33 | |||
34 | function onError (err: Error, jobId: number) { | ||
35 | logger.error('Error when sending ActivityPub request in job %d.', jobId, err) | ||
36 | return Promise.resolve() | ||
37 | } | ||
38 | |||
39 | function onSuccess (jobId: number) { | ||
40 | logger.info('Job %d is a success.', jobId) | ||
41 | return Promise.resolve() | ||
42 | } | ||
43 | |||
44 | // --------------------------------------------------------------------------- | ||
45 | |||
46 | export { | ||
47 | process, | ||
48 | onError, | ||
49 | onSuccess | ||
50 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts deleted file mode 100644 index ad8f527b4..000000000 --- a/server/lib/jobs/activitypub-http-job-scheduler/index.ts +++ /dev/null | |||
@@ -1 +0,0 @@ | |||
1 | export * from './activitypub-http-job-scheduler' | ||
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts deleted file mode 100644 index 394264ec1..000000000 --- a/server/lib/jobs/index.ts +++ /dev/null | |||
@@ -1,2 +0,0 @@ | |||
1 | export * from './activitypub-http-job-scheduler' | ||
2 | export * from './transcoding-job-scheduler' | ||
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts deleted file mode 100644 index 9d55880e6..000000000 --- a/server/lib/jobs/job-scheduler.ts +++ /dev/null | |||
@@ -1,144 +0,0 @@ | |||
1 | import { AsyncQueue, forever, queue } from 'async' | ||
2 | import * as Sequelize from 'sequelize' | ||
3 | import { JobCategory } from '../../../shared' | ||
4 | import { logger } from '../../helpers/logger' | ||
5 | import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' | ||
6 | import { JobModel } from '../../models/job/job' | ||
7 | |||
8 | export interface JobHandler<P, T> { | ||
9 | process (data: object, jobId: number): Promise<T> | ||
10 | onError (err: Error, jobId: number) | ||
11 | onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>): Promise<any> | ||
12 | } | ||
13 | type JobQueueCallback = (err: Error) => void | ||
14 | |||
15 | class JobScheduler<P, T> { | ||
16 | |||
17 | constructor ( | ||
18 | private jobCategory: JobCategory, | ||
19 | private jobHandlers: { [ id: string ]: JobHandler<P, T> } | ||
20 | ) {} | ||
21 | |||
22 | async activate () { | ||
23 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory] | ||
24 | |||
25 | logger.info('Jobs scheduler %s activated.', this.jobCategory) | ||
26 | |||
27 | const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this)) | ||
28 | |||
29 | // Finish processing jobs from a previous start | ||
30 | const state = JOB_STATES.PROCESSING | ||
31 | try { | ||
32 | const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) | ||
33 | |||
34 | this.enqueueJobs(jobsQueue, jobs) | ||
35 | } catch (err) { | ||
36 | logger.error('Cannot list pending jobs.', err) | ||
37 | } | ||
38 | |||
39 | forever( | ||
40 | async next => { | ||
41 | if (jobsQueue.length() !== 0) { | ||
42 | // Finish processing the queue first | ||
43 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | ||
44 | } | ||
45 | |||
46 | const state = JOB_STATES.PENDING | ||
47 | try { | ||
48 | const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) | ||
49 | |||
50 | this.enqueueJobs(jobsQueue, jobs) | ||
51 | } catch (err) { | ||
52 | logger.error('Cannot list pending jobs.', err) | ||
53 | } | ||
54 | |||
55 | // Optimization: we could use "drain" from queue object | ||
56 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | ||
57 | }, | ||
58 | |||
59 | err => logger.error('Error in job scheduler queue.', err) | ||
60 | ) | ||
61 | } | ||
62 | |||
63 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) { | ||
64 | const createQuery = { | ||
65 | state: JOB_STATES.PENDING, | ||
66 | category: this.jobCategory, | ||
67 | handlerName, | ||
68 | handlerInputData | ||
69 | } | ||
70 | |||
71 | const options = { transaction } | ||
72 | |||
73 | return JobModel.create(createQuery, options) | ||
74 | } | ||
75 | |||
76 | private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) { | ||
77 | jobs.forEach(job => jobsQueue.push(job)) | ||
78 | } | ||
79 | |||
80 | private async processJob (job: JobModel, callback: (err: Error) => void) { | ||
81 | const jobHandler = this.jobHandlers[job.handlerName] | ||
82 | if (jobHandler === undefined) { | ||
83 | const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id | ||
84 | logger.error(errorString) | ||
85 | |||
86 | const error = new Error(errorString) | ||
87 | await this.onJobError(jobHandler, job, error) | ||
88 | return callback(error) | ||
89 | } | ||
90 | |||
91 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | ||
92 | |||
93 | job.state = JOB_STATES.PROCESSING | ||
94 | await job.save() | ||
95 | |||
96 | try { | ||
97 | const result: T = await jobHandler.process(job.handlerInputData, job.id) | ||
98 | await this.onJobSuccess(jobHandler, job, result) | ||
99 | } catch (err) { | ||
100 | logger.error('Error in job handler %s.', job.handlerName, err) | ||
101 | |||
102 | try { | ||
103 | await this.onJobError(jobHandler, job, err) | ||
104 | } catch (innerErr) { | ||
105 | this.cannotSaveJobError(innerErr) | ||
106 | return callback(innerErr) | ||
107 | } | ||
108 | } | ||
109 | |||
110 | return callback(null) | ||
111 | } | ||
112 | |||
113 | private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) { | ||
114 | job.state = JOB_STATES.ERROR | ||
115 | |||
116 | try { | ||
117 | await job.save() | ||
118 | if (jobHandler) await jobHandler.onError(err, job.id) | ||
119 | } catch (err) { | ||
120 | this.cannotSaveJobError(err) | ||
121 | } | ||
122 | } | ||
123 | |||
124 | private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) { | ||
125 | job.state = JOB_STATES.SUCCESS | ||
126 | |||
127 | try { | ||
128 | await job.save() | ||
129 | await jobHandler.onSuccess(job.id, jobResult, this) | ||
130 | } catch (err) { | ||
131 | this.cannotSaveJobError(err) | ||
132 | } | ||
133 | } | ||
134 | |||
135 | private cannotSaveJobError (err: Error) { | ||
136 | logger.error('Cannot save new job state.', err) | ||
137 | } | ||
138 | } | ||
139 | |||
140 | // --------------------------------------------------------------------------- | ||
141 | |||
142 | export { | ||
143 | JobScheduler | ||
144 | } | ||
diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts deleted file mode 100644 index 73152a1be..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/index.ts +++ /dev/null | |||
@@ -1 +0,0 @@ | |||
1 | export * from './transcoding-job-scheduler' | ||
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts deleted file mode 100644 index e5530a73c..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts +++ /dev/null | |||
@@ -1,23 +0,0 @@ | |||
1 | import { JobCategory } from '../../../../shared' | ||
2 | import { VideoModel } from '../../../models/video/video' | ||
3 | import { JobHandler, JobScheduler } from '../job-scheduler' | ||
4 | |||
5 | import * as videoFileOptimizer from './video-file-optimizer-handler' | ||
6 | import * as videoFileTranscoder from './video-file-transcoder-handler' | ||
7 | |||
8 | type TranscodingJobPayload = { | ||
9 | videoUUID: string | ||
10 | resolution?: number | ||
11 | } | ||
12 | const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoModel> } = { | ||
13 | videoFileOptimizer, | ||
14 | videoFileTranscoder | ||
15 | } | ||
16 | const jobCategory: JobCategory = 'transcoding' | ||
17 | |||
18 | const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers) | ||
19 | |||
20 | export { | ||
21 | TranscodingJobPayload, | ||
22 | transcodingJobScheduler | ||
23 | } | ||
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts deleted file mode 100644 index 883d3eba8..000000000 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts +++ /dev/null | |||
@@ -1,48 +0,0 @@ | |||
1 | import { VideoResolution } from '../../../../shared' | ||
2 | import { VideoPrivacy } from '../../../../shared/models/videos' | ||
3 | import { logger } from '../../../helpers/logger' | ||
4 | import { VideoModel } from '../../../models/video/video' | ||
5 | import { sendUpdateVideo } from '../../activitypub/send' | ||
6 | |||
7 | async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { | ||
8 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) | ||
9 | // No video, maybe deleted? | ||
10 | if (!video) { | ||
11 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | ||
12 | return undefined | ||
13 | } | ||
14 | |||
15 | await video.transcodeOriginalVideofile(data.resolution) | ||
16 | |||
17 | return video | ||
18 | } | ||
19 | |||
20 | function onError (err: Error, jobId: number) { | ||
21 | logger.error('Error when transcoding video file in job %d.', jobId, err) | ||
22 | return Promise.resolve() | ||
23 | } | ||
24 | |||
25 | async function onSuccess (jobId: number, video: VideoModel) { | ||
26 | if (video === undefined) return undefined | ||
27 | |||
28 | logger.info('Job %d is a success.', jobId) | ||
29 | |||
30 | // Maybe the video changed in database, refresh it | ||
31 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | ||
32 | // Video does not exist anymore | ||
33 | if (!videoDatabase) return undefined | ||
34 | |||
35 | if (video.privacy !== VideoPrivacy.PRIVATE) { | ||
36 | await sendUpdateVideo(video, undefined) | ||
37 | } | ||
38 | |||
39 | return undefined | ||
40 | } | ||
41 | |||
42 | // --------------------------------------------------------------------------- | ||
43 | |||
44 | export { | ||
45 | process, | ||
46 | onError, | ||
47 | onSuccess | ||
48 | } | ||
diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts new file mode 100644 index 000000000..add5677ac --- /dev/null +++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts | |||
@@ -0,0 +1,19 @@ | |||
1 | import { JobQueue } from '../job-queue' | ||
2 | import { AbstractScheduler } from './abstract-scheduler' | ||
3 | |||
4 | export class RemoveOldJobsScheduler extends AbstractScheduler { | ||
5 | |||
6 | private static instance: AbstractScheduler | ||
7 | |||
8 | private constructor () { | ||
9 | super() | ||
10 | } | ||
11 | |||
12 | async execute () { | ||
13 | JobQueue.Instance.removeOldJobs() | ||
14 | } | ||
15 | |||
16 | static get Instance () { | ||
17 | return this.instance || (this.instance = new this()) | ||
18 | } | ||
19 | } | ||
diff --git a/server/middlewares/validators/jobs.ts b/server/middlewares/validators/jobs.ts new file mode 100644 index 000000000..2f8b1738c --- /dev/null +++ b/server/middlewares/validators/jobs.ts | |||
@@ -0,0 +1,23 @@ | |||
1 | import * as express from 'express' | ||
2 | import { param } from 'express-validator/check' | ||
3 | import { isValidJobState } from '../../helpers/custom-validators/jobs' | ||
4 | import { logger } from '../../helpers/logger' | ||
5 | import { areValidationErrors } from './utils' | ||
6 | |||
7 | const listJobsValidator = [ | ||
8 | param('state').custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'), | ||
9 | |||
10 | async (req: express.Request, res: express.Response, next: express.NextFunction) => { | ||
11 | logger.debug('Checking listJobsValidator parameters.', { parameters: req.params }) | ||
12 | |||
13 | if (areValidationErrors(req, res)) return | ||
14 | |||
15 | return next() | ||
16 | } | ||
17 | ] | ||
18 | |||
19 | // --------------------------------------------------------------------------- | ||
20 | |||
21 | export { | ||
22 | listJobsValidator | ||
23 | } | ||
diff --git a/server/models/job/job.ts b/server/models/job/job.ts deleted file mode 100644 index ba1c6737e..000000000 --- a/server/models/job/job.ts +++ /dev/null | |||
@@ -1,80 +0,0 @@ | |||
1 | import { values } from 'lodash' | ||
2 | import { AllowNull, Column, CreatedAt, DataType, Model, Table, UpdatedAt } from 'sequelize-typescript' | ||
3 | import { JobCategory, JobState } from '../../../shared/models' | ||
4 | import { JOB_CATEGORIES, JOB_STATES } from '../../initializers' | ||
5 | import { getSort } from '../utils' | ||
6 | |||
7 | @Table({ | ||
8 | tableName: 'job', | ||
9 | indexes: [ | ||
10 | { | ||
11 | fields: [ 'state', 'category' ] | ||
12 | } | ||
13 | ] | ||
14 | }) | ||
15 | export class JobModel extends Model<JobModel> { | ||
16 | @AllowNull(false) | ||
17 | @Column(DataType.ENUM(values(JOB_STATES))) | ||
18 | state: JobState | ||
19 | |||
20 | @AllowNull(false) | ||
21 | @Column(DataType.ENUM(values(JOB_CATEGORIES))) | ||
22 | category: JobCategory | ||
23 | |||
24 | @AllowNull(false) | ||
25 | @Column | ||
26 | handlerName: string | ||
27 | |||
28 | @AllowNull(true) | ||
29 | @Column(DataType.JSON) | ||
30 | handlerInputData: any | ||
31 | |||
32 | @CreatedAt | ||
33 | createdAt: Date | ||
34 | |||
35 | @UpdatedAt | ||
36 | updatedAt: Date | ||
37 | |||
38 | static listWithLimitByCategory (limit: number, state: JobState, jobCategory: JobCategory) { | ||
39 | const query = { | ||
40 | order: [ | ||
41 | [ 'id', 'ASC' ] | ||
42 | ], | ||
43 | limit: limit, | ||
44 | where: { | ||
45 | state, | ||
46 | category: jobCategory | ||
47 | }, | ||
48 | logging: false | ||
49 | } | ||
50 | |||
51 | return JobModel.findAll(query) | ||
52 | } | ||
53 | |||
54 | static listForApi (start: number, count: number, sort: string) { | ||
55 | const query = { | ||
56 | offset: start, | ||
57 | limit: count, | ||
58 | order: [ getSort(sort) ] | ||
59 | } | ||
60 | |||
61 | return JobModel.findAndCountAll(query).then(({ rows, count }) => { | ||
62 | return { | ||
63 | data: rows, | ||
64 | total: count | ||
65 | } | ||
66 | }) | ||
67 | } | ||
68 | |||
69 | toFormattedJSON () { | ||
70 | return { | ||
71 | id: this.id, | ||
72 | state: this.state, | ||
73 | category: this.category, | ||
74 | handlerName: this.handlerName, | ||
75 | handlerInputData: this.handlerInputData, | ||
76 | createdAt: this.createdAt, | ||
77 | updatedAt: this.updatedAt | ||
78 | } | ||
79 | } | ||
80 | } | ||
diff --git a/server/tests/api/check-params/jobs.ts b/server/tests/api/check-params/jobs.ts index b12818bb1..ce3ac8809 100644 --- a/server/tests/api/check-params/jobs.ts +++ b/server/tests/api/check-params/jobs.ts | |||
@@ -7,7 +7,7 @@ import { checkBadCountPagination, checkBadSortPagination, checkBadStartPaginatio | |||
7 | import { makeGetRequest } from '../../utils/requests/requests' | 7 | import { makeGetRequest } from '../../utils/requests/requests' |
8 | 8 | ||
9 | describe('Test jobs API validators', function () { | 9 | describe('Test jobs API validators', function () { |
10 | const path = '/api/v1/jobs/' | 10 | const path = '/api/v1/jobs/failed' |
11 | let server: ServerInfo | 11 | let server: ServerInfo |
12 | let userAccessToken = '' | 12 | let userAccessToken = '' |
13 | 13 | ||
@@ -31,6 +31,15 @@ describe('Test jobs API validators', function () { | |||
31 | }) | 31 | }) |
32 | 32 | ||
33 | describe('When listing jobs', function () { | 33 | describe('When listing jobs', function () { |
34 | |||
35 | it('Should fail with a bad state', async function () { | ||
36 | await makeGetRequest({ | ||
37 | url: server.url, | ||
38 | token: server.accessToken, | ||
39 | path: path + 'ade' | ||
40 | }) | ||
41 | }) | ||
42 | |||
34 | it('Should fail with a bad start pagination', async function () { | 43 | it('Should fail with a bad start pagination', async function () { |
35 | await checkBadStartPagination(server.url, path, server.accessToken) | 44 | await checkBadStartPagination(server.url, path, server.accessToken) |
36 | }) | 45 | }) |
diff --git a/server/tests/api/server/handle-down.ts b/server/tests/api/server/handle-down.ts index de4e77b2f..4cedeb89e 100644 --- a/server/tests/api/server/handle-down.ts +++ b/server/tests/api/server/handle-down.ts | |||
@@ -2,6 +2,7 @@ | |||
2 | 2 | ||
3 | import * as chai from 'chai' | 3 | import * as chai from 'chai' |
4 | import 'mocha' | 4 | import 'mocha' |
5 | import { JobState } from '../../../../shared/models' | ||
5 | import { VideoPrivacy } from '../../../../shared/models/videos' | 6 | import { VideoPrivacy } from '../../../../shared/models/videos' |
6 | import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model' | 7 | import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model' |
7 | import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils' | 8 | import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils' |
@@ -139,12 +140,11 @@ describe('Test handle downs', function () { | |||
139 | }) | 140 | }) |
140 | 141 | ||
141 | it('Should not have pending/processing jobs anymore', async function () { | 142 | it('Should not have pending/processing jobs anymore', async function () { |
142 | const res = await getJobsListPaginationAndSort(servers[0].url, servers[0].accessToken, 0, 50, '-createdAt') | 143 | const states: JobState[] = [ 'inactive', 'active' ] |
143 | const jobs = res.body.data | ||
144 | 144 | ||
145 | for (const job of jobs) { | 145 | for (const state of states) { |
146 | expect(job.state).not.to.equal('pending') | 146 | const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt') |
147 | expect(job.state).not.to.equal('processing') | 147 | expect(res.body.data).to.have.length(0) |
148 | } | 148 | } |
149 | }) | 149 | }) |
150 | 150 | ||
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts index 2e17e71a4..671498769 100644 --- a/server/tests/api/server/jobs.ts +++ b/server/tests/api/server/jobs.ts | |||
@@ -35,20 +35,20 @@ describe('Test jobs', function () { | |||
35 | }) | 35 | }) |
36 | 36 | ||
37 | it('Should list jobs', async function () { | 37 | it('Should list jobs', async function () { |
38 | const res = await getJobsList(servers[1].url, servers[1].accessToken) | 38 | const res = await getJobsList(servers[1].url, servers[1].accessToken, 'complete') |
39 | expect(res.body.total).to.be.above(2) | 39 | expect(res.body.total).to.be.above(2) |
40 | expect(res.body.data).to.have.length.above(2) | 40 | expect(res.body.data).to.have.length.above(2) |
41 | }) | 41 | }) |
42 | 42 | ||
43 | it('Should list jobs with sort and pagination', async function () { | 43 | it('Should list jobs with sort and pagination', async function () { |
44 | const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 4, 1, 'createdAt') | 44 | const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'complete', 1, 1, 'createdAt') |
45 | expect(res.body.total).to.be.above(2) | 45 | expect(res.body.total).to.be.above(2) |
46 | expect(res.body.data).to.have.lengthOf(1) | 46 | expect(res.body.data).to.have.lengthOf(1) |
47 | 47 | ||
48 | const job = res.body.data[0] | 48 | const job = res.body.data[0] |
49 | expect(job.state).to.equal('success') | 49 | |
50 | expect(job.category).to.equal('transcoding') | 50 | expect(job.state).to.equal('complete') |
51 | expect(job.handlerName).to.have.length.above(3) | 51 | expect(job.type).to.equal('activitypub-http-unicast') |
52 | expect(dateIsValid(job.createdAt)).to.be.true | 52 | expect(dateIsValid(job.createdAt)).to.be.true |
53 | expect(dateIsValid(job.updatedAt)).to.be.true | 53 | expect(dateIsValid(job.updatedAt)).to.be.true |
54 | }) | 54 | }) |
diff --git a/server/tests/api/videos/multiple-servers.ts b/server/tests/api/videos/multiple-servers.ts index 4c4b5123d..0215b3011 100644 --- a/server/tests/api/videos/multiple-servers.ts +++ b/server/tests/api/videos/multiple-servers.ts | |||
@@ -475,16 +475,17 @@ describe('Test multiple servers', function () { | |||
475 | it('Should like and dislikes videos on different services', async function () { | 475 | it('Should like and dislikes videos on different services', async function () { |
476 | this.timeout(20000) | 476 | this.timeout(20000) |
477 | 477 | ||
478 | const tasks: Promise<any>[] = [] | 478 | await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like') |
479 | tasks.push(rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')) | 479 | await wait(200) |
480 | tasks.push(rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'dislike')) | 480 | await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'dislike') |
481 | tasks.push(rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')) | 481 | await wait(200) |
482 | tasks.push(rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'like')) | 482 | await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like') |
483 | tasks.push(rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'dislike')) | 483 | await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'like') |
484 | tasks.push(rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[1], 'dislike')) | 484 | await wait(200) |
485 | tasks.push(rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[0], 'like')) | 485 | await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'dislike') |
486 | 486 | await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[1], 'dislike') | |
487 | await Promise.all(tasks) | 487 | await wait(200) |
488 | await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[0], 'like') | ||
488 | 489 | ||
489 | await wait(10000) | 490 | await wait(10000) |
490 | 491 | ||
diff --git a/server/tests/real-world/real-world.ts b/server/tests/real-world/real-world.ts index e41203351..7f67525ed 100644 --- a/server/tests/real-world/real-world.ts +++ b/server/tests/real-world/real-world.ts | |||
@@ -3,6 +3,7 @@ process.env.NODE_ENV = 'test' | |||
3 | 3 | ||
4 | import * as program from 'commander' | 4 | import * as program from 'commander' |
5 | import { Video, VideoFile, VideoRateType } from '../../../shared' | 5 | import { Video, VideoFile, VideoRateType } from '../../../shared' |
6 | import { JobState } from '../../../shared/models' | ||
6 | import { | 7 | import { |
7 | flushAndRunMultipleServers, | 8 | flushAndRunMultipleServers, |
8 | flushTests, follow, | 9 | flushTests, follow, |
@@ -346,23 +347,19 @@ function goodbye () { | |||
346 | } | 347 | } |
347 | 348 | ||
348 | async function isTherePendingRequests (servers: ServerInfo[]) { | 349 | async function isTherePendingRequests (servers: ServerInfo[]) { |
350 | const states: JobState[] = [ 'inactive', 'active' ] | ||
349 | const tasks: Promise<any>[] = [] | 351 | const tasks: Promise<any>[] = [] |
350 | let pendingRequests = false | 352 | let pendingRequests = false |
351 | 353 | ||
352 | // Check if each server has pending request | 354 | // Check if each server has pending request |
353 | for (const server of servers) { | 355 | for (const server of servers) { |
354 | const p = getJobsListPaginationAndSort(server.url, server.accessToken, 0, 10, '-createdAt') | 356 | for (const state of states) { |
355 | .then(res => { | 357 | const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt') |
356 | const jobs = res.body.data | 358 | .then(res => { |
357 | 359 | if (res.body.total > 0) pendingRequests = true | |
358 | for (const job of jobs) { | 360 | }) |
359 | if (job.state === 'pending' || job.state === 'processing') { | 361 | tasks.push(p) |
360 | pendingRequests = true | 362 | } |
361 | } | ||
362 | } | ||
363 | }) | ||
364 | |||
365 | tasks.push(p) | ||
366 | } | 363 | } |
367 | 364 | ||
368 | await Promise.all(tasks) | 365 | await Promise.all(tasks) |
diff --git a/server/tests/utils/server/jobs.ts b/server/tests/utils/server/jobs.ts index 0a8c51575..4053dd40b 100644 --- a/server/tests/utils/server/jobs.ts +++ b/server/tests/utils/server/jobs.ts | |||
@@ -1,7 +1,8 @@ | |||
1 | import * as request from 'supertest' | 1 | import * as request from 'supertest' |
2 | import { JobState } from '../../../../shared/models' | ||
2 | 3 | ||
3 | function getJobsList (url: string, accessToken: string) { | 4 | function getJobsList (url: string, accessToken: string, state: JobState) { |
4 | const path = '/api/v1/jobs' | 5 | const path = '/api/v1/jobs/' + state |
5 | 6 | ||
6 | return request(url) | 7 | return request(url) |
7 | .get(path) | 8 | .get(path) |
@@ -11,8 +12,8 @@ function getJobsList (url: string, accessToken: string) { | |||
11 | .expect('Content-Type', /json/) | 12 | .expect('Content-Type', /json/) |
12 | } | 13 | } |
13 | 14 | ||
14 | function getJobsListPaginationAndSort (url: string, accessToken: string, start: number, count: number, sort: string) { | 15 | function getJobsListPaginationAndSort (url: string, accessToken: string, state: JobState, start: number, count: number, sort: string) { |
15 | const path = '/api/v1/jobs' | 16 | const path = '/api/v1/jobs/' + state |
16 | 17 | ||
17 | return request(url) | 18 | return request(url) |
18 | .get(path) | 19 | .get(path) |