aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts9
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts8
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts11
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts54
-rw-r--r--server/lib/job-queue/handlers/email.ts3
-rw-r--r--server/lib/job-queue/handlers/utils/activitypub-http-utils.ts29
-rw-r--r--server/lib/job-queue/handlers/video-file.ts106
-rw-r--r--server/lib/job-queue/handlers/video-import.ts27
-rw-r--r--server/lib/job-queue/handlers/video-views.ts21
-rw-r--r--server/lib/job-queue/job-queue.ts15
11 files changed, 220 insertions, 65 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index 36d0f237b..b4d381062 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -8,6 +8,7 @@ import { getOrCreateActorAndServerAndModel } from '../../activitypub/actor'
8import { retryTransactionWrapper } from '../../../helpers/database-utils' 8import { retryTransactionWrapper } from '../../../helpers/database-utils'
9import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 9import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
10import { ActorModel } from '../../../models/activitypub/actor' 10import { ActorModel } from '../../../models/activitypub/actor'
11import { Notifier } from '../../notifier'
11 12
12export type ActivitypubFollowPayload = { 13export type ActivitypubFollowPayload = {
13 followerActorId: number 14 followerActorId: number
@@ -42,7 +43,7 @@ export {
42 43
43// --------------------------------------------------------------------------- 44// ---------------------------------------------------------------------------
44 45
45function follow (fromActor: ActorModel, targetActor: ActorModel) { 46async function follow (fromActor: ActorModel, targetActor: ActorModel) {
46 if (fromActor.id === targetActor.id) { 47 if (fromActor.id === targetActor.id) {
47 throw new Error('Follower is the same than target actor.') 48 throw new Error('Follower is the same than target actor.')
48 } 49 }
@@ -50,7 +51,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) {
50 // Same server, direct accept 51 // Same server, direct accept
51 const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending' 52 const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending'
52 53
53 return sequelizeTypescript.transaction(async t => { 54 const actorFollow = await sequelizeTypescript.transaction(async t => {
54 const [ actorFollow ] = await ActorFollowModel.findOrCreate({ 55 const [ actorFollow ] = await ActorFollowModel.findOrCreate({
55 where: { 56 where: {
56 actorId: fromActor.id, 57 actorId: fromActor.id,
@@ -68,5 +69,9 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) {
68 69
69 // Send a notification to remote server if our follow is not already accepted 70 // Send a notification to remote server if our follow is not already accepted
70 if (actorFollow.state !== 'accepted') await sendFollow(actorFollow) 71 if (actorFollow.state !== 'accepted') await sendFollow(actorFollow)
72
73 return actorFollow
71 }) 74 })
75
76 if (actorFollow.state === 'accepted') Notifier.Instance.notifyOfNewFollow(actorFollow)
72} 77}
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 03a9e12a4..9493945ff 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -3,8 +3,9 @@ import * as Bluebird from 'bluebird'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { doRequest } from '../../../helpers/requests' 4import { doRequest } from '../../../helpers/requests'
5import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 5import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
6import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
7import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' 7import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers'
8import { ActorFollowScoreCache } from '../../cache'
8 9
9export type ActivitypubHttpBroadcastPayload = { 10export type ActivitypubHttpBroadcastPayload = {
10 uris: string[] 11 uris: string[]
@@ -25,7 +26,8 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) {
25 uri: '', 26 uri: '',
26 json: body, 27 json: body,
27 httpSignature: httpSignatureOptions, 28 httpSignature: httpSignatureOptions,
28 timeout: JOB_REQUEST_TIMEOUT 29 timeout: JOB_REQUEST_TIMEOUT,
30 headers: buildGlobalHeaders(body)
29 } 31 }
30 32
31 const badUrls: string[] = [] 33 const badUrls: string[] = []
@@ -37,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) {
37 .catch(() => badUrls.push(uri)) 39 .catch(() => badUrls.push(uri))
38 }, { concurrency: BROADCAST_CONCURRENCY }) 40 }, { concurrency: BROADCAST_CONCURRENCY })
39 41
40 return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) 42 return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls)
41} 43}
42 44
43// --------------------------------------------------------------------------- 45// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index 42217c27c..67ccfa995 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -23,7 +23,7 @@ async function processActivityPubHttpFetcher (job: Bull.Job) {
23 if (payload.videoId) video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) 23 if (payload.videoId) video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId)
24 24
25 const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = { 25 const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = {
26 'activity': items => processActivities(items), 26 'activity': items => processActivities(items, { outboxUrl: payload.uri }),
27 'video-likes': items => createRates(items, video, 'like'), 27 'video-likes': items => createRates(items, video, 'like'),
28 'video-dislikes': items => createRates(items, video, 'dislike'), 28 'video-dislikes': items => createRates(items, video, 'dislike'),
29 'video-shares': items => addVideoShares(items, video), 29 'video-shares': items => addVideoShares(items, video),
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index c90d735f6..3973dcdc8 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -1,9 +1,9 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 4import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
5import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
6import { JOB_REQUEST_TIMEOUT } from '../../../initializers' 5import { JOB_REQUEST_TIMEOUT } from '../../../initializers'
6import { ActorFollowScoreCache } from '../../cache'
7 7
8export type ActivitypubHttpUnicastPayload = { 8export type ActivitypubHttpUnicastPayload = {
9 uri: string 9 uri: string
@@ -25,14 +25,15 @@ async function processActivityPubHttpUnicast (job: Bull.Job) {
25 uri, 25 uri,
26 json: body, 26 json: body,
27 httpSignature: httpSignatureOptions, 27 httpSignature: httpSignatureOptions,
28 timeout: JOB_REQUEST_TIMEOUT 28 timeout: JOB_REQUEST_TIMEOUT,
29 headers: buildGlobalHeaders(body)
29 } 30 }
30 31
31 try { 32 try {
32 await doRequest(options) 33 await doRequest(options)
33 ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) 34 ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], [])
34 } catch (err) { 35 } catch (err) {
35 ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) 36 ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ])
36 37
37 throw err 38 throw err
38 } 39 }
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
new file mode 100644
index 000000000..454b975fe
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -0,0 +1,54 @@
1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger'
3import { fetchVideoByUrl } from '../../../helpers/video'
4import { refreshVideoIfNeeded, refreshActorIfNeeded } from '../../activitypub'
5import { ActorModel } from '../../../models/activitypub/actor'
6
7export type RefreshPayload = {
8 type: 'video' | 'actor'
9 url: string
10}
11
12async function refreshAPObject (job: Bull.Job) {
13 const payload = job.data as RefreshPayload
14
15 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url)
16
17 if (payload.type === 'video') return refreshVideo(payload.url)
18 if (payload.type === 'actor') return refreshActor(payload.url)
19}
20
21// ---------------------------------------------------------------------------
22
23export {
24 refreshActor,
25 refreshAPObject
26}
27
28// ---------------------------------------------------------------------------
29
30async function refreshVideo (videoUrl: string) {
31 const fetchType = 'all' as 'all'
32 const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true }
33
34 const videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType)
35 if (videoFromDatabase) {
36 const refreshOptions = {
37 video: videoFromDatabase,
38 fetchedType: fetchType,
39 syncParam
40 }
41
42 await refreshVideoIfNeeded(refreshOptions)
43 }
44}
45
46async function refreshActor (actorUrl: string) {
47 const fetchType = 'all' as 'all'
48 const actor = await ActorModel.loadByUrlAndPopulateAccountAndChannel(actorUrl)
49
50 if (actor) {
51 await refreshActorIfNeeded(actor, fetchType)
52 }
53
54}
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index 73d98ae54..220d0af32 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -6,13 +6,14 @@ export type EmailPayload = {
6 to: string[] 6 to: string[]
7 subject: string 7 subject: string
8 text: string 8 text: string
9 from?: string
9} 10}
10 11
11async function processEmail (job: Bull.Job) { 12async function processEmail (job: Bull.Job) {
12 const payload = job.data as EmailPayload 13 const payload = job.data as EmailPayload
13 logger.info('Processing email in job %d.', job.id) 14 logger.info('Processing email in job %d.', job.id)
14 15
15 return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text) 16 return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text, payload.from)
16} 17}
17 18
18// --------------------------------------------------------------------------- 19// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
index 36092665e..4961d4502 100644
--- a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
+++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
@@ -1,8 +1,12 @@
1import { buildSignedActivity } from '../../../../helpers/activitypub' 1import { buildSignedActivity } from '../../../../helpers/activitypub'
2import { getServerActor } from '../../../../helpers/utils' 2import { getServerActor } from '../../../../helpers/utils'
3import { ActorModel } from '../../../../models/activitypub/actor' 3import { ActorModel } from '../../../../models/activitypub/actor'
4import { sha256 } from '../../../../helpers/core-utils'
5import { HTTP_SIGNATURE } from '../../../../initializers'
4 6
5async function computeBody (payload: { body: any, signatureActorId?: number }) { 7type Payload = { body: any, signatureActorId?: number }
8
9async function computeBody (payload: Payload) {
6 let body = payload.body 10 let body = payload.body
7 11
8 if (payload.signatureActorId) { 12 if (payload.signatureActorId) {
@@ -14,7 +18,7 @@ async function computeBody (payload: { body: any, signatureActorId?: number }) {
14 return body 18 return body
15} 19}
16 20
17async function buildSignedRequestOptions (payload: { signatureActorId?: number }) { 21async function buildSignedRequestOptions (payload: Payload) {
18 let actor: ActorModel | null 22 let actor: ActorModel | null
19 if (payload.signatureActorId) { 23 if (payload.signatureActorId) {
20 actor = await ActorModel.load(payload.signatureActorId) 24 actor = await ActorModel.load(payload.signatureActorId)
@@ -26,14 +30,29 @@ async function buildSignedRequestOptions (payload: { signatureActorId?: number }
26 30
27 const keyId = actor.getWebfingerUrl() 31 const keyId = actor.getWebfingerUrl()
28 return { 32 return {
29 algorithm: 'rsa-sha256', 33 algorithm: HTTP_SIGNATURE.ALGORITHM,
30 authorizationHeaderName: 'Signature', 34 authorizationHeaderName: HTTP_SIGNATURE.HEADER_NAME,
31 keyId, 35 keyId,
32 key: actor.privateKey 36 key: actor.privateKey,
37 headers: HTTP_SIGNATURE.HEADERS_TO_SIGN
38 }
39}
40
41function buildGlobalHeaders (body: any) {
42 return {
43 'Digest': buildDigest(body)
33 } 44 }
34} 45}
35 46
47function buildDigest (body: any) {
48 const rawBody = typeof body === 'string' ? body : JSON.stringify(body)
49
50 return 'SHA-256=' + sha256(rawBody, 'base64')
51}
52
36export { 53export {
54 buildDigest,
55 buildGlobalHeaders,
37 computeBody, 56 computeBody,
38 buildSignedRequestOptions 57 buildSignedRequestOptions
39} 58}
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
index 1463c93fc..04983155c 100644
--- a/server/lib/job-queue/handlers/video-file.ts
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -5,16 +5,18 @@ import { VideoModel } from '../../../models/video/video'
5import { JobQueue } from '../job-queue' 5import { JobQueue } from '../job-queue'
6import { federateVideoIfNeeded } from '../../activitypub' 6import { federateVideoIfNeeded } from '../../activitypub'
7import { retryTransactionWrapper } from '../../../helpers/database-utils' 7import { retryTransactionWrapper } from '../../../helpers/database-utils'
8import { sequelizeTypescript } from '../../../initializers' 8import { sequelizeTypescript, CONFIG } from '../../../initializers'
9import * as Bluebird from 'bluebird' 9import * as Bluebird from 'bluebird'
10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' 10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
11import { importVideoFile, transcodeOriginalVideofile, optimizeOriginalVideofile } from '../../video-transcoding' 11import { generateHlsPlaylist, importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding'
12import { Notifier } from '../../notifier'
12 13
13export type VideoFilePayload = { 14export type VideoFilePayload = {
14 videoUUID: string 15 videoUUID: string
15 isNewVideo?: boolean
16 resolution?: VideoResolution 16 resolution?: VideoResolution
17 isNewVideo?: boolean
17 isPortraitMode?: boolean 18 isPortraitMode?: boolean
19 generateHlsPlaylist?: boolean
18} 20}
19 21
20export type VideoFileImportPayload = { 22export type VideoFileImportPayload = {
@@ -50,34 +52,51 @@ async function processVideoFile (job: Bull.Job) {
50 return undefined 52 return undefined
51 } 53 }
52 54
53 // Transcoding in other resolution 55 if (payload.generateHlsPlaylist) {
54 if (payload.resolution) { 56 await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false)
57
58 await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
59 } else if (payload.resolution) { // Transcoding in other resolution
55 await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) 60 await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false)
56 61
57 await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video) 62 await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video, payload)
58 } else { 63 } else {
59 await optimizeOriginalVideofile(video) 64 await optimizeVideofile(video)
60 65
61 await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload.isNewVideo) 66 await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload)
62 } 67 }
63 68
64 return video 69 return video
65} 70}
66 71
67async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { 72async function onHlsPlaylistGenerationSuccess (video: VideoModel) {
73 if (video === undefined) return undefined
74
75 await sequelizeTypescript.transaction(async t => {
76 // Maybe the video changed in database, refresh it
77 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
78 // Video does not exist anymore
79 if (!videoDatabase) return undefined
80
81 // If the video was not published, we consider it is a new one for other instances
82 await federateVideoIfNeeded(videoDatabase, false, t)
83 })
84}
85
86async function onVideoFileTranscoderOrImportSuccess (video: VideoModel, payload?: VideoFilePayload) {
68 if (video === undefined) return undefined 87 if (video === undefined) return undefined
69 88
70 return sequelizeTypescript.transaction(async t => { 89 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
71 // Maybe the video changed in database, refresh it 90 // Maybe the video changed in database, refresh it
72 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 91 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
73 // Video does not exist anymore 92 // Video does not exist anymore
74 if (!videoDatabase) return undefined 93 if (!videoDatabase) return undefined
75 94
76 let isNewVideo = false 95 let videoPublished = false
77 96
78 // We transcoded the video file in another format, now we can publish it 97 // We transcoded the video file in another format, now we can publish it
79 if (videoDatabase.state !== VideoState.PUBLISHED) { 98 if (videoDatabase.state !== VideoState.PUBLISHED) {
80 isNewVideo = true 99 videoPublished = true
81 100
82 videoDatabase.state = VideoState.PUBLISHED 101 videoDatabase.state = VideoState.PUBLISHED
83 videoDatabase.publishedAt = new Date() 102 videoDatabase.publishedAt = new Date()
@@ -85,21 +104,29 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) {
85 } 104 }
86 105
87 // If the video was not published, we consider it is a new one for other instances 106 // If the video was not published, we consider it is a new one for other instances
88 await federateVideoIfNeeded(videoDatabase, isNewVideo, t) 107 await federateVideoIfNeeded(videoDatabase, videoPublished, t)
89 108
90 return undefined 109 return { videoDatabase, videoPublished }
91 }) 110 })
111
112 // don't notify prior to scheduled video update
113 if (videoPublished && !videoDatabase.ScheduleVideoUpdate) {
114 Notifier.Instance.notifyOnNewVideo(videoDatabase)
115 Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
116 }
117
118 await createHlsJobIfEnabled(payload)
92} 119}
93 120
94async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { 121async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoFilePayload) {
95 if (video === undefined) return undefined 122 if (videoArg === undefined) return undefined
96 123
97 // Outside the transaction (IO on disk) 124 // Outside the transaction (IO on disk)
98 const { videoFileResolution } = await video.getOriginalFileResolution() 125 const { videoFileResolution } = await videoArg.getOriginalFileResolution()
99 126
100 return sequelizeTypescript.transaction(async t => { 127 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
101 // Maybe the video changed in database, refresh it 128 // Maybe the video changed in database, refresh it
102 const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 129 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t)
103 // Video does not exist anymore 130 // Video does not exist anymore
104 if (!videoDatabase) return undefined 131 if (!videoDatabase) return undefined
105 132
@@ -110,8 +137,10 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole
110 { resolutions: resolutionsEnabled } 137 { resolutions: resolutionsEnabled }
111 ) 138 )
112 139
140 let videoPublished = false
141
113 if (resolutionsEnabled.length !== 0) { 142 if (resolutionsEnabled.length !== 0) {
114 const tasks: Bluebird<any>[] = [] 143 const tasks: Bluebird<Bull.Job<any>>[] = []
115 144
116 for (const resolution of resolutionsEnabled) { 145 for (const resolution of resolutionsEnabled) {
117 const dataInput = { 146 const dataInput = {
@@ -127,15 +156,27 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole
127 156
128 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) 157 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
129 } else { 158 } else {
159 videoPublished = true
160
130 // No transcoding to do, it's now published 161 // No transcoding to do, it's now published
131 video.state = VideoState.PUBLISHED 162 videoDatabase.state = VideoState.PUBLISHED
132 video = await video.save({ transaction: t }) 163 videoDatabase = await videoDatabase.save({ transaction: t })
133 164
134 logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) 165 logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
135 } 166 }
136 167
137 return federateVideoIfNeeded(video, isNewVideo, t) 168 await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
169
170 return { videoDatabase, videoPublished }
138 }) 171 })
172
173 // don't notify prior to scheduled video update
174 if (!videoDatabase.ScheduleVideoUpdate) {
175 if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase)
176 if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
177 }
178
179 await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution }))
139} 180}
140 181
141// --------------------------------------------------------------------------- 182// ---------------------------------------------------------------------------
@@ -144,3 +185,20 @@ export {
144 processVideoFile, 185 processVideoFile,
145 processVideoFileImport 186 processVideoFileImport
146} 187}
188
189// ---------------------------------------------------------------------------
190
191function createHlsJobIfEnabled (payload?: VideoFilePayload) {
192 // Generate HLS playlist?
193 if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
194 const hlsTranscodingPayload = {
195 videoUUID: payload.videoUUID,
196 resolution: payload.resolution,
197 isPortraitMode: payload.isPortraitMode,
198
199 generateHlsPlaylist: true
200 }
201
202 return JobQueue.Instance.createJob({ type: 'video-file', payload: hlsTranscodingPayload })
203 }
204}
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index e3f2a276c..12004dcd7 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -6,15 +6,16 @@ import { VideoImportState } from '../../../../shared/models/videos'
6import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffmpeg-utils' 6import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffmpeg-utils'
7import { extname, join } from 'path' 7import { extname, join } from 'path'
8import { VideoFileModel } from '../../../models/video/video-file' 8import { VideoFileModel } from '../../../models/video/video-file'
9import { CONFIG, sequelizeTypescript, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' 9import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers'
10import { doRequestAndSaveToFile } from '../../../helpers/requests' 10import { downloadImage } from '../../../helpers/requests'
11import { VideoState } from '../../../../shared' 11import { VideoState } from '../../../../shared'
12import { JobQueue } from '../index' 12import { JobQueue } from '../index'
13import { federateVideoIfNeeded } from '../../activitypub' 13import { federateVideoIfNeeded } from '../../activitypub'
14import { VideoModel } from '../../../models/video/video' 14import { VideoModel } from '../../../models/video/video'
15import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' 15import { downloadWebTorrentVideo } from '../../../helpers/webtorrent'
16import { getSecureTorrentName } from '../../../helpers/utils' 16import { getSecureTorrentName } from '../../../helpers/utils'
17import { remove, rename, stat } from 'fs-extra' 17import { remove, move, stat } from 'fs-extra'
18import { Notifier } from '../../notifier'
18 19
19type VideoImportYoutubeDLPayload = { 20type VideoImportYoutubeDLPayload = {
20 type: 'youtube-dl' 21 type: 'youtube-dl'
@@ -109,6 +110,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
109 let tempVideoPath: string 110 let tempVideoPath: string
110 let videoDestFile: string 111 let videoDestFile: string
111 let videoFile: VideoFileModel 112 let videoFile: VideoFileModel
113
112 try { 114 try {
113 // Download video from youtubeDL 115 // Download video from youtubeDL
114 tempVideoPath = await downloader() 116 tempVideoPath = await downloader()
@@ -133,19 +135,18 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
133 videoId: videoImport.videoId 135 videoId: videoImport.videoId
134 } 136 }
135 videoFile = new VideoFileModel(videoFileData) 137 videoFile = new VideoFileModel(videoFileData)
136 // Import if the import fails, to clean files 138 // To clean files if the import fails
137 videoImport.Video.VideoFiles = [ videoFile ] 139 videoImport.Video.VideoFiles = [ videoFile ]
138 140
139 // Move file 141 // Move file
140 videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImport.Video.getVideoFilename(videoFile)) 142 videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImport.Video.getVideoFilename(videoFile))
141 await rename(tempVideoPath, videoDestFile) 143 await move(tempVideoPath, videoDestFile)
142 tempVideoPath = null // This path is not used anymore 144 tempVideoPath = null // This path is not used anymore
143 145
144 // Process thumbnail 146 // Process thumbnail
145 if (options.downloadThumbnail) { 147 if (options.downloadThumbnail) {
146 if (options.thumbnailUrl) { 148 if (options.thumbnailUrl) {
147 const destThumbnailPath = join(CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName()) 149 await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName(), THUMBNAILS_SIZE)
148 await doRequestAndSaveToFile({ method: 'GET', uri: options.thumbnailUrl }, destThumbnailPath)
149 } else { 150 } else {
150 await videoImport.Video.createThumbnail(videoFile) 151 await videoImport.Video.createThumbnail(videoFile)
151 } 152 }
@@ -156,8 +157,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
156 // Process preview 157 // Process preview
157 if (options.downloadPreview) { 158 if (options.downloadPreview) {
158 if (options.thumbnailUrl) { 159 if (options.thumbnailUrl) {
159 const destPreviewPath = join(CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName()) 160 await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName(), PREVIEWS_SIZE)
160 await doRequestAndSaveToFile({ method: 'GET', uri: options.thumbnailUrl }, destPreviewPath)
161 } else { 161 } else {
162 await videoImport.Video.createPreview(videoFile) 162 await videoImport.Video.createPreview(videoFile)
163 } 163 }
@@ -180,7 +180,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
180 // Update video DB object 180 // Update video DB object
181 video.duration = duration 181 video.duration = duration
182 video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED 182 video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED
183 const videoUpdated = await video.save({ transaction: t }) 183 await video.save({ transaction: t })
184 184
185 // Now we can federate the video (reload from database, we need more attributes) 185 // Now we can federate the video (reload from database, we need more attributes)
186 const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 186 const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
@@ -192,10 +192,13 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
192 192
193 logger.info('Video %s imported.', video.uuid) 193 logger.info('Video %s imported.', video.uuid)
194 194
195 videoImportUpdated.Video = videoUpdated 195 videoImportUpdated.Video = videoForFederation
196 return videoImportUpdated 196 return videoImportUpdated
197 }) 197 })
198 198
199 Notifier.Instance.notifyOnNewVideo(videoImportUpdated.Video)
200 Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true)
201
199 // Create transcoding jobs? 202 // Create transcoding jobs?
200 if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) { 203 if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) {
201 // Put uuid because we don't have id auto incremented for now 204 // Put uuid because we don't have id auto incremented for now
@@ -218,6 +221,8 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
218 videoImport.state = VideoImportState.FAILED 221 videoImport.state = VideoImportState.FAILED
219 await videoImport.save() 222 await videoImport.save()
220 223
224 Notifier.Instance.notifyOnFinishedVideoImport(videoImport, false)
225
221 throw err 226 throw err
222 } 227 }
223} 228}
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts
index cf180a11a..fa1fd13b3 100644
--- a/server/lib/job-queue/handlers/video-views.ts
+++ b/server/lib/job-queue/handlers/video-views.ts
@@ -3,8 +3,9 @@ import { logger } from '../../../helpers/logger'
3import { VideoModel } from '../../../models/video/video' 3import { VideoModel } from '../../../models/video/video'
4import { VideoViewModel } from '../../../models/video/video-views' 4import { VideoViewModel } from '../../../models/video/video-views'
5import { isTestInstance } from '../../../helpers/core-utils' 5import { isTestInstance } from '../../../helpers/core-utils'
6import { federateVideoIfNeeded } from '../../activitypub'
6 7
7async function processVideosViewsViews () { 8async function processVideosViews () {
8 const lastHour = new Date() 9 const lastHour = new Date()
9 10
10 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour 11 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour
@@ -22,13 +23,9 @@ async function processVideosViewsViews () {
22 for (const videoId of videoIds) { 23 for (const videoId of videoIds) {
23 try { 24 try {
24 const views = await Redis.Instance.getVideoViews(videoId, hour) 25 const views = await Redis.Instance.getVideoViews(videoId, hour)
25 if (isNaN(views)) { 26 if (views) {
26 logger.error('Cannot process videos views of video %d in hour %d: views number is NaN.', videoId, hour)
27 } else {
28 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) 27 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour)
29 28
30 await VideoModel.incrementViews(videoId, views)
31
32 try { 29 try {
33 await VideoViewModel.create({ 30 await VideoViewModel.create({
34 startDate, 31 startDate,
@@ -36,6 +33,16 @@ async function processVideosViewsViews () {
36 views, 33 views,
37 videoId 34 videoId
38 }) 35 })
36
37 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
38 if (video.isOwned()) {
39 // If this is a remote video, the origin instance will send us an update
40 await VideoModel.incrementViews(videoId, views)
41
42 // Send video update
43 video.views += views
44 await federateVideoIfNeeded(video, false)
45 }
39 } catch (err) { 46 } catch (err) {
40 logger.debug('Cannot create video views for video %d in hour %d. Maybe the video does not exist anymore?', videoId, hour) 47 logger.debug('Cannot create video views for video %d in hour %d. Maybe the video does not exist anymore?', videoId, hour)
41 } 48 }
@@ -51,5 +58,5 @@ async function processVideosViewsViews () {
51// --------------------------------------------------------------------------- 58// ---------------------------------------------------------------------------
52 59
53export { 60export {
54 processVideosViewsViews 61 processVideosViews
55} 62}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0696ba43c..ba9cbe0d9 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -10,7 +10,8 @@ import { EmailPayload, processEmail } from './handlers/email'
10import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' 10import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12import { processVideoImport, VideoImportPayload } from './handlers/video-import' 12import { processVideoImport, VideoImportPayload } from './handlers/video-import'
13import { processVideosViewsViews } from './handlers/video-views' 13import { processVideosViews } from './handlers/video-views'
14import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher'
14 15
15type CreateJobArgument = 16type CreateJobArgument =
16 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 17 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -21,6 +22,7 @@ type CreateJobArgument =
21 { type: 'video-file', payload: VideoFilePayload } | 22 { type: 'video-file', payload: VideoFilePayload } |
22 { type: 'email', payload: EmailPayload } | 23 { type: 'email', payload: EmailPayload } |
23 { type: 'video-import', payload: VideoImportPayload } | 24 { type: 'video-import', payload: VideoImportPayload } |
25 { type: 'activitypub-refresher', payload: RefreshPayload } |
24 { type: 'videos-views', payload: {} } 26 { type: 'videos-views', payload: {} }
25 27
26const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { 28const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
@@ -32,7 +34,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
32 'video-file': processVideoFile, 34 'video-file': processVideoFile,
33 'email': processEmail, 35 'email': processEmail,
34 'video-import': processVideoImport, 36 'video-import': processVideoImport,
35 'videos-views': processVideosViewsViews 37 'videos-views': processVideosViews,
38 'activitypub-refresher': refreshAPObject
36} 39}
37 40
38const jobTypes: JobType[] = [ 41const jobTypes: JobType[] = [
@@ -44,7 +47,8 @@ const jobTypes: JobType[] = [
44 'video-file', 47 'video-file',
45 'video-file-import', 48 'video-file-import',
46 'video-import', 49 'video-import',
47 'videos-views' 50 'videos-views',
51 'activitypub-refresher'
48] 52]
49 53
50class JobQueue { 54class JobQueue {
@@ -84,7 +88,6 @@ class JobQueue {
84 88
85 queue.on('error', err => { 89 queue.on('error', err => {
86 logger.error('Error in job queue %s.', handlerName, { err }) 90 logger.error('Error in job queue %s.', handlerName, { err })
87 process.exit(-1)
88 }) 91 })
89 92
90 this.queues[handlerName] = queue 93 this.queues[handlerName] = queue
@@ -162,10 +165,10 @@ class JobQueue {
162 return total 165 return total
163 } 166 }
164 167
165 removeOldJobs () { 168 async removeOldJobs () {
166 for (const key of Object.keys(this.queues)) { 169 for (const key of Object.keys(this.queues)) {
167 const queue = this.queues[key] 170 const queue = this.queues[key]
168 queue.clean(JOB_COMPLETED_LIFETIME, 'completed') 171 await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
169 } 172 }
170 } 173 }
171 174