aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2019-02-11 14:09:23 +0100
committerChocobozzz <me@florianbigard.com>2019-02-11 14:09:23 +0100
commitb718fd22374d64534bcfe69932cf562894abed6a (patch)
tree311d3c67e2a4d1f33ebdd1dc163527de9d33d0f7 /server/lib/job-queue
parentadb115f5522bea4d52456a9fc5eb4140bb064476 (diff)
parent501e961199578129629cf0567033d13efced9904 (diff)
downloadPeerTube-b718fd22374d64534bcfe69932cf562894abed6a.tar.gz
PeerTube-b718fd22374d64534bcfe69932cf562894abed6a.tar.zst
PeerTube-b718fd22374d64534bcfe69932cf562894abed6a.zip
Merge branch 'develop' into pr/1285
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts25
-rw-r--r--server/lib/job-queue/handlers/video-file.ts69
2 files changed, 75 insertions, 19 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
index 671b0f487..454b975fe 100644
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -1,30 +1,33 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { fetchVideoByUrl } from '../../../helpers/video' 3import { fetchVideoByUrl } from '../../../helpers/video'
4import { refreshVideoIfNeeded } from '../../activitypub' 4import { refreshVideoIfNeeded, refreshActorIfNeeded } from '../../activitypub'
5import { ActorModel } from '../../../models/activitypub/actor'
5 6
6export type RefreshPayload = { 7export type RefreshPayload = {
7 videoUrl: string 8 type: 'video' | 'actor'
8 type: 'video' 9 url: string
9} 10}
10 11
11async function refreshAPObject (job: Bull.Job) { 12async function refreshAPObject (job: Bull.Job) {
12 const payload = job.data as RefreshPayload 13 const payload = job.data as RefreshPayload
13 14
14 logger.info('Processing AP refresher in job %d for video %s.', job.id, payload.videoUrl) 15 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url)
15 16
16 if (payload.type === 'video') return refreshAPVideo(payload.videoUrl) 17 if (payload.type === 'video') return refreshVideo(payload.url)
18 if (payload.type === 'actor') return refreshActor(payload.url)
17} 19}
18 20
19// --------------------------------------------------------------------------- 21// ---------------------------------------------------------------------------
20 22
21export { 23export {
24 refreshActor,
22 refreshAPObject 25 refreshAPObject
23} 26}
24 27
25// --------------------------------------------------------------------------- 28// ---------------------------------------------------------------------------
26 29
27async function refreshAPVideo (videoUrl: string) { 30async function refreshVideo (videoUrl: string) {
28 const fetchType = 'all' as 'all' 31 const fetchType = 'all' as 'all'
29 const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } 32 const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true }
30 33
@@ -39,3 +42,13 @@ async function refreshAPVideo (videoUrl: string) {
39 await refreshVideoIfNeeded(refreshOptions) 42 await refreshVideoIfNeeded(refreshOptions)
40 } 43 }
41} 44}
45
46async function refreshActor (actorUrl: string) {
47 const fetchType = 'all' as 'all'
48 const actor = await ActorModel.loadByUrlAndPopulateAccountAndChannel(actorUrl)
49
50 if (actor) {
51 await refreshActorIfNeeded(actor, fetchType)
52 }
53
54}
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
index 593e43cc5..04983155c 100644
--- a/server/lib/job-queue/handlers/video-file.ts
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -5,17 +5,18 @@ import { VideoModel } from '../../../models/video/video'
5import { JobQueue } from '../job-queue' 5import { JobQueue } from '../job-queue'
6import { federateVideoIfNeeded } from '../../activitypub' 6import { federateVideoIfNeeded } from '../../activitypub'
7import { retryTransactionWrapper } from '../../../helpers/database-utils' 7import { retryTransactionWrapper } from '../../../helpers/database-utils'
8import { sequelizeTypescript } from '../../../initializers' 8import { sequelizeTypescript, CONFIG } from '../../../initializers'
9import * as Bluebird from 'bluebird' 9import * as Bluebird from 'bluebird'
10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' 10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
11import { importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding' 11import { generateHlsPlaylist, importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding'
12import { Notifier } from '../../notifier' 12import { Notifier } from '../../notifier'
13 13
14export type VideoFilePayload = { 14export type VideoFilePayload = {
15 videoUUID: string 15 videoUUID: string
16 isNewVideo?: boolean
17 resolution?: VideoResolution 16 resolution?: VideoResolution
17 isNewVideo?: boolean
18 isPortraitMode?: boolean 18 isPortraitMode?: boolean
19 generateHlsPlaylist?: boolean
19} 20}
20 21
21export type VideoFileImportPayload = { 22export type VideoFileImportPayload = {
@@ -51,21 +52,38 @@ async function processVideoFile (job: Bull.Job) {
51 return undefined 52 return undefined
52 } 53 }
53 54
54 // Transcoding in other resolution 55 if (payload.generateHlsPlaylist) {
55 if (payload.resolution) { 56 await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false)
57
58 await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
59 } else if (payload.resolution) { // Transcoding in other resolution
56 await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) 60 await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false)
57 61
58 await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video) 62 await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video, payload)
59 } else { 63 } else {
60 await optimizeVideofile(video) 64 await optimizeVideofile(video)
61 65
62 await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload.isNewVideo) 66 await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload)
63 } 67 }
64 68
65 return video 69 return video
66} 70}
67 71
68async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { 72async function onHlsPlaylistGenerationSuccess (video: VideoModel) {
73 if (video === undefined) return undefined
74
75 await sequelizeTypescript.transaction(async t => {
76 // Maybe the video changed in database, refresh it
77 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
78 // Video does not exist anymore
79 if (!videoDatabase) return undefined
80
81 // If the video was not published, we consider it is a new one for other instances
82 await federateVideoIfNeeded(videoDatabase, false, t)
83 })
84}
85
86async function onVideoFileTranscoderOrImportSuccess (video: VideoModel, payload?: VideoFilePayload) {
69 if (video === undefined) return undefined 87 if (video === undefined) return undefined
70 88
71 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { 89 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
@@ -91,13 +109,16 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) {
91 return { videoDatabase, videoPublished } 109 return { videoDatabase, videoPublished }
92 }) 110 })
93 111
94 if (videoPublished) { 112 // don't notify prior to scheduled video update
113 if (videoPublished && !videoDatabase.ScheduleVideoUpdate) {
95 Notifier.Instance.notifyOnNewVideo(videoDatabase) 114 Notifier.Instance.notifyOnNewVideo(videoDatabase)
96 Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) 115 Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
97 } 116 }
117
118 await createHlsJobIfEnabled(payload)
98} 119}
99 120
100async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: boolean) { 121async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoFilePayload) {
101 if (videoArg === undefined) return undefined 122 if (videoArg === undefined) return undefined
102 123
103 // Outside the transaction (IO on disk) 124 // Outside the transaction (IO on disk)
@@ -144,13 +165,18 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo
144 logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) 165 logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
145 } 166 }
146 167
147 await federateVideoIfNeeded(videoDatabase, isNewVideo, t) 168 await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
148 169
149 return { videoDatabase, videoPublished } 170 return { videoDatabase, videoPublished }
150 }) 171 })
151 172
152 if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) 173 // don't notify prior to scheduled video update
153 if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) 174 if (!videoDatabase.ScheduleVideoUpdate) {
175 if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase)
176 if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
177 }
178
179 await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution }))
154} 180}
155 181
156// --------------------------------------------------------------------------- 182// ---------------------------------------------------------------------------
@@ -159,3 +185,20 @@ export {
159 processVideoFile, 185 processVideoFile,
160 processVideoFileImport 186 processVideoFileImport
161} 187}
188
189// ---------------------------------------------------------------------------
190
191function createHlsJobIfEnabled (payload?: VideoFilePayload) {
192 // Generate HLS playlist?
193 if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
194 const hlsTranscodingPayload = {
195 videoUUID: payload.videoUUID,
196 resolution: payload.resolution,
197 isPortraitMode: payload.isPortraitMode,
198
199 generateHlsPlaylist: true
200 }
201
202 return JobQueue.Instance.createJob({ type: 'video-file', payload: hlsTranscodingPayload })
203 }
204}