diff options
Diffstat (limited to 'server/lib/jobs')
8 files changed, 35 insertions, 39 deletions
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts index 49d4bf5c6..8040dde2a 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts | |||
@@ -1,5 +1,4 @@ | |||
1 | import { logger } from '../../../helpers' | 1 | import { doRequest, logger } from '../../../helpers' |
2 | import { doRequest } from '../../../helpers/requests' | ||
3 | import { ActivityPubHttpPayload, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' | 2 | import { ActivityPubHttpPayload, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' |
4 | 3 | ||
5 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | 4 | async function process (payload: ActivityPubHttpPayload, jobId: number) { |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts index 9adceab84..638150202 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts | |||
@@ -1,7 +1,6 @@ | |||
1 | import { logger } from '../../../helpers' | 1 | import { doRequest, logger } from '../../../helpers' |
2 | import { doRequest } from '../../../helpers/requests' | 2 | import { ACTIVITY_PUB } from '../../../initializers' |
3 | import { ACTIVITY_PUB } from '../../../initializers/constants' | 3 | import { processActivities } from '../../activitypub/process' |
4 | import { processActivities } from '../../activitypub/process/process' | ||
5 | import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' | 4 | import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' |
6 | 5 | ||
7 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | 6 | async function process (payload: ActivityPubHttpPayload, jobId: number) { |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts index fcc81eb16..76da5b724 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts | |||
@@ -1,8 +1,7 @@ | |||
1 | import { JobCategory } from '../../../../shared' | 1 | import { JobCategory } from '../../../../shared' |
2 | import { buildSignedActivity } from '../../../helpers/activitypub' | 2 | import { buildSignedActivity, logger } from '../../../helpers' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { ACTIVITY_PUB } from '../../../initializers' |
4 | import { ACTIVITY_PUB } from '../../../initializers/constants' | 4 | import { AccountModel } from '../../../models/account/account' |
5 | import { database as db } from '../../../initializers/database' | ||
6 | import { JobHandler, JobScheduler } from '../job-scheduler' | 5 | import { JobHandler, JobScheduler } from '../job-scheduler' |
7 | 6 | ||
8 | import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' | 7 | import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' |
@@ -46,7 +45,7 @@ async function computeBody (payload: ActivityPubHttpPayload) { | |||
46 | let body = payload.body | 45 | let body = payload.body |
47 | 46 | ||
48 | if (payload.signatureAccountId) { | 47 | if (payload.signatureAccountId) { |
49 | const accountSignature = await db.Account.load(payload.signatureAccountId) | 48 | const accountSignature = await AccountModel.load(payload.signatureAccountId) |
50 | if (!accountSignature) throw new Error('Unknown signature account id.') | 49 | if (!accountSignature) throw new Error('Unknown signature account id.') |
51 | body = await buildSignedActivity(accountSignature, payload.body) | 50 | body = await buildSignedActivity(accountSignature, payload.body) |
52 | } | 51 | } |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts index 4c95197c4..f16cfcec3 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts | |||
@@ -1,5 +1,4 @@ | |||
1 | import { logger } from '../../../helpers' | 1 | import { doRequest, logger } from '../../../helpers' |
2 | import { doRequest } from '../../../helpers/requests' | ||
3 | import { ActivityPubHttpPayload, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' | 2 | import { ActivityPubHttpPayload, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler' |
4 | 3 | ||
5 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | 4 | async function process (payload: ActivityPubHttpPayload, jobId: number) { |
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index 62ce6927e..88fe8a4a3 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -2,8 +2,8 @@ import { AsyncQueue, forever, queue } from 'async' | |||
2 | import * as Sequelize from 'sequelize' | 2 | import * as Sequelize from 'sequelize' |
3 | import { JobCategory } from '../../../shared' | 3 | import { JobCategory } from '../../../shared' |
4 | import { logger } from '../../helpers' | 4 | import { logger } from '../../helpers' |
5 | import { database as db, JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' | 5 | import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' |
6 | import { JobInstance } from '../../models' | 6 | import { JobModel } from '../../models/job/job' |
7 | 7 | ||
8 | export interface JobHandler<P, T> { | 8 | export interface JobHandler<P, T> { |
9 | process (data: object, jobId: number): Promise<T> | 9 | process (data: object, jobId: number): Promise<T> |
@@ -24,12 +24,12 @@ class JobScheduler<P, T> { | |||
24 | 24 | ||
25 | logger.info('Jobs scheduler %s activated.', this.jobCategory) | 25 | logger.info('Jobs scheduler %s activated.', this.jobCategory) |
26 | 26 | ||
27 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) | 27 | const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this)) |
28 | 28 | ||
29 | // Finish processing jobs from a previous start | 29 | // Finish processing jobs from a previous start |
30 | const state = JOB_STATES.PROCESSING | 30 | const state = JOB_STATES.PROCESSING |
31 | try { | 31 | try { |
32 | const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) | 32 | const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) |
33 | 33 | ||
34 | this.enqueueJobs(jobsQueue, jobs) | 34 | this.enqueueJobs(jobsQueue, jobs) |
35 | } catch (err) { | 35 | } catch (err) { |
@@ -45,7 +45,7 @@ class JobScheduler<P, T> { | |||
45 | 45 | ||
46 | const state = JOB_STATES.PENDING | 46 | const state = JOB_STATES.PENDING |
47 | try { | 47 | try { |
48 | const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) | 48 | const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory) |
49 | 49 | ||
50 | this.enqueueJobs(jobsQueue, jobs) | 50 | this.enqueueJobs(jobsQueue, jobs) |
51 | } catch (err) { | 51 | } catch (err) { |
@@ -70,14 +70,14 @@ class JobScheduler<P, T> { | |||
70 | 70 | ||
71 | const options = { transaction } | 71 | const options = { transaction } |
72 | 72 | ||
73 | return db.Job.create(createQuery, options) | 73 | return JobModel.create(createQuery, options) |
74 | } | 74 | } |
75 | 75 | ||
76 | private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { | 76 | private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) { |
77 | jobs.forEach(job => jobsQueue.push(job)) | 77 | jobs.forEach(job => jobsQueue.push(job)) |
78 | } | 78 | } |
79 | 79 | ||
80 | private async processJob (job: JobInstance, callback: (err: Error) => void) { | 80 | private async processJob (job: JobModel, callback: (err: Error) => void) { |
81 | const jobHandler = this.jobHandlers[job.handlerName] | 81 | const jobHandler = this.jobHandlers[job.handlerName] |
82 | if (jobHandler === undefined) { | 82 | if (jobHandler === undefined) { |
83 | const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id | 83 | const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id |
@@ -110,7 +110,7 @@ class JobScheduler<P, T> { | |||
110 | return callback(null) | 110 | return callback(null) |
111 | } | 111 | } |
112 | 112 | ||
113 | private async onJobError (jobHandler: JobHandler<P, T>, job: JobInstance, err: Error) { | 113 | private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) { |
114 | job.state = JOB_STATES.ERROR | 114 | job.state = JOB_STATES.ERROR |
115 | 115 | ||
116 | try { | 116 | try { |
@@ -121,7 +121,7 @@ class JobScheduler<P, T> { | |||
121 | } | 121 | } |
122 | } | 122 | } |
123 | 123 | ||
124 | private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobInstance, jobResult: T) { | 124 | private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) { |
125 | job.state = JOB_STATES.SUCCESS | 125 | job.state = JOB_STATES.SUCCESS |
126 | 126 | ||
127 | try { | 127 | try { |
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts index c5efe8eeb..e5530a73c 100644 --- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts +++ b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts | |||
@@ -1,14 +1,15 @@ | |||
1 | import { JobCategory } from '../../../../shared' | 1 | import { JobCategory } from '../../../../shared' |
2 | import { VideoModel } from '../../../models/video/video' | ||
2 | import { JobHandler, JobScheduler } from '../job-scheduler' | 3 | import { JobHandler, JobScheduler } from '../job-scheduler' |
4 | |||
3 | import * as videoFileOptimizer from './video-file-optimizer-handler' | 5 | import * as videoFileOptimizer from './video-file-optimizer-handler' |
4 | import * as videoFileTranscoder from './video-file-transcoder-handler' | 6 | import * as videoFileTranscoder from './video-file-transcoder-handler' |
5 | import { VideoInstance } from '../../../models/video/video-interface' | ||
6 | 7 | ||
7 | type TranscodingJobPayload = { | 8 | type TranscodingJobPayload = { |
8 | videoUUID: string | 9 | videoUUID: string |
9 | resolution?: number | 10 | resolution?: number |
10 | } | 11 | } |
11 | const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoInstance> } = { | 12 | const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoModel> } = { |
12 | videoFileOptimizer, | 13 | videoFileOptimizer, |
13 | videoFileTranscoder | 14 | videoFileTranscoder |
14 | } | 15 | } |
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts index e65ab3ee1..1786ce971 100644 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts +++ b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts | |||
@@ -1,14 +1,14 @@ | |||
1 | import * as Bluebird from 'bluebird' | 1 | import * as Bluebird from 'bluebird' |
2 | import { computeResolutionsToTranscode, logger } from '../../../helpers' | 2 | import { computeResolutionsToTranscode, logger } from '../../../helpers' |
3 | import { database as db } from '../../../initializers/database' | 3 | import { sequelizeTypescript } from '../../../initializers' |
4 | import { VideoInstance } from '../../../models' | 4 | import { VideoModel } from '../../../models/video/video' |
5 | import { sendAddVideo } from '../../activitypub/send/send-add' | 5 | import { shareVideoByServer } from '../../activitypub' |
6 | import { sendAddVideo } from '../../activitypub/send' | ||
6 | import { JobScheduler } from '../job-scheduler' | 7 | import { JobScheduler } from '../job-scheduler' |
7 | import { TranscodingJobPayload } from './transcoding-job-scheduler' | 8 | import { TranscodingJobPayload } from './transcoding-job-scheduler' |
8 | import { shareVideoByServer } from '../../activitypub/share' | ||
9 | 9 | ||
10 | async function process (data: TranscodingJobPayload, jobId: number) { | 10 | async function process (data: TranscodingJobPayload, jobId: number) { |
11 | const video = await db.Video.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) | 11 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) |
12 | // No video, maybe deleted? | 12 | // No video, maybe deleted? |
13 | if (!video) { | 13 | if (!video) { |
14 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | 14 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) |
@@ -25,13 +25,13 @@ function onError (err: Error, jobId: number) { | |||
25 | return Promise.resolve() | 25 | return Promise.resolve() |
26 | } | 26 | } |
27 | 27 | ||
28 | async function onSuccess (jobId: number, video: VideoInstance, jobScheduler: JobScheduler<TranscodingJobPayload, VideoInstance>) { | 28 | async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler<TranscodingJobPayload, VideoModel>) { |
29 | if (video === undefined) return undefined | 29 | if (video === undefined) return undefined |
30 | 30 | ||
31 | logger.info('Job %d is a success.', jobId) | 31 | logger.info('Job %d is a success.', jobId) |
32 | 32 | ||
33 | // Maybe the video changed in database, refresh it | 33 | // Maybe the video changed in database, refresh it |
34 | const videoDatabase = await db.Video.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | 34 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) |
35 | // Video does not exist anymore | 35 | // Video does not exist anymore |
36 | if (!videoDatabase) return undefined | 36 | if (!videoDatabase) return undefined |
37 | 37 | ||
@@ -50,7 +50,7 @@ async function onSuccess (jobId: number, video: VideoInstance, jobScheduler: Job | |||
50 | 50 | ||
51 | if (resolutionsEnabled.length !== 0) { | 51 | if (resolutionsEnabled.length !== 0) { |
52 | try { | 52 | try { |
53 | await db.sequelize.transaction(async t => { | 53 | await sequelizeTypescript.transaction(async t => { |
54 | const tasks: Bluebird<any>[] = [] | 54 | const tasks: Bluebird<any>[] = [] |
55 | 55 | ||
56 | for (const resolution of resolutionsEnabled) { | 56 | for (const resolution of resolutionsEnabled) { |
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts index 867580200..8957b4565 100644 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts +++ b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts | |||
@@ -1,11 +1,10 @@ | |||
1 | import { VideoResolution } from '../../../../shared' | 1 | import { VideoResolution } from '../../../../shared' |
2 | import { logger } from '../../../helpers' | 2 | import { logger } from '../../../helpers' |
3 | import { database as db } from '../../../initializers/database' | 3 | import { VideoModel } from '../../../models/video/video' |
4 | import { VideoInstance } from '../../../models' | 4 | import { sendUpdateVideo } from '../../activitypub/send' |
5 | import { sendUpdateVideo } from '../../activitypub/send/send-update' | ||
6 | 5 | ||
7 | async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { | 6 | async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { |
8 | const video = await db.Video.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) | 7 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID) |
9 | // No video, maybe deleted? | 8 | // No video, maybe deleted? |
10 | if (!video) { | 9 | if (!video) { |
11 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) | 10 | logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid }) |
@@ -22,13 +21,13 @@ function onError (err: Error, jobId: number) { | |||
22 | return Promise.resolve() | 21 | return Promise.resolve() |
23 | } | 22 | } |
24 | 23 | ||
25 | async function onSuccess (jobId: number, video: VideoInstance) { | 24 | async function onSuccess (jobId: number, video: VideoModel) { |
26 | if (video === undefined) return undefined | 25 | if (video === undefined) return undefined |
27 | 26 | ||
28 | logger.info('Job %d is a success.', jobId) | 27 | logger.info('Job %d is a success.', jobId) |
29 | 28 | ||
30 | // Maybe the video changed in database, refresh it | 29 | // Maybe the video changed in database, refresh it |
31 | const videoDatabase = await db.Video.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | 30 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) |
32 | // Video does not exist anymore | 31 | // Video does not exist anymore |
33 | if (!videoDatabase) return undefined | 32 | if (!videoDatabase) return undefined |
34 | 33 | ||