aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/handlers
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-07-31 14:34:36 +0200
committerChocobozzz <me@florianbigard.com>2023-08-11 15:02:33 +0200
commit3a4992633ee62d5edfbb484d9c6bcb3cf158489d (patch)
treee4510b39bdac9c318fdb4b47018d08f15368b8f0 /server/lib/job-queue/handlers
parent04d1da5621d25d59bd5fa1543b725c497bf5d9a8 (diff)
downloadPeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.gz
PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.zst
PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.zip
Migrate server to ESM
Sorry for the very big commit that may lead to git log issues and merge conflicts, but it's a major step forward: * Server can be faster at startup because imports() are async and we can easily lazy import big modules * Angular doesn't seem to support ES import (with .js extension), so we had to correctly organize peertube into a monorepo: * Use yarn workspace feature * Use typescript reference projects for dependencies * Shared projects have been moved into "packages", each one is now a node module (with a dedicated package.json/tsconfig.json) * server/tools have been moved into apps/ and is now a dedicated app bundled and published on NPM so users don't have to build peertube cli tools manually * server/tests have been moved into packages/ so we don't compile them every time we want to run the server * Use isolatedModule option: * Had to move from const enum to const (https://www.typescriptlang.org/docs/handbook/enums.html#objects-vs-enums) * Had to explictely specify "type" imports when used in decorators * Prefer tsx (that uses esbuild under the hood) instead of ts-node to load typescript files (tests with mocha or scripts): * To reduce test complexity as esbuild doesn't support decorator metadata, we only test server files that do not import server models * We still build tests files into js files for a faster CI * Remove unmaintained peertube CLI import script * Removed some barrels to speed up execution (less imports)
Diffstat (limited to 'server/lib/job-queue/handlers')
-rw-r--r--server/lib/job-queue/handlers/activitypub-cleaner.ts202
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts82
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts49
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts41
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts38
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts60
-rw-r--r--server/lib/job-queue/handlers/actor-keys.ts20
-rw-r--r--server/lib/job-queue/handlers/after-video-channel-import.ts37
-rw-r--r--server/lib/job-queue/handlers/email.ts17
-rw-r--r--server/lib/job-queue/handlers/federate-video.ts28
-rw-r--r--server/lib/job-queue/handlers/generate-storyboard.ts163
-rw-r--r--server/lib/job-queue/handlers/manage-video-torrent.ts110
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts159
-rw-r--r--server/lib/job-queue/handlers/notify.ts27
-rw-r--r--server/lib/job-queue/handlers/transcoding-job-builder.ts48
-rw-r--r--server/lib/job-queue/handlers/video-channel-import.ts43
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts83
-rw-r--r--server/lib/job-queue/handlers/video-import.ts344
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts279
-rw-r--r--server/lib/job-queue/handlers/video-redundancy.ts17
-rw-r--r--server/lib/job-queue/handlers/video-studio-edition.ts180
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts150
-rw-r--r--server/lib/job-queue/handlers/video-views-stats.ts57
23 files changed, 0 insertions, 2234 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
deleted file mode 100644
index 6ee9e2429..000000000
--- a/server/lib/job-queue/handlers/activitypub-cleaner.ts
+++ /dev/null
@@ -1,202 +0,0 @@
1import { map } from 'bluebird'
2import { Job } from 'bullmq'
3import {
4 isAnnounceActivityValid,
5 isDislikeActivityValid,
6 isLikeActivityValid
7} from '@server/helpers/custom-validators/activitypub/activity'
8import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
9import { PeerTubeRequestError } from '@server/helpers/requests'
10import { AP_CLEANER } from '@server/initializers/constants'
11import { fetchAP } from '@server/lib/activitypub/activity'
12import { checkUrlsSameHost } from '@server/lib/activitypub/url'
13import { Redis } from '@server/lib/redis'
14import { VideoModel } from '@server/models/video/video'
15import { VideoCommentModel } from '@server/models/video/video-comment'
16import { VideoShareModel } from '@server/models/video/video-share'
17import { HttpStatusCode } from '@shared/models'
18import { logger, loggerTagsFactory } from '../../../helpers/logger'
19import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
20
21const lTags = loggerTagsFactory('ap-cleaner')
22
23// Job to clean remote interactions off local videos
24
25async function processActivityPubCleaner (_job: Job) {
26 logger.info('Processing ActivityPub cleaner.', lTags())
27
28 {
29 const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
30 const { bodyValidator, deleter, updater } = rateOptionsFactory()
31
32 await map(rateUrls, async rateUrl => {
33 // TODO: remove when https://github.com/mastodon/mastodon/issues/13571 is fixed
34 if (rateUrl.includes('#')) return
35
36 const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter })
37
38 if (result?.status === 'deleted') {
39 const { videoId, type } = result.data
40
41 await VideoModel.syncLocalRates(videoId, type, undefined)
42 }
43 }, { concurrency: AP_CLEANER.CONCURRENCY })
44 }
45
46 {
47 const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos()
48 const { bodyValidator, deleter, updater } = shareOptionsFactory()
49
50 await map(shareUrls, async shareUrl => {
51 await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter })
52 }, { concurrency: AP_CLEANER.CONCURRENCY })
53 }
54
55 {
56 const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos()
57 const { bodyValidator, deleter, updater } = commentOptionsFactory()
58
59 await map(commentUrls, async commentUrl => {
60 await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter })
61 }, { concurrency: AP_CLEANER.CONCURRENCY })
62 }
63}
64
65// ---------------------------------------------------------------------------
66
67export {
68 processActivityPubCleaner
69}
70
71// ---------------------------------------------------------------------------
72
73async function updateObjectIfNeeded <T> (options: {
74 url: string
75 bodyValidator: (body: any) => boolean
76 updater: (url: string, newUrl: string) => Promise<T>
77 deleter: (url: string) => Promise<T> }
78): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
79 const { url, bodyValidator, updater, deleter } = options
80
81 const on404OrTombstone = async () => {
82 logger.info('Removing remote AP object %s.', url, lTags(url))
83 const data = await deleter(url)
84
85 return { status: 'deleted' as 'deleted', data }
86 }
87
88 try {
89 const { body } = await fetchAP<any>(url)
90
91 // If not same id, check same host and update
92 if (!body?.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
93
94 if (body.type === 'Tombstone') {
95 return on404OrTombstone()
96 }
97
98 const newUrl = body.id
99 if (newUrl !== url) {
100 if (checkUrlsSameHost(newUrl, url) !== true) {
101 throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
102 }
103
104 logger.info('Updating remote AP object %s.', url, lTags(url))
105 const data = await updater(url, newUrl)
106
107 return { status: 'updated', data }
108 }
109
110 return null
111 } catch (err) {
112 // Does not exist anymore, remove entry
113 if ((err as PeerTubeRequestError).statusCode === HttpStatusCode.NOT_FOUND_404) {
114 return on404OrTombstone()
115 }
116
117 logger.debug('Remote AP object %s is unavailable.', url, lTags(url))
118
119 const unavailability = await Redis.Instance.addAPUnavailability(url)
120 if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) {
121 logger.info('Removing unavailable AP resource %s.', url, lTags(url))
122 return on404OrTombstone()
123 }
124
125 return null
126 }
127}
128
129function rateOptionsFactory () {
130 return {
131 bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body),
132
133 updater: async (url: string, newUrl: string) => {
134 const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
135 rate.url = newUrl
136
137 const videoId = rate.videoId
138 const type = rate.type
139
140 await rate.save()
141
142 return { videoId, type }
143 },
144
145 deleter: async (url) => {
146 const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
147
148 const videoId = rate.videoId
149 const type = rate.type
150
151 await rate.destroy()
152
153 return { videoId, type }
154 }
155 }
156}
157
158function shareOptionsFactory () {
159 return {
160 bodyValidator: (body: any) => isAnnounceActivityValid(body),
161
162 updater: async (url: string, newUrl: string) => {
163 const share = await VideoShareModel.loadByUrl(url, undefined)
164 share.url = newUrl
165
166 await share.save()
167
168 return undefined
169 },
170
171 deleter: async (url) => {
172 const share = await VideoShareModel.loadByUrl(url, undefined)
173
174 await share.destroy()
175
176 return undefined
177 }
178 }
179}
180
181function commentOptionsFactory () {
182 return {
183 bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body),
184
185 updater: async (url: string, newUrl: string) => {
186 const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
187 comment.url = newUrl
188
189 await comment.save()
190
191 return undefined
192 },
193
194 deleter: async (url) => {
195 const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
196
197 await comment.destroy()
198
199 return undefined
200 }
201 }
202}
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
deleted file mode 100644
index a68c32ba0..000000000
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ /dev/null
@@ -1,82 +0,0 @@
1import { Job } from 'bullmq'
2import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url'
3import { ActivitypubFollowPayload } from '@shared/models'
4import { sanitizeHost } from '../../../helpers/core-utils'
5import { retryTransactionWrapper } from '../../../helpers/database-utils'
6import { logger } from '../../../helpers/logger'
7import { REMOTE_SCHEME, WEBSERVER } from '../../../initializers/constants'
8import { sequelizeTypescript } from '../../../initializers/database'
9import { ActorModel } from '../../../models/actor/actor'
10import { ActorFollowModel } from '../../../models/actor/actor-follow'
11import { MActor, MActorFull } from '../../../types/models'
12import { getOrCreateAPActor, loadActorUrlOrGetFromWebfinger } from '../../activitypub/actors'
13import { sendFollow } from '../../activitypub/send'
14import { Notifier } from '../../notifier'
15
16async function processActivityPubFollow (job: Job) {
17 const payload = job.data as ActivitypubFollowPayload
18 const host = payload.host
19
20 logger.info('Processing ActivityPub follow in job %s.', job.id)
21
22 let targetActor: MActorFull
23 if (!host || host === WEBSERVER.HOST) {
24 targetActor = await ActorModel.loadLocalByName(payload.name)
25 } else {
26 const sanitizedHost = sanitizeHost(host, REMOTE_SCHEME.HTTP)
27 const actorUrl = await loadActorUrlOrGetFromWebfinger(payload.name + '@' + sanitizedHost)
28 targetActor = await getOrCreateAPActor(actorUrl, 'all')
29 }
30
31 if (payload.assertIsChannel && !targetActor.VideoChannel) {
32 logger.warn('Do not follow %s@%s because it is not a channel.', payload.name, host)
33 return
34 }
35
36 const fromActor = await ActorModel.load(payload.followerActorId)
37
38 return retryTransactionWrapper(follow, fromActor, targetActor, payload.isAutoFollow)
39}
40// ---------------------------------------------------------------------------
41
42export {
43 processActivityPubFollow
44}
45
46// ---------------------------------------------------------------------------
47
48async function follow (fromActor: MActor, targetActor: MActorFull, isAutoFollow = false) {
49 if (fromActor.id === targetActor.id) {
50 throw new Error('Follower is the same as target actor.')
51 }
52
53 // Same server, direct accept
54 const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending'
55
56 const actorFollow = await sequelizeTypescript.transaction(async t => {
57 const [ actorFollow ] = await ActorFollowModel.findOrCreateCustom({
58 byActor: fromActor,
59 state,
60 targetActor,
61 activityId: getLocalActorFollowActivityPubUrl(fromActor, targetActor),
62 transaction: t
63 })
64
65 // Send a notification to remote server if our follow is not already accepted
66 if (actorFollow.state !== 'accepted') sendFollow(actorFollow, t)
67
68 return actorFollow
69 })
70
71 const followerFull = await ActorModel.loadFull(fromActor.id)
72
73 const actorFollowFull = Object.assign(actorFollow, {
74 ActorFollowing: targetActor,
75 ActorFollower: followerFull
76 })
77
78 if (actorFollow.state === 'accepted') Notifier.Instance.notifyOfNewUserFollow(actorFollowFull)
79 if (isAutoFollow === true) Notifier.Instance.notifyOfAutoInstanceFollowing(actorFollowFull)
80
81 return actorFollow
82}
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
deleted file mode 100644
index 8904d086f..000000000
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ /dev/null
@@ -1,49 +0,0 @@
1import { Job } from 'bullmq'
2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
3import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
4import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
5import { ActivitypubHttpBroadcastPayload } from '@shared/models'
6import { logger } from '../../../helpers/logger'
7
8// Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive
9
10async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
11 logger.info('Processing ActivityPub broadcast in job %s.', job.id)
12
13 const requestOptions = await buildRequestOptions(job.data)
14
15 const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
16
17 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
18}
19
20async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) {
21 logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id)
22
23 const requestOptions = await buildRequestOptions(job.data)
24
25 const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
26
27 return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
28}
29
30// ---------------------------------------------------------------------------
31
32export {
33 processActivityPubHttpSequentialBroadcast,
34 processActivityPubParallelHttpBroadcast
35}
36
37// ---------------------------------------------------------------------------
38
39async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
40 const body = await computeBody(payload)
41 const httpSignatureOptions = await buildSignedRequestOptions({ signatureActorId: payload.signatureActorId, hasPayload: true })
42
43 return {
44 method: 'POST' as 'POST',
45 json: body,
46 httpSignature: httpSignatureOptions,
47 headers: buildGlobalHeaders(body)
48 }
49}
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
deleted file mode 100644
index b6cb3c4a6..000000000
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ /dev/null
@@ -1,41 +0,0 @@
1import { Job } from 'bullmq'
2import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models'
3import { logger } from '../../../helpers/logger'
4import { VideoModel } from '../../../models/video/video'
5import { VideoCommentModel } from '../../../models/video/video-comment'
6import { VideoShareModel } from '../../../models/video/video-share'
7import { MVideoFullLight } from '../../../types/models'
8import { crawlCollectionPage } from '../../activitypub/crawl'
9import { createAccountPlaylists } from '../../activitypub/playlists'
10import { processActivities } from '../../activitypub/process'
11import { addVideoShares } from '../../activitypub/share'
12import { addVideoComments } from '../../activitypub/video-comments'
13
14async function processActivityPubHttpFetcher (job: Job) {
15 logger.info('Processing ActivityPub fetcher in job %s.', job.id)
16
17 const payload = job.data as ActivitypubHttpFetcherPayload
18
19 let video: MVideoFullLight
20 if (payload.videoId) video = await VideoModel.loadFull(payload.videoId)
21
22 const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = {
23 'activity': items => processActivities(items, { outboxUrl: payload.uri, fromFetch: true }),
24 'video-shares': items => addVideoShares(items, video),
25 'video-comments': items => addVideoComments(items),
26 'account-playlists': items => createAccountPlaylists(items)
27 }
28
29 const cleanerType: { [ id in FetchType ]?: (crawlStartDate: Date) => Promise<any> } = {
30 'video-shares': crawlStartDate => VideoShareModel.cleanOldSharesOf(video.id, crawlStartDate),
31 'video-comments': crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate)
32 }
33
34 return crawlCollectionPage(payload.uri, fetcherType[payload.type], cleanerType[payload.type])
35}
36
37// ---------------------------------------------------------------------------
38
39export {
40 processActivityPubHttpFetcher
41}
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
deleted file mode 100644
index 50fca3f94..000000000
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ /dev/null
@@ -1,38 +0,0 @@
1import { Job } from 'bullmq'
2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
3import { ActivitypubHttpUnicastPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger'
5import { doRequest } from '../../../helpers/requests'
6import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
7
8async function processActivityPubHttpUnicast (job: Job) {
9 logger.info('Processing ActivityPub unicast in job %s.', job.id)
10
11 const payload = job.data as ActivitypubHttpUnicastPayload
12 const uri = payload.uri
13
14 const body = await computeBody(payload)
15 const httpSignatureOptions = await buildSignedRequestOptions({ signatureActorId: payload.signatureActorId, hasPayload: true })
16
17 const options = {
18 method: 'POST' as 'POST',
19 json: body,
20 httpSignature: httpSignatureOptions,
21 headers: buildGlobalHeaders(body)
22 }
23
24 try {
25 await doRequest(uri, options)
26 ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], [])
27 } catch (err) {
28 ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ])
29
30 throw err
31 }
32}
33
34// ---------------------------------------------------------------------------
35
36export {
37 processActivityPubHttpUnicast
38}
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
deleted file mode 100644
index 706bf17fa..000000000
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ /dev/null
@@ -1,60 +0,0 @@
1import { Job } from 'bullmq'
2import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists'
3import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos'
4import { loadVideoByUrl } from '@server/lib/model-loaders'
5import { RefreshPayload } from '@shared/models'
6import { logger } from '../../../helpers/logger'
7import { ActorModel } from '../../../models/actor/actor'
8import { VideoPlaylistModel } from '../../../models/video/video-playlist'
9import { refreshActorIfNeeded } from '../../activitypub/actors'
10
11async function refreshAPObject (job: Job) {
12 const payload = job.data as RefreshPayload
13
14 logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url)
15
16 if (payload.type === 'video') return refreshVideo(payload.url)
17 if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url)
18 if (payload.type === 'actor') return refreshActor(payload.url)
19}
20
21// ---------------------------------------------------------------------------
22
23export {
24 refreshAPObject
25}
26
27// ---------------------------------------------------------------------------
28
29async function refreshVideo (videoUrl: string) {
30 const fetchType = 'all' as 'all'
31 const syncParam = { rates: true, shares: true, comments: true }
32
33 const videoFromDatabase = await loadVideoByUrl(videoUrl, fetchType)
34 if (videoFromDatabase) {
35 const refreshOptions = {
36 video: videoFromDatabase,
37 fetchedType: fetchType,
38 syncParam
39 }
40
41 await refreshVideoIfNeeded(refreshOptions)
42 }
43}
44
45async function refreshActor (actorUrl: string) {
46 const fetchType = 'all' as 'all'
47 const actor = await ActorModel.loadByUrlAndPopulateAccountAndChannel(actorUrl)
48
49 if (actor) {
50 await refreshActorIfNeeded({ actor, fetchedType: fetchType })
51 }
52}
53
54async function refreshVideoPlaylist (playlistUrl: string) {
55 const playlist = await VideoPlaylistModel.loadByUrlAndPopulateAccount(playlistUrl)
56
57 if (playlist) {
58 await refreshVideoPlaylistIfNeeded(playlist)
59 }
60}
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts
deleted file mode 100644
index 27a2d431b..000000000
--- a/server/lib/job-queue/handlers/actor-keys.ts
+++ /dev/null
@@ -1,20 +0,0 @@
1import { Job } from 'bullmq'
2import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors'
3import { ActorModel } from '@server/models/actor/actor'
4import { ActorKeysPayload } from '@shared/models'
5import { logger } from '../../../helpers/logger'
6
7async function processActorKeys (job: Job) {
8 const payload = job.data as ActorKeysPayload
9 logger.info('Processing actor keys in job %s.', job.id)
10
11 const actor = await ActorModel.load(payload.actorId)
12
13 await generateAndSaveActorKeys(actor)
14}
15
16// ---------------------------------------------------------------------------
17
18export {
19 processActorKeys
20}
diff --git a/server/lib/job-queue/handlers/after-video-channel-import.ts b/server/lib/job-queue/handlers/after-video-channel-import.ts
deleted file mode 100644
index ffdd8c5b5..000000000
--- a/server/lib/job-queue/handlers/after-video-channel-import.ts
+++ /dev/null
@@ -1,37 +0,0 @@
1import { Job } from 'bullmq'
2import { logger } from '@server/helpers/logger'
3import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync'
4import { AfterVideoChannelImportPayload, VideoChannelSyncState, VideoImportPreventExceptionResult } from '@shared/models'
5
6export async function processAfterVideoChannelImport (job: Job) {
7 const payload = job.data as AfterVideoChannelImportPayload
8 if (!payload.channelSyncId) return
9
10 logger.info('Processing after video channel import in job %s.', job.id)
11
12 const sync = await VideoChannelSyncModel.loadWithChannel(payload.channelSyncId)
13 if (!sync) {
14 logger.error('Unknown sync id %d.', payload.channelSyncId)
15 return
16 }
17
18 const childrenValues = await job.getChildrenValues<VideoImportPreventExceptionResult>()
19
20 let errors = 0
21 let successes = 0
22
23 for (const value of Object.values(childrenValues)) {
24 if (value.resultType === 'success') successes++
25 else if (value.resultType === 'error') errors++
26 }
27
28 if (errors > 0) {
29 sync.state = VideoChannelSyncState.FAILED
30 logger.error(`Finished synchronizing "${sync.VideoChannel.Actor.preferredUsername}" with failures.`, { errors, successes })
31 } else {
32 sync.state = VideoChannelSyncState.SYNCED
33 logger.info(`Finished synchronizing "${sync.VideoChannel.Actor.preferredUsername}" successfully.`, { successes })
34 }
35
36 await sync.save()
37}
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
deleted file mode 100644
index 567bcc076..000000000
--- a/server/lib/job-queue/handlers/email.ts
+++ /dev/null
@@ -1,17 +0,0 @@
1import { Job } from 'bullmq'
2import { EmailPayload } from '@shared/models'
3import { logger } from '../../../helpers/logger'
4import { Emailer } from '../../emailer'
5
6async function processEmail (job: Job) {
7 const payload = job.data as EmailPayload
8 logger.info('Processing email in job %s.', job.id)
9
10 return Emailer.Instance.sendMail(payload)
11}
12
13// ---------------------------------------------------------------------------
14
15export {
16 processEmail
17}
diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts
deleted file mode 100644
index 6aac36741..000000000
--- a/server/lib/job-queue/handlers/federate-video.ts
+++ /dev/null
@@ -1,28 +0,0 @@
1import { Job } from 'bullmq'
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { sequelizeTypescript } from '@server/initializers/database'
4import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
5import { VideoModel } from '@server/models/video/video'
6import { FederateVideoPayload } from '@shared/models'
7import { logger } from '../../../helpers/logger'
8
9function processFederateVideo (job: Job) {
10 const payload = job.data as FederateVideoPayload
11
12 logger.info('Processing video federation in job %s.', job.id)
13
14 return retryTransactionWrapper(() => {
15 return sequelizeTypescript.transaction(async t => {
16 const video = await VideoModel.loadFull(payload.videoUUID, t)
17 if (!video) return
18
19 return federateVideoIfNeeded(video, payload.isNewVideo, t)
20 })
21 })
22}
23
24// ---------------------------------------------------------------------------
25
26export {
27 processFederateVideo
28}
diff --git a/server/lib/job-queue/handlers/generate-storyboard.ts b/server/lib/job-queue/handlers/generate-storyboard.ts
deleted file mode 100644
index eea20274a..000000000
--- a/server/lib/job-queue/handlers/generate-storyboard.ts
+++ /dev/null
@@ -1,163 +0,0 @@
1import { Job } from 'bullmq'
2import { join } from 'path'
3import { retryTransactionWrapper } from '@server/helpers/database-utils'
4import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg'
5import { generateImageFilename, getImageSize } from '@server/helpers/image-utils'
6import { logger, loggerTagsFactory } from '@server/helpers/logger'
7import { deleteFileAndCatch } from '@server/helpers/utils'
8import { CONFIG } from '@server/initializers/config'
9import { STORYBOARD } from '@server/initializers/constants'
10import { sequelizeTypescript } from '@server/initializers/database'
11import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
12import { VideoPathManager } from '@server/lib/video-path-manager'
13import { StoryboardModel } from '@server/models/video/storyboard'
14import { VideoModel } from '@server/models/video/video'
15import { MVideo } from '@server/types/models'
16import { FFmpegImage, isAudioFile } from '@shared/ffmpeg'
17import { GenerateStoryboardPayload } from '@shared/models'
18
19const lTagsBase = loggerTagsFactory('storyboard')
20
21async function processGenerateStoryboard (job: Job): Promise<void> {
22 const payload = job.data as GenerateStoryboardPayload
23 const lTags = lTagsBase(payload.videoUUID)
24
25 logger.info('Processing generate storyboard of %s in job %s.', payload.videoUUID, job.id, lTags)
26
27 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(payload.videoUUID)
28
29 try {
30 const video = await VideoModel.loadFull(payload.videoUUID)
31 if (!video) {
32 logger.info('Video %s does not exist anymore, skipping storyboard generation.', payload.videoUUID, lTags)
33 return
34 }
35
36 const inputFile = video.getMaxQualityFile()
37
38 await VideoPathManager.Instance.makeAvailableVideoFile(inputFile, async videoPath => {
39 const isAudio = await isAudioFile(videoPath)
40
41 if (isAudio) {
42 logger.info('Do not generate a storyboard of %s since the video does not have a video stream', payload.videoUUID, lTags)
43 return
44 }
45
46 const ffmpeg = new FFmpegImage(getFFmpegCommandWrapperOptions('thumbnail'))
47
48 const filename = generateImageFilename()
49 const destination = join(CONFIG.STORAGE.STORYBOARDS_DIR, filename)
50
51 const totalSprites = buildTotalSprites(video)
52 if (totalSprites === 0) {
53 logger.info('Do not generate a storyboard of %s because the video is not long enough', payload.videoUUID, lTags)
54 return
55 }
56
57 const spriteDuration = Math.round(video.duration / totalSprites)
58
59 const spritesCount = findGridSize({
60 toFind: totalSprites,
61 maxEdgeCount: STORYBOARD.SPRITES_MAX_EDGE_COUNT
62 })
63
64 logger.debug(
65 'Generating storyboard from video of %s to %s', video.uuid, destination,
66 { ...lTags, spritesCount, spriteDuration, videoDuration: video.duration }
67 )
68
69 await ffmpeg.generateStoryboardFromVideo({
70 destination,
71 path: videoPath,
72 sprites: {
73 size: STORYBOARD.SPRITE_SIZE,
74 count: spritesCount,
75 duration: spriteDuration
76 }
77 })
78
79 const imageSize = await getImageSize(destination)
80
81 await retryTransactionWrapper(() => {
82 return sequelizeTypescript.transaction(async transaction => {
83 const videoStillExists = await VideoModel.load(video.id, transaction)
84 if (!videoStillExists) {
85 logger.info('Video %s does not exist anymore, skipping storyboard generation.', payload.videoUUID, lTags)
86 deleteFileAndCatch(destination)
87 return
88 }
89
90 const existing = await StoryboardModel.loadByVideo(video.id, transaction)
91 if (existing) await existing.destroy({ transaction })
92
93 await StoryboardModel.create({
94 filename,
95 totalHeight: imageSize.height,
96 totalWidth: imageSize.width,
97 spriteHeight: STORYBOARD.SPRITE_SIZE.height,
98 spriteWidth: STORYBOARD.SPRITE_SIZE.width,
99 spriteDuration,
100 videoId: video.id
101 }, { transaction })
102
103 logger.info('Storyboard generation %s ended for video %s.', destination, video.uuid, lTags)
104
105 if (payload.federate) {
106 await federateVideoIfNeeded(video, false, transaction)
107 }
108 })
109 })
110 })
111 } finally {
112 inputFileMutexReleaser()
113 }
114}
115
116// ---------------------------------------------------------------------------
117
118export {
119 processGenerateStoryboard
120}
121
122function buildTotalSprites (video: MVideo) {
123 const maxSprites = STORYBOARD.SPRITE_SIZE.height * STORYBOARD.SPRITE_SIZE.width
124 const totalSprites = Math.min(Math.ceil(video.duration), maxSprites)
125
126 // We can generate a single line
127 if (totalSprites <= STORYBOARD.SPRITES_MAX_EDGE_COUNT) return totalSprites
128
129 return findGridFit(totalSprites, STORYBOARD.SPRITES_MAX_EDGE_COUNT)
130}
131
132function findGridSize (options: {
133 toFind: number
134 maxEdgeCount: number
135}) {
136 const { toFind, maxEdgeCount } = options
137
138 for (let i = 1; i <= maxEdgeCount; i++) {
139 for (let j = i; j <= maxEdgeCount; j++) {
140 if (toFind === i * j) return { width: j, height: i }
141 }
142 }
143
144 throw new Error(`Could not find grid size (to find: ${toFind}, max edge count: ${maxEdgeCount}`)
145}
146
147function findGridFit (value: number, maxMultiplier: number) {
148 for (let i = value; i--; i > 0) {
149 if (!isPrimeWithin(i, maxMultiplier)) return i
150 }
151
152 throw new Error('Could not find prime number below ' + value)
153}
154
155function isPrimeWithin (value: number, maxMultiplier: number) {
156 if (value < 2) return false
157
158 for (let i = 2, end = Math.min(Math.sqrt(value), maxMultiplier); i <= end; i++) {
159 if (value % i === 0 && value / i <= maxMultiplier) return false
160 }
161
162 return true
163}
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts
deleted file mode 100644
index edf52de0c..000000000
--- a/server/lib/job-queue/handlers/manage-video-torrent.ts
+++ /dev/null
@@ -1,110 +0,0 @@
1import { Job } from 'bullmq'
2import { extractVideo } from '@server/helpers/video'
3import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent'
4import { VideoPathManager } from '@server/lib/video-path-manager'
5import { VideoModel } from '@server/models/video/video'
6import { VideoFileModel } from '@server/models/video/video-file'
7import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
8import { ManageVideoTorrentPayload } from '@shared/models'
9import { logger } from '../../../helpers/logger'
10
11async function processManageVideoTorrent (job: Job) {
12 const payload = job.data as ManageVideoTorrentPayload
13 logger.info('Processing torrent in job %s.', job.id)
14
15 if (payload.action === 'create') return doCreateAction(payload)
16 if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload)
17}
18
19// ---------------------------------------------------------------------------
20
21export {
22 processManageVideoTorrent
23}
24
25// ---------------------------------------------------------------------------
26
27async function doCreateAction (payload: ManageVideoTorrentPayload & { action: 'create' }) {
28 const [ video, file ] = await Promise.all([
29 loadVideoOrLog(payload.videoId),
30 loadFileOrLog(payload.videoFileId)
31 ])
32
33 if (!video || !file) return
34
35 const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
36
37 try {
38 await video.reload()
39 await file.reload()
40
41 await createTorrentAndSetInfoHash(video, file)
42
43 // Refresh videoFile because the createTorrentAndSetInfoHash could be long
44 const refreshedFile = await VideoFileModel.loadWithVideo(file.id)
45 // File does not exist anymore, remove the generated torrent
46 if (!refreshedFile) return file.removeTorrent()
47
48 refreshedFile.infoHash = file.infoHash
49 refreshedFile.torrentFilename = file.torrentFilename
50
51 await refreshedFile.save()
52 } finally {
53 fileMutexReleaser()
54 }
55}
56
57async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) {
58 const [ video, streamingPlaylist, file ] = await Promise.all([
59 loadVideoOrLog(payload.videoId),
60 loadStreamingPlaylistOrLog(payload.streamingPlaylistId),
61 loadFileOrLog(payload.videoFileId)
62 ])
63
64 if ((!video && !streamingPlaylist) || !file) return
65
66 const extractedVideo = extractVideo(video || streamingPlaylist)
67 const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(extractedVideo.uuid)
68
69 try {
70 await updateTorrentMetadata(video || streamingPlaylist, file)
71
72 await file.save()
73 } finally {
74 fileMutexReleaser()
75 }
76}
77
78async function loadVideoOrLog (videoId: number) {
79 if (!videoId) return undefined
80
81 const video = await VideoModel.load(videoId)
82 if (!video) {
83 logger.debug('Do not process torrent for video %d: does not exist anymore.', videoId)
84 }
85
86 return video
87}
88
89async function loadStreamingPlaylistOrLog (streamingPlaylistId: number) {
90 if (!streamingPlaylistId) return undefined
91
92 const streamingPlaylist = await VideoStreamingPlaylistModel.loadWithVideo(streamingPlaylistId)
93 if (!streamingPlaylist) {
94 logger.debug('Do not process torrent for streaming playlist %d: does not exist anymore.', streamingPlaylistId)
95 }
96
97 return streamingPlaylist
98}
99
100async function loadFileOrLog (videoFileId: number) {
101 if (!videoFileId) return undefined
102
103 const file = await VideoFileModel.load(videoFileId)
104
105 if (!file) {
106 logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId)
107 }
108
109 return file
110}
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
deleted file mode 100644
index 9a99b6722..000000000
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ /dev/null
@@ -1,159 +0,0 @@
1import { Job } from 'bullmq'
2import { remove } from 'fs-extra'
3import { join } from 'path'
4import { logger, loggerTagsFactory } from '@server/helpers/logger'
5import { updateTorrentMetadata } from '@server/helpers/webtorrent'
6import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants'
7import { storeHLSFileFromFilename, storeWebVideoFile } from '@server/lib/object-storage'
8import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
9import { VideoPathManager } from '@server/lib/video-path-manager'
10import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state'
11import { VideoModel } from '@server/models/video/video'
12import { VideoJobInfoModel } from '@server/models/video/video-job-info'
13import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models'
14import { MoveObjectStoragePayload, VideoState, VideoStorage } from '@shared/models'
15
16const lTagsBase = loggerTagsFactory('move-object-storage')
17
18export async function processMoveToObjectStorage (job: Job) {
19 const payload = job.data as MoveObjectStoragePayload
20 logger.info('Moving video %s in job %s.', payload.videoUUID, job.id)
21
22 const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(payload.videoUUID)
23
24 const video = await VideoModel.loadWithFiles(payload.videoUUID)
25 // No video, maybe deleted?
26 if (!video) {
27 logger.info('Can\'t process job %d, video does not exist.', job.id, lTagsBase(payload.videoUUID))
28 fileMutexReleaser()
29 return undefined
30 }
31
32 const lTags = lTagsBase(video.uuid, video.url)
33
34 try {
35 if (video.VideoFiles) {
36 logger.debug('Moving %d web video files for video %s.', video.VideoFiles.length, video.uuid, lTags)
37
38 await moveWebVideoFiles(video)
39 }
40
41 if (video.VideoStreamingPlaylists) {
42 logger.debug('Moving HLS playlist of %s.', video.uuid)
43
44 await moveHLSFiles(video)
45 }
46
47 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
48 if (pendingMove === 0) {
49 logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags)
50
51 await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
52 }
53 } catch (err) {
54 await onMoveToObjectStorageFailure(job, err)
55
56 throw err
57 } finally {
58 fileMutexReleaser()
59 }
60
61 return payload.videoUUID
62}
63
64export async function onMoveToObjectStorageFailure (job: Job, err: any) {
65 const payload = job.data as MoveObjectStoragePayload
66
67 const video = await VideoModel.loadWithFiles(payload.videoUUID)
68 if (!video) return
69
70 logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTagsBase(video.uuid, video.url) })
71
72 await moveToFailedMoveToObjectStorageState(video)
73 await VideoJobInfoModel.abortAllTasks(video.uuid, 'pendingMove')
74}
75
76// ---------------------------------------------------------------------------
77
78async function moveWebVideoFiles (video: MVideoWithAllFiles) {
79 for (const file of video.VideoFiles) {
80 if (file.storage !== VideoStorage.FILE_SYSTEM) continue
81
82 const fileUrl = await storeWebVideoFile(video, file)
83
84 const oldPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, file)
85 await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath })
86 }
87}
88
89async function moveHLSFiles (video: MVideoWithAllFiles) {
90 for (const playlist of video.VideoStreamingPlaylists) {
91 const playlistWithVideo = playlist.withVideo(video)
92
93 for (const file of playlist.VideoFiles) {
94 if (file.storage !== VideoStorage.FILE_SYSTEM) continue
95
96 // Resolution playlist
97 const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
98 await storeHLSFileFromFilename(playlistWithVideo, playlistFilename)
99
100 // Resolution fragmented file
101 const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename)
102
103 const oldPath = join(getHLSDirectory(video), file.filename)
104
105 await onFileMoved({ videoOrPlaylist: Object.assign(playlist, { Video: video }), file, fileUrl, oldPath })
106 }
107 }
108}
109
110async function doAfterLastJob (options: {
111 video: MVideoWithAllFiles
112 previousVideoState: VideoState
113 isNewVideo: boolean
114}) {
115 const { video, previousVideoState, isNewVideo } = options
116
117 for (const playlist of video.VideoStreamingPlaylists) {
118 if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue
119
120 const playlistWithVideo = playlist.withVideo(video)
121
122 // Master playlist
123 playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename)
124 // Sha256 segments file
125 playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename)
126
127 playlist.storage = VideoStorage.OBJECT_STORAGE
128
129 playlist.assignP2PMediaLoaderInfoHashes(video, playlist.VideoFiles)
130 playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION
131
132 await playlist.save()
133 }
134
135 // Remove empty hls video directory
136 if (video.VideoStreamingPlaylists) {
137 await remove(getHLSDirectory(video))
138 }
139
140 await moveToNextState({ video, previousVideoState, isNewVideo })
141}
142
143async function onFileMoved (options: {
144 videoOrPlaylist: MVideo | MStreamingPlaylistVideo
145 file: MVideoFile
146 fileUrl: string
147 oldPath: string
148}) {
149 const { videoOrPlaylist, file, fileUrl, oldPath } = options
150
151 file.fileUrl = fileUrl
152 file.storage = VideoStorage.OBJECT_STORAGE
153
154 await updateTorrentMetadata(videoOrPlaylist, file)
155 await file.save()
156
157 logger.debug('Removing %s because it\'s now on object storage', oldPath)
158 await remove(oldPath)
159}
diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts
deleted file mode 100644
index 83605396c..000000000
--- a/server/lib/job-queue/handlers/notify.ts
+++ /dev/null
@@ -1,27 +0,0 @@
1import { Job } from 'bullmq'
2import { Notifier } from '@server/lib/notifier'
3import { VideoModel } from '@server/models/video/video'
4import { NotifyPayload } from '@shared/models'
5import { logger } from '../../../helpers/logger'
6
7async function processNotify (job: Job) {
8 const payload = job.data as NotifyPayload
9 logger.info('Processing %s notification in job %s.', payload.action, job.id)
10
11 if (payload.action === 'new-video') return doNotifyNewVideo(payload)
12}
13
14// ---------------------------------------------------------------------------
15
16export {
17 processNotify
18}
19
20// ---------------------------------------------------------------------------
21
22async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) {
23 const refreshedVideo = await VideoModel.loadFull(payload.videoUUID)
24 if (!refreshedVideo) return
25
26 Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
27}
diff --git a/server/lib/job-queue/handlers/transcoding-job-builder.ts b/server/lib/job-queue/handlers/transcoding-job-builder.ts
deleted file mode 100644
index 8621b109f..000000000
--- a/server/lib/job-queue/handlers/transcoding-job-builder.ts
+++ /dev/null
@@ -1,48 +0,0 @@
1import { Job } from 'bullmq'
2import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job'
3import { UserModel } from '@server/models/user/user'
4import { VideoModel } from '@server/models/video/video'
5import { VideoJobInfoModel } from '@server/models/video/video-job-info'
6import { pick } from '@shared/core-utils'
7import { TranscodingJobBuilderPayload } from '@shared/models'
8import { logger } from '../../../helpers/logger'
9import { JobQueue } from '../job-queue'
10
11async function processTranscodingJobBuilder (job: Job) {
12 const payload = job.data as TranscodingJobBuilderPayload
13
14 logger.info('Processing transcoding job builder in job %s.', job.id)
15
16 if (payload.optimizeJob) {
17 const video = await VideoModel.loadFull(payload.videoUUID)
18 const user = await UserModel.loadByVideoId(video.id)
19 const videoFile = video.getMaxQualityFile()
20
21 await createOptimizeOrMergeAudioJobs({
22 ...pick(payload.optimizeJob, [ 'isNewVideo' ]),
23
24 video,
25 videoFile,
26 user,
27 videoFileAlreadyLocked: false
28 })
29 }
30
31 for (const job of (payload.jobs || [])) {
32 await JobQueue.Instance.createJob(job)
33
34 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
35 }
36
37 for (const sequentialJobs of (payload.sequentialJobs || [])) {
38 await JobQueue.Instance.createSequentialJobFlow(...sequentialJobs)
39
40 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode', sequentialJobs.filter(s => !!s).length)
41 }
42}
43
44// ---------------------------------------------------------------------------
45
46export {
47 processTranscodingJobBuilder
48}
diff --git a/server/lib/job-queue/handlers/video-channel-import.ts b/server/lib/job-queue/handlers/video-channel-import.ts
deleted file mode 100644
index 035f88e96..000000000
--- a/server/lib/job-queue/handlers/video-channel-import.ts
+++ /dev/null
@@ -1,43 +0,0 @@
1import { Job } from 'bullmq'
2import { logger } from '@server/helpers/logger'
3import { CONFIG } from '@server/initializers/config'
4import { synchronizeChannel } from '@server/lib/sync-channel'
5import { VideoChannelModel } from '@server/models/video/video-channel'
6import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync'
7import { MChannelSync } from '@server/types/models'
8import { VideoChannelImportPayload } from '@shared/models'
9
10export async function processVideoChannelImport (job: Job) {
11 const payload = job.data as VideoChannelImportPayload
12
13 logger.info('Processing video channel import in job %s.', job.id)
14
15 // Channel import requires only http upload to be allowed
16 if (!CONFIG.IMPORT.VIDEOS.HTTP.ENABLED) {
17 throw new Error('Cannot import channel as the HTTP upload is disabled')
18 }
19
20 if (!CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.ENABLED) {
21 throw new Error('Cannot import channel as the synchronization is disabled')
22 }
23
24 let channelSync: MChannelSync
25 if (payload.partOfChannelSyncId) {
26 channelSync = await VideoChannelSyncModel.loadWithChannel(payload.partOfChannelSyncId)
27
28 if (!channelSync) {
29 throw new Error('Unlnown channel sync specified in videos channel import')
30 }
31 }
32
33 const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId)
34
35 logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `)
36
37 await synchronizeChannel({
38 channel: videoChannel,
39 externalChannelUrl: payload.externalChannelUrl,
40 channelSync,
41 videosCountLimit: CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.FULL_SYNC_VIDEOS_LIMIT
42 })
43}
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
deleted file mode 100644
index d221e8968..000000000
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ /dev/null
@@ -1,83 +0,0 @@
1import { Job } from 'bullmq'
2import { copy, stat } from 'fs-extra'
3import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
4import { CONFIG } from '@server/initializers/config'
5import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
6import { generateWebVideoFilename } from '@server/lib/paths'
7import { buildMoveToObjectStorageJob } from '@server/lib/video'
8import { VideoPathManager } from '@server/lib/video-path-manager'
9import { VideoModel } from '@server/models/video/video'
10import { VideoFileModel } from '@server/models/video/video-file'
11import { MVideoFullLight } from '@server/types/models'
12import { getLowercaseExtension } from '@shared/core-utils'
13import { getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg'
14import { VideoFileImportPayload, VideoStorage } from '@shared/models'
15import { logger } from '../../../helpers/logger'
16import { JobQueue } from '../job-queue'
17
18async function processVideoFileImport (job: Job) {
19 const payload = job.data as VideoFileImportPayload
20 logger.info('Processing video file import in job %s.', job.id)
21
22 const video = await VideoModel.loadFull(payload.videoUUID)
23 // No video, maybe deleted?
24 if (!video) {
25 logger.info('Do not process job %d, video does not exist.', job.id)
26 return undefined
27 }
28
29 await updateVideoFile(video, payload.filePath)
30
31 if (CONFIG.OBJECT_STORAGE.ENABLED) {
32 await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state }))
33 } else {
34 await federateVideoIfNeeded(video, false)
35 }
36
37 return video
38}
39
40// ---------------------------------------------------------------------------
41
42export {
43 processVideoFileImport
44}
45
46// ---------------------------------------------------------------------------
47
48async function updateVideoFile (video: MVideoFullLight, inputFilePath: string) {
49 const { resolution } = await getVideoStreamDimensionsInfo(inputFilePath)
50 const { size } = await stat(inputFilePath)
51 const fps = await getVideoStreamFPS(inputFilePath)
52
53 const fileExt = getLowercaseExtension(inputFilePath)
54
55 const currentVideoFile = video.VideoFiles.find(videoFile => videoFile.resolution === resolution)
56
57 if (currentVideoFile) {
58 // Remove old file and old torrent
59 await video.removeWebVideoFile(currentVideoFile)
60 // Remove the old video file from the array
61 video.VideoFiles = video.VideoFiles.filter(f => f !== currentVideoFile)
62
63 await currentVideoFile.destroy()
64 }
65
66 const newVideoFile = new VideoFileModel({
67 resolution,
68 extname: fileExt,
69 filename: generateWebVideoFilename(resolution, fileExt),
70 storage: VideoStorage.FILE_SYSTEM,
71 size,
72 fps,
73 videoId: video.id
74 })
75
76 const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, newVideoFile)
77 await copy(inputFilePath, outputPath)
78
79 video.VideoFiles.push(newVideoFile)
80 await createTorrentAndSetInfoHash(video, newVideoFile)
81
82 await newVideoFile.save()
83}
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
deleted file mode 100644
index e5cd258d6..000000000
--- a/server/lib/job-queue/handlers/video-import.ts
+++ /dev/null
@@ -1,344 +0,0 @@
1import { Job } from 'bullmq'
2import { move, remove, stat } from 'fs-extra'
3import { retryTransactionWrapper } from '@server/helpers/database-utils'
4import { YoutubeDLWrapper } from '@server/helpers/youtube-dl'
5import { CONFIG } from '@server/initializers/config'
6import { isPostImportVideoAccepted } from '@server/lib/moderation'
7import { generateWebVideoFilename } from '@server/lib/paths'
8import { Hooks } from '@server/lib/plugins/hooks'
9import { ServerConfigManager } from '@server/lib/server-config-manager'
10import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job'
11import { isAbleToUploadVideo } from '@server/lib/user'
12import { buildMoveToObjectStorageJob } from '@server/lib/video'
13import { VideoPathManager } from '@server/lib/video-path-manager'
14import { buildNextVideoState } from '@server/lib/video-state'
15import { ThumbnailModel } from '@server/models/video/thumbnail'
16import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
17import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
18import { getLowercaseExtension } from '@shared/core-utils'
19import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg'
20import {
21 ThumbnailType,
22 VideoImportPayload,
23 VideoImportPreventExceptionResult,
24 VideoImportState,
25 VideoImportTorrentPayload,
26 VideoImportTorrentPayloadType,
27 VideoImportYoutubeDLPayload,
28 VideoImportYoutubeDLPayloadType,
29 VideoResolution,
30 VideoState
31} from '@shared/models'
32import { logger } from '../../../helpers/logger'
33import { getSecureTorrentName } from '../../../helpers/utils'
34import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent'
35import { JOB_TTL } from '../../../initializers/constants'
36import { sequelizeTypescript } from '../../../initializers/database'
37import { VideoModel } from '../../../models/video/video'
38import { VideoFileModel } from '../../../models/video/video-file'
39import { VideoImportModel } from '../../../models/video/video-import'
40import { federateVideoIfNeeded } from '../../activitypub/videos'
41import { Notifier } from '../../notifier'
42import { generateLocalVideoMiniature } from '../../thumbnail'
43import { JobQueue } from '../job-queue'
44
45async function processVideoImport (job: Job): Promise<VideoImportPreventExceptionResult> {
46 const payload = job.data as VideoImportPayload
47
48 const videoImport = await getVideoImportOrDie(payload)
49 if (videoImport.state === VideoImportState.CANCELLED) {
50 logger.info('Do not process import since it has been cancelled', { payload })
51 return { resultType: 'success' }
52 }
53
54 videoImport.state = VideoImportState.PROCESSING
55 await videoImport.save()
56
57 try {
58 if (payload.type === 'youtube-dl') await processYoutubeDLImport(job, videoImport, payload)
59 if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') await processTorrentImport(job, videoImport, payload)
60
61 return { resultType: 'success' }
62 } catch (err) {
63 if (!payload.preventException) throw err
64
65 logger.warn('Catch error in video import to send value to parent job.', { payload, err })
66 return { resultType: 'error' }
67 }
68}
69
70// ---------------------------------------------------------------------------
71
72export {
73 processVideoImport
74}
75
76// ---------------------------------------------------------------------------
77
78async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) {
79 logger.info('Processing torrent video import in job %s.', job.id)
80
81 const options = { type: payload.type, videoImportId: payload.videoImportId }
82
83 const target = {
84 torrentName: videoImport.torrentName ? getSecureTorrentName(videoImport.torrentName) : undefined,
85 uri: videoImport.magnetUri
86 }
87 return processFile(() => downloadWebTorrentVideo(target, JOB_TTL['video-import']), videoImport, options)
88}
89
90async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) {
91 logger.info('Processing youtubeDL video import in job %s.', job.id)
92
93 const options = { type: payload.type, videoImportId: videoImport.id }
94
95 const youtubeDL = new YoutubeDLWrapper(
96 videoImport.targetUrl,
97 ServerConfigManager.Instance.getEnabledResolutions('vod'),
98 CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION
99 )
100
101 return processFile(
102 () => youtubeDL.downloadVideo(payload.fileExt, JOB_TTL['video-import']),
103 videoImport,
104 options
105 )
106}
107
108async function getVideoImportOrDie (payload: VideoImportPayload) {
109 const videoImport = await VideoImportModel.loadAndPopulateVideo(payload.videoImportId)
110 if (!videoImport?.Video) {
111 throw new Error(`Cannot import video ${payload.videoImportId}: the video import or video linked to this import does not exist anymore.`)
112 }
113
114 return videoImport
115}
116
117type ProcessFileOptions = {
118 type: VideoImportYoutubeDLPayloadType | VideoImportTorrentPayloadType
119 videoImportId: number
120}
121async function processFile (downloader: () => Promise<string>, videoImport: MVideoImportDefault, options: ProcessFileOptions) {
122 let tempVideoPath: string
123 let videoFile: VideoFileModel
124
125 try {
126 // Download video from youtubeDL
127 tempVideoPath = await downloader()
128
129 // Get information about this video
130 const stats = await stat(tempVideoPath)
131 const isAble = await isAbleToUploadVideo(videoImport.User.id, stats.size)
132 if (isAble === false) {
133 throw new Error('The user video quota is exceeded with this video to import.')
134 }
135
136 const probe = await ffprobePromise(tempVideoPath)
137
138 const { resolution } = await isAudioFile(tempVideoPath, probe)
139 ? { resolution: VideoResolution.H_NOVIDEO }
140 : await getVideoStreamDimensionsInfo(tempVideoPath, probe)
141
142 const fps = await getVideoStreamFPS(tempVideoPath, probe)
143 const duration = await getVideoStreamDuration(tempVideoPath, probe)
144
145 // Prepare video file object for creation in database
146 const fileExt = getLowercaseExtension(tempVideoPath)
147 const videoFileData = {
148 extname: fileExt,
149 resolution,
150 size: stats.size,
151 filename: generateWebVideoFilename(resolution, fileExt),
152 fps,
153 videoId: videoImport.videoId
154 }
155 videoFile = new VideoFileModel(videoFileData)
156
157 const hookName = options.type === 'youtube-dl'
158 ? 'filter:api.video.post-import-url.accept.result'
159 : 'filter:api.video.post-import-torrent.accept.result'
160
161 // Check we accept this video
162 const acceptParameters = {
163 videoImport,
164 video: videoImport.Video,
165 videoFilePath: tempVideoPath,
166 videoFile,
167 user: videoImport.User
168 }
169 const acceptedResult = await Hooks.wrapFun(isPostImportVideoAccepted, acceptParameters, hookName)
170
171 if (acceptedResult.accepted !== true) {
172 logger.info('Refused imported video.', { acceptedResult, acceptParameters })
173
174 videoImport.state = VideoImportState.REJECTED
175 await videoImport.save()
176
177 throw new Error(acceptedResult.errorMessage)
178 }
179
180 // Video is accepted, resuming preparation
181 const videoFileLockReleaser = await VideoPathManager.Instance.lockFiles(videoImport.Video.uuid)
182
183 try {
184 const videoImportWithFiles = await refreshVideoImportFromDB(videoImport, videoFile)
185
186 // Move file
187 const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile)
188 await move(tempVideoPath, videoDestFile)
189
190 tempVideoPath = null // This path is not used anymore
191
192 let {
193 miniatureModel: thumbnailModel,
194 miniatureJSONSave: thumbnailSave
195 } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.MINIATURE)
196
197 let {
198 miniatureModel: previewModel,
199 miniatureJSONSave: previewSave
200 } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.PREVIEW)
201
202 // Create torrent
203 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile)
204
205 const videoFileSave = videoFile.toJSON()
206
207 const { videoImportUpdated, video } = await retryTransactionWrapper(() => {
208 return sequelizeTypescript.transaction(async t => {
209 // Refresh video
210 const video = await VideoModel.load(videoImportWithFiles.videoId, t)
211 if (!video) throw new Error('Video linked to import ' + videoImportWithFiles.videoId + ' does not exist anymore.')
212
213 await videoFile.save({ transaction: t })
214
215 // Update video DB object
216 video.duration = duration
217 video.state = buildNextVideoState(video.state)
218 await video.save({ transaction: t })
219
220 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t)
221 if (previewModel) await video.addAndSaveThumbnail(previewModel, t)
222
223 // Now we can federate the video (reload from database, we need more attributes)
224 const videoForFederation = await VideoModel.loadFull(video.uuid, t)
225 await federateVideoIfNeeded(videoForFederation, true, t)
226
227 // Update video import object
228 videoImportWithFiles.state = VideoImportState.SUCCESS
229 const videoImportUpdated = await videoImportWithFiles.save({ transaction: t }) as MVideoImport
230
231 logger.info('Video %s imported.', video.uuid)
232
233 return { videoImportUpdated, video: videoForFederation }
234 }).catch(err => {
235 // Reset fields
236 if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave)
237 if (previewModel) previewModel = new ThumbnailModel(previewSave)
238
239 videoFile = new VideoFileModel(videoFileSave)
240
241 throw err
242 })
243 })
244
245 await afterImportSuccess({ videoImport: videoImportUpdated, video, videoFile, user: videoImport.User, videoFileAlreadyLocked: true })
246 } finally {
247 videoFileLockReleaser()
248 }
249 } catch (err) {
250 await onImportError(err, tempVideoPath, videoImport)
251
252 throw err
253 }
254}
255
256async function refreshVideoImportFromDB (videoImport: MVideoImportDefault, videoFile: MVideoFile): Promise<MVideoImportDefaultFiles> {
257 // Refresh video, privacy may have changed
258 const video = await videoImport.Video.reload()
259 const videoWithFiles = Object.assign(video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] })
260
261 return Object.assign(videoImport, { Video: videoWithFiles })
262}
263
264async function generateMiniature (videoImportWithFiles: MVideoImportDefaultFiles, videoFile: MVideoFile, thumbnailType: ThumbnailType) {
265 // Generate miniature if the import did not created it
266 const needsMiniature = thumbnailType === ThumbnailType.MINIATURE
267 ? !videoImportWithFiles.Video.getMiniature()
268 : !videoImportWithFiles.Video.getPreview()
269
270 if (!needsMiniature) {
271 return {
272 miniatureModel: null,
273 miniatureJSONSave: null
274 }
275 }
276
277 const miniatureModel = await generateLocalVideoMiniature({
278 video: videoImportWithFiles.Video,
279 videoFile,
280 type: thumbnailType
281 })
282 const miniatureJSONSave = miniatureModel.toJSON()
283
284 return {
285 miniatureModel,
286 miniatureJSONSave
287 }
288}
289
290async function afterImportSuccess (options: {
291 videoImport: MVideoImport
292 video: MVideoFullLight
293 videoFile: MVideoFile
294 user: MUserId
295 videoFileAlreadyLocked: boolean
296}) {
297 const { video, videoFile, videoImport, user, videoFileAlreadyLocked } = options
298
299 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: Object.assign(videoImport, { Video: video }), success: true })
300
301 if (video.isBlacklisted()) {
302 const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video })
303
304 Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist)
305 } else {
306 Notifier.Instance.notifyOnNewVideoIfNeeded(video)
307 }
308
309 // Generate the storyboard in the job queue, and don't forget to federate an update after
310 await JobQueue.Instance.createJob({
311 type: 'generate-video-storyboard' as 'generate-video-storyboard',
312 payload: {
313 videoUUID: video.uuid,
314 federate: true
315 }
316 })
317
318 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
319 await JobQueue.Instance.createJob(
320 await buildMoveToObjectStorageJob({ video, previousVideoState: VideoState.TO_IMPORT })
321 )
322 return
323 }
324
325 if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs?
326 await createOptimizeOrMergeAudioJobs({ video, videoFile, isNewVideo: true, user, videoFileAlreadyLocked })
327 }
328}
329
330async function onImportError (err: Error, tempVideoPath: string, videoImport: MVideoImportVideo) {
331 try {
332 if (tempVideoPath) await remove(tempVideoPath)
333 } catch (errUnlink) {
334 logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink })
335 }
336
337 videoImport.error = err.message
338 if (videoImport.state !== VideoImportState.REJECTED) {
339 videoImport.state = VideoImportState.FAILED
340 }
341 await videoImport.save()
342
343 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false })
344}
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
deleted file mode 100644
index 070d1d7a2..000000000
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ /dev/null
@@ -1,279 +0,0 @@
1import { Job } from 'bullmq'
2import { readdir, remove } from 'fs-extra'
3import { join } from 'path'
4import { peertubeTruncate } from '@server/helpers/core-utils'
5import { CONSTRAINTS_FIELDS } from '@server/initializers/constants'
6import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
7import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
8import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
9import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths'
10import { generateLocalVideoMiniature, regenerateMiniaturesIfNeeded } from '@server/lib/thumbnail'
11import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/hls-transcoding'
12import { VideoPathManager } from '@server/lib/video-path-manager'
13import { moveToNextState } from '@server/lib/video-state'
14import { VideoModel } from '@server/models/video/video'
15import { VideoBlacklistModel } from '@server/models/video/video-blacklist'
16import { VideoFileModel } from '@server/models/video/video-file'
17import { VideoLiveModel } from '@server/models/video/video-live'
18import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
19import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
20import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
21import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
22import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg'
23import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
24import { logger, loggerTagsFactory } from '../../../helpers/logger'
25import { JobQueue } from '../job-queue'
26
27const lTags = loggerTagsFactory('live', 'job')
28
29async function processVideoLiveEnding (job: Job) {
30 const payload = job.data as VideoLiveEndingPayload
31
32 logger.info('Processing video live ending for %s.', payload.videoId, { payload, ...lTags() })
33
34 function logError () {
35 logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId, lTags())
36 }
37
38 const video = await VideoModel.load(payload.videoId)
39 const live = await VideoLiveModel.loadByVideoId(payload.videoId)
40 const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId)
41
42 if (!video || !live || !liveSession) {
43 logError()
44 return
45 }
46
47 const permanentLive = live.permanentLive
48
49 liveSession.endingProcessed = true
50 await liveSession.save()
51
52 if (liveSession.saveReplay !== true) {
53 return cleanupLiveAndFederate({ permanentLive, video, streamingPlaylistId: payload.streamingPlaylistId })
54 }
55
56 if (permanentLive) {
57 await saveReplayToExternalVideo({
58 liveVideo: video,
59 liveSession,
60 publishedAt: payload.publishedAt,
61 replayDirectory: payload.replayDirectory
62 })
63
64 return cleanupLiveAndFederate({ permanentLive, video, streamingPlaylistId: payload.streamingPlaylistId })
65 }
66
67 return replaceLiveByReplay({
68 video,
69 liveSession,
70 live,
71 permanentLive,
72 replayDirectory: payload.replayDirectory
73 })
74}
75
76// ---------------------------------------------------------------------------
77
78export {
79 processVideoLiveEnding
80}
81
82// ---------------------------------------------------------------------------
83
84async function saveReplayToExternalVideo (options: {
85 liveVideo: MVideo
86 liveSession: MVideoLiveSession
87 publishedAt: string
88 replayDirectory: string
89}) {
90 const { liveVideo, liveSession, publishedAt, replayDirectory } = options
91
92 const replaySettings = await VideoLiveReplaySettingModel.load(liveSession.replaySettingId)
93
94 const videoNameSuffix = ` - ${new Date(publishedAt).toLocaleString()}`
95 const truncatedVideoName = peertubeTruncate(liveVideo.name, {
96 length: CONSTRAINTS_FIELDS.VIDEOS.NAME.max - videoNameSuffix.length
97 })
98
99 const replayVideo = new VideoModel({
100 name: truncatedVideoName + videoNameSuffix,
101 isLive: false,
102 state: VideoState.TO_TRANSCODE,
103 duration: 0,
104
105 remote: liveVideo.remote,
106 category: liveVideo.category,
107 licence: liveVideo.licence,
108 language: liveVideo.language,
109 commentsEnabled: liveVideo.commentsEnabled,
110 downloadEnabled: liveVideo.downloadEnabled,
111 waitTranscoding: true,
112 nsfw: liveVideo.nsfw,
113 description: liveVideo.description,
114 support: liveVideo.support,
115 privacy: replaySettings.privacy,
116 channelId: liveVideo.channelId
117 }) as MVideoWithAllFiles
118
119 replayVideo.Thumbnails = []
120 replayVideo.VideoFiles = []
121 replayVideo.VideoStreamingPlaylists = []
122
123 replayVideo.url = getLocalVideoActivityPubUrl(replayVideo)
124
125 await replayVideo.save()
126
127 liveSession.replayVideoId = replayVideo.id
128 await liveSession.save()
129
130 // If live is blacklisted, also blacklist the replay
131 const blacklist = await VideoBlacklistModel.loadByVideoId(liveVideo.id)
132 if (blacklist) {
133 await VideoBlacklistModel.create({
134 videoId: replayVideo.id,
135 unfederated: blacklist.unfederated,
136 reason: blacklist.reason,
137 type: blacklist.type
138 })
139 }
140
141 await assignReplayFilesToVideo({ video: replayVideo, replayDirectory })
142
143 await remove(replayDirectory)
144
145 for (const type of [ ThumbnailType.MINIATURE, ThumbnailType.PREVIEW ]) {
146 const image = await generateLocalVideoMiniature({ video: replayVideo, videoFile: replayVideo.getMaxQualityFile(), type })
147 await replayVideo.addAndSaveThumbnail(image)
148 }
149
150 await moveToNextState({ video: replayVideo, isNewVideo: true })
151
152 await createStoryboardJob(replayVideo)
153}
154
155async function replaceLiveByReplay (options: {
156 video: MVideo
157 liveSession: MVideoLiveSession
158 live: MVideoLive
159 permanentLive: boolean
160 replayDirectory: string
161}) {
162 const { video, liveSession, live, permanentLive, replayDirectory } = options
163
164 const replaySettings = await VideoLiveReplaySettingModel.load(liveSession.replaySettingId)
165 const videoWithFiles = await VideoModel.loadFull(video.id)
166 const hlsPlaylist = videoWithFiles.getHLSPlaylist()
167
168 await cleanupTMPLiveFiles(videoWithFiles, hlsPlaylist)
169
170 await live.destroy()
171
172 videoWithFiles.isLive = false
173 videoWithFiles.privacy = replaySettings.privacy
174 videoWithFiles.waitTranscoding = true
175 videoWithFiles.state = VideoState.TO_TRANSCODE
176
177 await videoWithFiles.save()
178
179 liveSession.replayVideoId = videoWithFiles.id
180 await liveSession.save()
181
182 await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
183
184 // Reset playlist
185 hlsPlaylist.VideoFiles = []
186 hlsPlaylist.playlistFilename = generateHLSMasterPlaylistFilename()
187 hlsPlaylist.segmentsSha256Filename = generateHlsSha256SegmentsFilename()
188 await hlsPlaylist.save()
189
190 await assignReplayFilesToVideo({ video: videoWithFiles, replayDirectory })
191
192 // Should not happen in this function, but we keep the code if in the future we can replace the permanent live by a replay
193 if (permanentLive) { // Remove session replay
194 await remove(replayDirectory)
195 } else { // We won't stream again in this live, we can delete the base replay directory
196 await remove(getLiveReplayBaseDirectory(videoWithFiles))
197 }
198
199 // Regenerate the thumbnail & preview?
200 await regenerateMiniaturesIfNeeded(videoWithFiles)
201
202 // We consider this is a new video
203 await moveToNextState({ video: videoWithFiles, isNewVideo: true })
204
205 await createStoryboardJob(videoWithFiles)
206}
207
208async function assignReplayFilesToVideo (options: {
209 video: MVideo
210 replayDirectory: string
211}) {
212 const { video, replayDirectory } = options
213
214 const concatenatedTsFiles = await readdir(replayDirectory)
215
216 for (const concatenatedTsFile of concatenatedTsFiles) {
217 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
218 await video.reload()
219
220 const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile)
221
222 const probe = await ffprobePromise(concatenatedTsFilePath)
223 const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe)
224 const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe)
225 const fps = await getVideoStreamFPS(concatenatedTsFilePath, probe)
226
227 try {
228 await generateHlsPlaylistResolutionFromTS({
229 video,
230 inputFileMutexReleaser,
231 concatenatedTsFilePath,
232 resolution,
233 fps,
234 isAAC: audioStream?.codec_name === 'aac'
235 })
236 } catch (err) {
237 logger.error('Cannot generate HLS playlist resolution from TS files.', { err })
238 }
239
240 inputFileMutexReleaser()
241 }
242
243 return video
244}
245
246async function cleanupLiveAndFederate (options: {
247 video: MVideo
248 permanentLive: boolean
249 streamingPlaylistId: number
250}) {
251 const { permanentLive, video, streamingPlaylistId } = options
252
253 const streamingPlaylist = await VideoStreamingPlaylistModel.loadWithVideo(streamingPlaylistId)
254
255 if (streamingPlaylist) {
256 if (permanentLive) {
257 await cleanupAndDestroyPermanentLive(video, streamingPlaylist)
258 } else {
259 await cleanupUnsavedNormalLive(video, streamingPlaylist)
260 }
261 }
262
263 try {
264 const fullVideo = await VideoModel.loadFull(video.id)
265 return federateVideoIfNeeded(fullVideo, false, undefined)
266 } catch (err) {
267 logger.warn('Cannot federate live after cleanup', { videoId: video.id, err })
268 }
269}
270
271function createStoryboardJob (video: MVideo) {
272 return JobQueue.Instance.createJob({
273 type: 'generate-video-storyboard' as 'generate-video-storyboard',
274 payload: {
275 videoUUID: video.uuid,
276 federate: true
277 }
278 })
279}
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts
deleted file mode 100644
index bac99fdb7..000000000
--- a/server/lib/job-queue/handlers/video-redundancy.ts
+++ /dev/null
@@ -1,17 +0,0 @@
1import { Job } from 'bullmq'
2import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler'
3import { VideoRedundancyPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger'
5
6async function processVideoRedundancy (job: Job) {
7 const payload = job.data as VideoRedundancyPayload
8 logger.info('Processing video redundancy in job %s.', job.id)
9
10 return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId)
11}
12
13// ---------------------------------------------------------------------------
14
15export {
16 processVideoRedundancy
17}
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts
deleted file mode 100644
index caf051bfa..000000000
--- a/server/lib/job-queue/handlers/video-studio-edition.ts
+++ /dev/null
@@ -1,180 +0,0 @@
1import { Job } from 'bullmq'
2import { remove } from 'fs-extra'
3import { join } from 'path'
4import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg'
5import { CONFIG } from '@server/initializers/config'
6import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
7import { isAbleToUploadVideo } from '@server/lib/user'
8import { VideoPathManager } from '@server/lib/video-path-manager'
9import { approximateIntroOutroAdditionalSize, onVideoStudioEnded, safeCleanupStudioTMPFiles } from '@server/lib/video-studio'
10import { UserModel } from '@server/models/user/user'
11import { VideoModel } from '@server/models/video/video'
12import { MVideo, MVideoFullLight } from '@server/types/models'
13import { pick } from '@shared/core-utils'
14import { buildUUID } from '@shared/extra-utils'
15import { FFmpegEdition } from '@shared/ffmpeg'
16import {
17 VideoStudioEditionPayload,
18 VideoStudioTask,
19 VideoStudioTaskCutPayload,
20 VideoStudioTaskIntroPayload,
21 VideoStudioTaskOutroPayload,
22 VideoStudioTaskPayload,
23 VideoStudioTaskWatermarkPayload
24} from '@shared/models'
25import { logger, loggerTagsFactory } from '../../../helpers/logger'
26
27const lTagsBase = loggerTagsFactory('video-studio')
28
29async function processVideoStudioEdition (job: Job) {
30 const payload = job.data as VideoStudioEditionPayload
31 const lTags = lTagsBase(payload.videoUUID)
32
33 logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags)
34
35 try {
36 const video = await VideoModel.loadFull(payload.videoUUID)
37
38 // No video, maybe deleted?
39 if (!video) {
40 logger.info('Can\'t process job %d, video does not exist.', job.id, lTags)
41
42 await safeCleanupStudioTMPFiles(payload.tasks)
43 return undefined
44 }
45
46 await checkUserQuotaOrThrow(video, payload)
47
48 const inputFile = video.getMaxQualityFile()
49
50 const editionResultPath = await VideoPathManager.Instance.makeAvailableVideoFile(inputFile, async originalFilePath => {
51 let tmpInputFilePath: string
52 let outputPath: string
53
54 for (const task of payload.tasks) {
55 const outputFilename = buildUUID() + inputFile.extname
56 outputPath = join(CONFIG.STORAGE.TMP_DIR, outputFilename)
57
58 await processTask({
59 inputPath: tmpInputFilePath ?? originalFilePath,
60 video,
61 outputPath,
62 task,
63 lTags
64 })
65
66 if (tmpInputFilePath) await remove(tmpInputFilePath)
67
68 // For the next iteration
69 tmpInputFilePath = outputPath
70 }
71
72 return outputPath
73 })
74
75 logger.info('Video edition ended for video %s.', video.uuid, lTags)
76
77 await onVideoStudioEnded({ video, editionResultPath, tasks: payload.tasks })
78 } catch (err) {
79 await safeCleanupStudioTMPFiles(payload.tasks)
80
81 throw err
82 }
83}
84
85// ---------------------------------------------------------------------------
86
87export {
88 processVideoStudioEdition
89}
90
91// ---------------------------------------------------------------------------
92
93type TaskProcessorOptions <T extends VideoStudioTaskPayload = VideoStudioTaskPayload> = {
94 inputPath: string
95 outputPath: string
96 video: MVideo
97 task: T
98 lTags: { tags: string[] }
99}
100
101const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = {
102 'add-intro': processAddIntroOutro,
103 'add-outro': processAddIntroOutro,
104 'cut': processCut,
105 'add-watermark': processAddWatermark
106}
107
108async function processTask (options: TaskProcessorOptions) {
109 const { video, task, lTags } = options
110
111 logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...lTags })
112
113 const processor = taskProcessors[options.task.name]
114 if (!process) throw new Error('Unknown task ' + task.name)
115
116 return processor(options)
117}
118
119function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) {
120 const { task, lTags } = options
121
122 logger.debug('Will add intro/outro to the video.', { options, ...lTags })
123
124 return buildFFmpegEdition().addIntroOutro({
125 ...pick(options, [ 'inputPath', 'outputPath' ]),
126
127 introOutroPath: task.options.file,
128 type: task.name === 'add-intro'
129 ? 'intro'
130 : 'outro'
131 })
132}
133
134function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) {
135 const { task, lTags } = options
136
137 logger.debug('Will cut the video.', { options, ...lTags })
138
139 return buildFFmpegEdition().cutVideo({
140 ...pick(options, [ 'inputPath', 'outputPath' ]),
141
142 start: task.options.start,
143 end: task.options.end
144 })
145}
146
147function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) {
148 const { task, lTags } = options
149
150 logger.debug('Will add watermark to the video.', { options, ...lTags })
151
152 return buildFFmpegEdition().addWatermark({
153 ...pick(options, [ 'inputPath', 'outputPath' ]),
154
155 watermarkPath: task.options.file,
156
157 videoFilters: {
158 watermarkSizeRatio: task.options.watermarkSizeRatio,
159 horitonzalMarginRatio: task.options.horitonzalMarginRatio,
160 verticalMarginRatio: task.options.verticalMarginRatio
161 }
162 })
163}
164
165// ---------------------------------------------------------------------------
166
167async function checkUserQuotaOrThrow (video: MVideoFullLight, payload: VideoStudioEditionPayload) {
168 const user = await UserModel.loadByVideoId(video.id)
169
170 const filePathFinder = (i: number) => (payload.tasks[i] as VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload).options.file
171
172 const additionalBytes = await approximateIntroOutroAdditionalSize(video, payload.tasks, filePathFinder)
173 if (await isAbleToUploadVideo(user.id, additionalBytes) === false) {
174 throw new Error('Quota exceeded for this user to edit the video')
175 }
176}
177
178function buildFFmpegEdition () {
179 return new FFmpegEdition(getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders()))
180}
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
deleted file mode 100644
index 1c8f4fd9f..000000000
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ /dev/null
@@ -1,150 +0,0 @@
1import { Job } from 'bullmq'
2import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding'
3import { generateHlsPlaylistResolution } from '@server/lib/transcoding/hls-transcoding'
4import { mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebVideoResolution } from '@server/lib/transcoding/web-transcoding'
5import { removeAllWebVideoFiles } from '@server/lib/video-file'
6import { VideoPathManager } from '@server/lib/video-path-manager'
7import { moveToFailedTranscodingState } from '@server/lib/video-state'
8import { UserModel } from '@server/models/user/user'
9import { VideoJobInfoModel } from '@server/models/video/video-job-info'
10import { MUser, MUserId, MVideoFullLight } from '@server/types/models'
11import {
12 HLSTranscodingPayload,
13 MergeAudioTranscodingPayload,
14 NewWebVideoResolutionTranscodingPayload,
15 OptimizeTranscodingPayload,
16 VideoTranscodingPayload
17} from '@shared/models'
18import { logger, loggerTagsFactory } from '../../../helpers/logger'
19import { VideoModel } from '../../../models/video/video'
20
21type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
22
23const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = {
24 'new-resolution-to-hls': handleHLSJob,
25 'new-resolution-to-web-video': handleNewWebVideoResolutionJob,
26 'merge-audio-to-web-video': handleWebVideoMergeAudioJob,
27 'optimize-to-web-video': handleWebVideoOptimizeJob
28}
29
30const lTags = loggerTagsFactory('transcoding')
31
32async function processVideoTranscoding (job: Job) {
33 const payload = job.data as VideoTranscodingPayload
34 logger.info('Processing transcoding job %s.', job.id, lTags(payload.videoUUID))
35
36 const video = await VideoModel.loadFull(payload.videoUUID)
37 // No video, maybe deleted?
38 if (!video) {
39 logger.info('Do not process job %d, video does not exist.', job.id, lTags(payload.videoUUID))
40 return undefined
41 }
42
43 const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId)
44
45 const handler = handlers[payload.type]
46
47 if (!handler) {
48 await moveToFailedTranscodingState(video)
49 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
50
51 throw new Error('Cannot find transcoding handler for ' + payload.type)
52 }
53
54 try {
55 await handler(job, payload, video, user)
56 } catch (error) {
57 await moveToFailedTranscodingState(video)
58
59 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
60
61 throw error
62 }
63
64 return video
65}
66
67// ---------------------------------------------------------------------------
68
69export {
70 processVideoTranscoding
71}
72
73// ---------------------------------------------------------------------------
74// Job handlers
75// ---------------------------------------------------------------------------
76
77async function handleWebVideoMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
78 logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid), { payload })
79
80 await mergeAudioVideofile({ video, resolution: payload.resolution, fps: payload.fps, job })
81
82 logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid), { payload })
83
84 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: !payload.hasChildren, video })
85}
86
87async function handleWebVideoOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
88 logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid), { payload })
89
90 await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), quickTranscode: payload.quickTranscode, job })
91
92 logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid), { payload })
93
94 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: !payload.hasChildren, video })
95}
96
97// ---------------------------------------------------------------------------
98
99async function handleNewWebVideoResolutionJob (job: Job, payload: NewWebVideoResolutionTranscodingPayload, video: MVideoFullLight) {
100 logger.info('Handling Web Video transcoding job for %s.', video.uuid, lTags(video.uuid), { payload })
101
102 await transcodeNewWebVideoResolution({ video, resolution: payload.resolution, fps: payload.fps, job })
103
104 logger.info('Web Video transcoding job for %s ended.', video.uuid, lTags(video.uuid), { payload })
105
106 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
107}
108
109// ---------------------------------------------------------------------------
110
111async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, videoArg: MVideoFullLight) {
112 logger.info('Handling HLS transcoding job for %s.', videoArg.uuid, lTags(videoArg.uuid), { payload })
113
114 const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid)
115 let video: MVideoFullLight
116
117 try {
118 video = await VideoModel.loadFull(videoArg.uuid)
119
120 const videoFileInput = payload.copyCodecs
121 ? video.getWebVideoFile(payload.resolution)
122 : video.getMaxQualityFile()
123
124 const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
125
126 await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => {
127 return generateHlsPlaylistResolution({
128 video,
129 videoInputPath,
130 inputFileMutexReleaser,
131 resolution: payload.resolution,
132 fps: payload.fps,
133 copyCodecs: payload.copyCodecs,
134 job
135 })
136 })
137 } finally {
138 inputFileMutexReleaser()
139 }
140
141 logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid), { payload })
142
143 if (payload.deleteWebVideoFiles === true) {
144 logger.info('Removing Web Video files of %s now we have a HLS version of it.', video.uuid, lTags(video.uuid))
145
146 await removeAllWebVideoFiles(video)
147 }
148
149 await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video })
150}
diff --git a/server/lib/job-queue/handlers/video-views-stats.ts b/server/lib/job-queue/handlers/video-views-stats.ts
deleted file mode 100644
index c9aa218e5..000000000
--- a/server/lib/job-queue/handlers/video-views-stats.ts
+++ /dev/null
@@ -1,57 +0,0 @@
1import { VideoViewModel } from '@server/models/view/video-view'
2import { isTestOrDevInstance } from '../../../helpers/core-utils'
3import { logger } from '../../../helpers/logger'
4import { VideoModel } from '../../../models/video/video'
5import { Redis } from '../../redis'
6
7async function processVideosViewsStats () {
8 const lastHour = new Date()
9
10 // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour
11 if (!isTestOrDevInstance()) lastHour.setHours(lastHour.getHours() - 1)
12
13 const hour = lastHour.getHours()
14 const startDate = lastHour.setMinutes(0, 0, 0)
15 const endDate = lastHour.setMinutes(59, 59, 999)
16
17 const videoIds = await Redis.Instance.listVideosViewedForStats(hour)
18 if (videoIds.length === 0) return
19
20 logger.info('Processing videos views stats in job for hour %d.', hour)
21
22 for (const videoId of videoIds) {
23 try {
24 const views = await Redis.Instance.getVideoViewsStats(videoId, hour)
25 await Redis.Instance.deleteVideoViewsStats(videoId, hour)
26
27 if (views) {
28 logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour)
29
30 try {
31 const video = await VideoModel.load(videoId)
32 if (!video) {
33 logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId)
34 continue
35 }
36
37 await VideoViewModel.create({
38 startDate: new Date(startDate),
39 endDate: new Date(endDate),
40 views,
41 videoId
42 })
43 } catch (err) {
44 logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err })
45 }
46 }
47 } catch (err) {
48 logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err })
49 }
50 }
51}
52
53// ---------------------------------------------------------------------------
54
55export {
56 processVideosViewsStats
57}