aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--client/src/app/+admin/jobs/jobs-list/jobs-list.component.html12
-rw-r--r--client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts4
-rw-r--r--client/src/app/+admin/jobs/shared/job.service.ts9
-rwxr-xr-xscripts/clean/server/test.sh2
-rw-r--r--server/controllers/api/jobs.ts25
-rw-r--r--server/helpers/custom-validators/jobs.ts2
-rw-r--r--server/initializers/constants.ts2
-rw-r--r--server/initializers/migrations/0230-kue-to-bull.ts63
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts4
-rw-r--r--server/lib/job-queue/handlers/email.ts4
-rw-r--r--server/lib/job-queue/handlers/video-file.ts9
-rw-r--r--server/lib/job-queue/job-queue.ts178
-rw-r--r--server/lib/redis.ts10
-rw-r--r--server/tests/api/server/handle-down.ts14
-rw-r--r--server/tests/api/server/jobs.ts11
-rw-r--r--server/tests/real-world/real-world.ts2
-rw-r--r--server/tests/utils/server/jobs.ts2
-rw-r--r--shared/models/server/job.model.ts5
21 files changed, 214 insertions, 156 deletions
diff --git a/client/src/app/+admin/jobs/jobs-list/jobs-list.component.html b/client/src/app/+admin/jobs/jobs-list/jobs-list.component.html
index 20c35cb5b..b52d026a7 100644
--- a/client/src/app/+admin/jobs/jobs-list/jobs-list.component.html
+++ b/client/src/app/+admin/jobs/jobs-list/jobs-list.component.html
@@ -9,7 +9,7 @@
9</div> 9</div>
10 10
11<p-table 11<p-table
12 [value]="jobs" [lazy]="true" [paginator]="true" [totalRecords]="totalRecords" [rows]="rowsPerPage" dataKey="id" 12 [value]="jobs" [lazy]="true" [paginator]="true" [totalRecords]="totalRecords" [rows]="rowsPerPage" dataKey="uniqId"
13 [sortField]="sort.field" [sortOrder]="sort.order" (onLazyLoad)="loadLazy($event)" 13 [sortField]="sort.field" [sortOrder]="sort.order" (onLazyLoad)="loadLazy($event)"
14> 14>
15 <ng-template pTemplate="header"> 15 <ng-template pTemplate="header">
@@ -19,7 +19,8 @@
19 <th i18n style="width: 210px">Type</th> 19 <th i18n style="width: 210px">Type</th>
20 <th i18n style="width: 130px">State</th> 20 <th i18n style="width: 130px">State</th>
21 <th i18n style="width: 250px" pSortableColumn="createdAt">Created <p-sortIcon field="createdAt"></p-sortIcon></th> 21 <th i18n style="width: 250px" pSortableColumn="createdAt">Created <p-sortIcon field="createdAt"></p-sortIcon></th>
22 <th i18n style="width: 250px">Updated</th> 22 <th i18n style="width: 250px">Processed on</th>
23 <th i18n style="width: 250px">Finished on</th>
23 </tr> 24 </tr>
24 </ng-template> 25 </ng-template>
25 26
@@ -34,18 +35,19 @@
34 <td>{{ job.type }}</td> 35 <td>{{ job.type }}</td>
35 <td>{{ job.state }}</td> 36 <td>{{ job.state }}</td>
36 <td>{{ job.createdAt }}</td> 37 <td>{{ job.createdAt }}</td>
37 <td>{{ job.updatedAt }}</td> 38 <td>{{ job.processedOn }}</td>
39 <td>{{ job.finishedOn }}</td>
38 </tr> 40 </tr>
39 </ng-template> 41 </ng-template>
40 42
41 <ng-template pTemplate="rowexpansion" let-job> 43 <ng-template pTemplate="rowexpansion" let-job>
42 <tr> 44 <tr>
43 <td colspan="6"> 45 <td colspan="7">
44 <pre>{{ job.data }}</pre> 46 <pre>{{ job.data }}</pre>
45 </td> 47 </td>
46 </tr> 48 </tr>
47 <tr class="job-error" *ngIf="job.error"> 49 <tr class="job-error" *ngIf="job.error">
48 <td colspan="6"> 50 <td colspan="7">
49 <pre>{{ job.error }}</pre> 51 <pre>{{ job.error }}</pre>
50 </td> 52 </td>
51 </tr> 53 </tr>
diff --git a/client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts b/client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts
index 29dd9f31c..a77f4a4a1 100644
--- a/client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts
+++ b/client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts
@@ -17,8 +17,8 @@ import { I18n } from '@ngx-translate/i18n-polyfill'
17export class JobsListComponent extends RestTable implements OnInit { 17export class JobsListComponent extends RestTable implements OnInit {
18 private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state' 18 private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state'
19 19
20 jobState: JobState = 'inactive' 20 jobState: JobState = 'waiting'
21 jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ] 21 jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
22 jobs: Job[] = [] 22 jobs: Job[] = []
23 totalRecords: number 23 totalRecords: number
24 rowsPerPage = 10 24 rowsPerPage = 10
diff --git a/client/src/app/+admin/jobs/shared/job.service.ts b/client/src/app/+admin/jobs/shared/job.service.ts
index 6441eaac1..b96dc3359 100644
--- a/client/src/app/+admin/jobs/shared/job.service.ts
+++ b/client/src/app/+admin/jobs/shared/job.service.ts
@@ -25,8 +25,11 @@ export class JobService {
25 25
26 return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params }) 26 return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params })
27 .pipe( 27 .pipe(
28 map(res => this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'updatedAt' ])), 28 map(res => {
29 return this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'processedOn', 'finishedOn' ])
30 }),
29 map(res => this.restExtractor.applyToResultListData(res, this.prettyPrintData)), 31 map(res => this.restExtractor.applyToResultListData(res, this.prettyPrintData)),
32 map(res => this.restExtractor.applyToResultListData(res, this.buildUniqId)),
30 catchError(err => this.restExtractor.handleError(err)) 33 catchError(err => this.restExtractor.handleError(err))
31 ) 34 )
32 } 35 }
@@ -36,4 +39,8 @@ export class JobService {
36 39
37 return Object.assign(obj, { data }) 40 return Object.assign(obj, { data })
38 } 41 }
42
43 private buildUniqId (obj: Job) {
44 return Object.assign(obj, { uniqId: `${obj.id}-${obj.type}` })
45 }
39} 46}
diff --git a/scripts/clean/server/test.sh b/scripts/clean/server/test.sh
index 303806fe2..753b8c67e 100755
--- a/scripts/clean/server/test.sh
+++ b/scripts/clean/server/test.sh
@@ -8,5 +8,5 @@ for i in $(seq 1 6); do
8 rm -f "./config/local-test.json" 8 rm -f "./config/local-test.json"
9 rm -f "./config/local-test-$i.json" 9 rm -f "./config/local-test-$i.json"
10 createdb -O peertube "peertube_test$i" 10 createdb -O peertube "peertube_test$i"
11 redis-cli KEYS "q-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL 11 redis-cli KEYS "bull-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL
12done 12done
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts
index aa58a9144..c19596dde 100644
--- a/server/controllers/api/jobs.ts
+++ b/server/controllers/api/jobs.ts
@@ -13,6 +13,7 @@ import {
13} from '../../middlewares' 13} from '../../middlewares'
14import { paginationValidator } from '../../middlewares/validators' 14import { paginationValidator } from '../../middlewares/validators'
15import { listJobsValidator } from '../../middlewares/validators/jobs' 15import { listJobsValidator } from '../../middlewares/validators/jobs'
16import { isArray } from '../../helpers/custom-validators/misc'
16 17
17const jobsRouter = express.Router() 18const jobsRouter = express.Router()
18 19
@@ -36,26 +37,30 @@ export {
36// --------------------------------------------------------------------------- 37// ---------------------------------------------------------------------------
37 38
38async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { 39async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) {
39 const sort = req.query.sort === 'createdAt' ? 'ASC' : 'DESC' 40 const state: JobState = req.params.state
41 const asc = req.query.sort === 'createdAt'
40 42
41 const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) 43 const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc)
42 const total = await JobQueue.Instance.count(req.params.state) 44 const total = await JobQueue.Instance.count(state)
43 45
44 const result: ResultList<any> = { 46 const result: ResultList<any> = {
45 total, 47 total,
46 data: jobs.map(j => formatJob(j.toJSON())) 48 data: jobs.map(j => formatJob(j, state))
47 } 49 }
48 return res.json(result) 50 return res.json(result)
49} 51}
50 52
51function formatJob (job: any): Job { 53function formatJob (job: any, state: JobState): Job {
54 const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null
55
52 return { 56 return {
53 id: job.id, 57 id: job.id,
54 state: job.state as JobState, 58 state: state,
55 type: job.type as JobType, 59 type: job.queue.name as JobType,
56 data: job.data, 60 data: job.data,
57 error: job.error, 61 error,
58 createdAt: new Date(parseInt(job.created_at, 10)), 62 createdAt: new Date(job.timestamp),
59 updatedAt: new Date(parseInt(job.updated_at, 10)) 63 finishedOn: new Date(job.finishedOn),
64 processedOn: new Date(job.processedOn)
60 } 65 }
61} 66}
diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts
index 9700fbd12..1cc6e6912 100644
--- a/server/helpers/custom-validators/jobs.ts
+++ b/server/helpers/custom-validators/jobs.ts
@@ -1,7 +1,7 @@
1import { JobState } from '../../../shared/models' 1import { JobState } from '../../../shared/models'
2import { exists } from './misc' 2import { exists } from './misc'
3 3
4const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ] 4const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
5 5
6function isValidJobState (value: JobState) { 6function isValidJobState (value: JobState) {
7 return exists(value) && jobStates.indexOf(value) !== -1 7 return exists(value) && jobStates.indexOf(value) !== -1
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 24b7e2655..6173e1298 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -14,7 +14,7 @@ let config: IConfig = require('config')
14 14
15// --------------------------------------------------------------------------- 15// ---------------------------------------------------------------------------
16 16
17const LAST_MIGRATION_VERSION = 225 17const LAST_MIGRATION_VERSION = 230
18 18
19// --------------------------------------------------------------------------- 19// ---------------------------------------------------------------------------
20 20
diff --git a/server/initializers/migrations/0230-kue-to-bull.ts b/server/initializers/migrations/0230-kue-to-bull.ts
new file mode 100644
index 000000000..5fad87a61
--- /dev/null
+++ b/server/initializers/migrations/0230-kue-to-bull.ts
@@ -0,0 +1,63 @@
1import * as Sequelize from 'sequelize'
2import { createClient } from 'redis'
3import { CONFIG } from '../constants'
4import { JobQueue } from '../../lib/job-queue'
5import { initDatabaseModels } from '../database'
6
7async function up (utils: {
8 transaction: Sequelize.Transaction
9 queryInterface: Sequelize.QueryInterface
10 sequelize: Sequelize.Sequelize
11}): Promise<any> {
12 await initDatabaseModels(false)
13
14 return new Promise((res, rej) => {
15 const client = createClient({
16 host: CONFIG.REDIS.HOSTNAME,
17 port: CONFIG.REDIS.PORT,
18 db: CONFIG.REDIS.DB
19 })
20
21 const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST
22
23 client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => {
24 if (err) return rej(err)
25
26 const jobPromises = jobStrings
27 .map(s => s.split('|'))
28 .map(([ , jobId ]) => {
29 return new Promise((res, rej) => {
30 client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => {
31 if (err) return rej(err)
32
33 try {
34 const parsedData = JSON.parse(job.data)
35
36 return res({ type: job.type, payload: parsedData })
37 } catch (err) {
38 console.error('Cannot parse data %s.', job.data)
39 return res(null)
40 }
41 })
42 })
43 })
44
45 JobQueue.Instance.init()
46 .then(() => Promise.all(jobPromises))
47 .then((jobs: any) => {
48 const createJobPromises = jobs
49 .filter(job => job !== null)
50 .map(job => JobQueue.Instance.createJob(job))
51
52 return Promise.all(createJobPromises)
53 })
54 .then(() => res())
55 })
56 })
57}
58
59function down (options) {
60 throw new Error('Not implemented.')
61}
62
63export { up, down }
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index 286e343f2..2c1b4f49d 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { getServerActor } from '../../../helpers/utils' 3import { getServerActor } from '../../../helpers/utils'
4import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers' 4import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers'
@@ -14,7 +14,7 @@ export type ActivitypubFollowPayload = {
14 host: string 14 host: string
15} 15}
16 16
17async function processActivityPubFollow (job: kue.Job) { 17async function processActivityPubFollow (job: Bull.Job) {
18 const payload = job.data as ActivitypubFollowPayload 18 const payload = job.data as ActivitypubFollowPayload
19 const host = payload.host 19 const host = payload.host
20 20
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index d8b8ec222..03a9e12a4 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import * as Bluebird from 'bluebird' 2import * as Bluebird from 'bluebird'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { doRequest } from '../../../helpers/requests' 4import { doRequest } from '../../../helpers/requests'
@@ -12,7 +12,7 @@ export type ActivitypubHttpBroadcastPayload = {
12 body: any 12 body: any
13} 13}
14 14
15async function processActivityPubHttpBroadcast (job: kue.Job) { 15async function processActivityPubHttpBroadcast (job: Bull.Job) {
16 logger.info('Processing ActivityPub broadcast in job %d.', job.id) 16 logger.info('Processing ActivityPub broadcast in job %d.', job.id)
17 17
18 const payload = job.data as ActivitypubHttpBroadcastPayload 18 const payload = job.data as ActivitypubHttpBroadcastPayload
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index 10c0e606f..f21da087e 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { processActivities } from '../../activitypub/process' 3import { processActivities } from '../../activitypub/process'
4import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' 4import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
@@ -9,7 +9,7 @@ export type ActivitypubHttpFetcherPayload = {
9 uris: string[] 9 uris: string[]
10} 10}
11 11
12async function processActivityPubHttpFetcher (job: kue.Job) { 12async function processActivityPubHttpFetcher (job: Bull.Job) {
13 logger.info('Processing ActivityPub fetcher in job %d.', job.id) 13 logger.info('Processing ActivityPub fetcher in job %d.', job.id)
14 14
15 const payload = job.data as ActivitypubHttpBroadcastPayload 15 const payload = job.data as ActivitypubHttpBroadcastPayload
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 173f3bb52..c90d735f6 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
@@ -11,7 +11,7 @@ export type ActivitypubHttpUnicastPayload = {
11 body: any 11 body: any
12} 12}
13 13
14async function processActivityPubHttpUnicast (job: kue.Job) { 14async function processActivityPubHttpUnicast (job: Bull.Job) {
15 logger.info('Processing ActivityPub unicast in job %d.', job.id) 15 logger.info('Processing ActivityPub unicast in job %d.', job.id)
16 16
17 const payload = job.data as ActivitypubHttpUnicastPayload 17 const payload = job.data as ActivitypubHttpUnicastPayload
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index 9d7686116..73d98ae54 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { Emailer } from '../../emailer' 3import { Emailer } from '../../emailer'
4 4
@@ -8,7 +8,7 @@ export type EmailPayload = {
8 text: string 8 text: string
9} 9}
10 10
11async function processEmail (job: kue.Job) { 11async function processEmail (job: Bull.Job) {
12 const payload = job.data as EmailPayload 12 const payload = job.data as EmailPayload
13 logger.info('Processing email in job %d.', job.id) 13 logger.info('Processing email in job %d.', job.id)
14 14
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
index fc40527c7..bd68dd78b 100644
--- a/server/lib/job-queue/handlers/video-file.ts
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { VideoResolution, VideoState } from '../../../../shared' 2import { VideoResolution, VideoState } from '../../../../shared'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { computeResolutionsToTranscode } from '../../../helpers/utils' 4import { computeResolutionsToTranscode } from '../../../helpers/utils'
@@ -7,6 +7,7 @@ import { JobQueue } from '../job-queue'
7import { federateVideoIfNeeded } from '../../activitypub' 7import { federateVideoIfNeeded } from '../../activitypub'
8import { retryTransactionWrapper } from '../../../helpers/database-utils' 8import { retryTransactionWrapper } from '../../../helpers/database-utils'
9import { sequelizeTypescript } from '../../../initializers' 9import { sequelizeTypescript } from '../../../initializers'
10import * as Bluebird from 'bluebird'
10 11
11export type VideoFilePayload = { 12export type VideoFilePayload = {
12 videoUUID: string 13 videoUUID: string
@@ -20,7 +21,7 @@ export type VideoFileImportPayload = {
20 filePath: string 21 filePath: string
21} 22}
22 23
23async function processVideoFileImport (job: kue.Job) { 24async function processVideoFileImport (job: Bull.Job) {
24 const payload = job.data as VideoFileImportPayload 25 const payload = job.data as VideoFileImportPayload
25 logger.info('Processing video file import in job %d.', job.id) 26 logger.info('Processing video file import in job %d.', job.id)
26 27
@@ -37,7 +38,7 @@ async function processVideoFileImport (job: kue.Job) {
37 return video 38 return video
38} 39}
39 40
40async function processVideoFile (job: kue.Job) { 41async function processVideoFile (job: Bull.Job) {
41 const payload = job.data as VideoFilePayload 42 const payload = job.data as VideoFilePayload
42 logger.info('Processing video file in job %d.', job.id) 43 logger.info('Processing video file in job %d.', job.id)
43 44
@@ -109,7 +110,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole
109 ) 110 )
110 111
111 if (resolutionsEnabled.length !== 0) { 112 if (resolutionsEnabled.length !== 0) {
112 const tasks: Promise<any>[] = [] 113 const tasks: Bluebird<any>[] = []
113 114
114 for (const resolution of resolutionsEnabled) { 115 for (const resolution of resolutionsEnabled) {
115 const dataInput = { 116 const dataInput = {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 695fe0eea..77aaa7fa8 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,13 +1,12 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { JobState, JobType } from '../../../shared/models' 2import { JobState, JobType } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' 4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
5import { Redis } from '../redis'
6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
9import { EmailPayload, processEmail } from './handlers/email' 8import { EmailPayload, processEmail } from './handlers/email'
10import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file' 9import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 10import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12 11
13type CreateJobArgument = 12type CreateJobArgument =
@@ -19,7 +18,7 @@ type CreateJobArgument =
19 { type: 'video-file', payload: VideoFilePayload } | 18 { type: 'video-file', payload: VideoFilePayload } |
20 { type: 'email', payload: EmailPayload } 19 { type: 'email', payload: EmailPayload }
21 20
22const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { 21const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
23 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 22 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
24 'activitypub-http-unicast': processActivityPubHttpUnicast, 23 'activitypub-http-unicast': processActivityPubHttpUnicast,
25 'activitypub-http-fetcher': processActivityPubHttpFetcher, 24 'activitypub-http-fetcher': processActivityPubHttpFetcher,
@@ -29,18 +28,28 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
29 'email': processEmail 28 'email': processEmail
30} 29}
31 30
32const jobsWithTLL: JobType[] = [ 31const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = {
32 'activitypub-http-broadcast': true,
33 'activitypub-http-unicast': true,
34 'activitypub-http-fetcher': true,
35 'activitypub-follow': true
36}
37
38const jobTypes: JobType[] = [
39 'activitypub-follow',
33 'activitypub-http-broadcast', 40 'activitypub-http-broadcast',
34 'activitypub-http-unicast',
35 'activitypub-http-fetcher', 41 'activitypub-http-fetcher',
36 'activitypub-follow' 42 'activitypub-http-unicast',
43 'email',
44 'video-file',
45 'video-file-import'
37] 46]
38 47
39class JobQueue { 48class JobQueue {
40 49
41 private static instance: JobQueue 50 private static instance: JobQueue
42 51
43 private jobQueue: kue.Queue 52 private queues: { [ id in JobType ]?: Bull.Queue } = {}
44 private initialized = false 53 private initialized = false
45 private jobRedisPrefix: string 54 private jobRedisPrefix: string
46 55
@@ -51,9 +60,8 @@ class JobQueue {
51 if (this.initialized === true) return 60 if (this.initialized === true) return
52 this.initialized = true 61 this.initialized = true
53 62
54 this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST 63 this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST
55 64 const queueOptions = {
56 this.jobQueue = kue.createQueue({
57 prefix: this.jobRedisPrefix, 65 prefix: this.jobRedisPrefix,
58 redis: { 66 redis: {
59 host: CONFIG.REDIS.HOSTNAME, 67 host: CONFIG.REDIS.HOSTNAME,
@@ -61,120 +69,94 @@ class JobQueue {
61 auth: CONFIG.REDIS.AUTH, 69 auth: CONFIG.REDIS.AUTH,
62 db: CONFIG.REDIS.DB 70 db: CONFIG.REDIS.DB
63 } 71 }
64 }) 72 }
65
66 this.jobQueue.setMaxListeners(20)
67 73
68 this.jobQueue.on('error', err => { 74 for (const handlerName of Object.keys(handlers)) {
69 logger.error('Error in job queue.', { err }) 75 const queue = new Bull(handlerName, queueOptions)
70 process.exit(-1) 76 const handler = handlers[handlerName]
71 })
72 this.jobQueue.watchStuckJobs(5000)
73 77
74 await this.reactiveStuckJobs() 78 queue.process(JOB_CONCURRENCY[handlerName], handler)
79 .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err }))
75 80
76 for (const handlerName of Object.keys(handlers)) { 81 queue.on('error', err => {
77 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { 82 logger.error('Error in job queue %s.', handlerName, { err })
78 try { 83 process.exit(-1)
79 const res = await handlers[ handlerName ](job)
80 return done(null, res)
81 } catch (err) {
82 logger.error('Cannot execute job %d.', job.id, { err })
83 return done(err)
84 }
85 }) 84 })
85
86 this.queues[handlerName] = queue
86 } 87 }
87 } 88 }
88 89
89 createJob (obj: CreateJobArgument, priority = 'normal') { 90 createJob (obj: CreateJobArgument) {
90 return new Promise((res, rej) => { 91 const queue = this.queues[obj.type]
91 let job = this.jobQueue 92 if (queue === undefined) {
92 .create(obj.type, obj.payload) 93 logger.error('Unknown queue %s: cannot create job.', obj.type)
93 .priority(priority) 94 return
94 .attempts(JOB_ATTEMPTS[obj.type]) 95 }
95 .backoff({ delay: 60 * 1000, type: 'exponential' })
96 96
97 if (jobsWithTLL.indexOf(obj.type) !== -1) { 97 const jobArgs: Bull.JobOptions = {
98 job = job.ttl(JOB_REQUEST_TTL) 98 backoff: { delay: 60 * 1000, type: 'exponential' },
99 } 99 attempts: JOB_ATTEMPTS[obj.type]
100 }
100 101
101 return job.save(err => { 102 if (jobsWithRequestTimeout[obj.type] === true) {
102 if (err) return rej(err) 103 jobArgs.timeout = JOB_REQUEST_TTL
104 }
103 105
104 return res() 106 return queue.add(obj.payload, jobArgs)
105 })
106 })
107 } 107 }
108 108
109 async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> { 109 async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> {
110 const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) 110 let results: Bull.Job[] = []
111 111
112 const jobPromises = jobStrings 112 // TODO: optimize
113 .map(s => s.split('|')) 113 for (const jobType of jobTypes) {
114 .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) 114 const queue = this.queues[ jobType ]
115 if (queue === undefined) {
116 logger.error('Unknown queue %s to list jobs.', jobType)
117 continue
118 }
115 119
116 return Promise.all(jobPromises) 120 // FIXME: Bull queue typings does not have getJobs method
117 } 121 const jobs = await (queue as any).getJobs(state, 0, start + count, asc)
122 results = results.concat(jobs)
123 }
118 124
119 count (state: JobState) { 125 results.sort((j1: any, j2: any) => {
120 return new Promise<number>((res, rej) => { 126 if (j1.timestamp < j2.timestamp) return -1
121 this.jobQueue[state + 'Count']((err, total) => { 127 else if (j1.timestamp === j2.timestamp) return 0
122 if (err) return rej(err)
123 128
124 return res(total) 129 return 1
125 })
126 }) 130 })
127 }
128 131
129 removeOldJobs () { 132 if (asc === false) results.reverse()
130 const now = new Date().getTime()
131 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
132 if (err) {
133 logger.error('Cannot get jobs when removing old jobs.', { err })
134 return
135 }
136 133
137 for (const job of jobs) { 134 return results.slice(start, start + count)
138 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
139 job.remove()
140 }
141 }
142 })
143 } 135 }
144 136
145 private reactiveStuckJobs () { 137 async count (state: JobState): Promise<number> {
146 const promises: Promise<any>[] = [] 138 let total = 0
147
148 this.jobQueue.active((err, ids) => {
149 if (err) throw err
150 139
151 for (const id of ids) { 140 for (const type of jobTypes) {
152 kue.Job.get(id, (err, job) => { 141 const queue = this.queues[ type ]
153 if (err) throw err 142 if (queue === undefined) {
143 logger.error('Unknown queue %s to count jobs.', type)
144 continue
145 }
154 146
155 const p = new Promise((res, rej) => { 147 const counts = await queue.getJobCounts()
156 job.inactive(err => {
157 if (err) return rej(err)
158 return res()
159 })
160 })
161 148
162 promises.push(p) 149 total += counts[ state ]
163 }) 150 }
164 }
165 })
166 151
167 return Promise.all(promises) 152 return total
168 } 153 }
169 154
170 private getJob (id: number) { 155 removeOldJobs () {
171 return new Promise<kue.Job>((res, rej) => { 156 for (const key of Object.keys(this.queues)) {
172 kue.Job.get(id, (err, job) => { 157 const queue = this.queues[key]
173 if (err) return rej(err) 158 queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
174 159 }
175 return res(job)
176 })
177 })
178 } 160 }
179 161
180 static get Instance () { 162 static get Instance () {
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index 5bd55109c..78b28986a 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -78,16 +78,6 @@ class Redis {
78 return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) 78 return this.setObject(this.buildCachedRouteKey(req), cached, lifetime)
79 } 79 }
80 80
81 listJobs (jobsPrefix: string, state: string, mode: 'alpha', order: 'ASC' | 'DESC', offset: number, count: number) {
82 return new Promise<string[]>((res, rej) => {
83 this.client.sort(jobsPrefix + ':jobs:' + state, 'by', mode, order, 'LIMIT', offset.toString(), count.toString(), (err, values) => {
84 if (err) return rej(err)
85
86 return res(values)
87 })
88 })
89 }
90
91 generateResetPasswordKey (userId: number) { 81 generateResetPasswordKey (userId: number) {
92 return 'reset-password-' + userId 82 return 'reset-password-' + userId
93 } 83 }
diff --git a/server/tests/api/server/handle-down.ts b/server/tests/api/server/handle-down.ts
index 69609b4fc..84d310ae6 100644
--- a/server/tests/api/server/handle-down.ts
+++ b/server/tests/api/server/handle-down.ts
@@ -6,15 +6,21 @@ import { JobState } from '../../../../shared/models'
6import { VideoPrivacy } from '../../../../shared/models/videos' 6import { VideoPrivacy } from '../../../../shared/models/videos'
7import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model' 7import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model'
8import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils' 8import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils'
9
10import { 9import {
11 flushAndRunMultipleServers, flushTests, getVideosList, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo, 10 flushAndRunMultipleServers,
11 getVideosList,
12 killallServers,
13 ServerInfo,
14 setAccessTokensToServers,
15 uploadVideo,
12 wait 16 wait
13} from '../../utils/index' 17} from '../../utils/index'
14import { follow, getFollowersListPaginationAndSort } from '../../utils/server/follows' 18import { follow, getFollowersListPaginationAndSort } from '../../utils/server/follows'
15import { getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' 19import { getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs'
16import { 20import {
17 addVideoCommentReply, addVideoCommentThread, getVideoCommentThreads, 21 addVideoCommentReply,
22 addVideoCommentThread,
23 getVideoCommentThreads,
18 getVideoThreadComments 24 getVideoThreadComments
19} from '../../utils/videos/video-comments' 25} from '../../utils/videos/video-comments'
20 26
@@ -146,7 +152,7 @@ describe('Test handle downs', function () {
146 }) 152 })
147 153
148 it('Should not have pending/processing jobs anymore', async function () { 154 it('Should not have pending/processing jobs anymore', async function () {
149 const states: JobState[] = [ 'inactive', 'active' ] 155 const states: JobState[] = [ 'waiting', 'active' ]
150 156
151 for (const state of states) { 157 for (const state of states) {
152 const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt') 158 const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt')
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts
index 81e389de6..f248c5521 100644
--- a/server/tests/api/server/jobs.ts
+++ b/server/tests/api/server/jobs.ts
@@ -2,7 +2,7 @@
2 2
3import * as chai from 'chai' 3import * as chai from 'chai'
4import 'mocha' 4import 'mocha'
5import { flushTests, killallServers, ServerInfo, setAccessTokensToServers, wait } from '../../utils/index' 5import { killallServers, ServerInfo, setAccessTokensToServers } from '../../utils/index'
6import { doubleFollow } from '../../utils/server/follows' 6import { doubleFollow } from '../../utils/server/follows'
7import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' 7import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs'
8import { flushAndRunMultipleServers } from '../../utils/server/servers' 8import { flushAndRunMultipleServers } from '../../utils/server/servers'
@@ -35,22 +35,23 @@ describe('Test jobs', function () {
35 }) 35 })
36 36
37 it('Should list jobs', async function () { 37 it('Should list jobs', async function () {
38 const res = await getJobsList(servers[1].url, servers[1].accessToken, 'complete') 38 const res = await getJobsList(servers[1].url, servers[1].accessToken, 'completed')
39 expect(res.body.total).to.be.above(2) 39 expect(res.body.total).to.be.above(2)
40 expect(res.body.data).to.have.length.above(2) 40 expect(res.body.data).to.have.length.above(2)
41 }) 41 })
42 42
43 it('Should list jobs with sort and pagination', async function () { 43 it('Should list jobs with sort and pagination', async function () {
44 const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'complete', 1, 1, 'createdAt') 44 const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 1, 'createdAt')
45 expect(res.body.total).to.be.above(2) 45 expect(res.body.total).to.be.above(2)
46 expect(res.body.data).to.have.lengthOf(1) 46 expect(res.body.data).to.have.lengthOf(1)
47 47
48 const job = res.body.data[0] 48 const job = res.body.data[0]
49 49
50 expect(job.state).to.equal('complete') 50 expect(job.state).to.equal('completed')
51 expect(job.type).to.equal('activitypub-http-unicast') 51 expect(job.type).to.equal('activitypub-http-unicast')
52 expect(dateIsValid(job.createdAt)).to.be.true 52 expect(dateIsValid(job.createdAt)).to.be.true
53 expect(dateIsValid(job.updatedAt)).to.be.true 53 expect(dateIsValid(job.processedOn)).to.be.true
54 expect(dateIsValid(job.finishedOn)).to.be.true
54 }) 55 })
55 56
56 after(async function () { 57 after(async function () {
diff --git a/server/tests/real-world/real-world.ts b/server/tests/real-world/real-world.ts
index b7375f778..a96469b11 100644
--- a/server/tests/real-world/real-world.ts
+++ b/server/tests/real-world/real-world.ts
@@ -347,7 +347,7 @@ function goodbye () {
347} 347}
348 348
349async function isTherePendingRequests (servers: ServerInfo[]) { 349async function isTherePendingRequests (servers: ServerInfo[]) {
350 const states: JobState[] = [ 'inactive', 'active', 'delayed' ] 350 const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
351 const tasks: Promise<any>[] = [] 351 const tasks: Promise<any>[] = []
352 let pendingRequests = false 352 let pendingRequests = false
353 353
diff --git a/server/tests/utils/server/jobs.ts b/server/tests/utils/server/jobs.ts
index 375e76f93..c9cb8d3a3 100644
--- a/server/tests/utils/server/jobs.ts
+++ b/server/tests/utils/server/jobs.ts
@@ -33,7 +33,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
33 if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ] 33 if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ]
34 else servers = serversArg as ServerInfo[] 34 else servers = serversArg as ServerInfo[]
35 35
36 const states: JobState[] = [ 'inactive', 'active', 'delayed' ] 36 const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
37 const tasks: Promise<any>[] = [] 37 const tasks: Promise<any>[] = []
38 let pendingRequests: boolean 38 let pendingRequests: boolean
39 39
diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts
index 7d8d39a19..a38a8aa3b 100644
--- a/shared/models/server/job.model.ts
+++ b/shared/models/server/job.model.ts
@@ -1,4 +1,4 @@
1export type JobState = 'active' | 'complete' | 'failed' | 'inactive' | 'delayed' 1export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'
2 2
3export type JobType = 'activitypub-http-unicast' | 3export type JobType = 'activitypub-http-unicast' |
4 'activitypub-http-broadcast' | 4 'activitypub-http-broadcast' |
@@ -15,5 +15,6 @@ export interface Job {
15 data: any, 15 data: any,
16 error: any, 16 error: any,
17 createdAt: Date 17 createdAt: Date
18 updatedAt: Date 18 finishedOn: Date
19 processedOn: Date
19} 20}