diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-follow.ts | 9 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 8 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-fetcher.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-unicast.ts | 11 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-refresher.ts | 54 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/email.ts | 3 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | 29 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-file.ts | 106 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 27 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-views.ts | 21 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 15 |
11 files changed, 220 insertions, 65 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 36d0f237b..b4d381062 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -8,6 +8,7 @@ import { getOrCreateActorAndServerAndModel } from '../../activitypub/actor' | |||
8 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 8 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
9 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 9 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
10 | import { ActorModel } from '../../../models/activitypub/actor' | 10 | import { ActorModel } from '../../../models/activitypub/actor' |
11 | import { Notifier } from '../../notifier' | ||
11 | 12 | ||
12 | export type ActivitypubFollowPayload = { | 13 | export type ActivitypubFollowPayload = { |
13 | followerActorId: number | 14 | followerActorId: number |
@@ -42,7 +43,7 @@ export { | |||
42 | 43 | ||
43 | // --------------------------------------------------------------------------- | 44 | // --------------------------------------------------------------------------- |
44 | 45 | ||
45 | function follow (fromActor: ActorModel, targetActor: ActorModel) { | 46 | async function follow (fromActor: ActorModel, targetActor: ActorModel) { |
46 | if (fromActor.id === targetActor.id) { | 47 | if (fromActor.id === targetActor.id) { |
47 | throw new Error('Follower is the same than target actor.') | 48 | throw new Error('Follower is the same than target actor.') |
48 | } | 49 | } |
@@ -50,7 +51,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) { | |||
50 | // Same server, direct accept | 51 | // Same server, direct accept |
51 | const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending' | 52 | const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending' |
52 | 53 | ||
53 | return sequelizeTypescript.transaction(async t => { | 54 | const actorFollow = await sequelizeTypescript.transaction(async t => { |
54 | const [ actorFollow ] = await ActorFollowModel.findOrCreate({ | 55 | const [ actorFollow ] = await ActorFollowModel.findOrCreate({ |
55 | where: { | 56 | where: { |
56 | actorId: fromActor.id, | 57 | actorId: fromActor.id, |
@@ -68,5 +69,9 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) { | |||
68 | 69 | ||
69 | // Send a notification to remote server if our follow is not already accepted | 70 | // Send a notification to remote server if our follow is not already accepted |
70 | if (actorFollow.state !== 'accepted') await sendFollow(actorFollow) | 71 | if (actorFollow.state !== 'accepted') await sendFollow(actorFollow) |
72 | |||
73 | return actorFollow | ||
71 | }) | 74 | }) |
75 | |||
76 | if (actorFollow.state === 'accepted') Notifier.Instance.notifyOfNewFollow(actorFollow) | ||
72 | } | 77 | } |
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 03a9e12a4..9493945ff 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -3,8 +3,9 @@ import * as Bluebird from 'bluebird' | |||
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { doRequest } from '../../../helpers/requests' | 4 | import { doRequest } from '../../../helpers/requests' |
5 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 5 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
6 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
7 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' | 7 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' |
8 | import { ActorFollowScoreCache } from '../../cache' | ||
8 | 9 | ||
9 | export type ActivitypubHttpBroadcastPayload = { | 10 | export type ActivitypubHttpBroadcastPayload = { |
10 | uris: string[] | 11 | uris: string[] |
@@ -25,7 +26,8 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { | |||
25 | uri: '', | 26 | uri: '', |
26 | json: body, | 27 | json: body, |
27 | httpSignature: httpSignatureOptions, | 28 | httpSignature: httpSignatureOptions, |
28 | timeout: JOB_REQUEST_TIMEOUT | 29 | timeout: JOB_REQUEST_TIMEOUT, |
30 | headers: buildGlobalHeaders(body) | ||
29 | } | 31 | } |
30 | 32 | ||
31 | const badUrls: string[] = [] | 33 | const badUrls: string[] = [] |
@@ -37,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { | |||
37 | .catch(() => badUrls.push(uri)) | 39 | .catch(() => badUrls.push(uri)) |
38 | }, { concurrency: BROADCAST_CONCURRENCY }) | 40 | }, { concurrency: BROADCAST_CONCURRENCY }) |
39 | 41 | ||
40 | return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) | 42 | return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) |
41 | } | 43 | } |
42 | 44 | ||
43 | // --------------------------------------------------------------------------- | 45 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index 42217c27c..67ccfa995 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -23,7 +23,7 @@ async function processActivityPubHttpFetcher (job: Bull.Job) { | |||
23 | if (payload.videoId) video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) | 23 | if (payload.videoId) video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) |
24 | 24 | ||
25 | const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = { | 25 | const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = { |
26 | 'activity': items => processActivities(items), | 26 | 'activity': items => processActivities(items, { outboxUrl: payload.uri }), |
27 | 'video-likes': items => createRates(items, video, 'like'), | 27 | 'video-likes': items => createRates(items, video, 'like'), |
28 | 'video-dislikes': items => createRates(items, video, 'dislike'), | 28 | 'video-dislikes': items => createRates(items, video, 'dislike'), |
29 | 'video-shares': items => addVideoShares(items, video), | 29 | 'video-shares': items => addVideoShares(items, video), |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index c90d735f6..3973dcdc8 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -1,9 +1,9 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { doRequest } from '../../../helpers/requests' | 3 | import { doRequest } from '../../../helpers/requests' |
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 4 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
5 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | ||
6 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers' | 5 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers' |
6 | import { ActorFollowScoreCache } from '../../cache' | ||
7 | 7 | ||
8 | export type ActivitypubHttpUnicastPayload = { | 8 | export type ActivitypubHttpUnicastPayload = { |
9 | uri: string | 9 | uri: string |
@@ -25,14 +25,15 @@ async function processActivityPubHttpUnicast (job: Bull.Job) { | |||
25 | uri, | 25 | uri, |
26 | json: body, | 26 | json: body, |
27 | httpSignature: httpSignatureOptions, | 27 | httpSignature: httpSignatureOptions, |
28 | timeout: JOB_REQUEST_TIMEOUT | 28 | timeout: JOB_REQUEST_TIMEOUT, |
29 | headers: buildGlobalHeaders(body) | ||
29 | } | 30 | } |
30 | 31 | ||
31 | try { | 32 | try { |
32 | await doRequest(options) | 33 | await doRequest(options) |
33 | ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) | 34 | ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) |
34 | } catch (err) { | 35 | } catch (err) { |
35 | ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) | 36 | ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) |
36 | 37 | ||
37 | throw err | 38 | throw err |
38 | } | 39 | } |
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts new file mode 100644 index 000000000..454b975fe --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts | |||
@@ -0,0 +1,54 @@ | |||
1 | import * as Bull from 'bull' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { fetchVideoByUrl } from '../../../helpers/video' | ||
4 | import { refreshVideoIfNeeded, refreshActorIfNeeded } from '../../activitypub' | ||
5 | import { ActorModel } from '../../../models/activitypub/actor' | ||
6 | |||
7 | export type RefreshPayload = { | ||
8 | type: 'video' | 'actor' | ||
9 | url: string | ||
10 | } | ||
11 | |||
12 | async function refreshAPObject (job: Bull.Job) { | ||
13 | const payload = job.data as RefreshPayload | ||
14 | |||
15 | logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) | ||
16 | |||
17 | if (payload.type === 'video') return refreshVideo(payload.url) | ||
18 | if (payload.type === 'actor') return refreshActor(payload.url) | ||
19 | } | ||
20 | |||
21 | // --------------------------------------------------------------------------- | ||
22 | |||
23 | export { | ||
24 | refreshActor, | ||
25 | refreshAPObject | ||
26 | } | ||
27 | |||
28 | // --------------------------------------------------------------------------- | ||
29 | |||
30 | async function refreshVideo (videoUrl: string) { | ||
31 | const fetchType = 'all' as 'all' | ||
32 | const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } | ||
33 | |||
34 | const videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType) | ||
35 | if (videoFromDatabase) { | ||
36 | const refreshOptions = { | ||
37 | video: videoFromDatabase, | ||
38 | fetchedType: fetchType, | ||
39 | syncParam | ||
40 | } | ||
41 | |||
42 | await refreshVideoIfNeeded(refreshOptions) | ||
43 | } | ||
44 | } | ||
45 | |||
46 | async function refreshActor (actorUrl: string) { | ||
47 | const fetchType = 'all' as 'all' | ||
48 | const actor = await ActorModel.loadByUrlAndPopulateAccountAndChannel(actorUrl) | ||
49 | |||
50 | if (actor) { | ||
51 | await refreshActorIfNeeded(actor, fetchType) | ||
52 | } | ||
53 | |||
54 | } | ||
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 73d98ae54..220d0af32 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -6,13 +6,14 @@ export type EmailPayload = { | |||
6 | to: string[] | 6 | to: string[] |
7 | subject: string | 7 | subject: string |
8 | text: string | 8 | text: string |
9 | from?: string | ||
9 | } | 10 | } |
10 | 11 | ||
11 | async function processEmail (job: Bull.Job) { | 12 | async function processEmail (job: Bull.Job) { |
12 | const payload = job.data as EmailPayload | 13 | const payload = job.data as EmailPayload |
13 | logger.info('Processing email in job %d.', job.id) | 14 | logger.info('Processing email in job %d.', job.id) |
14 | 15 | ||
15 | return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text) | 16 | return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text, payload.from) |
16 | } | 17 | } |
17 | 18 | ||
18 | // --------------------------------------------------------------------------- | 19 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts index 36092665e..4961d4502 100644 --- a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -1,8 +1,12 @@ | |||
1 | import { buildSignedActivity } from '../../../../helpers/activitypub' | 1 | import { buildSignedActivity } from '../../../../helpers/activitypub' |
2 | import { getServerActor } from '../../../../helpers/utils' | 2 | import { getServerActor } from '../../../../helpers/utils' |
3 | import { ActorModel } from '../../../../models/activitypub/actor' | 3 | import { ActorModel } from '../../../../models/activitypub/actor' |
4 | import { sha256 } from '../../../../helpers/core-utils' | ||
5 | import { HTTP_SIGNATURE } from '../../../../initializers' | ||
4 | 6 | ||
5 | async function computeBody (payload: { body: any, signatureActorId?: number }) { | 7 | type Payload = { body: any, signatureActorId?: number } |
8 | |||
9 | async function computeBody (payload: Payload) { | ||
6 | let body = payload.body | 10 | let body = payload.body |
7 | 11 | ||
8 | if (payload.signatureActorId) { | 12 | if (payload.signatureActorId) { |
@@ -14,7 +18,7 @@ async function computeBody (payload: { body: any, signatureActorId?: number }) { | |||
14 | return body | 18 | return body |
15 | } | 19 | } |
16 | 20 | ||
17 | async function buildSignedRequestOptions (payload: { signatureActorId?: number }) { | 21 | async function buildSignedRequestOptions (payload: Payload) { |
18 | let actor: ActorModel | null | 22 | let actor: ActorModel | null |
19 | if (payload.signatureActorId) { | 23 | if (payload.signatureActorId) { |
20 | actor = await ActorModel.load(payload.signatureActorId) | 24 | actor = await ActorModel.load(payload.signatureActorId) |
@@ -26,14 +30,29 @@ async function buildSignedRequestOptions (payload: { signatureActorId?: number } | |||
26 | 30 | ||
27 | const keyId = actor.getWebfingerUrl() | 31 | const keyId = actor.getWebfingerUrl() |
28 | return { | 32 | return { |
29 | algorithm: 'rsa-sha256', | 33 | algorithm: HTTP_SIGNATURE.ALGORITHM, |
30 | authorizationHeaderName: 'Signature', | 34 | authorizationHeaderName: HTTP_SIGNATURE.HEADER_NAME, |
31 | keyId, | 35 | keyId, |
32 | key: actor.privateKey | 36 | key: actor.privateKey, |
37 | headers: HTTP_SIGNATURE.HEADERS_TO_SIGN | ||
38 | } | ||
39 | } | ||
40 | |||
41 | function buildGlobalHeaders (body: any) { | ||
42 | return { | ||
43 | 'Digest': buildDigest(body) | ||
33 | } | 44 | } |
34 | } | 45 | } |
35 | 46 | ||
47 | function buildDigest (body: any) { | ||
48 | const rawBody = typeof body === 'string' ? body : JSON.stringify(body) | ||
49 | |||
50 | return 'SHA-256=' + sha256(rawBody, 'base64') | ||
51 | } | ||
52 | |||
36 | export { | 53 | export { |
54 | buildDigest, | ||
55 | buildGlobalHeaders, | ||
37 | computeBody, | 56 | computeBody, |
38 | buildSignedRequestOptions | 57 | buildSignedRequestOptions |
39 | } | 58 | } |
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index 1463c93fc..04983155c 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -5,16 +5,18 @@ import { VideoModel } from '../../../models/video/video' | |||
5 | import { JobQueue } from '../job-queue' | 5 | import { JobQueue } from '../job-queue' |
6 | import { federateVideoIfNeeded } from '../../activitypub' | 6 | import { federateVideoIfNeeded } from '../../activitypub' |
7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
8 | import { sequelizeTypescript } from '../../../initializers' | 8 | import { sequelizeTypescript, CONFIG } from '../../../initializers' |
9 | import * as Bluebird from 'bluebird' | 9 | import * as Bluebird from 'bluebird' |
10 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' | 10 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' |
11 | import { importVideoFile, transcodeOriginalVideofile, optimizeOriginalVideofile } from '../../video-transcoding' | 11 | import { generateHlsPlaylist, importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding' |
12 | import { Notifier } from '../../notifier' | ||
12 | 13 | ||
13 | export type VideoFilePayload = { | 14 | export type VideoFilePayload = { |
14 | videoUUID: string | 15 | videoUUID: string |
15 | isNewVideo?: boolean | ||
16 | resolution?: VideoResolution | 16 | resolution?: VideoResolution |
17 | isNewVideo?: boolean | ||
17 | isPortraitMode?: boolean | 18 | isPortraitMode?: boolean |
19 | generateHlsPlaylist?: boolean | ||
18 | } | 20 | } |
19 | 21 | ||
20 | export type VideoFileImportPayload = { | 22 | export type VideoFileImportPayload = { |
@@ -50,34 +52,51 @@ async function processVideoFile (job: Bull.Job) { | |||
50 | return undefined | 52 | return undefined |
51 | } | 53 | } |
52 | 54 | ||
53 | // Transcoding in other resolution | 55 | if (payload.generateHlsPlaylist) { |
54 | if (payload.resolution) { | 56 | await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false) |
57 | |||
58 | await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) | ||
59 | } else if (payload.resolution) { // Transcoding in other resolution | ||
55 | await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) | 60 | await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) |
56 | 61 | ||
57 | await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video) | 62 | await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video, payload) |
58 | } else { | 63 | } else { |
59 | await optimizeOriginalVideofile(video) | 64 | await optimizeVideofile(video) |
60 | 65 | ||
61 | await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload.isNewVideo) | 66 | await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload) |
62 | } | 67 | } |
63 | 68 | ||
64 | return video | 69 | return video |
65 | } | 70 | } |
66 | 71 | ||
67 | async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { | 72 | async function onHlsPlaylistGenerationSuccess (video: VideoModel) { |
73 | if (video === undefined) return undefined | ||
74 | |||
75 | await sequelizeTypescript.transaction(async t => { | ||
76 | // Maybe the video changed in database, refresh it | ||
77 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | ||
78 | // Video does not exist anymore | ||
79 | if (!videoDatabase) return undefined | ||
80 | |||
81 | // If the video was not published, we consider it is a new one for other instances | ||
82 | await federateVideoIfNeeded(videoDatabase, false, t) | ||
83 | }) | ||
84 | } | ||
85 | |||
86 | async function onVideoFileTranscoderOrImportSuccess (video: VideoModel, payload?: VideoFilePayload) { | ||
68 | if (video === undefined) return undefined | 87 | if (video === undefined) return undefined |
69 | 88 | ||
70 | return sequelizeTypescript.transaction(async t => { | 89 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { |
71 | // Maybe the video changed in database, refresh it | 90 | // Maybe the video changed in database, refresh it |
72 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | 91 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) |
73 | // Video does not exist anymore | 92 | // Video does not exist anymore |
74 | if (!videoDatabase) return undefined | 93 | if (!videoDatabase) return undefined |
75 | 94 | ||
76 | let isNewVideo = false | 95 | let videoPublished = false |
77 | 96 | ||
78 | // We transcoded the video file in another format, now we can publish it | 97 | // We transcoded the video file in another format, now we can publish it |
79 | if (videoDatabase.state !== VideoState.PUBLISHED) { | 98 | if (videoDatabase.state !== VideoState.PUBLISHED) { |
80 | isNewVideo = true | 99 | videoPublished = true |
81 | 100 | ||
82 | videoDatabase.state = VideoState.PUBLISHED | 101 | videoDatabase.state = VideoState.PUBLISHED |
83 | videoDatabase.publishedAt = new Date() | 102 | videoDatabase.publishedAt = new Date() |
@@ -85,21 +104,29 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { | |||
85 | } | 104 | } |
86 | 105 | ||
87 | // If the video was not published, we consider it is a new one for other instances | 106 | // If the video was not published, we consider it is a new one for other instances |
88 | await federateVideoIfNeeded(videoDatabase, isNewVideo, t) | 107 | await federateVideoIfNeeded(videoDatabase, videoPublished, t) |
89 | 108 | ||
90 | return undefined | 109 | return { videoDatabase, videoPublished } |
91 | }) | 110 | }) |
111 | |||
112 | // don't notify prior to scheduled video update | ||
113 | if (videoPublished && !videoDatabase.ScheduleVideoUpdate) { | ||
114 | Notifier.Instance.notifyOnNewVideo(videoDatabase) | ||
115 | Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) | ||
116 | } | ||
117 | |||
118 | await createHlsJobIfEnabled(payload) | ||
92 | } | 119 | } |
93 | 120 | ||
94 | async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { | 121 | async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoFilePayload) { |
95 | if (video === undefined) return undefined | 122 | if (videoArg === undefined) return undefined |
96 | 123 | ||
97 | // Outside the transaction (IO on disk) | 124 | // Outside the transaction (IO on disk) |
98 | const { videoFileResolution } = await video.getOriginalFileResolution() | 125 | const { videoFileResolution } = await videoArg.getOriginalFileResolution() |
99 | 126 | ||
100 | return sequelizeTypescript.transaction(async t => { | 127 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { |
101 | // Maybe the video changed in database, refresh it | 128 | // Maybe the video changed in database, refresh it |
102 | const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | 129 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) |
103 | // Video does not exist anymore | 130 | // Video does not exist anymore |
104 | if (!videoDatabase) return undefined | 131 | if (!videoDatabase) return undefined |
105 | 132 | ||
@@ -110,8 +137,10 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole | |||
110 | { resolutions: resolutionsEnabled } | 137 | { resolutions: resolutionsEnabled } |
111 | ) | 138 | ) |
112 | 139 | ||
140 | let videoPublished = false | ||
141 | |||
113 | if (resolutionsEnabled.length !== 0) { | 142 | if (resolutionsEnabled.length !== 0) { |
114 | const tasks: Bluebird<any>[] = [] | 143 | const tasks: Bluebird<Bull.Job<any>>[] = [] |
115 | 144 | ||
116 | for (const resolution of resolutionsEnabled) { | 145 | for (const resolution of resolutionsEnabled) { |
117 | const dataInput = { | 146 | const dataInput = { |
@@ -127,15 +156,27 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole | |||
127 | 156 | ||
128 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | 157 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) |
129 | } else { | 158 | } else { |
159 | videoPublished = true | ||
160 | |||
130 | // No transcoding to do, it's now published | 161 | // No transcoding to do, it's now published |
131 | video.state = VideoState.PUBLISHED | 162 | videoDatabase.state = VideoState.PUBLISHED |
132 | video = await video.save({ transaction: t }) | 163 | videoDatabase = await videoDatabase.save({ transaction: t }) |
133 | 164 | ||
134 | logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) | 165 | logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) |
135 | } | 166 | } |
136 | 167 | ||
137 | return federateVideoIfNeeded(video, isNewVideo, t) | 168 | await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) |
169 | |||
170 | return { videoDatabase, videoPublished } | ||
138 | }) | 171 | }) |
172 | |||
173 | // don't notify prior to scheduled video update | ||
174 | if (!videoDatabase.ScheduleVideoUpdate) { | ||
175 | if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) | ||
176 | if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) | ||
177 | } | ||
178 | |||
179 | await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution })) | ||
139 | } | 180 | } |
140 | 181 | ||
141 | // --------------------------------------------------------------------------- | 182 | // --------------------------------------------------------------------------- |
@@ -144,3 +185,20 @@ export { | |||
144 | processVideoFile, | 185 | processVideoFile, |
145 | processVideoFileImport | 186 | processVideoFileImport |
146 | } | 187 | } |
188 | |||
189 | // --------------------------------------------------------------------------- | ||
190 | |||
191 | function createHlsJobIfEnabled (payload?: VideoFilePayload) { | ||
192 | // Generate HLS playlist? | ||
193 | if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { | ||
194 | const hlsTranscodingPayload = { | ||
195 | videoUUID: payload.videoUUID, | ||
196 | resolution: payload.resolution, | ||
197 | isPortraitMode: payload.isPortraitMode, | ||
198 | |||
199 | generateHlsPlaylist: true | ||
200 | } | ||
201 | |||
202 | return JobQueue.Instance.createJob({ type: 'video-file', payload: hlsTranscodingPayload }) | ||
203 | } | ||
204 | } | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index e3f2a276c..12004dcd7 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -6,15 +6,16 @@ import { VideoImportState } from '../../../../shared/models/videos' | |||
6 | import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffmpeg-utils' | 6 | import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffmpeg-utils' |
7 | import { extname, join } from 'path' | 7 | import { extname, join } from 'path' |
8 | import { VideoFileModel } from '../../../models/video/video-file' | 8 | import { VideoFileModel } from '../../../models/video/video-file' |
9 | import { CONFIG, sequelizeTypescript, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' | 9 | import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' |
10 | import { doRequestAndSaveToFile } from '../../../helpers/requests' | 10 | import { downloadImage } from '../../../helpers/requests' |
11 | import { VideoState } from '../../../../shared' | 11 | import { VideoState } from '../../../../shared' |
12 | import { JobQueue } from '../index' | 12 | import { JobQueue } from '../index' |
13 | import { federateVideoIfNeeded } from '../../activitypub' | 13 | import { federateVideoIfNeeded } from '../../activitypub' |
14 | import { VideoModel } from '../../../models/video/video' | 14 | import { VideoModel } from '../../../models/video/video' |
15 | import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' | 15 | import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' |
16 | import { getSecureTorrentName } from '../../../helpers/utils' | 16 | import { getSecureTorrentName } from '../../../helpers/utils' |
17 | import { remove, rename, stat } from 'fs-extra' | 17 | import { remove, move, stat } from 'fs-extra' |
18 | import { Notifier } from '../../notifier' | ||
18 | 19 | ||
19 | type VideoImportYoutubeDLPayload = { | 20 | type VideoImportYoutubeDLPayload = { |
20 | type: 'youtube-dl' | 21 | type: 'youtube-dl' |
@@ -109,6 +110,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
109 | let tempVideoPath: string | 110 | let tempVideoPath: string |
110 | let videoDestFile: string | 111 | let videoDestFile: string |
111 | let videoFile: VideoFileModel | 112 | let videoFile: VideoFileModel |
113 | |||
112 | try { | 114 | try { |
113 | // Download video from youtubeDL | 115 | // Download video from youtubeDL |
114 | tempVideoPath = await downloader() | 116 | tempVideoPath = await downloader() |
@@ -133,19 +135,18 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
133 | videoId: videoImport.videoId | 135 | videoId: videoImport.videoId |
134 | } | 136 | } |
135 | videoFile = new VideoFileModel(videoFileData) | 137 | videoFile = new VideoFileModel(videoFileData) |
136 | // Import if the import fails, to clean files | 138 | // To clean files if the import fails |
137 | videoImport.Video.VideoFiles = [ videoFile ] | 139 | videoImport.Video.VideoFiles = [ videoFile ] |
138 | 140 | ||
139 | // Move file | 141 | // Move file |
140 | videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImport.Video.getVideoFilename(videoFile)) | 142 | videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImport.Video.getVideoFilename(videoFile)) |
141 | await rename(tempVideoPath, videoDestFile) | 143 | await move(tempVideoPath, videoDestFile) |
142 | tempVideoPath = null // This path is not used anymore | 144 | tempVideoPath = null // This path is not used anymore |
143 | 145 | ||
144 | // Process thumbnail | 146 | // Process thumbnail |
145 | if (options.downloadThumbnail) { | 147 | if (options.downloadThumbnail) { |
146 | if (options.thumbnailUrl) { | 148 | if (options.thumbnailUrl) { |
147 | const destThumbnailPath = join(CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName()) | 149 | await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName(), THUMBNAILS_SIZE) |
148 | await doRequestAndSaveToFile({ method: 'GET', uri: options.thumbnailUrl }, destThumbnailPath) | ||
149 | } else { | 150 | } else { |
150 | await videoImport.Video.createThumbnail(videoFile) | 151 | await videoImport.Video.createThumbnail(videoFile) |
151 | } | 152 | } |
@@ -156,8 +157,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
156 | // Process preview | 157 | // Process preview |
157 | if (options.downloadPreview) { | 158 | if (options.downloadPreview) { |
158 | if (options.thumbnailUrl) { | 159 | if (options.thumbnailUrl) { |
159 | const destPreviewPath = join(CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName()) | 160 | await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName(), PREVIEWS_SIZE) |
160 | await doRequestAndSaveToFile({ method: 'GET', uri: options.thumbnailUrl }, destPreviewPath) | ||
161 | } else { | 161 | } else { |
162 | await videoImport.Video.createPreview(videoFile) | 162 | await videoImport.Video.createPreview(videoFile) |
163 | } | 163 | } |
@@ -180,7 +180,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
180 | // Update video DB object | 180 | // Update video DB object |
181 | video.duration = duration | 181 | video.duration = duration |
182 | video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED | 182 | video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED |
183 | const videoUpdated = await video.save({ transaction: t }) | 183 | await video.save({ transaction: t }) |
184 | 184 | ||
185 | // Now we can federate the video (reload from database, we need more attributes) | 185 | // Now we can federate the video (reload from database, we need more attributes) |
186 | const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | 186 | const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) |
@@ -192,10 +192,13 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
192 | 192 | ||
193 | logger.info('Video %s imported.', video.uuid) | 193 | logger.info('Video %s imported.', video.uuid) |
194 | 194 | ||
195 | videoImportUpdated.Video = videoUpdated | 195 | videoImportUpdated.Video = videoForFederation |
196 | return videoImportUpdated | 196 | return videoImportUpdated |
197 | }) | 197 | }) |
198 | 198 | ||
199 | Notifier.Instance.notifyOnNewVideo(videoImportUpdated.Video) | ||
200 | Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true) | ||
201 | |||
199 | // Create transcoding jobs? | 202 | // Create transcoding jobs? |
200 | if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) { | 203 | if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) { |
201 | // Put uuid because we don't have id auto incremented for now | 204 | // Put uuid because we don't have id auto incremented for now |
@@ -218,6 +221,8 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
218 | videoImport.state = VideoImportState.FAILED | 221 | videoImport.state = VideoImportState.FAILED |
219 | await videoImport.save() | 222 | await videoImport.save() |
220 | 223 | ||
224 | Notifier.Instance.notifyOnFinishedVideoImport(videoImport, false) | ||
225 | |||
221 | throw err | 226 | throw err |
222 | } | 227 | } |
223 | } | 228 | } |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts index cf180a11a..fa1fd13b3 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views.ts | |||
@@ -3,8 +3,9 @@ import { logger } from '../../../helpers/logger' | |||
3 | import { VideoModel } from '../../../models/video/video' | 3 | import { VideoModel } from '../../../models/video/video' |
4 | import { VideoViewModel } from '../../../models/video/video-views' | 4 | import { VideoViewModel } from '../../../models/video/video-views' |
5 | import { isTestInstance } from '../../../helpers/core-utils' | 5 | import { isTestInstance } from '../../../helpers/core-utils' |
6 | import { federateVideoIfNeeded } from '../../activitypub' | ||
6 | 7 | ||
7 | async function processVideosViewsViews () { | 8 | async function processVideosViews () { |
8 | const lastHour = new Date() | 9 | const lastHour = new Date() |
9 | 10 | ||
10 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour | 11 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour |
@@ -22,13 +23,9 @@ async function processVideosViewsViews () { | |||
22 | for (const videoId of videoIds) { | 23 | for (const videoId of videoIds) { |
23 | try { | 24 | try { |
24 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 25 | const views = await Redis.Instance.getVideoViews(videoId, hour) |
25 | if (isNaN(views)) { | 26 | if (views) { |
26 | logger.error('Cannot process videos views of video %d in hour %d: views number is NaN.', videoId, hour) | ||
27 | } else { | ||
28 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 27 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) |
29 | 28 | ||
30 | await VideoModel.incrementViews(videoId, views) | ||
31 | |||
32 | try { | 29 | try { |
33 | await VideoViewModel.create({ | 30 | await VideoViewModel.create({ |
34 | startDate, | 31 | startDate, |
@@ -36,6 +33,16 @@ async function processVideosViewsViews () { | |||
36 | views, | 33 | views, |
37 | videoId | 34 | videoId |
38 | }) | 35 | }) |
36 | |||
37 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
38 | if (video.isOwned()) { | ||
39 | // If this is a remote video, the origin instance will send us an update | ||
40 | await VideoModel.incrementViews(videoId, views) | ||
41 | |||
42 | // Send video update | ||
43 | video.views += views | ||
44 | await federateVideoIfNeeded(video, false) | ||
45 | } | ||
39 | } catch (err) { | 46 | } catch (err) { |
40 | logger.debug('Cannot create video views for video %d in hour %d. Maybe the video does not exist anymore?', videoId, hour) | 47 | logger.debug('Cannot create video views for video %d in hour %d. Maybe the video does not exist anymore?', videoId, hour) |
41 | } | 48 | } |
@@ -51,5 +58,5 @@ async function processVideosViewsViews () { | |||
51 | // --------------------------------------------------------------------------- | 58 | // --------------------------------------------------------------------------- |
52 | 59 | ||
53 | export { | 60 | export { |
54 | processVideosViewsViews | 61 | processVideosViews |
55 | } | 62 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0696ba43c..ba9cbe0d9 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -10,7 +10,8 @@ import { EmailPayload, processEmail } from './handlers/email' | |||
10 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' | 10 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' |
11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | 11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' |
12 | import { processVideoImport, VideoImportPayload } from './handlers/video-import' | 12 | import { processVideoImport, VideoImportPayload } from './handlers/video-import' |
13 | import { processVideosViewsViews } from './handlers/video-views' | 13 | import { processVideosViews } from './handlers/video-views' |
14 | import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' | ||
14 | 15 | ||
15 | type CreateJobArgument = | 16 | type CreateJobArgument = |
16 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 17 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -21,6 +22,7 @@ type CreateJobArgument = | |||
21 | { type: 'video-file', payload: VideoFilePayload } | | 22 | { type: 'video-file', payload: VideoFilePayload } | |
22 | { type: 'email', payload: EmailPayload } | | 23 | { type: 'email', payload: EmailPayload } | |
23 | { type: 'video-import', payload: VideoImportPayload } | | 24 | { type: 'video-import', payload: VideoImportPayload } | |
25 | { type: 'activitypub-refresher', payload: RefreshPayload } | | ||
24 | { type: 'videos-views', payload: {} } | 26 | { type: 'videos-views', payload: {} } |
25 | 27 | ||
26 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | 28 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { |
@@ -32,7 +34,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | |||
32 | 'video-file': processVideoFile, | 34 | 'video-file': processVideoFile, |
33 | 'email': processEmail, | 35 | 'email': processEmail, |
34 | 'video-import': processVideoImport, | 36 | 'video-import': processVideoImport, |
35 | 'videos-views': processVideosViewsViews | 37 | 'videos-views': processVideosViews, |
38 | 'activitypub-refresher': refreshAPObject | ||
36 | } | 39 | } |
37 | 40 | ||
38 | const jobTypes: JobType[] = [ | 41 | const jobTypes: JobType[] = [ |
@@ -44,7 +47,8 @@ const jobTypes: JobType[] = [ | |||
44 | 'video-file', | 47 | 'video-file', |
45 | 'video-file-import', | 48 | 'video-file-import', |
46 | 'video-import', | 49 | 'video-import', |
47 | 'videos-views' | 50 | 'videos-views', |
51 | 'activitypub-refresher' | ||
48 | ] | 52 | ] |
49 | 53 | ||
50 | class JobQueue { | 54 | class JobQueue { |
@@ -84,7 +88,6 @@ class JobQueue { | |||
84 | 88 | ||
85 | queue.on('error', err => { | 89 | queue.on('error', err => { |
86 | logger.error('Error in job queue %s.', handlerName, { err }) | 90 | logger.error('Error in job queue %s.', handlerName, { err }) |
87 | process.exit(-1) | ||
88 | }) | 91 | }) |
89 | 92 | ||
90 | this.queues[handlerName] = queue | 93 | this.queues[handlerName] = queue |
@@ -162,10 +165,10 @@ class JobQueue { | |||
162 | return total | 165 | return total |
163 | } | 166 | } |
164 | 167 | ||
165 | removeOldJobs () { | 168 | async removeOldJobs () { |
166 | for (const key of Object.keys(this.queues)) { | 169 | for (const key of Object.keys(this.queues)) { |
167 | const queue = this.queues[key] | 170 | const queue = this.queues[key] |
168 | queue.clean(JOB_COMPLETED_LIFETIME, 'completed') | 171 | await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') |
169 | } | 172 | } |
170 | } | 173 | } |
171 | 174 | ||