aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/jobs')
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts3
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts7
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts9
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts3
-rw-r--r--server/lib/jobs/job-scheduler.ts20
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts5
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts16
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts11
8 files changed, 35 insertions, 39 deletions
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
index 49d4bf5c6..8040dde2a 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
+++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
@@ -1,5 +1,4 @@
1import { logger } from '../../../helpers' 1import { doRequest, logger } from '../../../helpers'
2import { doRequest } from '../../../helpers/requests'
3import { ActivityPubHttpPayload, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' 2import { ActivityPubHttpPayload, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
4 3
5async function process (payload: ActivityPubHttpPayload, jobId: number) { 4async function process (payload: ActivityPubHttpPayload, jobId: number) {
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts
index 9adceab84..638150202 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts
+++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts
@@ -1,7 +1,6 @@
1import { logger } from '../../../helpers' 1import { doRequest, logger } from '../../../helpers'
2import { doRequest } from '../../../helpers/requests' 2import { ACTIVITY_PUB } from '../../../initializers'
3import { ACTIVITY_PUB } from '../../../initializers/constants' 3import { processActivities } from '../../activitypub/process'
4import { processActivities } from '../../activitypub/process/process'
5import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' 4import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler'
6 5
7async function process (payload: ActivityPubHttpPayload, jobId: number) { 6async function process (payload: ActivityPubHttpPayload, jobId: number) {
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
index fcc81eb16..76da5b724 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
+++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
@@ -1,8 +1,7 @@
1import { JobCategory } from '../../../../shared' 1import { JobCategory } from '../../../../shared'
2import { buildSignedActivity } from '../../../helpers/activitypub' 2import { buildSignedActivity, logger } from '../../../helpers'
3import { logger } from '../../../helpers/logger' 3import { ACTIVITY_PUB } from '../../../initializers'
4import { ACTIVITY_PUB } from '../../../initializers/constants' 4import { AccountModel } from '../../../models/account/account'
5import { database as db } from '../../../initializers/database'
6import { JobHandler, JobScheduler } from '../job-scheduler' 5import { JobHandler, JobScheduler } from '../job-scheduler'
7 6
8import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' 7import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler'
@@ -46,7 +45,7 @@ async function computeBody (payload: ActivityPubHttpPayload) {
46 let body = payload.body 45 let body = payload.body
47 46
48 if (payload.signatureAccountId) { 47 if (payload.signatureAccountId) {
49 const accountSignature = await db.Account.load(payload.signatureAccountId) 48 const accountSignature = await AccountModel.load(payload.signatureAccountId)
50 if (!accountSignature) throw new Error('Unknown signature account id.') 49 if (!accountSignature) throw new Error('Unknown signature account id.')
51 body = await buildSignedActivity(accountSignature, payload.body) 50 body = await buildSignedActivity(accountSignature, payload.body)
52 } 51 }
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
index 4c95197c4..f16cfcec3 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
+++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
@@ -1,5 +1,4 @@
1import { logger } from '../../../helpers' 1import { doRequest, logger } from '../../../helpers'
2import { doRequest } from '../../../helpers/requests'
3import { ActivityPubHttpPayload, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' 2import { ActivityPubHttpPayload, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
4 3
5async function process (payload: ActivityPubHttpPayload, jobId: number) { 4async function process (payload: ActivityPubHttpPayload, jobId: number) {
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index 62ce6927e..88fe8a4a3 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -2,8 +2,8 @@ import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3import { JobCategory } from '../../../shared' 3import { JobCategory } from '../../../shared'
4import { logger } from '../../helpers' 4import { logger } from '../../helpers'
5import { database as db, JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' 5import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers'
6import { JobInstance } from '../../models' 6import { JobModel } from '../../models/job/job'
7 7
8export interface JobHandler<P, T> { 8export interface JobHandler<P, T> {
9 process (data: object, jobId: number): Promise<T> 9 process (data: object, jobId: number): Promise<T>
@@ -24,12 +24,12 @@ class JobScheduler<P, T> {
24 24
25 logger.info('Jobs scheduler %s activated.', this.jobCategory) 25 logger.info('Jobs scheduler %s activated.', this.jobCategory)
26 26
27 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) 27 const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this))
28 28
29 // Finish processing jobs from a previous start 29 // Finish processing jobs from a previous start
30 const state = JOB_STATES.PROCESSING 30 const state = JOB_STATES.PROCESSING
31 try { 31 try {
32 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) 32 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
33 33
34 this.enqueueJobs(jobsQueue, jobs) 34 this.enqueueJobs(jobsQueue, jobs)
35 } catch (err) { 35 } catch (err) {
@@ -45,7 +45,7 @@ class JobScheduler<P, T> {
45 45
46 const state = JOB_STATES.PENDING 46 const state = JOB_STATES.PENDING
47 try { 47 try {
48 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) 48 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
49 49
50 this.enqueueJobs(jobsQueue, jobs) 50 this.enqueueJobs(jobsQueue, jobs)
51 } catch (err) { 51 } catch (err) {
@@ -70,14 +70,14 @@ class JobScheduler<P, T> {
70 70
71 const options = { transaction } 71 const options = { transaction }
72 72
73 return db.Job.create(createQuery, options) 73 return JobModel.create(createQuery, options)
74 } 74 }
75 75
76 private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { 76 private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) {
77 jobs.forEach(job => jobsQueue.push(job)) 77 jobs.forEach(job => jobsQueue.push(job))
78 } 78 }
79 79
80 private async processJob (job: JobInstance, callback: (err: Error) => void) { 80 private async processJob (job: JobModel, callback: (err: Error) => void) {
81 const jobHandler = this.jobHandlers[job.handlerName] 81 const jobHandler = this.jobHandlers[job.handlerName]
82 if (jobHandler === undefined) { 82 if (jobHandler === undefined) {
83 const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id 83 const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id
@@ -110,7 +110,7 @@ class JobScheduler<P, T> {
110 return callback(null) 110 return callback(null)
111 } 111 }
112 112
113 private async onJobError (jobHandler: JobHandler<P, T>, job: JobInstance, err: Error) { 113 private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) {
114 job.state = JOB_STATES.ERROR 114 job.state = JOB_STATES.ERROR
115 115
116 try { 116 try {
@@ -121,7 +121,7 @@ class JobScheduler<P, T> {
121 } 121 }
122 } 122 }
123 123
124 private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobInstance, jobResult: T) { 124 private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) {
125 job.state = JOB_STATES.SUCCESS 125 job.state = JOB_STATES.SUCCESS
126 126
127 try { 127 try {
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
index c5efe8eeb..e5530a73c 100644
--- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
+++ b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
@@ -1,14 +1,15 @@
1import { JobCategory } from '../../../../shared' 1import { JobCategory } from '../../../../shared'
2import { VideoModel } from '../../../models/video/video'
2import { JobHandler, JobScheduler } from '../job-scheduler' 3import { JobHandler, JobScheduler } from '../job-scheduler'
4
3import * as videoFileOptimizer from './video-file-optimizer-handler' 5import * as videoFileOptimizer from './video-file-optimizer-handler'
4import * as videoFileTranscoder from './video-file-transcoder-handler' 6import * as videoFileTranscoder from './video-file-transcoder-handler'
5import { VideoInstance } from '../../../models/video/video-interface'
6 7
7type TranscodingJobPayload = { 8type TranscodingJobPayload = {
8 videoUUID: string 9 videoUUID: string
9 resolution?: number 10 resolution?: number
10} 11}
11const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoInstance> } = { 12const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoModel> } = {
12 videoFileOptimizer, 13 videoFileOptimizer,
13 videoFileTranscoder 14 videoFileTranscoder
14} 15}
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
index e65ab3ee1..1786ce971 100644
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
+++ b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
@@ -1,14 +1,14 @@
1import * as Bluebird from 'bluebird' 1import * as Bluebird from 'bluebird'
2import { computeResolutionsToTranscode, logger } from '../../../helpers' 2import { computeResolutionsToTranscode, logger } from '../../../helpers'
3import { database as db } from '../../../initializers/database' 3import { sequelizeTypescript } from '../../../initializers'
4import { VideoInstance } from '../../../models' 4import { VideoModel } from '../../../models/video/video'
5import { sendAddVideo } from '../../activitypub/send/send-add' 5import { shareVideoByServer } from '../../activitypub'
6import { sendAddVideo } from '../../activitypub/send'
6import { JobScheduler } from '../job-scheduler' 7import { JobScheduler } from '../job-scheduler'
7import { TranscodingJobPayload } from './transcoding-job-scheduler' 8import { TranscodingJobPayload } from './transcoding-job-scheduler'
8import { shareVideoByServer } from '../../activitypub/share'
9 9
10async function process (data: TranscodingJobPayload, jobId: number) { 10async function process (data: TranscodingJobPayload, jobId: number) {
11 const video = await db.Video.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) 11 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID)
12 // No video, maybe deleted? 12 // No video, maybe deleted?
13 if (!video) { 13 if (!video) {
14 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) 14 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
@@ -25,13 +25,13 @@ function onError (err: Error, jobId: number) {
25 return Promise.resolve() 25 return Promise.resolve()
26} 26}
27 27
28async function onSuccess (jobId: number, video: VideoInstance, jobScheduler: JobScheduler<TranscodingJobPayload, VideoInstance>) { 28async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler<TranscodingJobPayload, VideoModel>) {
29 if (video === undefined) return undefined 29 if (video === undefined) return undefined
30 30
31 logger.info('Job %d is a success.', jobId) 31 logger.info('Job %d is a success.', jobId)
32 32
33 // Maybe the video changed in database, refresh it 33 // Maybe the video changed in database, refresh it
34 const videoDatabase = await db.Video.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) 34 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
35 // Video does not exist anymore 35 // Video does not exist anymore
36 if (!videoDatabase) return undefined 36 if (!videoDatabase) return undefined
37 37
@@ -50,7 +50,7 @@ async function onSuccess (jobId: number, video: VideoInstance, jobScheduler: Job
50 50
51 if (resolutionsEnabled.length !== 0) { 51 if (resolutionsEnabled.length !== 0) {
52 try { 52 try {
53 await db.sequelize.transaction(async t => { 53 await sequelizeTypescript.transaction(async t => {
54 const tasks: Bluebird<any>[] = [] 54 const tasks: Bluebird<any>[] = []
55 55
56 for (const resolution of resolutionsEnabled) { 56 for (const resolution of resolutionsEnabled) {
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
index 867580200..8957b4565 100644
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
+++ b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
@@ -1,11 +1,10 @@
1import { VideoResolution } from '../../../../shared' 1import { VideoResolution } from '../../../../shared'
2import { logger } from '../../../helpers' 2import { logger } from '../../../helpers'
3import { database as db } from '../../../initializers/database' 3import { VideoModel } from '../../../models/video/video'
4import { VideoInstance } from '../../../models' 4import { sendUpdateVideo } from '../../activitypub/send'
5import { sendUpdateVideo } from '../../activitypub/send/send-update'
6 5
7async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { 6async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
8 const video = await db.Video.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) 7 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID)
9 // No video, maybe deleted? 8 // No video, maybe deleted?
10 if (!video) { 9 if (!video) {
11 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) 10 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
@@ -22,13 +21,13 @@ function onError (err: Error, jobId: number) {
22 return Promise.resolve() 21 return Promise.resolve()
23} 22}
24 23
25async function onSuccess (jobId: number, video: VideoInstance) { 24async function onSuccess (jobId: number, video: VideoModel) {
26 if (video === undefined) return undefined 25 if (video === undefined) return undefined
27 26
28 logger.info('Job %d is a success.', jobId) 27 logger.info('Job %d is a success.', jobId)
29 28
30 // Maybe the video changed in database, refresh it 29 // Maybe the video changed in database, refresh it
31 const videoDatabase = await db.Video.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) 30 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
32 // Video does not exist anymore 31 // Video does not exist anymore
33 if (!videoDatabase) return undefined 32 if (!videoDatabase) return undefined
34 33