aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/controllers/api/jobs.ts38
-rw-r--r--server/controllers/api/server/follows.ts2
-rw-r--r--server/controllers/api/videos/index.ts24
-rw-r--r--server/helpers/custom-validators/jobs.ts14
-rw-r--r--server/helpers/database-utils.ts1
-rw-r--r--server/initializers/constants.ts49
-rw-r--r--server/initializers/database.ts2
-rw-r--r--server/initializers/migrations/0100-activitypub.ts5
-rw-r--r--server/initializers/migrations/0180-job-table-delete.ts18
-rw-r--r--server/lib/activitypub/actor.ts59
-rw-r--r--server/lib/activitypub/fetch.ts9
-rw-r--r--server/lib/activitypub/process/process-accept.ts2
-rw-r--r--server/lib/activitypub/process/process-follow.ts2
-rw-r--r--server/lib/activitypub/send/misc.ts26
-rw-r--r--server/lib/activitypub/send/send-accept.ts5
-rw-r--r--server/lib/activitypub/send/send-announce.ts2
-rw-r--r--server/lib/activitypub/send/send-create.ts22
-rw-r--r--server/lib/activitypub/send/send-follow.ts5
-rw-r--r--server/lib/activitypub/send/send-like.ts2
-rw-r--r--server/lib/activitypub/send/send-undo.ts14
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts (renamed from server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts)32
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts (renamed from server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts)27
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts43
-rw-r--r--server/lib/job-queue/handlers/utils/activitypub-http-utils.ts39
-rw-r--r--server/lib/job-queue/handlers/video-file.ts (renamed from server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts)60
-rw-r--r--server/lib/job-queue/index.ts1
-rw-r--r--server/lib/job-queue/job-queue.ts124
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts94
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts50
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/index.ts1
-rw-r--r--server/lib/jobs/index.ts2
-rw-r--r--server/lib/jobs/job-scheduler.ts144
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/index.ts1
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts23
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts48
-rw-r--r--server/lib/schedulers/remove-old-jobs-scheduler.ts19
-rw-r--r--server/middlewares/validators/jobs.ts23
-rw-r--r--server/models/job/job.ts80
-rw-r--r--server/tests/api/check-params/jobs.ts11
-rw-r--r--server/tests/api/server/handle-down.ts10
-rw-r--r--server/tests/api/server/jobs.ts10
-rw-r--r--server/tests/api/videos/multiple-servers.ts21
-rw-r--r--server/tests/real-world/real-world.ts21
-rw-r--r--server/tests/utils/server/jobs.ts9
44 files changed, 540 insertions, 654 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts
index de37dea39..132d110ad 100644
--- a/server/controllers/api/jobs.ts
+++ b/server/controllers/api/jobs.ts
@@ -1,22 +1,29 @@
1import * as express from 'express' 1import * as express from 'express'
2import { ResultList } from '../../../shared'
3import { Job, JobType, JobState } from '../../../shared/models'
2import { UserRight } from '../../../shared/models/users' 4import { UserRight } from '../../../shared/models/users'
3import { getFormattedObjects } from '../../helpers/utils' 5import { JobQueue } from '../../lib/job-queue'
4import { 6import {
5 asyncMiddleware, authenticate, ensureUserHasRight, jobsSortValidator, setDefaultPagination, 7 asyncMiddleware,
8 authenticate,
9 ensureUserHasRight,
10 jobsSortValidator,
11 setDefaultPagination,
6 setDefaultSort 12 setDefaultSort
7} from '../../middlewares' 13} from '../../middlewares'
8import { paginationValidator } from '../../middlewares/validators' 14import { paginationValidator } from '../../middlewares/validators'
9import { JobModel } from '../../models/job/job' 15import { listJobsValidator } from '../../middlewares/validators/jobs'
10 16
11const jobsRouter = express.Router() 17const jobsRouter = express.Router()
12 18
13jobsRouter.get('/', 19jobsRouter.get('/:state',
14 authenticate, 20 authenticate,
15 ensureUserHasRight(UserRight.MANAGE_JOBS), 21 ensureUserHasRight(UserRight.MANAGE_JOBS),
16 paginationValidator, 22 paginationValidator,
17 jobsSortValidator, 23 jobsSortValidator,
18 setDefaultSort, 24 setDefaultSort,
19 setDefaultPagination, 25 setDefaultPagination,
26 asyncMiddleware(listJobsValidator),
20 asyncMiddleware(listJobs) 27 asyncMiddleware(listJobs)
21) 28)
22 29
@@ -29,7 +36,26 @@ export {
29// --------------------------------------------------------------------------- 36// ---------------------------------------------------------------------------
30 37
31async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { 38async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) {
32 const resultList = await JobModel.listForApi(req.query.start, req.query.count, req.query.sort) 39 const sort = req.query.sort === 'createdAt' ? 'asc' : 'desc'
40
41 const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort)
42 const total = await JobQueue.Instance.count(req.params.state)
43
44 const result: ResultList<any> = {
45 total,
46 data: jobs.map(j => formatJob(j.toJSON()))
47 }
48 return res.json(result)
49}
33 50
34 return res.json(getFormattedObjects(resultList.data, resultList.total)) 51function formatJob (job: any): Job {
52 return {
53 id: job.id,
54 state: job.state as JobState,
55 type: job.type as JobType,
56 data: job.data,
57 error: job.error,
58 createdAt: new Date(parseInt(job.created_at, 10)),
59 updatedAt: new Date(parseInt(job.updated_at, 10))
60 }
35} 61}
diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts
index 506b9668e..bb8713e7a 100644
--- a/server/controllers/api/server/follows.ts
+++ b/server/controllers/api/server/follows.ts
@@ -123,7 +123,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) {
123 actorFollow.ActorFollower = fromActor 123 actorFollow.ActorFollower = fromActor
124 124
125 // Send a notification to remote server 125 // Send a notification to remote server
126 await sendFollow(actorFollow, t) 126 await sendFollow(actorFollow)
127 }) 127 })
128} 128}
129 129
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts
index c2fdb4f95..459795141 100644
--- a/server/controllers/api/videos/index.ts
+++ b/server/controllers/api/videos/index.ts
@@ -12,7 +12,7 @@ import {
12} from '../../../initializers' 12} from '../../../initializers'
13import { fetchRemoteVideoDescription, getVideoActivityPubUrl, shareVideoByServerAndChannel } from '../../../lib/activitypub' 13import { fetchRemoteVideoDescription, getVideoActivityPubUrl, shareVideoByServerAndChannel } from '../../../lib/activitypub'
14import { sendCreateVideo, sendCreateViewToOrigin, sendCreateViewToVideoFollowers, sendUpdateVideo } from '../../../lib/activitypub/send' 14import { sendCreateVideo, sendCreateViewToOrigin, sendCreateViewToVideoFollowers, sendUpdateVideo } from '../../../lib/activitypub/send'
15import { transcodingJobScheduler } from '../../../lib/jobs/transcoding-job-scheduler' 15import { JobQueue } from '../../../lib/job-queue'
16import { 16import {
17 asyncMiddleware, authenticate, paginationValidator, setDefaultSort, setDefaultPagination, videosAddValidator, videosGetValidator, 17 asyncMiddleware, authenticate, paginationValidator, setDefaultSort, setDefaultPagination, videosAddValidator, videosGetValidator,
18 videosRemoveValidator, videosSearchValidator, videosSortValidator, videosUpdateValidator 18 videosRemoveValidator, videosSearchValidator, videosSortValidator, videosUpdateValidator
@@ -176,18 +176,9 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi
176 ) 176 )
177 await Promise.all(tasks) 177 await Promise.all(tasks)
178 178
179 return sequelizeTypescript.transaction(async t => { 179 const videoCreated = await sequelizeTypescript.transaction(async t => {
180 const sequelizeOptions = { transaction: t } 180 const sequelizeOptions = { transaction: t }
181 181
182 if (CONFIG.TRANSCODING.ENABLED === true) {
183 // Put uuid because we don't have id auto incremented for now
184 const dataInput = {
185 videoUUID: video.uuid
186 }
187
188 await transcodingJobScheduler.createJob(t, 'videoFileOptimizer', dataInput)
189 }
190
191 const videoCreated = await video.save(sequelizeOptions) 182 const videoCreated = await video.save(sequelizeOptions)
192 // Do not forget to add video channel information to the created video 183 // Do not forget to add video channel information to the created video
193 videoCreated.VideoChannel = res.locals.videoChannel 184 videoCreated.VideoChannel = res.locals.videoChannel
@@ -216,6 +207,17 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi
216 207
217 return videoCreated 208 return videoCreated
218 }) 209 })
210
211 if (CONFIG.TRANSCODING.ENABLED === true) {
212 // Put uuid because we don't have id auto incremented for now
213 const dataInput = {
214 videoUUID: videoCreated.uuid
215 }
216
217 await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
218 }
219
220 return videoCreated
219} 221}
220 222
221async function updateVideoRetryWrapper (req: express.Request, res: express.Response, next: express.NextFunction) { 223async function updateVideoRetryWrapper (req: express.Request, res: express.Response, next: express.NextFunction) {
diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts
new file mode 100644
index 000000000..9700fbd12
--- /dev/null
+++ b/server/helpers/custom-validators/jobs.ts
@@ -0,0 +1,14 @@
1import { JobState } from '../../../shared/models'
2import { exists } from './misc'
3
4const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ]
5
6function isValidJobState (value: JobState) {
7 return exists(value) && jobStates.indexOf(value) !== -1
8}
9
10// ---------------------------------------------------------------------------
11
12export {
13 isValidJobState
14}
diff --git a/server/helpers/database-utils.ts b/server/helpers/database-utils.ts
index 78ca768b9..b4adaf9cc 100644
--- a/server/helpers/database-utils.ts
+++ b/server/helpers/database-utils.ts
@@ -16,6 +16,7 @@ function retryTransactionWrapper <T> (
16 .catch(err => callback(err)) 16 .catch(err => callback(err))
17 }) 17 })
18 .catch(err => { 18 .catch(err => {
19 console.error(err)
19 logger.error(options.errorMessage, err) 20 logger.error(options.errorMessage, err)
20 throw err 21 throw err
21 }) 22 })
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index cb043251a..329d0ffe8 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -1,6 +1,6 @@
1import { IConfig } from 'config' 1import { IConfig } from 'config'
2import { dirname, join } from 'path' 2import { dirname, join } from 'path'
3import { JobCategory, JobState, VideoRateType } from '../../shared/models' 3import { JobType, VideoRateType } from '../../shared/models'
4import { ActivityPubActorType } from '../../shared/models/activitypub' 4import { ActivityPubActorType } from '../../shared/models/activitypub'
5import { FollowState } from '../../shared/models/actors' 5import { FollowState } from '../../shared/models/actors'
6import { VideoPrivacy } from '../../shared/models/videos' 6import { VideoPrivacy } from '../../shared/models/videos'
@@ -12,7 +12,7 @@ let config: IConfig = require('config')
12 12
13// --------------------------------------------------------------------------- 13// ---------------------------------------------------------------------------
14 14
15const LAST_MIGRATION_VERSION = 175 15const LAST_MIGRATION_VERSION = 180
16 16
17// --------------------------------------------------------------------------- 17// ---------------------------------------------------------------------------
18 18
@@ -26,7 +26,7 @@ const PAGINATION_COUNT_DEFAULT = 15
26const SORTABLE_COLUMNS = { 26const SORTABLE_COLUMNS = {
27 USERS: [ 'id', 'username', 'createdAt' ], 27 USERS: [ 'id', 'username', 'createdAt' ],
28 ACCOUNTS: [ 'createdAt' ], 28 ACCOUNTS: [ 'createdAt' ],
29 JOBS: [ 'id', 'createdAt' ], 29 JOBS: [ 'createdAt' ],
30 VIDEO_ABUSES: [ 'id', 'createdAt' ], 30 VIDEO_ABUSES: [ 'id', 'createdAt' ],
31 VIDEO_CHANNELS: [ 'id', 'name', 'updatedAt', 'createdAt' ], 31 VIDEO_CHANNELS: [ 'id', 'name', 'updatedAt', 'createdAt' ],
32 VIDEOS: [ 'name', 'duration', 'createdAt', 'views', 'likes' ], 32 VIDEOS: [ 'name', 'duration', 'createdAt', 'views', 'likes' ],
@@ -61,23 +61,20 @@ const REMOTE_SCHEME = {
61 WS: 'wss' 61 WS: 'wss'
62} 62}
63 63
64const JOB_STATES: { [ id: string ]: JobState } = { 64const JOB_ATTEMPTS: { [ id in JobType ]: number } = {
65 PENDING: 'pending', 65 'activitypub-http-broadcast': 5,
66 PROCESSING: 'processing', 66 'activitypub-http-unicast': 5,
67 ERROR: 'error', 67 'activitypub-http-fetcher': 5,
68 SUCCESS: 'success' 68 'video-file': 1
69}
70const JOB_CATEGORIES: { [ id: string ]: JobCategory } = {
71 TRANSCODING: 'transcoding',
72 ACTIVITYPUB_HTTP: 'activitypub-http'
73} 69}
74// How many maximum jobs we fetch from the database per cycle 70const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
75const JOBS_FETCH_LIMIT_PER_CYCLE = { 71 'activitypub-http-broadcast': 1,
76 transcoding: 10, 72 'activitypub-http-unicast': 5,
77 httpRequest: 20 73 'activitypub-http-fetcher': 1,
74 'video-file': 1
78} 75}
79// 1 minutes 76// 2 days
80let JOBS_FETCHING_INTERVAL = 60000 77const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2
81 78
82// 1 hour 79// 1 hour
83let SCHEDULER_INTERVAL = 60000 * 60 80let SCHEDULER_INTERVAL = 60000 * 60
@@ -96,6 +93,11 @@ const CONFIG = {
96 USERNAME: config.get<string>('database.username'), 93 USERNAME: config.get<string>('database.username'),
97 PASSWORD: config.get<string>('database.password') 94 PASSWORD: config.get<string>('database.password')
98 }, 95 },
96 REDIS: {
97 HOSTNAME: config.get<string>('redis.hostname'),
98 PORT: config.get<string>('redis.port'),
99 AUTH: config.get<string>('redis.auth')
100 },
99 STORAGE: { 101 STORAGE: {
100 AVATARS_DIR: buildPath(config.get<string>('storage.avatars')), 102 AVATARS_DIR: buildPath(config.get<string>('storage.avatars')),
101 LOG_DIR: buildPath(config.get<string>('storage.logs')), 103 LOG_DIR: buildPath(config.get<string>('storage.logs')),
@@ -284,7 +286,6 @@ const ACTIVITY_PUB = {
284 PUBLIC: 'https://www.w3.org/ns/activitystreams#Public', 286 PUBLIC: 'https://www.w3.org/ns/activitystreams#Public',
285 COLLECTION_ITEMS_PER_PAGE: 10, 287 COLLECTION_ITEMS_PER_PAGE: 10,
286 FETCH_PAGE_LIMIT: 100, 288 FETCH_PAGE_LIMIT: 100,
287 MAX_HTTP_ATTEMPT: 5,
288 URL_MIME_TYPES: { 289 URL_MIME_TYPES: {
289 VIDEO: Object.keys(VIDEO_MIMETYPE_EXT), 290 VIDEO: Object.keys(VIDEO_MIMETYPE_EXT),
290 TORRENT: [ 'application/x-bittorrent' ], 291 TORRENT: [ 'application/x-bittorrent' ],
@@ -358,7 +359,6 @@ const OPENGRAPH_AND_OEMBED_COMMENT = '<!-- open graph and oembed tags -->'
358// Special constants for a test instance 359// Special constants for a test instance
359if (isTestInstance() === true) { 360if (isTestInstance() === true) {
360 ACTOR_FOLLOW_SCORE.BASE = 20 361 ACTOR_FOLLOW_SCORE.BASE = 20
361 JOBS_FETCHING_INTERVAL = 1000
362 REMOTE_SCHEME.HTTP = 'http' 362 REMOTE_SCHEME.HTTP = 'http'
363 REMOTE_SCHEME.WS = 'ws' 363 REMOTE_SCHEME.WS = 'ws'
364 STATIC_MAX_AGE = '0' 364 STATIC_MAX_AGE = '0'
@@ -381,10 +381,8 @@ export {
381 CONFIG, 381 CONFIG,
382 CONSTRAINTS_FIELDS, 382 CONSTRAINTS_FIELDS,
383 EMBED_SIZE, 383 EMBED_SIZE,
384 JOB_STATES, 384 JOB_CONCURRENCY,
385 JOBS_FETCH_LIMIT_PER_CYCLE, 385 JOB_ATTEMPTS,
386 JOBS_FETCHING_INTERVAL,
387 JOB_CATEGORIES,
388 LAST_MIGRATION_VERSION, 386 LAST_MIGRATION_VERSION,
389 OAUTH_LIFETIME, 387 OAUTH_LIFETIME,
390 OPENGRAPH_AND_OEMBED_COMMENT, 388 OPENGRAPH_AND_OEMBED_COMMENT,
@@ -408,7 +406,8 @@ export {
408 VIDEO_RATE_TYPES, 406 VIDEO_RATE_TYPES,
409 VIDEO_MIMETYPE_EXT, 407 VIDEO_MIMETYPE_EXT,
410 AVATAR_MIMETYPE_EXT, 408 AVATAR_MIMETYPE_EXT,
411 SCHEDULER_INTERVAL 409 SCHEDULER_INTERVAL,
410 JOB_COMPLETED_LIFETIME
412} 411}
413 412
414// --------------------------------------------------------------------------- 413// ---------------------------------------------------------------------------
diff --git a/server/initializers/database.ts b/server/initializers/database.ts
index 852db68a0..b537ee59a 100644
--- a/server/initializers/database.ts
+++ b/server/initializers/database.ts
@@ -9,7 +9,6 @@ import { ActorModel } from '../models/activitypub/actor'
9import { ActorFollowModel } from '../models/activitypub/actor-follow' 9import { ActorFollowModel } from '../models/activitypub/actor-follow'
10import { ApplicationModel } from '../models/application/application' 10import { ApplicationModel } from '../models/application/application'
11import { AvatarModel } from '../models/avatar/avatar' 11import { AvatarModel } from '../models/avatar/avatar'
12import { JobModel } from '../models/job/job'
13import { OAuthClientModel } from '../models/oauth/oauth-client' 12import { OAuthClientModel } from '../models/oauth/oauth-client'
14import { OAuthTokenModel } from '../models/oauth/oauth-token' 13import { OAuthTokenModel } from '../models/oauth/oauth-token'
15import { ServerModel } from '../models/server/server' 14import { ServerModel } from '../models/server/server'
@@ -61,7 +60,6 @@ async function initDatabaseModels (silent: boolean) {
61 ActorFollowModel, 60 ActorFollowModel,
62 AvatarModel, 61 AvatarModel,
63 AccountModel, 62 AccountModel,
64 JobModel,
65 OAuthClientModel, 63 OAuthClientModel,
66 OAuthTokenModel, 64 OAuthTokenModel,
67 ServerModel, 65 ServerModel,
diff --git a/server/initializers/migrations/0100-activitypub.ts b/server/initializers/migrations/0100-activitypub.ts
index 8c5198f85..a7ebd804c 100644
--- a/server/initializers/migrations/0100-activitypub.ts
+++ b/server/initializers/migrations/0100-activitypub.ts
@@ -1,11 +1,10 @@
1import { values } from 'lodash'
2import * as Sequelize from 'sequelize' 1import * as Sequelize from 'sequelize'
3import { createPrivateAndPublicKeys } from '../../helpers/peertube-crypto' 2import { createPrivateAndPublicKeys } from '../../helpers/peertube-crypto'
4import { shareVideoByServerAndChannel } from '../../lib/activitypub/share' 3import { shareVideoByServerAndChannel } from '../../lib/activitypub/share'
5import { getVideoActivityPubUrl, getVideoChannelActivityPubUrl } from '../../lib/activitypub/url' 4import { getVideoActivityPubUrl, getVideoChannelActivityPubUrl } from '../../lib/activitypub/url'
6import { createLocalAccountWithoutKeys } from '../../lib/user' 5import { createLocalAccountWithoutKeys } from '../../lib/user'
7import { ApplicationModel } from '../../models/application/application' 6import { ApplicationModel } from '../../models/application/application'
8import { JOB_CATEGORIES, SERVER_ACTOR_NAME } from '../constants' 7import { SERVER_ACTOR_NAME } from '../constants'
9 8
10async function up (utils: { 9async function up (utils: {
11 transaction: Sequelize.Transaction, 10 transaction: Sequelize.Transaction,
@@ -161,7 +160,7 @@ async function up (utils: {
161 160
162 { 161 {
163 const data = { 162 const data = {
164 type: Sequelize.ENUM(values(JOB_CATEGORIES)), 163 type: Sequelize.ENUM('transcoding', 'activitypub-http'),
165 defaultValue: 'transcoding', 164 defaultValue: 'transcoding',
166 allowNull: false 165 allowNull: false
167 } 166 }
diff --git a/server/initializers/migrations/0180-job-table-delete.ts b/server/initializers/migrations/0180-job-table-delete.ts
new file mode 100644
index 000000000..df29145d0
--- /dev/null
+++ b/server/initializers/migrations/0180-job-table-delete.ts
@@ -0,0 +1,18 @@
1import * as Sequelize from 'sequelize'
2
3async function up (utils: {
4 transaction: Sequelize.Transaction,
5 queryInterface: Sequelize.QueryInterface,
6 sequelize: Sequelize.Sequelize
7}): Promise<void> {
8 await utils.queryInterface.dropTable('job')
9}
10
11function down (options) {
12 throw new Error('Not implemented.')
13}
14
15export {
16 up,
17 down
18}
diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts
index c708b38ba..712de7d0d 100644
--- a/server/lib/activitypub/actor.ts
+++ b/server/lib/activitypub/actor.ts
@@ -64,7 +64,11 @@ async function getOrCreateActorAndServerAndModel (actorUrl: string, recurseIfNee
64 actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options) 64 actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options)
65 } 65 }
66 66
67 return refreshActorIfNeeded(actor) 67 const options = {
68 arguments: [ actor ],
69 errorMessage: 'Cannot refresh actor if needed with many retries.'
70 }
71 return retryTransactionWrapper(refreshActorIfNeeded, options)
68} 72}
69 73
70function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) { 74function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) {
@@ -325,38 +329,43 @@ async function saveVideoChannel (actor: ActorModel, result: FetchRemoteActorResu
325async function refreshActorIfNeeded (actor: ActorModel) { 329async function refreshActorIfNeeded (actor: ActorModel) {
326 if (!actor.isOutdated()) return actor 330 if (!actor.isOutdated()) return actor
327 331
328 const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost()) 332 try {
329 const result = await fetchRemoteActor(actorUrl) 333 const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost())
330 if (result === undefined) { 334 const result = await fetchRemoteActor(actorUrl)
331 logger.warn('Cannot fetch remote actor in refresh actor.') 335 if (result === undefined) {
332 return actor 336 logger.warn('Cannot fetch remote actor in refresh actor.')
333 } 337 return actor
334
335 return sequelizeTypescript.transaction(async t => {
336 updateInstanceWithAnother(actor, result.actor)
337
338 if (result.avatarName !== undefined) {
339 await updateActorAvatarInstance(actor, result.avatarName, t)
340 } 338 }
341 339
342 // Force update 340 return sequelizeTypescript.transaction(async t => {
343 actor.setDataValue('updatedAt', new Date()) 341 updateInstanceWithAnother(actor, result.actor)
344 await actor.save({ transaction: t })
345 342
346 if (actor.Account) { 343 if (result.avatarName !== undefined) {
347 await actor.save({ transaction: t }) 344 await updateActorAvatarInstance(actor, result.avatarName, t)
345 }
348 346
349 actor.Account.set('name', result.name) 347 // Force update
350 await actor.Account.save({ transaction: t }) 348 actor.setDataValue('updatedAt', new Date())
351 } else if (actor.VideoChannel) {
352 await actor.save({ transaction: t }) 349 await actor.save({ transaction: t })
353 350
354 actor.VideoChannel.set('name', result.name) 351 if (actor.Account) {
355 await actor.VideoChannel.save({ transaction: t }) 352 await actor.save({ transaction: t })
356 } 353
354 actor.Account.set('name', result.name)
355 await actor.Account.save({ transaction: t })
356 } else if (actor.VideoChannel) {
357 await actor.save({ transaction: t })
358
359 actor.VideoChannel.set('name', result.name)
360 await actor.VideoChannel.save({ transaction: t })
361 }
357 362
363 return actor
364 })
365 } catch (err) {
366 logger.warn('Cannot refresh actor.', err)
358 return actor 367 return actor
359 }) 368 }
360} 369}
361 370
362function normalizeActor (actor: any) { 371function normalizeActor (actor: any) {
diff --git a/server/lib/activitypub/fetch.ts b/server/lib/activitypub/fetch.ts
index 4fc97cc38..b1b370a1a 100644
--- a/server/lib/activitypub/fetch.ts
+++ b/server/lib/activitypub/fetch.ts
@@ -1,13 +1,12 @@
1import { Transaction } from 'sequelize'
2import { ActorModel } from '../../models/activitypub/actor' 1import { ActorModel } from '../../models/activitypub/actor'
3import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler' 2import { JobQueue } from '../job-queue'
4 3
5async function addFetchOutboxJob (actor: ActorModel, t: Transaction) { 4async function addFetchOutboxJob (actor: ActorModel) {
6 const jobPayload: ActivityPubHttpPayload = { 5 const payload = {
7 uris: [ actor.outboxUrl ] 6 uris: [ actor.outboxUrl ]
8 } 7 }
9 8
10 return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload) 9 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
11} 10}
12 11
13export { 12export {
diff --git a/server/lib/activitypub/process/process-accept.ts b/server/lib/activitypub/process/process-accept.ts
index 551f09ea7..7db2f8ff0 100644
--- a/server/lib/activitypub/process/process-accept.ts
+++ b/server/lib/activitypub/process/process-accept.ts
@@ -26,6 +26,6 @@ async function processAccept (actor: ActorModel, targetActor: ActorModel) {
26 if (follow.state !== 'accepted') { 26 if (follow.state !== 'accepted') {
27 follow.set('state', 'accepted') 27 follow.set('state', 'accepted')
28 await follow.save() 28 await follow.save()
29 await addFetchOutboxJob(targetActor, undefined) 29 await addFetchOutboxJob(targetActor)
30 } 30 }
31} 31}
diff --git a/server/lib/activitypub/process/process-follow.ts b/server/lib/activitypub/process/process-follow.ts
index 69f5c51b5..dc1d542b5 100644
--- a/server/lib/activitypub/process/process-follow.ts
+++ b/server/lib/activitypub/process/process-follow.ts
@@ -63,7 +63,7 @@ async function follow (actor: ActorModel, targetActorURL: string) {
63 actorFollow.ActorFollowing = targetActor 63 actorFollow.ActorFollowing = targetActor
64 64
65 // Target sends to actor he accepted the follow request 65 // Target sends to actor he accepted the follow request
66 return sendAccept(actorFollow, t) 66 return sendAccept(actorFollow)
67 }) 67 })
68 68
69 logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url) 69 logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url)
diff --git a/server/lib/activitypub/send/misc.ts b/server/lib/activitypub/send/misc.ts
index dc0d3de57..7a21f0c94 100644
--- a/server/lib/activitypub/send/misc.ts
+++ b/server/lib/activitypub/send/misc.ts
@@ -7,7 +7,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
7import { VideoModel } from '../../../models/video/video' 7import { VideoModel } from '../../../models/video/video'
8import { VideoCommentModel } from '../../../models/video/video-comment' 8import { VideoCommentModel } from '../../../models/video/video-comment'
9import { VideoShareModel } from '../../../models/video/video-share' 9import { VideoShareModel } from '../../../models/video/video-share'
10import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../../jobs/activitypub-http-job-scheduler' 10import { JobQueue } from '../../job-queue'
11 11
12async function forwardActivity ( 12async function forwardActivity (
13 activity: Activity, 13 activity: Activity,
@@ -35,12 +35,11 @@ async function forwardActivity (
35 35
36 logger.debug('Creating forwarding job.', { uris }) 36 logger.debug('Creating forwarding job.', { uris })
37 37
38 const jobPayload: ActivityPubHttpPayload = { 38 const payload = {
39 uris, 39 uris,
40 body: activity 40 body: activity
41 } 41 }
42 42 return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
43 return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload)
44} 43}
45 44
46async function broadcastToFollowers ( 45async function broadcastToFollowers (
@@ -51,44 +50,43 @@ async function broadcastToFollowers (
51 actorsException: ActorModel[] = [] 50 actorsException: ActorModel[] = []
52) { 51) {
53 const uris = await computeFollowerUris(toActorFollowers, actorsException, t) 52 const uris = await computeFollowerUris(toActorFollowers, actorsException, t)
54 return broadcastTo(uris, data, byActor, t) 53 return broadcastTo(uris, data, byActor)
55} 54}
56 55
57async function broadcastToActors ( 56async function broadcastToActors (
58 data: any, 57 data: any,
59 byActor: ActorModel, 58 byActor: ActorModel,
60 toActors: ActorModel[], 59 toActors: ActorModel[],
61 t: Transaction,
62 actorsException: ActorModel[] = [] 60 actorsException: ActorModel[] = []
63) { 61) {
64 const uris = await computeUris(toActors, actorsException) 62 const uris = await computeUris(toActors, actorsException)
65 return broadcastTo(uris, data, byActor, t) 63 return broadcastTo(uris, data, byActor)
66} 64}
67 65
68async function broadcastTo (uris: string[], data: any, byActor: ActorModel, t: Transaction) { 66async function broadcastTo (uris: string[], data: any, byActor: ActorModel) {
69 if (uris.length === 0) return undefined 67 if (uris.length === 0) return undefined
70 68
71 logger.debug('Creating broadcast job.', { uris }) 69 logger.debug('Creating broadcast job.', { uris })
72 70
73 const jobPayload: ActivityPubHttpPayload = { 71 const payload = {
74 uris, 72 uris,
75 signatureActorId: byActor.id, 73 signatureActorId: byActor.id,
76 body: data 74 body: data
77 } 75 }
78 76
79 return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload) 77 return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
80} 78}
81 79
82async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string, t: Transaction) { 80async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string) {
83 logger.debug('Creating unicast job.', { uri: toActorUrl }) 81 logger.debug('Creating unicast job.', { uri: toActorUrl })
84 82
85 const jobPayload: ActivityPubHttpPayload = { 83 const payload = {
86 uris: [ toActorUrl ], 84 uri: toActorUrl,
87 signatureActorId: byActor.id, 85 signatureActorId: byActor.id,
88 body: data 86 body: data
89 } 87 }
90 88
91 return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload) 89 return JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
92} 90}
93 91
94function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) { 92function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) {
diff --git a/server/lib/activitypub/send/send-accept.ts b/server/lib/activitypub/send/send-accept.ts
index 4eaa329d9..064fd88d2 100644
--- a/server/lib/activitypub/send/send-accept.ts
+++ b/server/lib/activitypub/send/send-accept.ts
@@ -1,4 +1,3 @@
1import { Transaction } from 'sequelize'
2import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub' 1import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub'
3import { ActorModel } from '../../../models/activitypub/actor' 2import { ActorModel } from '../../../models/activitypub/actor'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
@@ -6,7 +5,7 @@ import { getActorFollowAcceptActivityPubUrl, getActorFollowActivityPubUrl } from
6import { unicastTo } from './misc' 5import { unicastTo } from './misc'
7import { followActivityData } from './send-follow' 6import { followActivityData } from './send-follow'
8 7
9async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) { 8async function sendAccept (actorFollow: ActorFollowModel) {
10 const follower = actorFollow.ActorFollower 9 const follower = actorFollow.ActorFollower
11 const me = actorFollow.ActorFollowing 10 const me = actorFollow.ActorFollowing
12 11
@@ -16,7 +15,7 @@ async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) {
16 const url = getActorFollowAcceptActivityPubUrl(actorFollow) 15 const url = getActorFollowAcceptActivityPubUrl(actorFollow)
17 const data = acceptActivityData(url, me, followData) 16 const data = acceptActivityData(url, me, followData)
18 17
19 return unicastTo(data, me, follower.inboxUrl, t) 18 return unicastTo(data, me, follower.inboxUrl)
20} 19}
21 20
22// --------------------------------------------------------------------------- 21// ---------------------------------------------------------------------------
diff --git a/server/lib/activitypub/send/send-announce.ts b/server/lib/activitypub/send/send-announce.ts
index 578fbc630..93b5668d2 100644
--- a/server/lib/activitypub/send/send-announce.ts
+++ b/server/lib/activitypub/send/send-announce.ts
@@ -42,7 +42,7 @@ async function sendVideoAnnounceToOrigin (byActor: ActorModel, video: VideoModel
42 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) 42 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo)
43 const data = await createActivityData(url, byActor, announcedActivity, t, audience) 43 const data = await createActivityData(url, byActor, announcedActivity, t, audience)
44 44
45 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 45 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
46} 46}
47 47
48async function announceActivityData ( 48async function announceActivityData (
diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts
index 9db663be1..b92615e9b 100644
--- a/server/lib/activitypub/send/send-create.ts
+++ b/server/lib/activitypub/send/send-create.ts
@@ -8,8 +8,14 @@ import { VideoAbuseModel } from '../../../models/video/video-abuse'
8import { VideoCommentModel } from '../../../models/video/video-comment' 8import { VideoCommentModel } from '../../../models/video/video-comment'
9import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url' 9import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url'
10import { 10import {
11 audiencify, broadcastToActors, broadcastToFollowers, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience, 11 audiencify,
12 getOriginVideoAudience, getOriginVideoCommentAudience, 12 broadcastToActors,
13 broadcastToFollowers,
14 getActorsInvolvedInVideo,
15 getAudience,
16 getObjectFollowersAudience,
17 getOriginVideoAudience,
18 getOriginVideoCommentAudience,
13 unicastTo 19 unicastTo
14} from './misc' 20} from './misc'
15 21
@@ -31,7 +37,7 @@ async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel,
31 const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] } 37 const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] }
32 const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience) 38 const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience)
33 39
34 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 40 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
35} 41}
36 42
37async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) { 43async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) {
@@ -47,13 +53,13 @@ async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Tr
47 53
48 // This was a reply, send it to the parent actors 54 // This was a reply, send it to the parent actors
49 const actorsException = [ byActor ] 55 const actorsException = [ byActor ]
50 await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) 56 await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException)
51 57
52 // Broadcast to our followers 58 // Broadcast to our followers
53 await broadcastToFollowers(data, byActor, [ byActor ], t) 59 await broadcastToFollowers(data, byActor, [ byActor ], t)
54 60
55 // Send to origin 61 // Send to origin
56 return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl, t) 62 return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl)
57} 63}
58 64
59async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) { 65async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) {
@@ -69,7 +75,7 @@ async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentMode
69 75
70 // This was a reply, send it to the parent actors 76 // This was a reply, send it to the parent actors
71 const actorsException = [ byActor ] 77 const actorsException = [ byActor ]
72 await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException) 78 await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException)
73 79
74 // Broadcast to our followers 80 // Broadcast to our followers
75 await broadcastToFollowers(data, byActor, [ byActor ], t) 81 await broadcastToFollowers(data, byActor, [ byActor ], t)
@@ -86,7 +92,7 @@ async function sendCreateViewToOrigin (byActor: ActorModel, video: VideoModel, t
86 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) 92 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo)
87 const data = await createActivityData(url, byActor, viewActivityData, t, audience) 93 const data = await createActivityData(url, byActor, viewActivityData, t, audience)
88 94
89 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 95 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
90} 96}
91 97
92async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 98async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
@@ -111,7 +117,7 @@ async function sendCreateDislikeToOrigin (byActor: ActorModel, video: VideoModel
111 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo) 117 const audience = getOriginVideoAudience(video, actorsInvolvedInVideo)
112 const data = await createActivityData(url, byActor, dislikeActivityData, t, audience) 118 const data = await createActivityData(url, byActor, dislikeActivityData, t, audience)
113 119
114 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 120 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
115} 121}
116 122
117async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 123async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
diff --git a/server/lib/activitypub/send/send-follow.ts b/server/lib/activitypub/send/send-follow.ts
index eac60e94f..4e9865af4 100644
--- a/server/lib/activitypub/send/send-follow.ts
+++ b/server/lib/activitypub/send/send-follow.ts
@@ -1,18 +1,17 @@
1import { Transaction } from 'sequelize'
2import { ActivityFollow } from '../../../../shared/models/activitypub' 1import { ActivityFollow } from '../../../../shared/models/activitypub'
3import { ActorModel } from '../../../models/activitypub/actor' 2import { ActorModel } from '../../../models/activitypub/actor'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { getActorFollowActivityPubUrl } from '../url' 4import { getActorFollowActivityPubUrl } from '../url'
6import { unicastTo } from './misc' 5import { unicastTo } from './misc'
7 6
8function sendFollow (actorFollow: ActorFollowModel, t: Transaction) { 7function sendFollow (actorFollow: ActorFollowModel) {
9 const me = actorFollow.ActorFollower 8 const me = actorFollow.ActorFollower
10 const following = actorFollow.ActorFollowing 9 const following = actorFollow.ActorFollowing
11 10
12 const url = getActorFollowActivityPubUrl(actorFollow) 11 const url = getActorFollowActivityPubUrl(actorFollow)
13 const data = followActivityData(url, me, following) 12 const data = followActivityData(url, me, following)
14 13
15 return unicastTo(data, me, following.inboxUrl, t) 14 return unicastTo(data, me, following.inboxUrl)
16} 15}
17 16
18function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { 17function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow {
diff --git a/server/lib/activitypub/send/send-like.ts b/server/lib/activitypub/send/send-like.ts
index 743646455..78ed1aaf2 100644
--- a/server/lib/activitypub/send/send-like.ts
+++ b/server/lib/activitypub/send/send-like.ts
@@ -20,7 +20,7 @@ async function sendLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Tran
20 const audience = getOriginVideoAudience(video, accountsInvolvedInVideo) 20 const audience = getOriginVideoAudience(video, accountsInvolvedInVideo)
21 const data = await likeActivityData(url, byActor, video, t, audience) 21 const data = await likeActivityData(url, byActor, video, t, audience)
22 22
23 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 23 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
24} 24}
25 25
26async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 26async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
diff --git a/server/lib/activitypub/send/send-undo.ts b/server/lib/activitypub/send/send-undo.ts
index 3a0597fba..4a08b5ca1 100644
--- a/server/lib/activitypub/send/send-undo.ts
+++ b/server/lib/activitypub/send/send-undo.ts
@@ -1,11 +1,5 @@
1import { Transaction } from 'sequelize' 1import { Transaction } from 'sequelize'
2import { 2import { ActivityAudience, ActivityCreate, ActivityFollow, ActivityLike, ActivityUndo } from '../../../../shared/models/activitypub'
3 ActivityAudience,
4 ActivityCreate,
5 ActivityFollow,
6 ActivityLike,
7 ActivityUndo
8} from '../../../../shared/models/activitypub'
9import { ActorModel } from '../../../models/activitypub/actor' 3import { ActorModel } from '../../../models/activitypub/actor'
10import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
11import { VideoModel } from '../../../models/video/video' 5import { VideoModel } from '../../../models/video/video'
@@ -33,7 +27,7 @@ async function sendUndoFollow (actorFollow: ActorFollowModel, t: Transaction) {
33 const object = followActivityData(followUrl, me, following) 27 const object = followActivityData(followUrl, me, following)
34 const data = await undoActivityData(undoUrl, me, object, t) 28 const data = await undoActivityData(undoUrl, me, object, t)
35 29
36 return unicastTo(data, me, following.inboxUrl, t) 30 return unicastTo(data, me, following.inboxUrl)
37} 31}
38 32
39async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) { 33async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) {
@@ -45,7 +39,7 @@ async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t:
45 const object = await likeActivityData(likeUrl, byActor, video, t) 39 const object = await likeActivityData(likeUrl, byActor, video, t)
46 const data = await undoActivityData(undoUrl, byActor, object, t, audience) 40 const data = await undoActivityData(undoUrl, byActor, object, t, audience)
47 41
48 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 42 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
49} 43}
50 44
51async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 45async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
@@ -72,7 +66,7 @@ async function sendUndoDislikeToOrigin (byActor: ActorModel, video: VideoModel,
72 66
73 const data = await undoActivityData(undoUrl, byActor, object, t, audience) 67 const data = await undoActivityData(undoUrl, byActor, object, t, audience)
74 68
75 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t) 69 return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
76} 70}
77 71
78async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) { 72async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 3f780e319..159856cda 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,10 +1,19 @@
1import * as kue from 'kue'
1import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
3import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
4import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' 5import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
5 6
6async function process (payload: ActivityPubHttpPayload, jobId: number) { 7export type ActivitypubHttpBroadcastPayload = {
7 logger.info('Processing ActivityPub broadcast in job %d.', jobId) 8 uris: string[]
9 signatureActorId?: number
10 body: any
11}
12
13async function processActivityPubHttpBroadcast (job: kue.Job) {
14 logger.info('Processing ActivityPub broadcast in job %d.', job.id)
15
16 const payload = job.data as ActivitypubHttpBroadcastPayload
8 17
9 const body = await computeBody(payload) 18 const body = await computeBody(payload)
10 const httpSignatureOptions = await buildSignedRequestOptions(payload) 19 const httpSignatureOptions = await buildSignedRequestOptions(payload)
@@ -26,28 +35,15 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
26 await doRequest(options) 35 await doRequest(options)
27 goodUrls.push(uri) 36 goodUrls.push(uri)
28 } catch (err) { 37 } catch (err) {
29 const isRetryingLater = await maybeRetryRequestLater(err, payload, uri) 38 badUrls.push(uri)
30 if (isRetryingLater === false) badUrls.push(uri)
31 } 39 }
32 } 40 }
33 41
34 return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) 42 return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined)
35} 43}
36 44
37function onError (err: Error, jobId: number) {
38 logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err)
39 return Promise.resolve()
40}
41
42function onSuccess (jobId: number) {
43 logger.info('Job %d is a success.', jobId)
44 return Promise.resolve()
45}
46
47// --------------------------------------------------------------------------- 45// ---------------------------------------------------------------------------
48 46
49export { 47export {
50 process, 48 processActivityPubHttpBroadcast
51 onError,
52 onSuccess
53} 49}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index a7b5aabd0..062211c85 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,11 +1,18 @@
1import * as kue from 'kue'
1import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
3import { ACTIVITY_PUB } from '../../../initializers' 4import { ACTIVITY_PUB } from '../../../initializers'
4import { processActivities } from '../../activitypub/process' 5import { processActivities } from '../../activitypub/process'
5import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' 6import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
6 7
7async function process (payload: ActivityPubHttpPayload, jobId: number) { 8export type ActivitypubHttpFetcherPayload = {
8 logger.info('Processing ActivityPub fetcher in job %d.', jobId) 9 uris: string[]
10}
11
12async function processActivityPubHttpFetcher (job: kue.Job) {
13 logger.info('Processing ActivityPub fetcher in job %d.', job.id)
14
15 const payload = job.data as ActivitypubHttpBroadcastPayload
9 16
10 const options = { 17 const options = {
11 method: 'GET', 18 method: 'GET',
@@ -49,20 +56,8 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
49 } 56 }
50} 57}
51 58
52function onError (err: Error, jobId: number) {
53 logger.error('Error when fetcher ActivityPub request in job %d.', jobId, err)
54 return Promise.resolve()
55}
56
57function onSuccess (jobId: number) {
58 logger.info('Job %d is a success.', jobId)
59 return Promise.resolve()
60}
61
62// --------------------------------------------------------------------------- 59// ---------------------------------------------------------------------------
63 60
64export { 61export {
65 process, 62 processActivityPubHttpFetcher
66 onError,
67 onSuccess
68} 63}
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
new file mode 100644
index 000000000..9b4188c50
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -0,0 +1,43 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
6
7export type ActivitypubHttpUnicastPayload = {
8 uri: string
9 signatureActorId?: number
10 body: any
11}
12
13async function processActivityPubHttpUnicast (job: kue.Job) {
14 logger.info('Processing ActivityPub unicast in job %d.', job.id)
15
16 const payload = job.data as ActivitypubHttpUnicastPayload
17 const uri = payload.uri
18
19 const body = await computeBody(payload)
20 const httpSignatureOptions = await buildSignedRequestOptions(payload)
21
22 const options = {
23 method: 'POST',
24 uri,
25 json: body,
26 httpSignature: httpSignatureOptions
27 }
28
29 try {
30 await doRequest(options)
31 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
32 } catch (err) {
33 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)
34
35 throw err
36 }
37}
38
39// ---------------------------------------------------------------------------
40
41export {
42 processActivityPubHttpUnicast
43}
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
new file mode 100644
index 000000000..c087371c6
--- /dev/null
+++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
@@ -0,0 +1,39 @@
1import { buildSignedActivity } from '../../../../helpers/activitypub'
2import { getServerActor } from '../../../../helpers/utils'
3import { ActorModel } from '../../../../models/activitypub/actor'
4
5async function computeBody (payload: { body: any, signatureActorId?: number }) {
6 let body = payload.body
7
8 if (payload.signatureActorId) {
9 const actorSignature = await ActorModel.load(payload.signatureActorId)
10 if (!actorSignature) throw new Error('Unknown signature actor id.')
11 body = await buildSignedActivity(actorSignature, payload.body)
12 }
13
14 return body
15}
16
17async function buildSignedRequestOptions (payload: { signatureActorId?: number }) {
18 let actor: ActorModel
19 if (payload.signatureActorId) {
20 actor = await ActorModel.load(payload.signatureActorId)
21 if (!actor) throw new Error('Unknown signature actor id.')
22 } else {
23 // We need to sign the request, so use the server
24 actor = await getServerActor()
25 }
26
27 const keyId = actor.getWebfingerUrl()
28 return {
29 algorithm: 'rsa-sha256',
30 authorizationHeaderName: 'Signature',
31 keyId,
32 key: actor.privateKey
33 }
34}
35
36export {
37 computeBody,
38 buildSignedRequestOptions
39}
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/job-queue/handlers/video-file.ts
index f224a31b4..5294483bd 100644
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -1,38 +1,60 @@
1import * as Bluebird from 'bluebird' 1import * as kue from 'kue'
2import { VideoResolution } from '../../../../shared'
2import { VideoPrivacy } from '../../../../shared/models/videos' 3import { VideoPrivacy } from '../../../../shared/models/videos'
3import { logger } from '../../../helpers/logger' 4import { logger } from '../../../helpers/logger'
4import { computeResolutionsToTranscode } from '../../../helpers/utils' 5import { computeResolutionsToTranscode } from '../../../helpers/utils'
5import { sequelizeTypescript } from '../../../initializers' 6import { sequelizeTypescript } from '../../../initializers'
6import { JobModel } from '../../../models/job/job'
7import { VideoModel } from '../../../models/video/video' 7import { VideoModel } from '../../../models/video/video'
8import { shareVideoByServerAndChannel } from '../../activitypub' 8import { shareVideoByServerAndChannel } from '../../activitypub'
9import { sendCreateVideo } from '../../activitypub/send' 9import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send'
10import { JobScheduler } from '../job-scheduler' 10import { JobQueue } from '../job-queue'
11import { TranscodingJobPayload } from './transcoding-job-scheduler'
12 11
13async function process (data: TranscodingJobPayload, jobId: number) { 12export type VideoFilePayload = {
14 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) 13 videoUUID: string
14 resolution?: VideoResolution
15}
16
17async function processVideoFile (job: kue.Job) {
18 const payload = job.data as VideoFilePayload
19 logger.info('Processing video file in job %d.', job.id)
20
21 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID)
15 // No video, maybe deleted? 22 // No video, maybe deleted?
16 if (!video) { 23 if (!video) {
17 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) 24 logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid })
18 return undefined 25 return undefined
19 } 26 }
20 27
21 await video.optimizeOriginalVideofile() 28 // Transcoding in other resolution
29 if (payload.resolution) {
30 await video.transcodeOriginalVideofile(payload.resolution)
31 await onVideoFileTranscoderSuccess(video)
32 } else {
33 await video.optimizeOriginalVideofile()
34 await onVideoFileOptimizerSuccess(video)
35 }
22 36
23 return video 37 return video
24} 38}
25 39
26function onError (err: Error, jobId: number) { 40async function onVideoFileTranscoderSuccess (video: VideoModel) {
27 logger.error('Error when optimized video file in job %d.', jobId, err) 41 if (video === undefined) return undefined
28 return Promise.resolve() 42
43 // Maybe the video changed in database, refresh it
44 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
45 // Video does not exist anymore
46 if (!videoDatabase) return undefined
47
48 if (video.privacy !== VideoPrivacy.PRIVATE) {
49 await sendUpdateVideo(video, undefined)
50 }
51
52 return undefined
29} 53}
30 54
31async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler<TranscodingJobPayload, VideoModel>) { 55async function onVideoFileOptimizerSuccess (video: VideoModel) {
32 if (video === undefined) return undefined 56 if (video === undefined) return undefined
33 57
34 logger.info('Job %d is a success.', jobId)
35
36 // Maybe the video changed in database, refresh it 58 // Maybe the video changed in database, refresh it
37 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) 59 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
38 // Video does not exist anymore 60 // Video does not exist anymore
@@ -56,7 +78,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch
56 if (resolutionsEnabled.length !== 0) { 78 if (resolutionsEnabled.length !== 0) {
57 try { 79 try {
58 await sequelizeTypescript.transaction(async t => { 80 await sequelizeTypescript.transaction(async t => {
59 const tasks: Bluebird<JobModel>[] = [] 81 const tasks: Promise<any>[] = []
60 82
61 for (const resolution of resolutionsEnabled) { 83 for (const resolution of resolutionsEnabled) {
62 const dataInput = { 84 const dataInput = {
@@ -64,7 +86,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch
64 resolution 86 resolution
65 } 87 }
66 88
67 const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput) 89 const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
68 tasks.push(p) 90 tasks.push(p)
69 } 91 }
70 92
@@ -84,7 +106,5 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch
84// --------------------------------------------------------------------------- 106// ---------------------------------------------------------------------------
85 107
86export { 108export {
87 process, 109 processVideoFile
88 onError,
89 onSuccess
90} 110}
diff --git a/server/lib/job-queue/index.ts b/server/lib/job-queue/index.ts
new file mode 100644
index 000000000..57231e649
--- /dev/null
+++ b/server/lib/job-queue/index.ts
@@ -0,0 +1 @@
export * from './job-queue'
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
new file mode 100644
index 000000000..7a2b6c78d
--- /dev/null
+++ b/server/lib/job-queue/job-queue.ts
@@ -0,0 +1,124 @@
1import * as kue from 'kue'
2import { JobType, JobState } from '../../../shared/models'
3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
8import { processVideoFile, VideoFilePayload } from './handlers/video-file'
9
10type CreateJobArgument =
11 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
12 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
13 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
14 { type: 'video-file', payload: VideoFilePayload }
15
16const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
17 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
18 'activitypub-http-unicast': processActivityPubHttpUnicast,
19 'activitypub-http-fetcher': processActivityPubHttpFetcher,
20 'video-file': processVideoFile
21}
22
23class JobQueue {
24
25 private static instance: JobQueue
26
27 private jobQueue: kue.Queue
28 private initialized = false
29
30 private constructor () {}
31
32 init () {
33 // Already initialized
34 if (this.initialized === true) return
35 this.initialized = true
36
37 this.jobQueue = kue.createQueue({
38 prefix: 'q-' + CONFIG.WEBSERVER.HOST,
39 redis: {
40 host: CONFIG.REDIS.HOSTNAME,
41 port: CONFIG.REDIS.PORT,
42 auth: CONFIG.REDIS.AUTH
43 }
44 })
45
46 this.jobQueue.on('error', err => {
47 logger.error('Error in job queue.', err)
48 process.exit(-1)
49 })
50 this.jobQueue.watchStuckJobs(5000)
51
52 for (const handlerName of Object.keys(handlers)) {
53 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
54 try {
55 const res = await handlers[ handlerName ](job)
56 return done(null, res)
57 } catch (err) {
58 return done(err)
59 }
60 })
61 }
62 }
63
64 createJob (obj: CreateJobArgument, priority = 'normal') {
65 return new Promise((res, rej) => {
66 this.jobQueue
67 .create(obj.type, obj.payload)
68 .priority(priority)
69 .attempts(JOB_ATTEMPTS[obj.type])
70 .backoff({ type: 'exponential' })
71 .save(err => {
72 if (err) return rej(err)
73
74 return res()
75 })
76 })
77 }
78
79 listForApi (state: JobState, start: number, count: number, sort: string) {
80 return new Promise<kue.Job[]>((res, rej) => {
81 kue.Job.rangeByState(state, start, count, sort, (err, jobs) => {
82 if (err) return rej(err)
83
84 return res(jobs)
85 })
86 })
87 }
88
89 count (state: JobState) {
90 return new Promise<number>((res, rej) => {
91 this.jobQueue[state + 'Count']((err, total) => {
92 if (err) return rej(err)
93
94 return res(total)
95 })
96 })
97 }
98
99 removeOldJobs () {
100 const now = new Date().getTime()
101 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
102 if (err) {
103 logger.error('Cannot get jobs when removing old jobs.', err)
104 return
105 }
106
107 for (const job of jobs) {
108 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
109 job.remove()
110 }
111 }
112 })
113 }
114
115 static get Instance () {
116 return this.instance || (this.instance = new this())
117 }
118}
119
120// ---------------------------------------------------------------------------
121
122export {
123 JobQueue
124}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
deleted file mode 100644
index 4459152db..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
+++ /dev/null
@@ -1,94 +0,0 @@
1import { JobCategory } from '../../../../shared'
2import { buildSignedActivity } from '../../../helpers/activitypub'
3import { logger } from '../../../helpers/logger'
4import { getServerActor } from '../../../helpers/utils'
5import { ACTIVITY_PUB } from '../../../initializers'
6import { ActorModel } from '../../../models/activitypub/actor'
7import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
8import { JobHandler, JobScheduler } from '../job-scheduler'
9
10import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler'
11import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler'
12import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler'
13
14type ActivityPubHttpPayload = {
15 uris: string[]
16 signatureActorId?: number
17 body?: any
18 attemptNumber?: number
19}
20
21const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = {
22 activitypubHttpBroadcastHandler,
23 activitypubHttpUnicastHandler,
24 activitypubHttpFetcherHandler
25}
26const jobCategory: JobCategory = 'activitypub-http'
27
28const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers)
29
30async function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) {
31 logger.warn('Cannot make request to %s.', uri, err)
32
33 let attemptNumber = payload.attemptNumber || 1
34 attemptNumber += 1
35
36 if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) {
37 logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err)
38
39 const actor = await ActorFollowModel.loadByFollowerInbox(uri, undefined)
40 if (!actor) {
41 logger.debug('Actor %s is not a follower, do not retry the request.', uri)
42 return false
43 }
44
45 const newPayload = Object.assign(payload, {
46 uris: [ uri ],
47 attemptNumber
48 })
49 await activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload)
50
51 return true
52 }
53
54 return false
55}
56
57async function computeBody (payload: ActivityPubHttpPayload) {
58 let body = payload.body
59
60 if (payload.signatureActorId) {
61 const actorSignature = await ActorModel.load(payload.signatureActorId)
62 if (!actorSignature) throw new Error('Unknown signature actor id.')
63 body = await buildSignedActivity(actorSignature, payload.body)
64 }
65
66 return body
67}
68
69async function buildSignedRequestOptions (payload: ActivityPubHttpPayload) {
70 let actor: ActorModel
71 if (payload.signatureActorId) {
72 actor = await ActorModel.load(payload.signatureActorId)
73 if (!actor) throw new Error('Unknown signature actor id.')
74 } else {
75 // We need to sign the request, so use the server
76 actor = await getServerActor()
77 }
78
79 const keyId = actor.getWebfingerUrl()
80 return {
81 algorithm: 'rsa-sha256',
82 authorizationHeaderName: 'Signature',
83 keyId,
84 key: actor.privateKey
85 }
86}
87
88export {
89 ActivityPubHttpPayload,
90 activitypubHttpJobScheduler,
91 maybeRetryRequestLater,
92 computeBody,
93 buildSignedRequestOptions
94}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
deleted file mode 100644
index 54a7504e8..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
+++ /dev/null
@@ -1,50 +0,0 @@
1import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests'
3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
4import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
5
6async function process (payload: ActivityPubHttpPayload, jobId: number) {
7 logger.info('Processing ActivityPub unicast in job %d.', jobId)
8
9 const uri = payload.uris[0]
10
11 const body = await computeBody(payload)
12 const httpSignatureOptions = await buildSignedRequestOptions(payload)
13
14 const options = {
15 method: 'POST',
16 uri,
17 json: body,
18 httpSignature: httpSignatureOptions
19 }
20
21 try {
22 await doRequest(options)
23 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
24 } catch (err) {
25 const isRetryingLater = await maybeRetryRequestLater(err, payload, uri)
26 if (isRetryingLater === false) {
27 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)
28 }
29
30 throw err
31 }
32}
33
34function onError (err: Error, jobId: number) {
35 logger.error('Error when sending ActivityPub request in job %d.', jobId, err)
36 return Promise.resolve()
37}
38
39function onSuccess (jobId: number) {
40 logger.info('Job %d is a success.', jobId)
41 return Promise.resolve()
42}
43
44// ---------------------------------------------------------------------------
45
46export {
47 process,
48 onError,
49 onSuccess
50}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts
deleted file mode 100644
index ad8f527b4..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/index.ts
+++ /dev/null
@@ -1 +0,0 @@
1export * from './activitypub-http-job-scheduler'
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts
deleted file mode 100644
index 394264ec1..000000000
--- a/server/lib/jobs/index.ts
+++ /dev/null
@@ -1,2 +0,0 @@
1export * from './activitypub-http-job-scheduler'
2export * from './transcoding-job-scheduler'
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
deleted file mode 100644
index 9d55880e6..000000000
--- a/server/lib/jobs/job-scheduler.ts
+++ /dev/null
@@ -1,144 +0,0 @@
1import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize'
3import { JobCategory } from '../../../shared'
4import { logger } from '../../helpers/logger'
5import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers'
6import { JobModel } from '../../models/job/job'
7
8export interface JobHandler<P, T> {
9 process (data: object, jobId: number): Promise<T>
10 onError (err: Error, jobId: number)
11 onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>): Promise<any>
12}
13type JobQueueCallback = (err: Error) => void
14
15class JobScheduler<P, T> {
16
17 constructor (
18 private jobCategory: JobCategory,
19 private jobHandlers: { [ id: string ]: JobHandler<P, T> }
20 ) {}
21
22 async activate () {
23 const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory]
24
25 logger.info('Jobs scheduler %s activated.', this.jobCategory)
26
27 const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this))
28
29 // Finish processing jobs from a previous start
30 const state = JOB_STATES.PROCESSING
31 try {
32 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
33
34 this.enqueueJobs(jobsQueue, jobs)
35 } catch (err) {
36 logger.error('Cannot list pending jobs.', err)
37 }
38
39 forever(
40 async next => {
41 if (jobsQueue.length() !== 0) {
42 // Finish processing the queue first
43 return setTimeout(next, JOBS_FETCHING_INTERVAL)
44 }
45
46 const state = JOB_STATES.PENDING
47 try {
48 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
49
50 this.enqueueJobs(jobsQueue, jobs)
51 } catch (err) {
52 logger.error('Cannot list pending jobs.', err)
53 }
54
55 // Optimization: we could use "drain" from queue object
56 return setTimeout(next, JOBS_FETCHING_INTERVAL)
57 },
58
59 err => logger.error('Error in job scheduler queue.', err)
60 )
61 }
62
63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) {
64 const createQuery = {
65 state: JOB_STATES.PENDING,
66 category: this.jobCategory,
67 handlerName,
68 handlerInputData
69 }
70
71 const options = { transaction }
72
73 return JobModel.create(createQuery, options)
74 }
75
76 private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) {
77 jobs.forEach(job => jobsQueue.push(job))
78 }
79
80 private async processJob (job: JobModel, callback: (err: Error) => void) {
81 const jobHandler = this.jobHandlers[job.handlerName]
82 if (jobHandler === undefined) {
83 const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id
84 logger.error(errorString)
85
86 const error = new Error(errorString)
87 await this.onJobError(jobHandler, job, error)
88 return callback(error)
89 }
90
91 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
92
93 job.state = JOB_STATES.PROCESSING
94 await job.save()
95
96 try {
97 const result: T = await jobHandler.process(job.handlerInputData, job.id)
98 await this.onJobSuccess(jobHandler, job, result)
99 } catch (err) {
100 logger.error('Error in job handler %s.', job.handlerName, err)
101
102 try {
103 await this.onJobError(jobHandler, job, err)
104 } catch (innerErr) {
105 this.cannotSaveJobError(innerErr)
106 return callback(innerErr)
107 }
108 }
109
110 return callback(null)
111 }
112
113 private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) {
114 job.state = JOB_STATES.ERROR
115
116 try {
117 await job.save()
118 if (jobHandler) await jobHandler.onError(err, job.id)
119 } catch (err) {
120 this.cannotSaveJobError(err)
121 }
122 }
123
124 private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) {
125 job.state = JOB_STATES.SUCCESS
126
127 try {
128 await job.save()
129 await jobHandler.onSuccess(job.id, jobResult, this)
130 } catch (err) {
131 this.cannotSaveJobError(err)
132 }
133 }
134
135 private cannotSaveJobError (err: Error) {
136 logger.error('Cannot save new job state.', err)
137 }
138}
139
140// ---------------------------------------------------------------------------
141
142export {
143 JobScheduler
144}
diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts
deleted file mode 100644
index 73152a1be..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/index.ts
+++ /dev/null
@@ -1 +0,0 @@
1export * from './transcoding-job-scheduler'
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
deleted file mode 100644
index e5530a73c..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
+++ /dev/null
@@ -1,23 +0,0 @@
1import { JobCategory } from '../../../../shared'
2import { VideoModel } from '../../../models/video/video'
3import { JobHandler, JobScheduler } from '../job-scheduler'
4
5import * as videoFileOptimizer from './video-file-optimizer-handler'
6import * as videoFileTranscoder from './video-file-transcoder-handler'
7
8type TranscodingJobPayload = {
9 videoUUID: string
10 resolution?: number
11}
12const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoModel> } = {
13 videoFileOptimizer,
14 videoFileTranscoder
15}
16const jobCategory: JobCategory = 'transcoding'
17
18const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers)
19
20export {
21 TranscodingJobPayload,
22 transcodingJobScheduler
23}
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
deleted file mode 100644
index 883d3eba8..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
+++ /dev/null
@@ -1,48 +0,0 @@
1import { VideoResolution } from '../../../../shared'
2import { VideoPrivacy } from '../../../../shared/models/videos'
3import { logger } from '../../../helpers/logger'
4import { VideoModel } from '../../../models/video/video'
5import { sendUpdateVideo } from '../../activitypub/send'
6
7async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
8 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID)
9 // No video, maybe deleted?
10 if (!video) {
11 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
12 return undefined
13 }
14
15 await video.transcodeOriginalVideofile(data.resolution)
16
17 return video
18}
19
20function onError (err: Error, jobId: number) {
21 logger.error('Error when transcoding video file in job %d.', jobId, err)
22 return Promise.resolve()
23}
24
25async function onSuccess (jobId: number, video: VideoModel) {
26 if (video === undefined) return undefined
27
28 logger.info('Job %d is a success.', jobId)
29
30 // Maybe the video changed in database, refresh it
31 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
32 // Video does not exist anymore
33 if (!videoDatabase) return undefined
34
35 if (video.privacy !== VideoPrivacy.PRIVATE) {
36 await sendUpdateVideo(video, undefined)
37 }
38
39 return undefined
40}
41
42// ---------------------------------------------------------------------------
43
44export {
45 process,
46 onError,
47 onSuccess
48}
diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts
new file mode 100644
index 000000000..add5677ac
--- /dev/null
+++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts
@@ -0,0 +1,19 @@
1import { JobQueue } from '../job-queue'
2import { AbstractScheduler } from './abstract-scheduler'
3
4export class RemoveOldJobsScheduler extends AbstractScheduler {
5
6 private static instance: AbstractScheduler
7
8 private constructor () {
9 super()
10 }
11
12 async execute () {
13 JobQueue.Instance.removeOldJobs()
14 }
15
16 static get Instance () {
17 return this.instance || (this.instance = new this())
18 }
19}
diff --git a/server/middlewares/validators/jobs.ts b/server/middlewares/validators/jobs.ts
new file mode 100644
index 000000000..2f8b1738c
--- /dev/null
+++ b/server/middlewares/validators/jobs.ts
@@ -0,0 +1,23 @@
1import * as express from 'express'
2import { param } from 'express-validator/check'
3import { isValidJobState } from '../../helpers/custom-validators/jobs'
4import { logger } from '../../helpers/logger'
5import { areValidationErrors } from './utils'
6
7const listJobsValidator = [
8 param('state').custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
9
10 async (req: express.Request, res: express.Response, next: express.NextFunction) => {
11 logger.debug('Checking listJobsValidator parameters.', { parameters: req.params })
12
13 if (areValidationErrors(req, res)) return
14
15 return next()
16 }
17]
18
19// ---------------------------------------------------------------------------
20
21export {
22 listJobsValidator
23}
diff --git a/server/models/job/job.ts b/server/models/job/job.ts
deleted file mode 100644
index ba1c6737e..000000000
--- a/server/models/job/job.ts
+++ /dev/null
@@ -1,80 +0,0 @@
1import { values } from 'lodash'
2import { AllowNull, Column, CreatedAt, DataType, Model, Table, UpdatedAt } from 'sequelize-typescript'
3import { JobCategory, JobState } from '../../../shared/models'
4import { JOB_CATEGORIES, JOB_STATES } from '../../initializers'
5import { getSort } from '../utils'
6
7@Table({
8 tableName: 'job',
9 indexes: [
10 {
11 fields: [ 'state', 'category' ]
12 }
13 ]
14})
15export class JobModel extends Model<JobModel> {
16 @AllowNull(false)
17 @Column(DataType.ENUM(values(JOB_STATES)))
18 state: JobState
19
20 @AllowNull(false)
21 @Column(DataType.ENUM(values(JOB_CATEGORIES)))
22 category: JobCategory
23
24 @AllowNull(false)
25 @Column
26 handlerName: string
27
28 @AllowNull(true)
29 @Column(DataType.JSON)
30 handlerInputData: any
31
32 @CreatedAt
33 createdAt: Date
34
35 @UpdatedAt
36 updatedAt: Date
37
38 static listWithLimitByCategory (limit: number, state: JobState, jobCategory: JobCategory) {
39 const query = {
40 order: [
41 [ 'id', 'ASC' ]
42 ],
43 limit: limit,
44 where: {
45 state,
46 category: jobCategory
47 },
48 logging: false
49 }
50
51 return JobModel.findAll(query)
52 }
53
54 static listForApi (start: number, count: number, sort: string) {
55 const query = {
56 offset: start,
57 limit: count,
58 order: [ getSort(sort) ]
59 }
60
61 return JobModel.findAndCountAll(query).then(({ rows, count }) => {
62 return {
63 data: rows,
64 total: count
65 }
66 })
67 }
68
69 toFormattedJSON () {
70 return {
71 id: this.id,
72 state: this.state,
73 category: this.category,
74 handlerName: this.handlerName,
75 handlerInputData: this.handlerInputData,
76 createdAt: this.createdAt,
77 updatedAt: this.updatedAt
78 }
79 }
80}
diff --git a/server/tests/api/check-params/jobs.ts b/server/tests/api/check-params/jobs.ts
index b12818bb1..ce3ac8809 100644
--- a/server/tests/api/check-params/jobs.ts
+++ b/server/tests/api/check-params/jobs.ts
@@ -7,7 +7,7 @@ import { checkBadCountPagination, checkBadSortPagination, checkBadStartPaginatio
7import { makeGetRequest } from '../../utils/requests/requests' 7import { makeGetRequest } from '../../utils/requests/requests'
8 8
9describe('Test jobs API validators', function () { 9describe('Test jobs API validators', function () {
10 const path = '/api/v1/jobs/' 10 const path = '/api/v1/jobs/failed'
11 let server: ServerInfo 11 let server: ServerInfo
12 let userAccessToken = '' 12 let userAccessToken = ''
13 13
@@ -31,6 +31,15 @@ describe('Test jobs API validators', function () {
31 }) 31 })
32 32
33 describe('When listing jobs', function () { 33 describe('When listing jobs', function () {
34
35 it('Should fail with a bad state', async function () {
36 await makeGetRequest({
37 url: server.url,
38 token: server.accessToken,
39 path: path + 'ade'
40 })
41 })
42
34 it('Should fail with a bad start pagination', async function () { 43 it('Should fail with a bad start pagination', async function () {
35 await checkBadStartPagination(server.url, path, server.accessToken) 44 await checkBadStartPagination(server.url, path, server.accessToken)
36 }) 45 })
diff --git a/server/tests/api/server/handle-down.ts b/server/tests/api/server/handle-down.ts
index de4e77b2f..4cedeb89e 100644
--- a/server/tests/api/server/handle-down.ts
+++ b/server/tests/api/server/handle-down.ts
@@ -2,6 +2,7 @@
2 2
3import * as chai from 'chai' 3import * as chai from 'chai'
4import 'mocha' 4import 'mocha'
5import { JobState } from '../../../../shared/models'
5import { VideoPrivacy } from '../../../../shared/models/videos' 6import { VideoPrivacy } from '../../../../shared/models/videos'
6import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model' 7import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model'
7import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils' 8import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils'
@@ -139,12 +140,11 @@ describe('Test handle downs', function () {
139 }) 140 })
140 141
141 it('Should not have pending/processing jobs anymore', async function () { 142 it('Should not have pending/processing jobs anymore', async function () {
142 const res = await getJobsListPaginationAndSort(servers[0].url, servers[0].accessToken, 0, 50, '-createdAt') 143 const states: JobState[] = [ 'inactive', 'active' ]
143 const jobs = res.body.data
144 144
145 for (const job of jobs) { 145 for (const state of states) {
146 expect(job.state).not.to.equal('pending') 146 const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt')
147 expect(job.state).not.to.equal('processing') 147 expect(res.body.data).to.have.length(0)
148 } 148 }
149 }) 149 })
150 150
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts
index 2e17e71a4..671498769 100644
--- a/server/tests/api/server/jobs.ts
+++ b/server/tests/api/server/jobs.ts
@@ -35,20 +35,20 @@ describe('Test jobs', function () {
35 }) 35 })
36 36
37 it('Should list jobs', async function () { 37 it('Should list jobs', async function () {
38 const res = await getJobsList(servers[1].url, servers[1].accessToken) 38 const res = await getJobsList(servers[1].url, servers[1].accessToken, 'complete')
39 expect(res.body.total).to.be.above(2) 39 expect(res.body.total).to.be.above(2)
40 expect(res.body.data).to.have.length.above(2) 40 expect(res.body.data).to.have.length.above(2)
41 }) 41 })
42 42
43 it('Should list jobs with sort and pagination', async function () { 43 it('Should list jobs with sort and pagination', async function () {
44 const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 4, 1, 'createdAt') 44 const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'complete', 1, 1, 'createdAt')
45 expect(res.body.total).to.be.above(2) 45 expect(res.body.total).to.be.above(2)
46 expect(res.body.data).to.have.lengthOf(1) 46 expect(res.body.data).to.have.lengthOf(1)
47 47
48 const job = res.body.data[0] 48 const job = res.body.data[0]
49 expect(job.state).to.equal('success') 49
50 expect(job.category).to.equal('transcoding') 50 expect(job.state).to.equal('complete')
51 expect(job.handlerName).to.have.length.above(3) 51 expect(job.type).to.equal('activitypub-http-unicast')
52 expect(dateIsValid(job.createdAt)).to.be.true 52 expect(dateIsValid(job.createdAt)).to.be.true
53 expect(dateIsValid(job.updatedAt)).to.be.true 53 expect(dateIsValid(job.updatedAt)).to.be.true
54 }) 54 })
diff --git a/server/tests/api/videos/multiple-servers.ts b/server/tests/api/videos/multiple-servers.ts
index 4c4b5123d..0215b3011 100644
--- a/server/tests/api/videos/multiple-servers.ts
+++ b/server/tests/api/videos/multiple-servers.ts
@@ -475,16 +475,17 @@ describe('Test multiple servers', function () {
475 it('Should like and dislikes videos on different services', async function () { 475 it('Should like and dislikes videos on different services', async function () {
476 this.timeout(20000) 476 this.timeout(20000)
477 477
478 const tasks: Promise<any>[] = [] 478 await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')
479 tasks.push(rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')) 479 await wait(200)
480 tasks.push(rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'dislike')) 480 await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'dislike')
481 tasks.push(rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')) 481 await wait(200)
482 tasks.push(rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'like')) 482 await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')
483 tasks.push(rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'dislike')) 483 await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'like')
484 tasks.push(rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[1], 'dislike')) 484 await wait(200)
485 tasks.push(rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[0], 'like')) 485 await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'dislike')
486 486 await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[1], 'dislike')
487 await Promise.all(tasks) 487 await wait(200)
488 await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[0], 'like')
488 489
489 await wait(10000) 490 await wait(10000)
490 491
diff --git a/server/tests/real-world/real-world.ts b/server/tests/real-world/real-world.ts
index e41203351..7f67525ed 100644
--- a/server/tests/real-world/real-world.ts
+++ b/server/tests/real-world/real-world.ts
@@ -3,6 +3,7 @@ process.env.NODE_ENV = 'test'
3 3
4import * as program from 'commander' 4import * as program from 'commander'
5import { Video, VideoFile, VideoRateType } from '../../../shared' 5import { Video, VideoFile, VideoRateType } from '../../../shared'
6import { JobState } from '../../../shared/models'
6import { 7import {
7 flushAndRunMultipleServers, 8 flushAndRunMultipleServers,
8 flushTests, follow, 9 flushTests, follow,
@@ -346,23 +347,19 @@ function goodbye () {
346} 347}
347 348
348async function isTherePendingRequests (servers: ServerInfo[]) { 349async function isTherePendingRequests (servers: ServerInfo[]) {
350 const states: JobState[] = [ 'inactive', 'active' ]
349 const tasks: Promise<any>[] = [] 351 const tasks: Promise<any>[] = []
350 let pendingRequests = false 352 let pendingRequests = false
351 353
352 // Check if each server has pending request 354 // Check if each server has pending request
353 for (const server of servers) { 355 for (const server of servers) {
354 const p = getJobsListPaginationAndSort(server.url, server.accessToken, 0, 10, '-createdAt') 356 for (const state of states) {
355 .then(res => { 357 const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt')
356 const jobs = res.body.data 358 .then(res => {
357 359 if (res.body.total > 0) pendingRequests = true
358 for (const job of jobs) { 360 })
359 if (job.state === 'pending' || job.state === 'processing') { 361 tasks.push(p)
360 pendingRequests = true 362 }
361 }
362 }
363 })
364
365 tasks.push(p)
366 } 363 }
367 364
368 await Promise.all(tasks) 365 await Promise.all(tasks)
diff --git a/server/tests/utils/server/jobs.ts b/server/tests/utils/server/jobs.ts
index 0a8c51575..4053dd40b 100644
--- a/server/tests/utils/server/jobs.ts
+++ b/server/tests/utils/server/jobs.ts
@@ -1,7 +1,8 @@
1import * as request from 'supertest' 1import * as request from 'supertest'
2import { JobState } from '../../../../shared/models'
2 3
3function getJobsList (url: string, accessToken: string) { 4function getJobsList (url: string, accessToken: string, state: JobState) {
4 const path = '/api/v1/jobs' 5 const path = '/api/v1/jobs/' + state
5 6
6 return request(url) 7 return request(url)
7 .get(path) 8 .get(path)
@@ -11,8 +12,8 @@ function getJobsList (url: string, accessToken: string) {
11 .expect('Content-Type', /json/) 12 .expect('Content-Type', /json/)
12} 13}
13 14
14function getJobsListPaginationAndSort (url: string, accessToken: string, start: number, count: number, sort: string) { 15function getJobsListPaginationAndSort (url: string, accessToken: string, state: JobState, start: number, count: number, sort: string) {
15 const path = '/api/v1/jobs' 16 const path = '/api/v1/jobs/' + state
16 17
17 return request(url) 18 return request(url)
18 .get(path) 19 .get(path)