diff options
Diffstat (limited to 'server/lib')
20 files changed, 134 insertions, 633 deletions
diff --git a/server/lib/activitypub/index.ts b/server/lib/activitypub/index.ts index 740800606..f8d56528a 100644 --- a/server/lib/activitypub/index.ts +++ b/server/lib/activitypub/index.ts | |||
@@ -1,3 +1,4 @@ | |||
1 | export * from './process-create' | 1 | export * from './process-create' |
2 | export * from './process-flag' | 2 | export * from './process-flag' |
3 | export * from './process-update' | 3 | export * from './process-update' |
4 | export * from './send-request' | ||
diff --git a/server/lib/activitypub/misc.ts b/server/lib/activitypub/misc.ts index 05e77ebc3..2cf0c4fd1 100644 --- a/server/lib/activitypub/misc.ts +++ b/server/lib/activitypub/misc.ts | |||
@@ -8,7 +8,11 @@ import { VideoChannelInstance } from '../../models/video/video-channel-interface | |||
8 | import { VideoFileAttributes } from '../../models/video/video-file-interface' | 8 | import { VideoFileAttributes } from '../../models/video/video-file-interface' |
9 | import { VideoAttributes, VideoInstance } from '../../models/video/video-interface' | 9 | import { VideoAttributes, VideoInstance } from '../../models/video/video-interface' |
10 | 10 | ||
11 | async function videoActivityObjectToDBAttributes (videoChannel: VideoChannelInstance, videoObject: VideoTorrentObject, t: Sequelize.Transaction) { | 11 | async function videoActivityObjectToDBAttributes ( |
12 | videoChannel: VideoChannelInstance, | ||
13 | videoObject: VideoTorrentObject, | ||
14 | t: Sequelize.Transaction | ||
15 | ) { | ||
12 | const videoFromDatabase = await db.Video.loadByUUIDOrURL(videoObject.uuid, videoObject.id, t) | 16 | const videoFromDatabase = await db.Video.loadByUUIDOrURL(videoObject.uuid, videoObject.id, t) |
13 | if (videoFromDatabase) throw new Error('Video with this UUID/Url already exists.') | 17 | if (videoFromDatabase) throw new Error('Video with this UUID/Url already exists.') |
14 | 18 | ||
diff --git a/server/lib/activitypub/process-flag.ts b/server/lib/activitypub/process-flag.ts index 6fa862ee9..b562dce4d 100644 --- a/server/lib/activitypub/process-flag.ts +++ b/server/lib/activitypub/process-flag.ts | |||
@@ -5,7 +5,7 @@ import { | |||
5 | } from '../../../shared' | 5 | } from '../../../shared' |
6 | 6 | ||
7 | function processFlagActivity (activity: ActivityCreate) { | 7 | function processFlagActivity (activity: ActivityCreate) { |
8 | // empty | 8 | return Promise.resolve(undefined) |
9 | } | 9 | } |
10 | 10 | ||
11 | // --------------------------------------------------------------------------- | 11 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/activitypub/send-request.ts b/server/lib/activitypub/send-request.ts index 6a31c226d..91101f5ad 100644 --- a/server/lib/activitypub/send-request.ts +++ b/server/lib/activitypub/send-request.ts | |||
@@ -1,5 +1,6 @@ | |||
1 | import * as Sequelize from 'sequelize' | 1 | import * as Sequelize from 'sequelize' |
2 | 2 | ||
3 | import { database as db } from '../../initializers' | ||
3 | import { | 4 | import { |
4 | AccountInstance, | 5 | AccountInstance, |
5 | VideoInstance, | 6 | VideoInstance, |
@@ -13,54 +14,66 @@ function sendCreateVideoChannel (videoChannel: VideoChannelInstance, t: Sequeliz | |||
13 | const videoChannelObject = videoChannel.toActivityPubObject() | 14 | const videoChannelObject = videoChannel.toActivityPubObject() |
14 | const data = createActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) | 15 | const data = createActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) |
15 | 16 | ||
16 | return broadcastToFollowers(data, t) | 17 | return broadcastToFollowers(data, videoChannel.Account, t) |
17 | } | 18 | } |
18 | 19 | ||
19 | function sendUpdateVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { | 20 | function sendUpdateVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { |
20 | const videoChannelObject = videoChannel.toActivityPubObject() | 21 | const videoChannelObject = videoChannel.toActivityPubObject() |
21 | const data = updateActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) | 22 | const data = updateActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) |
22 | 23 | ||
23 | return broadcastToFollowers(data, t) | 24 | return broadcastToFollowers(data, videoChannel.Account, t) |
24 | } | 25 | } |
25 | 26 | ||
26 | function sendDeleteVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { | 27 | function sendDeleteVideoChannel (videoChannel: VideoChannelInstance, t: Sequelize.Transaction) { |
27 | const videoChannelObject = videoChannel.toActivityPubObject() | 28 | const videoChannelObject = videoChannel.toActivityPubObject() |
28 | const data = deleteActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) | 29 | const data = deleteActivityData(videoChannel.url, videoChannel.Account, videoChannelObject) |
29 | 30 | ||
30 | return broadcastToFollowers(data, t) | 31 | return broadcastToFollowers(data, videoChannel.Account, t) |
31 | } | 32 | } |
32 | 33 | ||
33 | function sendAddVideo (video: VideoInstance, t: Sequelize.Transaction) { | 34 | function sendAddVideo (video: VideoInstance, t: Sequelize.Transaction) { |
34 | const videoObject = video.toActivityPubObject() | 35 | const videoObject = video.toActivityPubObject() |
35 | const data = addActivityData(video.url, video.VideoChannel.Account, video.VideoChannel.url, videoObject) | 36 | const data = addActivityData(video.url, video.VideoChannel.Account, video.VideoChannel.url, videoObject) |
36 | 37 | ||
37 | return broadcastToFollowers(data, t) | 38 | return broadcastToFollowers(data, video.VideoChannel.Account, t) |
38 | } | 39 | } |
39 | 40 | ||
40 | function sendUpdateVideo (video: VideoInstance, t: Sequelize.Transaction) { | 41 | function sendUpdateVideo (video: VideoInstance, t: Sequelize.Transaction) { |
41 | const videoObject = video.toActivityPubObject() | 42 | const videoObject = video.toActivityPubObject() |
42 | const data = updateActivityData(video.url, video.VideoChannel.Account, videoObject) | 43 | const data = updateActivityData(video.url, video.VideoChannel.Account, videoObject) |
43 | 44 | ||
44 | return broadcastToFollowers(data, t) | 45 | return broadcastToFollowers(data, video.VideoChannel.Account, t) |
45 | } | 46 | } |
46 | 47 | ||
47 | function sendDeleteVideo (video: VideoInstance, t: Sequelize.Transaction) { | 48 | function sendDeleteVideo (video: VideoInstance, t: Sequelize.Transaction) { |
48 | const videoObject = video.toActivityPubObject() | 49 | const videoObject = video.toActivityPubObject() |
49 | const data = deleteActivityData(video.url, video.VideoChannel.Account, videoObject) | 50 | const data = deleteActivityData(video.url, video.VideoChannel.Account, videoObject) |
50 | 51 | ||
51 | return broadcastToFollowers(data, t) | 52 | return broadcastToFollowers(data, video.VideoChannel.Account, t) |
52 | } | 53 | } |
53 | 54 | ||
54 | // --------------------------------------------------------------------------- | 55 | // --------------------------------------------------------------------------- |
55 | 56 | ||
56 | export { | 57 | export { |
57 | 58 | sendCreateVideoChannel, | |
59 | sendUpdateVideoChannel, | ||
60 | sendDeleteVideoChannel, | ||
61 | sendAddVideo, | ||
62 | sendUpdateVideo, | ||
63 | sendDeleteVideo | ||
58 | } | 64 | } |
59 | 65 | ||
60 | // --------------------------------------------------------------------------- | 66 | // --------------------------------------------------------------------------- |
61 | 67 | ||
62 | function broadcastToFollowers (data: any, t: Sequelize.Transaction) { | 68 | async function broadcastToFollowers (data: any, fromAccount: AccountInstance, t: Sequelize.Transaction) { |
63 | return httpRequestJobScheduler.createJob(t, 'http-request', 'httpRequestBroadcastHandler', data) | 69 | const result = await db.Account.listFollowerUrlsForApi(fromAccount.name, 0) |
70 | |||
71 | const jobPayload = { | ||
72 | uris: result.data, | ||
73 | body: data | ||
74 | } | ||
75 | |||
76 | return httpRequestJobScheduler.createJob(t, 'httpRequestBroadcastHandler', jobPayload) | ||
64 | } | 77 | } |
65 | 78 | ||
66 | function buildSignedActivity (byAccount: AccountInstance, data: Object) { | 79 | function buildSignedActivity (byAccount: AccountInstance, data: Object) { |
diff --git a/server/lib/cache/videos-preview-cache.ts b/server/lib/cache/videos-preview-cache.ts index 791ad1cbf..776f647a0 100644 --- a/server/lib/cache/videos-preview-cache.ts +++ b/server/lib/cache/videos-preview-cache.ts | |||
@@ -3,9 +3,8 @@ import { join } from 'path' | |||
3 | import { createWriteStream } from 'fs' | 3 | import { createWriteStream } from 'fs' |
4 | 4 | ||
5 | import { database as db, CONFIG, CACHE } from '../../initializers' | 5 | import { database as db, CONFIG, CACHE } from '../../initializers' |
6 | import { logger, unlinkPromise } from '../../helpers' | 6 | import { logger, unlinkPromise, fetchRemoteVideoPreview } from '../../helpers' |
7 | import { VideoInstance } from '../../models' | 7 | import { VideoInstance } from '../../models' |
8 | import { fetchRemotePreview } from '../../lib' | ||
9 | 8 | ||
10 | class VideosPreviewCache { | 9 | class VideosPreviewCache { |
11 | 10 | ||
@@ -54,7 +53,7 @@ class VideosPreviewCache { | |||
54 | } | 53 | } |
55 | 54 | ||
56 | private saveRemotePreviewAndReturnPath (video: VideoInstance) { | 55 | private saveRemotePreviewAndReturnPath (video: VideoInstance) { |
57 | const req = fetchRemotePreview(video) | 56 | const req = fetchRemoteVideoPreview(video) |
58 | 57 | ||
59 | return new Promise<string>((res, rej) => { | 58 | return new Promise<string>((res, rej) => { |
60 | const path = join(CACHE.DIRECTORIES.PREVIEWS, video.getPreviewName()) | 59 | const path = join(CACHE.DIRECTORIES.PREVIEWS, video.getPreviewName()) |
diff --git a/server/lib/index.ts b/server/lib/index.ts index bfb415ad2..d22ecb665 100644 --- a/server/lib/index.ts +++ b/server/lib/index.ts | |||
@@ -1,8 +1,6 @@ | |||
1 | export * from './activitypub' | 1 | export * from './activitypub' |
2 | export * from './cache' | 2 | export * from './cache' |
3 | export * from './jobs' | 3 | export * from './jobs' |
4 | export * from './request' | ||
5 | export * from './friends' | ||
6 | export * from './oauth-model' | 4 | export * from './oauth-model' |
7 | export * from './user' | 5 | export * from './user' |
8 | export * from './video-channel' | 6 | export * from './video-channel' |
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts index 6b6946d02..799b86e1c 100644 --- a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts +++ b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts | |||
@@ -1,19 +1,28 @@ | |||
1 | import * as Bluebird from 'bluebird' | ||
2 | |||
3 | import { database as db } from '../../../initializers/database' | ||
4 | import { logger } from '../../../helpers' | 1 | import { logger } from '../../../helpers' |
2 | import { doRequest } from '../../../helpers/requests' | ||
3 | import { HTTPRequestPayload } from './http-request-job-scheduler' | ||
4 | |||
5 | async function process (payload: HTTPRequestPayload, jobId: number) { | ||
6 | logger.info('Processing broadcast in job %d.', jobId) | ||
5 | 7 | ||
6 | async function process (data: { videoUUID: string }, jobId: number) { | 8 | const options = { |
9 | uri: '', | ||
10 | json: payload.body | ||
11 | } | ||
7 | 12 | ||
13 | for (const uri of payload.uris) { | ||
14 | options.uri = uri | ||
15 | await doRequest(options) | ||
16 | } | ||
8 | } | 17 | } |
9 | 18 | ||
10 | function onError (err: Error, jobId: number) { | 19 | function onError (err: Error, jobId: number) { |
11 | logger.error('Error when optimized video file in job %d.', jobId, err) | 20 | logger.error('Error when broadcasting request in job %d.', jobId, err) |
12 | return Promise.resolve() | 21 | return Promise.resolve() |
13 | } | 22 | } |
14 | 23 | ||
15 | async function onSuccess (jobId: number) { | 24 | async function onSuccess (jobId: number) { |
16 | 25 | logger.info('Job %d is a success.', jobId) | |
17 | } | 26 | } |
18 | 27 | ||
19 | // --------------------------------------------------------------------------- | 28 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts index 42cb9139c..ad3349866 100644 --- a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts +++ b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts | |||
@@ -4,7 +4,11 @@ import * as httpRequestBroadcastHandler from './http-request-broadcast-handler' | |||
4 | import * as httpRequestUnicastHandler from './http-request-unicast-handler' | 4 | import * as httpRequestUnicastHandler from './http-request-unicast-handler' |
5 | import { JobCategory } from '../../../../shared' | 5 | import { JobCategory } from '../../../../shared' |
6 | 6 | ||
7 | const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { | 7 | type HTTPRequestPayload = { |
8 | uris: string[] | ||
9 | body: any | ||
10 | } | ||
11 | const jobHandlers: { [ handlerName: string ]: JobHandler<HTTPRequestPayload, void> } = { | ||
8 | httpRequestBroadcastHandler, | 12 | httpRequestBroadcastHandler, |
9 | httpRequestUnicastHandler | 13 | httpRequestUnicastHandler |
10 | } | 14 | } |
@@ -13,5 +17,6 @@ const jobCategory: JobCategory = 'http-request' | |||
13 | const httpRequestJobScheduler = new JobScheduler(jobCategory, jobHandlers) | 17 | const httpRequestJobScheduler = new JobScheduler(jobCategory, jobHandlers) |
14 | 18 | ||
15 | export { | 19 | export { |
20 | HTTPRequestPayload, | ||
16 | httpRequestJobScheduler | 21 | httpRequestJobScheduler |
17 | } | 22 | } |
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts index 6b6946d02..13451f042 100644 --- a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts +++ b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts | |||
@@ -1,19 +1,26 @@ | |||
1 | import * as Bluebird from 'bluebird' | ||
2 | |||
3 | import { database as db } from '../../../initializers/database' | ||
4 | import { logger } from '../../../helpers' | 1 | import { logger } from '../../../helpers' |
2 | import { doRequest } from '../../../helpers/requests' | ||
3 | import { HTTPRequestPayload } from './http-request-job-scheduler' | ||
4 | |||
5 | async function process (payload: HTTPRequestPayload, jobId: number) { | ||
6 | logger.info('Processing unicast in job %d.', jobId) | ||
5 | 7 | ||
6 | async function process (data: { videoUUID: string }, jobId: number) { | 8 | const uri = payload.uris[0] |
9 | const options = { | ||
10 | uri, | ||
11 | json: payload.body | ||
12 | } | ||
7 | 13 | ||
14 | await doRequest(options) | ||
8 | } | 15 | } |
9 | 16 | ||
10 | function onError (err: Error, jobId: number) { | 17 | function onError (err: Error, jobId: number) { |
11 | logger.error('Error when optimized video file in job %d.', jobId, err) | 18 | logger.error('Error when sending request in job %d.', jobId, err) |
12 | return Promise.resolve() | 19 | return Promise.resolve() |
13 | } | 20 | } |
14 | 21 | ||
15 | async function onSuccess (jobId: number) { | 22 | async function onSuccess (jobId: number) { |
16 | 23 | logger.info('Job %d is a success.', jobId) | |
17 | } | 24 | } |
18 | 25 | ||
19 | // --------------------------------------------------------------------------- | 26 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index 89a4bca88..f10f745b3 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -1,28 +1,22 @@ | |||
1 | import { AsyncQueue, forever, queue } from 'async' | 1 | import { AsyncQueue, forever, queue } from 'async' |
2 | import * as Sequelize from 'sequelize' | 2 | import * as Sequelize from 'sequelize' |
3 | 3 | import { JobCategory } from '../../../shared' | |
4 | import { | ||
5 | database as db, | ||
6 | JOBS_FETCHING_INTERVAL, | ||
7 | JOBS_FETCH_LIMIT_PER_CYCLE, | ||
8 | JOB_STATES | ||
9 | } from '../../initializers' | ||
10 | import { logger } from '../../helpers' | 4 | import { logger } from '../../helpers' |
5 | import { database as db, JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers' | ||
11 | import { JobInstance } from '../../models' | 6 | import { JobInstance } from '../../models' |
12 | import { JobCategory } from '../../../shared' | ||
13 | 7 | ||
14 | export interface JobHandler<T> { | 8 | export interface JobHandler<P, T> { |
15 | process (data: object, jobId: number): T | 9 | process (data: object, jobId: number): Promise<T> |
16 | onError (err: Error, jobId: number) | 10 | onError (err: Error, jobId: number) |
17 | onSuccess (jobId: number, jobResult: T) | 11 | onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>) |
18 | } | 12 | } |
19 | type JobQueueCallback = (err: Error) => void | 13 | type JobQueueCallback = (err: Error) => void |
20 | 14 | ||
21 | class JobScheduler<T> { | 15 | class JobScheduler<P, T> { |
22 | 16 | ||
23 | constructor ( | 17 | constructor ( |
24 | private jobCategory: JobCategory, | 18 | private jobCategory: JobCategory, |
25 | private jobHandlers: { [ id: string ]: JobHandler<T> } | 19 | private jobHandlers: { [ id: string ]: JobHandler<P, T> } |
26 | ) {} | 20 | ) {} |
27 | 21 | ||
28 | async activate () { | 22 | async activate () { |
@@ -66,13 +60,14 @@ class JobScheduler<T> { | |||
66 | ) | 60 | ) |
67 | } | 61 | } |
68 | 62 | ||
69 | createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) { | 63 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) { |
70 | const createQuery = { | 64 | const createQuery = { |
71 | state: JOB_STATES.PENDING, | 65 | state: JOB_STATES.PENDING, |
72 | category, | 66 | category: this.jobCategory, |
73 | handlerName, | 67 | handlerName, |
74 | handlerInputData | 68 | handlerInputData |
75 | } | 69 | } |
70 | |||
76 | const options = { transaction } | 71 | const options = { transaction } |
77 | 72 | ||
78 | return db.Job.create(createQuery, options) | 73 | return db.Job.create(createQuery, options) |
@@ -95,7 +90,7 @@ class JobScheduler<T> { | |||
95 | await job.save() | 90 | await job.save() |
96 | 91 | ||
97 | try { | 92 | try { |
98 | const result = await jobHandler.process(job.handlerInputData, job.id) | 93 | const result: T = await jobHandler.process(job.handlerInputData, job.id) |
99 | await this.onJobSuccess(jobHandler, job, result) | 94 | await this.onJobSuccess(jobHandler, job, result) |
100 | } catch (err) { | 95 | } catch (err) { |
101 | logger.error('Error in job handler %s.', job.handlerName, err) | 96 | logger.error('Error in job handler %s.', job.handlerName, err) |
@@ -111,7 +106,7 @@ class JobScheduler<T> { | |||
111 | callback(null) | 106 | callback(null) |
112 | } | 107 | } |
113 | 108 | ||
114 | private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { | 109 | private async onJobError (jobHandler: JobHandler<P, T>, job: JobInstance, err: Error) { |
115 | job.state = JOB_STATES.ERROR | 110 | job.state = JOB_STATES.ERROR |
116 | 111 | ||
117 | try { | 112 | try { |
@@ -122,12 +117,12 @@ class JobScheduler<T> { | |||
122 | } | 117 | } |
123 | } | 118 | } |
124 | 119 | ||
125 | private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { | 120 | private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobInstance, jobResult: T) { |
126 | job.state = JOB_STATES.SUCCESS | 121 | job.state = JOB_STATES.SUCCESS |
127 | 122 | ||
128 | try { | 123 | try { |
129 | await job.save() | 124 | await job.save() |
130 | jobHandler.onSuccess(job.id, jobResult) | 125 | jobHandler.onSuccess(job.id, jobResult, this) |
131 | } catch (err) { | 126 | } catch (err) { |
132 | this.cannotSaveJobError(err) | 127 | this.cannotSaveJobError(err) |
133 | } | 128 | } |
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts index d7c614fb8..c5efe8eeb 100644 --- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts +++ b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts | |||
@@ -1,10 +1,14 @@ | |||
1 | import { JobScheduler, JobHandler } from '../job-scheduler' | 1 | import { JobCategory } from '../../../../shared' |
2 | 2 | import { JobHandler, JobScheduler } from '../job-scheduler' | |
3 | import * as videoFileOptimizer from './video-file-optimizer-handler' | 3 | import * as videoFileOptimizer from './video-file-optimizer-handler' |
4 | import * as videoFileTranscoder from './video-file-transcoder-handler' | 4 | import * as videoFileTranscoder from './video-file-transcoder-handler' |
5 | import { JobCategory } from '../../../../shared' | 5 | import { VideoInstance } from '../../../models/video/video-interface' |
6 | 6 | ||
7 | const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { | 7 | type TranscodingJobPayload = { |
8 | videoUUID: string | ||
9 | resolution?: number | ||
10 | } | ||
11 | const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoInstance> } = { | ||
8 | videoFileOptimizer, | 12 | videoFileOptimizer, |
9 | videoFileTranscoder | 13 | videoFileTranscoder |
10 | } | 14 | } |
@@ -13,5 +17,6 @@ const jobCategory: JobCategory = 'transcoding' | |||
13 | const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers) | 17 | const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers) |
14 | 18 | ||
15 | export { | 19 | export { |
20 | TranscodingJobPayload, | ||
16 | transcodingJobScheduler | 21 | transcodingJobScheduler |
17 | } | 22 | } |
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts index f019c28bc..47603a66c 100644 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts +++ b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts | |||
@@ -1,12 +1,13 @@ | |||
1 | import * as Bluebird from 'bluebird' | 1 | import * as Bluebird from 'bluebird' |
2 | import { computeResolutionsToTranscode, logger } from '../../../helpers' | ||
2 | 3 | ||
3 | import { database as db } from '../../../initializers/database' | 4 | import { database as db } from '../../../initializers/database' |
4 | import { logger, computeResolutionsToTranscode } from '../../../helpers' | ||
5 | import { VideoInstance } from '../../../models' | 5 | import { VideoInstance } from '../../../models' |
6 | import { addVideoToFriends } from '../../friends' | 6 | import { sendAddVideo } from '../../activitypub/send-request' |
7 | import { JobScheduler } from '../job-scheduler' | 7 | import { JobScheduler } from '../job-scheduler' |
8 | import { TranscodingJobPayload } from './transcoding-job-scheduler' | ||
8 | 9 | ||
9 | async function process (data: { videoUUID: string }, jobId: number) { | 10 | async function process (data: TranscodingJobPayload, jobId: number) { |
10 | const video = await db.Video.loadByUUIDAndPopulateAccountAndPodAndTags(data.videoUUID) | 11 | const video = await db.Video.loadByUUIDAndPopulateAccountAndPodAndTags(data.videoUUID) |
11 | // No video, maybe deleted? | 12 | // No video, maybe deleted? |
12 | if (!video) { | 13 | if (!video) { |
@@ -24,7 +25,7 @@ function onError (err: Error, jobId: number) { | |||
24 | return Promise.resolve() | 25 | return Promise.resolve() |
25 | } | 26 | } |
26 | 27 | ||
27 | async function onSuccess (jobId: number, video: VideoInstance) { | 28 | async function onSuccess (jobId: number, video: VideoInstance, jobScheduler: JobScheduler<TranscodingJobPayload, VideoInstance>) { |
28 | if (video === undefined) return undefined | 29 | if (video === undefined) return undefined |
29 | 30 | ||
30 | logger.info('Job %d is a success.', jobId) | 31 | logger.info('Job %d is a success.', jobId) |
@@ -34,10 +35,8 @@ async function onSuccess (jobId: number, video: VideoInstance) { | |||
34 | // Video does not exist anymore | 35 | // Video does not exist anymore |
35 | if (!videoDatabase) return undefined | 36 | if (!videoDatabase) return undefined |
36 | 37 | ||
37 | const remoteVideo = await videoDatabase.toAddRemoteJSON() | 38 | // Now we'll add the video's meta data to our followers |
38 | 39 | await sendAddVideo(video, undefined) | |
39 | // Now we'll add the video's meta data to our friends | ||
40 | await addVideoToFriends(remoteVideo, null) | ||
41 | 40 | ||
42 | const originalFileHeight = await videoDatabase.getOriginalFileHeight() | 41 | const originalFileHeight = await videoDatabase.getOriginalFileHeight() |
43 | // Create transcoding jobs if there are enabled resolutions | 42 | // Create transcoding jobs if there are enabled resolutions |
@@ -59,7 +58,7 @@ async function onSuccess (jobId: number, video: VideoInstance) { | |||
59 | resolution | 58 | resolution |
60 | } | 59 | } |
61 | 60 | ||
62 | const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) | 61 | const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput) |
63 | tasks.push(p) | 62 | tasks.push(p) |
64 | } | 63 | } |
65 | 64 | ||
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts index 397b95795..77e5d9f7f 100644 --- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts +++ b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts | |||
@@ -1,8 +1,8 @@ | |||
1 | import { database as db } from '../../../initializers/database' | 1 | import { VideoResolution } from '../../../../shared' |
2 | import { updateVideoToFriends } from '../../friends' | ||
3 | import { logger } from '../../../helpers' | 2 | import { logger } from '../../../helpers' |
3 | import { database as db } from '../../../initializers/database' | ||
4 | import { VideoInstance } from '../../../models' | 4 | import { VideoInstance } from '../../../models' |
5 | import { VideoResolution } from '../../../../shared' | 5 | import { sendUpdateVideo } from '../../activitypub/send-request' |
6 | 6 | ||
7 | async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { | 7 | async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { |
8 | const video = await db.Video.loadByUUIDAndPopulateAccountAndPodAndTags(data.videoUUID) | 8 | const video = await db.Video.loadByUUIDAndPopulateAccountAndPodAndTags(data.videoUUID) |
@@ -32,10 +32,7 @@ async function onSuccess (jobId: number, video: VideoInstance) { | |||
32 | // Video does not exist anymore | 32 | // Video does not exist anymore |
33 | if (!videoDatabase) return undefined | 33 | if (!videoDatabase) return undefined |
34 | 34 | ||
35 | const remoteVideo = videoDatabase.toUpdateRemoteJSON() | 35 | await sendUpdateVideo(video, undefined) |
36 | |||
37 | // Now we'll add the video's meta data to our friends | ||
38 | await updateVideoToFriends(remoteVideo, null) | ||
39 | 36 | ||
40 | return undefined | 37 | return undefined |
41 | } | 38 | } |
diff --git a/server/lib/request/abstract-request-scheduler.ts b/server/lib/request/abstract-request-scheduler.ts deleted file mode 100644 index f838c47f2..000000000 --- a/server/lib/request/abstract-request-scheduler.ts +++ /dev/null | |||
@@ -1,168 +0,0 @@ | |||
1 | import { isEmpty } from 'lodash' | ||
2 | import * as Bluebird from 'bluebird' | ||
3 | |||
4 | import { database as db } from '../../initializers/database' | ||
5 | import { logger, makeSecureRequest } from '../../helpers' | ||
6 | import { AbstractRequestClass, AbstractRequestToPodClass, PodInstance } from '../../models' | ||
7 | import { | ||
8 | API_VERSION, | ||
9 | REQUESTS_IN_PARALLEL, | ||
10 | REQUESTS_INTERVAL | ||
11 | } from '../../initializers' | ||
12 | |||
13 | interface RequestsObjects<U> { | ||
14 | [ id: string ]: { | ||
15 | toPod: PodInstance | ||
16 | endpoint: string | ||
17 | ids: number[] // ids | ||
18 | datas: U[] | ||
19 | } | ||
20 | } | ||
21 | |||
22 | abstract class AbstractRequestScheduler <T> { | ||
23 | requestInterval: number | ||
24 | limitPods: number | ||
25 | limitPerPod: number | ||
26 | |||
27 | protected lastRequestTimestamp: number | ||
28 | protected timer: NodeJS.Timer | ||
29 | protected description: string | ||
30 | |||
31 | constructor () { | ||
32 | this.lastRequestTimestamp = 0 | ||
33 | this.timer = null | ||
34 | this.requestInterval = REQUESTS_INTERVAL | ||
35 | } | ||
36 | |||
37 | abstract getRequestModel (): AbstractRequestClass<T> | ||
38 | abstract getRequestToPodModel (): AbstractRequestToPodClass | ||
39 | abstract buildRequestsObjects (requestsGrouped: T): RequestsObjects<any> | ||
40 | |||
41 | activate () { | ||
42 | logger.info('Requests scheduler activated.') | ||
43 | this.lastRequestTimestamp = Date.now() | ||
44 | |||
45 | this.timer = setInterval(() => { | ||
46 | this.lastRequestTimestamp = Date.now() | ||
47 | this.makeRequests() | ||
48 | }, this.requestInterval) | ||
49 | } | ||
50 | |||
51 | deactivate () { | ||
52 | logger.info('Requests scheduler deactivated.') | ||
53 | clearInterval(this.timer) | ||
54 | this.timer = null | ||
55 | } | ||
56 | |||
57 | forceSend () { | ||
58 | logger.info('Force requests scheduler sending.') | ||
59 | this.makeRequests() | ||
60 | } | ||
61 | |||
62 | remainingMilliSeconds () { | ||
63 | if (this.timer === null) return -1 | ||
64 | |||
65 | return REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp) | ||
66 | } | ||
67 | |||
68 | remainingRequestsCount () { | ||
69 | return this.getRequestModel().countTotalRequests() | ||
70 | } | ||
71 | |||
72 | flush () { | ||
73 | return this.getRequestModel().removeAll() | ||
74 | } | ||
75 | |||
76 | // --------------------------------------------------------------------------- | ||
77 | |||
78 | // Make a requests to friends of a certain type | ||
79 | protected async makeRequest (toPod: PodInstance, requestEndpoint: string, requestsToMake: any) { | ||
80 | const params = { | ||
81 | toPod: toPod, | ||
82 | method: 'POST' as 'POST', | ||
83 | path: '/api/' + API_VERSION + '/remote/' + requestEndpoint, | ||
84 | data: requestsToMake // Requests we need to make | ||
85 | } | ||
86 | |||
87 | // Make multiple retry requests to all of pods | ||
88 | // The function fire some useful callbacks | ||
89 | try { | ||
90 | const { response } = await makeSecureRequest(params) | ||
91 | |||
92 | // 400 because if the other pod is not up to date, it may not understand our request | ||
93 | if ([ 200, 201, 204, 400 ].indexOf(response.statusCode) === -1) { | ||
94 | throw new Error('Status code not 20x or 400 : ' + response.statusCode) | ||
95 | } | ||
96 | } catch (err) { | ||
97 | logger.error('Error sending secure request to %s pod.', toPod.host, err) | ||
98 | |||
99 | throw err | ||
100 | } | ||
101 | } | ||
102 | |||
103 | // Make all the requests of the scheduler | ||
104 | protected async makeRequests () { | ||
105 | let requestsGrouped: T | ||
106 | |||
107 | try { | ||
108 | requestsGrouped = await this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod) | ||
109 | } catch (err) { | ||
110 | logger.error('Cannot get the list of "%s".', this.description, { error: err.stack }) | ||
111 | throw err | ||
112 | } | ||
113 | |||
114 | // We want to group requests by destinations pod and endpoint | ||
115 | const requestsToMake = this.buildRequestsObjects(requestsGrouped) | ||
116 | |||
117 | // If there are no requests, abort | ||
118 | if (isEmpty(requestsToMake) === true) { | ||
119 | logger.info('No "%s" to make.', this.description) | ||
120 | return { goodPods: [], badPods: [] } | ||
121 | } | ||
122 | |||
123 | logger.info('Making "%s" to friends.', this.description) | ||
124 | |||
125 | const goodPods: number[] = [] | ||
126 | const badPods: number[] = [] | ||
127 | |||
128 | await Bluebird.map(Object.keys(requestsToMake), async hashKey => { | ||
129 | const requestToMake = requestsToMake[hashKey] | ||
130 | const toPod: PodInstance = requestToMake.toPod | ||
131 | |||
132 | try { | ||
133 | await this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas) | ||
134 | logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids }) | ||
135 | goodPods.push(requestToMake.toPod.id) | ||
136 | |||
137 | this.afterRequestHook() | ||
138 | |||
139 | // Remove the pod id of these request ids | ||
140 | await this.getRequestToPodModel() | ||
141 | .removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id) | ||
142 | } catch (err) { | ||
143 | badPods.push(requestToMake.toPod.id) | ||
144 | logger.info('Cannot make request to %s.', toPod.host, err) | ||
145 | } | ||
146 | }, { concurrency: REQUESTS_IN_PARALLEL }) | ||
147 | |||
148 | this.afterRequestsHook() | ||
149 | |||
150 | // All the requests were made, we update the pods score | ||
151 | db.Pod.updatePodsScore(goodPods, badPods) | ||
152 | } | ||
153 | |||
154 | protected afterRequestHook () { | ||
155 | // Nothing to do, let children re-implement it | ||
156 | } | ||
157 | |||
158 | protected afterRequestsHook () { | ||
159 | // Nothing to do, let children re-implement it | ||
160 | } | ||
161 | } | ||
162 | |||
163 | // --------------------------------------------------------------------------- | ||
164 | |||
165 | export { | ||
166 | AbstractRequestScheduler, | ||
167 | RequestsObjects | ||
168 | } | ||
diff --git a/server/lib/request/index.ts b/server/lib/request/index.ts deleted file mode 100644 index 47d60e5b4..000000000 --- a/server/lib/request/index.ts +++ /dev/null | |||
@@ -1,4 +0,0 @@ | |||
1 | export * from './abstract-request-scheduler' | ||
2 | export * from './request-scheduler' | ||
3 | export * from './request-video-event-scheduler' | ||
4 | export * from './request-video-qadu-scheduler' | ||
diff --git a/server/lib/request/request-scheduler.ts b/server/lib/request/request-scheduler.ts deleted file mode 100644 index c3f7f6429..000000000 --- a/server/lib/request/request-scheduler.ts +++ /dev/null | |||
@@ -1,96 +0,0 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | |||
3 | import { database as db } from '../../initializers/database' | ||
4 | import { AbstractRequestScheduler, RequestsObjects } from './abstract-request-scheduler' | ||
5 | import { logger } from '../../helpers' | ||
6 | import { REQUESTS_LIMIT_PODS, REQUESTS_LIMIT_PER_POD } from '../../initializers' | ||
7 | import { RequestsGrouped } from '../../models' | ||
8 | import { RequestEndpoint, RemoteVideoRequest } from '../../../shared' | ||
9 | |||
10 | export type RequestSchedulerOptions = { | ||
11 | type: string | ||
12 | endpoint: RequestEndpoint | ||
13 | data: Object | ||
14 | toIds: number[] | ||
15 | transaction: Sequelize.Transaction | ||
16 | } | ||
17 | |||
18 | class RequestScheduler extends AbstractRequestScheduler<RequestsGrouped> { | ||
19 | constructor () { | ||
20 | super() | ||
21 | |||
22 | // We limit the size of the requests | ||
23 | this.limitPods = REQUESTS_LIMIT_PODS | ||
24 | this.limitPerPod = REQUESTS_LIMIT_PER_POD | ||
25 | |||
26 | this.description = 'requests' | ||
27 | } | ||
28 | |||
29 | getRequestModel () { | ||
30 | return db.Request | ||
31 | } | ||
32 | |||
33 | getRequestToPodModel () { | ||
34 | return db.RequestToPod | ||
35 | } | ||
36 | |||
37 | buildRequestsObjects (requestsGrouped: RequestsGrouped) { | ||
38 | const requestsToMakeGrouped: RequestsObjects<RemoteVideoRequest> = {} | ||
39 | |||
40 | for (const toPodId of Object.keys(requestsGrouped)) { | ||
41 | for (const data of requestsGrouped[toPodId]) { | ||
42 | const request = data.request | ||
43 | const pod = data.pod | ||
44 | const hashKey = toPodId + request.endpoint | ||
45 | |||
46 | if (!requestsToMakeGrouped[hashKey]) { | ||
47 | requestsToMakeGrouped[hashKey] = { | ||
48 | toPod: pod, | ||
49 | endpoint: request.endpoint, | ||
50 | ids: [], // request ids, to delete them from the DB in the future | ||
51 | datas: [] // requests data, | ||
52 | } | ||
53 | } | ||
54 | |||
55 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
56 | requestsToMakeGrouped[hashKey].datas.push(request.request) | ||
57 | } | ||
58 | } | ||
59 | |||
60 | return requestsToMakeGrouped | ||
61 | } | ||
62 | |||
63 | async createRequest ({ type, endpoint, data, toIds, transaction }: RequestSchedulerOptions) { | ||
64 | // If there are no destination pods abort | ||
65 | if (toIds.length === 0) return undefined | ||
66 | |||
67 | const createQuery = { | ||
68 | endpoint, | ||
69 | request: { | ||
70 | type: type, | ||
71 | data: data | ||
72 | } | ||
73 | } | ||
74 | |||
75 | const dbRequestOptions: Sequelize.CreateOptions = { | ||
76 | transaction | ||
77 | } | ||
78 | |||
79 | const request = await db.Request.create(createQuery, dbRequestOptions) | ||
80 | await request.setPods(toIds, dbRequestOptions) | ||
81 | } | ||
82 | |||
83 | // --------------------------------------------------------------------------- | ||
84 | |||
85 | afterRequestsHook () { | ||
86 | // Flush requests with no pod | ||
87 | this.getRequestModel().removeWithEmptyTo() | ||
88 | .catch(err => logger.error('Error when removing requests with no pods.', err)) | ||
89 | } | ||
90 | } | ||
91 | |||
92 | // --------------------------------------------------------------------------- | ||
93 | |||
94 | export { | ||
95 | RequestScheduler | ||
96 | } | ||
diff --git a/server/lib/request/request-video-event-scheduler.ts b/server/lib/request/request-video-event-scheduler.ts deleted file mode 100644 index 5f21287f0..000000000 --- a/server/lib/request/request-video-event-scheduler.ts +++ /dev/null | |||
@@ -1,129 +0,0 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | |||
3 | import { database as db } from '../../initializers/database' | ||
4 | import { AbstractRequestScheduler, RequestsObjects } from './abstract-request-scheduler' | ||
5 | import { | ||
6 | REQUESTS_VIDEO_EVENT_LIMIT_PODS, | ||
7 | REQUESTS_VIDEO_EVENT_LIMIT_PER_POD, | ||
8 | REQUEST_VIDEO_EVENT_ENDPOINT | ||
9 | } from '../../initializers' | ||
10 | import { RequestsVideoEventGrouped } from '../../models' | ||
11 | import { RequestVideoEventType, RemoteVideoEventRequest, RemoteVideoEventType } from '../../../shared' | ||
12 | |||
13 | export type RequestVideoEventSchedulerOptions = { | ||
14 | type: RequestVideoEventType | ||
15 | videoId: number | ||
16 | count?: number | ||
17 | transaction?: Sequelize.Transaction | ||
18 | } | ||
19 | |||
20 | class RequestVideoEventScheduler extends AbstractRequestScheduler<RequestsVideoEventGrouped> { | ||
21 | constructor () { | ||
22 | super() | ||
23 | |||
24 | // We limit the size of the requests | ||
25 | this.limitPods = REQUESTS_VIDEO_EVENT_LIMIT_PODS | ||
26 | this.limitPerPod = REQUESTS_VIDEO_EVENT_LIMIT_PER_POD | ||
27 | |||
28 | this.description = 'video event requests' | ||
29 | } | ||
30 | |||
31 | getRequestModel () { | ||
32 | return db.RequestVideoEvent | ||
33 | } | ||
34 | |||
35 | getRequestToPodModel () { | ||
36 | return db.RequestVideoEvent | ||
37 | } | ||
38 | |||
39 | buildRequestsObjects (eventRequests: RequestsVideoEventGrouped) { | ||
40 | const requestsToMakeGrouped: RequestsObjects<RemoteVideoEventRequest> = {} | ||
41 | |||
42 | /* Example: | ||
43 | { | ||
44 | pod1: { | ||
45 | video1: { views: 4, likes: 5 }, | ||
46 | video2: { likes: 5 } | ||
47 | } | ||
48 | } | ||
49 | */ | ||
50 | const eventsPerVideoPerPod: { | ||
51 | [ podId: string ]: { | ||
52 | [ videoUUID: string ]: { | ||
53 | views?: number | ||
54 | likes?: number | ||
55 | dislikes?: number | ||
56 | } | ||
57 | } | ||
58 | } = {} | ||
59 | |||
60 | // We group video events per video and per pod | ||
61 | // We add the counts of the same event types | ||
62 | for (const toPodId of Object.keys(eventRequests)) { | ||
63 | for (const eventToProcess of eventRequests[toPodId]) { | ||
64 | if (!eventsPerVideoPerPod[toPodId]) eventsPerVideoPerPod[toPodId] = {} | ||
65 | |||
66 | if (!requestsToMakeGrouped[toPodId]) { | ||
67 | requestsToMakeGrouped[toPodId] = { | ||
68 | toPod: eventToProcess.pod, | ||
69 | endpoint: REQUEST_VIDEO_EVENT_ENDPOINT, | ||
70 | ids: [], // request ids, to delete them from the DB in the future | ||
71 | datas: [] // requests data | ||
72 | } | ||
73 | } | ||
74 | requestsToMakeGrouped[toPodId].ids.push(eventToProcess.id) | ||
75 | |||
76 | const eventsPerVideo = eventsPerVideoPerPod[toPodId] | ||
77 | const uuid = eventToProcess.video.uuid | ||
78 | if (!eventsPerVideo[uuid]) eventsPerVideo[uuid] = {} | ||
79 | |||
80 | const events = eventsPerVideo[uuid] | ||
81 | if (!events[eventToProcess.type]) events[eventToProcess.type] = 0 | ||
82 | |||
83 | events[eventToProcess.type] += eventToProcess.count | ||
84 | } | ||
85 | } | ||
86 | |||
87 | // Now we build our requests array per pod | ||
88 | for (const toPodId of Object.keys(eventsPerVideoPerPod)) { | ||
89 | const eventsForPod = eventsPerVideoPerPod[toPodId] | ||
90 | |||
91 | for (const uuid of Object.keys(eventsForPod)) { | ||
92 | const eventsForVideo = eventsForPod[uuid] | ||
93 | |||
94 | for (const eventType of Object.keys(eventsForVideo)) { | ||
95 | requestsToMakeGrouped[toPodId].datas.push({ | ||
96 | data: { | ||
97 | uuid, | ||
98 | eventType: eventType as RemoteVideoEventType, | ||
99 | count: +eventsForVideo[eventType] | ||
100 | } | ||
101 | }) | ||
102 | } | ||
103 | } | ||
104 | } | ||
105 | |||
106 | return requestsToMakeGrouped | ||
107 | } | ||
108 | |||
109 | createRequest ({ type, videoId, count, transaction }: RequestVideoEventSchedulerOptions) { | ||
110 | if (count === undefined) count = 1 | ||
111 | |||
112 | const dbRequestOptions: Sequelize.CreateOptions = {} | ||
113 | if (transaction) dbRequestOptions.transaction = transaction | ||
114 | |||
115 | const createQuery = { | ||
116 | type, | ||
117 | count, | ||
118 | videoId | ||
119 | } | ||
120 | |||
121 | return db.RequestVideoEvent.create(createQuery, dbRequestOptions) | ||
122 | } | ||
123 | } | ||
124 | |||
125 | // --------------------------------------------------------------------------- | ||
126 | |||
127 | export { | ||
128 | RequestVideoEventScheduler | ||
129 | } | ||
diff --git a/server/lib/request/request-video-qadu-scheduler.ts b/server/lib/request/request-video-qadu-scheduler.ts deleted file mode 100644 index 24ee59d29..000000000 --- a/server/lib/request/request-video-qadu-scheduler.ts +++ /dev/null | |||
@@ -1,148 +0,0 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | |||
3 | import { database as db } from '../../initializers/database' | ||
4 | import { AbstractRequestScheduler, RequestsObjects } from './abstract-request-scheduler' | ||
5 | import { logger } from '../../helpers' | ||
6 | import { | ||
7 | REQUESTS_VIDEO_QADU_LIMIT_PODS, | ||
8 | REQUESTS_VIDEO_QADU_LIMIT_PER_POD, | ||
9 | REQUEST_VIDEO_QADU_ENDPOINT, | ||
10 | REQUEST_VIDEO_QADU_TYPES | ||
11 | } from '../../initializers' | ||
12 | import { RequestsVideoQaduGrouped, PodInstance } from '../../models' | ||
13 | import { RemoteQaduVideoRequest, RequestVideoQaduType } from '../../../shared' | ||
14 | |||
15 | // We create a custom interface because we need "videos" attribute for our computations | ||
16 | interface RequestsObjectsCustom<U> extends RequestsObjects<U> { | ||
17 | [ id: string ]: { | ||
18 | toPod: PodInstance | ||
19 | endpoint: string | ||
20 | ids: number[] // ids | ||
21 | datas: U[] | ||
22 | |||
23 | videos: { | ||
24 | [ uuid: string ]: { | ||
25 | uuid: string | ||
26 | likes?: number | ||
27 | dislikes?: number | ||
28 | views?: number | ||
29 | } | ||
30 | } | ||
31 | } | ||
32 | } | ||
33 | |||
34 | export type RequestVideoQaduSchedulerOptions = { | ||
35 | type: RequestVideoQaduType | ||
36 | videoId: number | ||
37 | transaction?: Sequelize.Transaction | ||
38 | } | ||
39 | |||
40 | class RequestVideoQaduScheduler extends AbstractRequestScheduler<RequestsVideoQaduGrouped> { | ||
41 | constructor () { | ||
42 | super() | ||
43 | |||
44 | // We limit the size of the requests | ||
45 | this.limitPods = REQUESTS_VIDEO_QADU_LIMIT_PODS | ||
46 | this.limitPerPod = REQUESTS_VIDEO_QADU_LIMIT_PER_POD | ||
47 | |||
48 | this.description = 'video QADU requests' | ||
49 | } | ||
50 | |||
51 | getRequestModel () { | ||
52 | return db.RequestVideoQadu | ||
53 | } | ||
54 | |||
55 | getRequestToPodModel () { | ||
56 | return db.RequestVideoQadu | ||
57 | } | ||
58 | |||
59 | buildRequestsObjects (requests: RequestsVideoQaduGrouped) { | ||
60 | const requestsToMakeGrouped: RequestsObjectsCustom<RemoteQaduVideoRequest> = {} | ||
61 | |||
62 | for (const toPodId of Object.keys(requests)) { | ||
63 | for (const data of requests[toPodId]) { | ||
64 | const request = data.request | ||
65 | const video = data.video | ||
66 | const pod = data.pod | ||
67 | const hashKey = toPodId | ||
68 | |||
69 | if (!requestsToMakeGrouped[hashKey]) { | ||
70 | requestsToMakeGrouped[hashKey] = { | ||
71 | toPod: pod, | ||
72 | endpoint: REQUEST_VIDEO_QADU_ENDPOINT, | ||
73 | ids: [], // request ids, to delete them from the DB in the future | ||
74 | datas: [], // requests data | ||
75 | videos: {} | ||
76 | } | ||
77 | } | ||
78 | |||
79 | // Maybe another attribute was filled for this video | ||
80 | let videoData = requestsToMakeGrouped[hashKey].videos[video.id] | ||
81 | if (!videoData) videoData = { uuid: null } | ||
82 | |||
83 | switch (request.type) { | ||
84 | case REQUEST_VIDEO_QADU_TYPES.LIKES: | ||
85 | videoData.likes = video.likes | ||
86 | break | ||
87 | |||
88 | case REQUEST_VIDEO_QADU_TYPES.DISLIKES: | ||
89 | videoData.dislikes = video.dislikes | ||
90 | break | ||
91 | |||
92 | case REQUEST_VIDEO_QADU_TYPES.VIEWS: | ||
93 | videoData.views = video.views | ||
94 | break | ||
95 | |||
96 | default: | ||
97 | logger.error('Unknown request video QADU type %s.', request.type) | ||
98 | return undefined | ||
99 | } | ||
100 | |||
101 | // Do not forget the uuid so the remote pod can identify the video | ||
102 | videoData.uuid = video.uuid | ||
103 | requestsToMakeGrouped[hashKey].ids.push(request.id) | ||
104 | |||
105 | // Maybe there are multiple quick and dirty update for the same video | ||
106 | // We use this hash map to dedupe them | ||
107 | requestsToMakeGrouped[hashKey].videos[video.id] = videoData | ||
108 | } | ||
109 | } | ||
110 | |||
111 | // Now we deduped similar quick and dirty updates, we can build our requests data | ||
112 | for (const hashKey of Object.keys(requestsToMakeGrouped)) { | ||
113 | for (const videoUUID of Object.keys(requestsToMakeGrouped[hashKey].videos)) { | ||
114 | const videoData = requestsToMakeGrouped[hashKey].videos[videoUUID] | ||
115 | |||
116 | requestsToMakeGrouped[hashKey].datas.push({ | ||
117 | data: videoData | ||
118 | }) | ||
119 | } | ||
120 | |||
121 | // We don't need it anymore, it was just to build our data array | ||
122 | delete requestsToMakeGrouped[hashKey].videos | ||
123 | } | ||
124 | |||
125 | return requestsToMakeGrouped | ||
126 | } | ||
127 | |||
128 | async createRequest ({ type, videoId, transaction }: RequestVideoQaduSchedulerOptions) { | ||
129 | const dbRequestOptions: Sequelize.BulkCreateOptions = {} | ||
130 | if (transaction) dbRequestOptions.transaction = transaction | ||
131 | |||
132 | // Send the update to all our friends | ||
133 | const podIds = await db.Pod.listAllIds(transaction) | ||
134 | const queries = [] | ||
135 | for (const podId of podIds) { | ||
136 | queries.push({ type, videoId, podId }) | ||
137 | } | ||
138 | |||
139 | await db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions) | ||
140 | return undefined | ||
141 | } | ||
142 | } | ||
143 | |||
144 | // --------------------------------------------------------------------------- | ||
145 | |||
146 | export { | ||
147 | RequestVideoQaduScheduler | ||
148 | } | ||
diff --git a/server/lib/user.ts b/server/lib/user.ts index 57c653e55..1094c2401 100644 --- a/server/lib/user.ts +++ b/server/lib/user.ts | |||
@@ -1,6 +1,9 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | import { getActivityPubUrl } from '../helpers/activitypub' | ||
3 | import { createPrivateAndPublicKeys } from '../helpers/peertube-crypto' | ||
1 | import { database as db } from '../initializers' | 4 | import { database as db } from '../initializers' |
5 | import { CONFIG } from '../initializers/constants' | ||
2 | import { UserInstance } from '../models' | 6 | import { UserInstance } from '../models' |
3 | import { addVideoAccountToFriends } from './friends' | ||
4 | import { createVideoChannel } from './video-channel' | 7 | import { createVideoChannel } from './video-channel' |
5 | 8 | ||
6 | async function createUserAccountAndChannel (user: UserInstance, validateUser = true) { | 9 | async function createUserAccountAndChannel (user: UserInstance, validateUser = true) { |
@@ -11,32 +14,46 @@ async function createUserAccountAndChannel (user: UserInstance, validateUser = t | |||
11 | } | 14 | } |
12 | 15 | ||
13 | const userCreated = await user.save(userOptions) | 16 | const userCreated = await user.save(userOptions) |
14 | const accountInstance = db.Account.build({ | 17 | const accountCreated = await createLocalAccount(user.username, user.id, null, t) |
15 | name: userCreated.username, | ||
16 | podId: null, // It is our pod | ||
17 | userId: userCreated.id | ||
18 | }) | ||
19 | |||
20 | const accountCreated = await accountInstance.save({ transaction: t }) | ||
21 | |||
22 | const remoteVideoAccount = accountCreated.toAddRemoteJSON() | ||
23 | |||
24 | // Now we'll add the video channel's meta data to our friends | ||
25 | const account = await addVideoAccountToFriends(remoteVideoAccount, t) | ||
26 | 18 | ||
27 | const videoChannelInfo = { | 19 | const videoChannelInfo = { |
28 | name: `Default ${userCreated.username} channel` | 20 | name: `Default ${userCreated.username} channel` |
29 | } | 21 | } |
30 | const videoChannel = await createVideoChannel(videoChannelInfo, accountCreated, t) | 22 | const videoChannel = await createVideoChannel(videoChannelInfo, accountCreated, t) |
31 | 23 | ||
32 | return { account, videoChannel } | 24 | return { account: accountCreated, videoChannel } |
33 | }) | 25 | }) |
34 | 26 | ||
35 | return res | 27 | return res |
36 | } | 28 | } |
37 | 29 | ||
30 | async function createLocalAccount (name: string, userId: number, applicationId: number, t: Sequelize.Transaction) { | ||
31 | const { publicKey, privateKey } = await createPrivateAndPublicKeys() | ||
32 | const url = getActivityPubUrl('account', name) | ||
33 | |||
34 | const accountInstance = db.Account.build({ | ||
35 | name, | ||
36 | url, | ||
37 | publicKey, | ||
38 | privateKey, | ||
39 | followersCount: 0, | ||
40 | followingCount: 0, | ||
41 | inboxUrl: url + '/inbox', | ||
42 | outboxUrl: url + '/outbox', | ||
43 | sharedInboxUrl: CONFIG.WEBSERVER.URL + '/inbox', | ||
44 | followersUrl: url + '/followers', | ||
45 | followingUrl: url + '/following', | ||
46 | userId, | ||
47 | applicationId, | ||
48 | podId: null // It is our pod | ||
49 | }) | ||
50 | |||
51 | return accountInstance.save({ transaction: t }) | ||
52 | } | ||
53 | |||
38 | // --------------------------------------------------------------------------- | 54 | // --------------------------------------------------------------------------- |
39 | 55 | ||
40 | export { | 56 | export { |
41 | createUserAccountAndChannel | 57 | createUserAccountAndChannel, |
58 | createLocalAccount | ||
42 | } | 59 | } |
diff --git a/server/lib/video-channel.ts b/server/lib/video-channel.ts index f81383ce8..459d9d4a8 100644 --- a/server/lib/video-channel.ts +++ b/server/lib/video-channel.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import * as Sequelize from 'sequelize' | 1 | import * as Sequelize from 'sequelize' |
2 | 2 | ||
3 | import { addVideoChannelToFriends } from './friends' | ||
4 | import { database as db } from '../initializers' | 3 | import { database as db } from '../initializers' |
5 | import { logger } from '../helpers' | 4 | import { logger } from '../helpers' |
6 | import { AccountInstance } from '../models' | 5 | import { AccountInstance } from '../models' |
7 | import { VideoChannelCreate } from '../../shared/models' | 6 | import { VideoChannelCreate } from '../../shared/models' |
7 | import { sendCreateVideoChannel } from './activitypub/send-request' | ||
8 | 8 | ||
9 | async function createVideoChannel (videoChannelInfo: VideoChannelCreate, account: AccountInstance, t: Sequelize.Transaction) { | 9 | async function createVideoChannel (videoChannelInfo: VideoChannelCreate, account: AccountInstance, t: Sequelize.Transaction) { |
10 | const videoChannelData = { | 10 | const videoChannelData = { |
@@ -22,10 +22,7 @@ async function createVideoChannel (videoChannelInfo: VideoChannelCreate, account | |||
22 | // Do not forget to add Account information to the created video channel | 22 | // Do not forget to add Account information to the created video channel |
23 | videoChannelCreated.Account = account | 23 | videoChannelCreated.Account = account |
24 | 24 | ||
25 | const remoteVideoChannel = videoChannelCreated.toAddRemoteJSON() | 25 | sendCreateVideoChannel(videoChannelCreated, t) |
26 | |||
27 | // Now we'll add the video channel's meta data to our friends | ||
28 | await addVideoChannelToFriends(remoteVideoChannel, t) | ||
29 | 26 | ||
30 | return videoChannelCreated | 27 | return videoChannelCreated |
31 | } | 28 | } |