diff options
author | Chocobozzz <me@florianbigard.com> | 2023-07-31 14:34:36 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-08-11 15:02:33 +0200 |
commit | 3a4992633ee62d5edfbb484d9c6bcb3cf158489d (patch) | |
tree | e4510b39bdac9c318fdb4b47018d08f15368b8f0 /server/lib/job-queue/handlers | |
parent | 04d1da5621d25d59bd5fa1543b725c497bf5d9a8 (diff) | |
download | PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.gz PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.zst PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.zip |
Migrate server to ESM
Sorry for the very big commit that may lead to git log issues and merge
conflicts, but it's a major step forward:
* Server can be faster at startup because imports() are async and we can
easily lazy import big modules
* Angular doesn't seem to support ES import (with .js extension), so we
had to correctly organize peertube into a monorepo:
* Use yarn workspace feature
* Use typescript reference projects for dependencies
* Shared projects have been moved into "packages", each one is now a
node module (with a dedicated package.json/tsconfig.json)
* server/tools have been moved into apps/ and is now a dedicated app
bundled and published on NPM so users don't have to build peertube
cli tools manually
* server/tests have been moved into packages/ so we don't compile
them every time we want to run the server
* Use isolatedModule option:
* Had to move from const enum to const
(https://www.typescriptlang.org/docs/handbook/enums.html#objects-vs-enums)
* Had to explictely specify "type" imports when used in decorators
* Prefer tsx (that uses esbuild under the hood) instead of ts-node to
load typescript files (tests with mocha or scripts):
* To reduce test complexity as esbuild doesn't support decorator
metadata, we only test server files that do not import server
models
* We still build tests files into js files for a faster CI
* Remove unmaintained peertube CLI import script
* Removed some barrels to speed up execution (less imports)
Diffstat (limited to 'server/lib/job-queue/handlers')
23 files changed, 0 insertions, 2234 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts deleted file mode 100644 index 6ee9e2429..000000000 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ /dev/null | |||
@@ -1,202 +0,0 @@ | |||
1 | import { map } from 'bluebird' | ||
2 | import { Job } from 'bullmq' | ||
3 | import { | ||
4 | isAnnounceActivityValid, | ||
5 | isDislikeActivityValid, | ||
6 | isLikeActivityValid | ||
7 | } from '@server/helpers/custom-validators/activitypub/activity' | ||
8 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' | ||
9 | import { PeerTubeRequestError } from '@server/helpers/requests' | ||
10 | import { AP_CLEANER } from '@server/initializers/constants' | ||
11 | import { fetchAP } from '@server/lib/activitypub/activity' | ||
12 | import { checkUrlsSameHost } from '@server/lib/activitypub/url' | ||
13 | import { Redis } from '@server/lib/redis' | ||
14 | import { VideoModel } from '@server/models/video/video' | ||
15 | import { VideoCommentModel } from '@server/models/video/video-comment' | ||
16 | import { VideoShareModel } from '@server/models/video/video-share' | ||
17 | import { HttpStatusCode } from '@shared/models' | ||
18 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | ||
19 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' | ||
20 | |||
21 | const lTags = loggerTagsFactory('ap-cleaner') | ||
22 | |||
23 | // Job to clean remote interactions off local videos | ||
24 | |||
25 | async function processActivityPubCleaner (_job: Job) { | ||
26 | logger.info('Processing ActivityPub cleaner.', lTags()) | ||
27 | |||
28 | { | ||
29 | const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() | ||
30 | const { bodyValidator, deleter, updater } = rateOptionsFactory() | ||
31 | |||
32 | await map(rateUrls, async rateUrl => { | ||
33 | // TODO: remove when https://github.com/mastodon/mastodon/issues/13571 is fixed | ||
34 | if (rateUrl.includes('#')) return | ||
35 | |||
36 | const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter }) | ||
37 | |||
38 | if (result?.status === 'deleted') { | ||
39 | const { videoId, type } = result.data | ||
40 | |||
41 | await VideoModel.syncLocalRates(videoId, type, undefined) | ||
42 | } | ||
43 | }, { concurrency: AP_CLEANER.CONCURRENCY }) | ||
44 | } | ||
45 | |||
46 | { | ||
47 | const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() | ||
48 | const { bodyValidator, deleter, updater } = shareOptionsFactory() | ||
49 | |||
50 | await map(shareUrls, async shareUrl => { | ||
51 | await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter }) | ||
52 | }, { concurrency: AP_CLEANER.CONCURRENCY }) | ||
53 | } | ||
54 | |||
55 | { | ||
56 | const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() | ||
57 | const { bodyValidator, deleter, updater } = commentOptionsFactory() | ||
58 | |||
59 | await map(commentUrls, async commentUrl => { | ||
60 | await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter }) | ||
61 | }, { concurrency: AP_CLEANER.CONCURRENCY }) | ||
62 | } | ||
63 | } | ||
64 | |||
65 | // --------------------------------------------------------------------------- | ||
66 | |||
67 | export { | ||
68 | processActivityPubCleaner | ||
69 | } | ||
70 | |||
71 | // --------------------------------------------------------------------------- | ||
72 | |||
73 | async function updateObjectIfNeeded <T> (options: { | ||
74 | url: string | ||
75 | bodyValidator: (body: any) => boolean | ||
76 | updater: (url: string, newUrl: string) => Promise<T> | ||
77 | deleter: (url: string) => Promise<T> } | ||
78 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { | ||
79 | const { url, bodyValidator, updater, deleter } = options | ||
80 | |||
81 | const on404OrTombstone = async () => { | ||
82 | logger.info('Removing remote AP object %s.', url, lTags(url)) | ||
83 | const data = await deleter(url) | ||
84 | |||
85 | return { status: 'deleted' as 'deleted', data } | ||
86 | } | ||
87 | |||
88 | try { | ||
89 | const { body } = await fetchAP<any>(url) | ||
90 | |||
91 | // If not same id, check same host and update | ||
92 | if (!body?.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) | ||
93 | |||
94 | if (body.type === 'Tombstone') { | ||
95 | return on404OrTombstone() | ||
96 | } | ||
97 | |||
98 | const newUrl = body.id | ||
99 | if (newUrl !== url) { | ||
100 | if (checkUrlsSameHost(newUrl, url) !== true) { | ||
101 | throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) | ||
102 | } | ||
103 | |||
104 | logger.info('Updating remote AP object %s.', url, lTags(url)) | ||
105 | const data = await updater(url, newUrl) | ||
106 | |||
107 | return { status: 'updated', data } | ||
108 | } | ||
109 | |||
110 | return null | ||
111 | } catch (err) { | ||
112 | // Does not exist anymore, remove entry | ||
113 | if ((err as PeerTubeRequestError).statusCode === HttpStatusCode.NOT_FOUND_404) { | ||
114 | return on404OrTombstone() | ||
115 | } | ||
116 | |||
117 | logger.debug('Remote AP object %s is unavailable.', url, lTags(url)) | ||
118 | |||
119 | const unavailability = await Redis.Instance.addAPUnavailability(url) | ||
120 | if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) { | ||
121 | logger.info('Removing unavailable AP resource %s.', url, lTags(url)) | ||
122 | return on404OrTombstone() | ||
123 | } | ||
124 | |||
125 | return null | ||
126 | } | ||
127 | } | ||
128 | |||
129 | function rateOptionsFactory () { | ||
130 | return { | ||
131 | bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body), | ||
132 | |||
133 | updater: async (url: string, newUrl: string) => { | ||
134 | const rate = await AccountVideoRateModel.loadByUrl(url, undefined) | ||
135 | rate.url = newUrl | ||
136 | |||
137 | const videoId = rate.videoId | ||
138 | const type = rate.type | ||
139 | |||
140 | await rate.save() | ||
141 | |||
142 | return { videoId, type } | ||
143 | }, | ||
144 | |||
145 | deleter: async (url) => { | ||
146 | const rate = await AccountVideoRateModel.loadByUrl(url, undefined) | ||
147 | |||
148 | const videoId = rate.videoId | ||
149 | const type = rate.type | ||
150 | |||
151 | await rate.destroy() | ||
152 | |||
153 | return { videoId, type } | ||
154 | } | ||
155 | } | ||
156 | } | ||
157 | |||
158 | function shareOptionsFactory () { | ||
159 | return { | ||
160 | bodyValidator: (body: any) => isAnnounceActivityValid(body), | ||
161 | |||
162 | updater: async (url: string, newUrl: string) => { | ||
163 | const share = await VideoShareModel.loadByUrl(url, undefined) | ||
164 | share.url = newUrl | ||
165 | |||
166 | await share.save() | ||
167 | |||
168 | return undefined | ||
169 | }, | ||
170 | |||
171 | deleter: async (url) => { | ||
172 | const share = await VideoShareModel.loadByUrl(url, undefined) | ||
173 | |||
174 | await share.destroy() | ||
175 | |||
176 | return undefined | ||
177 | } | ||
178 | } | ||
179 | } | ||
180 | |||
181 | function commentOptionsFactory () { | ||
182 | return { | ||
183 | bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body), | ||
184 | |||
185 | updater: async (url: string, newUrl: string) => { | ||
186 | const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) | ||
187 | comment.url = newUrl | ||
188 | |||
189 | await comment.save() | ||
190 | |||
191 | return undefined | ||
192 | }, | ||
193 | |||
194 | deleter: async (url) => { | ||
195 | const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) | ||
196 | |||
197 | await comment.destroy() | ||
198 | |||
199 | return undefined | ||
200 | } | ||
201 | } | ||
202 | } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts deleted file mode 100644 index a68c32ba0..000000000 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ /dev/null | |||
@@ -1,82 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' | ||
3 | import { ActivitypubFollowPayload } from '@shared/models' | ||
4 | import { sanitizeHost } from '../../../helpers/core-utils' | ||
5 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | ||
6 | import { logger } from '../../../helpers/logger' | ||
7 | import { REMOTE_SCHEME, WEBSERVER } from '../../../initializers/constants' | ||
8 | import { sequelizeTypescript } from '../../../initializers/database' | ||
9 | import { ActorModel } from '../../../models/actor/actor' | ||
10 | import { ActorFollowModel } from '../../../models/actor/actor-follow' | ||
11 | import { MActor, MActorFull } from '../../../types/models' | ||
12 | import { getOrCreateAPActor, loadActorUrlOrGetFromWebfinger } from '../../activitypub/actors' | ||
13 | import { sendFollow } from '../../activitypub/send' | ||
14 | import { Notifier } from '../../notifier' | ||
15 | |||
16 | async function processActivityPubFollow (job: Job) { | ||
17 | const payload = job.data as ActivitypubFollowPayload | ||
18 | const host = payload.host | ||
19 | |||
20 | logger.info('Processing ActivityPub follow in job %s.', job.id) | ||
21 | |||
22 | let targetActor: MActorFull | ||
23 | if (!host || host === WEBSERVER.HOST) { | ||
24 | targetActor = await ActorModel.loadLocalByName(payload.name) | ||
25 | } else { | ||
26 | const sanitizedHost = sanitizeHost(host, REMOTE_SCHEME.HTTP) | ||
27 | const actorUrl = await loadActorUrlOrGetFromWebfinger(payload.name + '@' + sanitizedHost) | ||
28 | targetActor = await getOrCreateAPActor(actorUrl, 'all') | ||
29 | } | ||
30 | |||
31 | if (payload.assertIsChannel && !targetActor.VideoChannel) { | ||
32 | logger.warn('Do not follow %s@%s because it is not a channel.', payload.name, host) | ||
33 | return | ||
34 | } | ||
35 | |||
36 | const fromActor = await ActorModel.load(payload.followerActorId) | ||
37 | |||
38 | return retryTransactionWrapper(follow, fromActor, targetActor, payload.isAutoFollow) | ||
39 | } | ||
40 | // --------------------------------------------------------------------------- | ||
41 | |||
42 | export { | ||
43 | processActivityPubFollow | ||
44 | } | ||
45 | |||
46 | // --------------------------------------------------------------------------- | ||
47 | |||
48 | async function follow (fromActor: MActor, targetActor: MActorFull, isAutoFollow = false) { | ||
49 | if (fromActor.id === targetActor.id) { | ||
50 | throw new Error('Follower is the same as target actor.') | ||
51 | } | ||
52 | |||
53 | // Same server, direct accept | ||
54 | const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending' | ||
55 | |||
56 | const actorFollow = await sequelizeTypescript.transaction(async t => { | ||
57 | const [ actorFollow ] = await ActorFollowModel.findOrCreateCustom({ | ||
58 | byActor: fromActor, | ||
59 | state, | ||
60 | targetActor, | ||
61 | activityId: getLocalActorFollowActivityPubUrl(fromActor, targetActor), | ||
62 | transaction: t | ||
63 | }) | ||
64 | |||
65 | // Send a notification to remote server if our follow is not already accepted | ||
66 | if (actorFollow.state !== 'accepted') sendFollow(actorFollow, t) | ||
67 | |||
68 | return actorFollow | ||
69 | }) | ||
70 | |||
71 | const followerFull = await ActorModel.loadFull(fromActor.id) | ||
72 | |||
73 | const actorFollowFull = Object.assign(actorFollow, { | ||
74 | ActorFollowing: targetActor, | ||
75 | ActorFollower: followerFull | ||
76 | }) | ||
77 | |||
78 | if (actorFollow.state === 'accepted') Notifier.Instance.notifyOfNewUserFollow(actorFollowFull) | ||
79 | if (isAutoFollow === true) Notifier.Instance.notifyOfAutoInstanceFollowing(actorFollowFull) | ||
80 | |||
81 | return actorFollow | ||
82 | } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts deleted file mode 100644 index 8904d086f..000000000 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ /dev/null | |||
@@ -1,49 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' | ||
3 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' | ||
4 | import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process' | ||
5 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' | ||
6 | import { logger } from '../../../helpers/logger' | ||
7 | |||
8 | // Prefer using a worker thread for HTTP requests because on high load we may have to sign many requests, which can be CPU intensive | ||
9 | |||
10 | async function processActivityPubHttpSequentialBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) { | ||
11 | logger.info('Processing ActivityPub broadcast in job %s.', job.id) | ||
12 | |||
13 | const requestOptions = await buildRequestOptions(job.data) | ||
14 | |||
15 | const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) | ||
16 | |||
17 | return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) | ||
18 | } | ||
19 | |||
20 | async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttpBroadcastPayload>) { | ||
21 | logger.info('Processing ActivityPub parallel broadcast in job %s.', job.id) | ||
22 | |||
23 | const requestOptions = await buildRequestOptions(job.data) | ||
24 | |||
25 | const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) | ||
26 | |||
27 | return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) | ||
28 | } | ||
29 | |||
30 | // --------------------------------------------------------------------------- | ||
31 | |||
32 | export { | ||
33 | processActivityPubHttpSequentialBroadcast, | ||
34 | processActivityPubParallelHttpBroadcast | ||
35 | } | ||
36 | |||
37 | // --------------------------------------------------------------------------- | ||
38 | |||
39 | async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) { | ||
40 | const body = await computeBody(payload) | ||
41 | const httpSignatureOptions = await buildSignedRequestOptions({ signatureActorId: payload.signatureActorId, hasPayload: true }) | ||
42 | |||
43 | return { | ||
44 | method: 'POST' as 'POST', | ||
45 | json: body, | ||
46 | httpSignature: httpSignatureOptions, | ||
47 | headers: buildGlobalHeaders(body) | ||
48 | } | ||
49 | } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts deleted file mode 100644 index b6cb3c4a6..000000000 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ /dev/null | |||
@@ -1,41 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' | ||
3 | import { logger } from '../../../helpers/logger' | ||
4 | import { VideoModel } from '../../../models/video/video' | ||
5 | import { VideoCommentModel } from '../../../models/video/video-comment' | ||
6 | import { VideoShareModel } from '../../../models/video/video-share' | ||
7 | import { MVideoFullLight } from '../../../types/models' | ||
8 | import { crawlCollectionPage } from '../../activitypub/crawl' | ||
9 | import { createAccountPlaylists } from '../../activitypub/playlists' | ||
10 | import { processActivities } from '../../activitypub/process' | ||
11 | import { addVideoShares } from '../../activitypub/share' | ||
12 | import { addVideoComments } from '../../activitypub/video-comments' | ||
13 | |||
14 | async function processActivityPubHttpFetcher (job: Job) { | ||
15 | logger.info('Processing ActivityPub fetcher in job %s.', job.id) | ||
16 | |||
17 | const payload = job.data as ActivitypubHttpFetcherPayload | ||
18 | |||
19 | let video: MVideoFullLight | ||
20 | if (payload.videoId) video = await VideoModel.loadFull(payload.videoId) | ||
21 | |||
22 | const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = { | ||
23 | 'activity': items => processActivities(items, { outboxUrl: payload.uri, fromFetch: true }), | ||
24 | 'video-shares': items => addVideoShares(items, video), | ||
25 | 'video-comments': items => addVideoComments(items), | ||
26 | 'account-playlists': items => createAccountPlaylists(items) | ||
27 | } | ||
28 | |||
29 | const cleanerType: { [ id in FetchType ]?: (crawlStartDate: Date) => Promise<any> } = { | ||
30 | 'video-shares': crawlStartDate => VideoShareModel.cleanOldSharesOf(video.id, crawlStartDate), | ||
31 | 'video-comments': crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate) | ||
32 | } | ||
33 | |||
34 | return crawlCollectionPage(payload.uri, fetcherType[payload.type], cleanerType[payload.type]) | ||
35 | } | ||
36 | |||
37 | // --------------------------------------------------------------------------- | ||
38 | |||
39 | export { | ||
40 | processActivityPubHttpFetcher | ||
41 | } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts deleted file mode 100644 index 50fca3f94..000000000 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ /dev/null | |||
@@ -1,38 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' | ||
3 | import { ActivitypubHttpUnicastPayload } from '@shared/models' | ||
4 | import { logger } from '../../../helpers/logger' | ||
5 | import { doRequest } from '../../../helpers/requests' | ||
6 | import { ActorFollowHealthCache } from '../../actor-follow-health-cache' | ||
7 | |||
8 | async function processActivityPubHttpUnicast (job: Job) { | ||
9 | logger.info('Processing ActivityPub unicast in job %s.', job.id) | ||
10 | |||
11 | const payload = job.data as ActivitypubHttpUnicastPayload | ||
12 | const uri = payload.uri | ||
13 | |||
14 | const body = await computeBody(payload) | ||
15 | const httpSignatureOptions = await buildSignedRequestOptions({ signatureActorId: payload.signatureActorId, hasPayload: true }) | ||
16 | |||
17 | const options = { | ||
18 | method: 'POST' as 'POST', | ||
19 | json: body, | ||
20 | httpSignature: httpSignatureOptions, | ||
21 | headers: buildGlobalHeaders(body) | ||
22 | } | ||
23 | |||
24 | try { | ||
25 | await doRequest(uri, options) | ||
26 | ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], []) | ||
27 | } catch (err) { | ||
28 | ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ]) | ||
29 | |||
30 | throw err | ||
31 | } | ||
32 | } | ||
33 | |||
34 | // --------------------------------------------------------------------------- | ||
35 | |||
36 | export { | ||
37 | processActivityPubHttpUnicast | ||
38 | } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts deleted file mode 100644 index 706bf17fa..000000000 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ /dev/null | |||
@@ -1,60 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' | ||
3 | import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' | ||
4 | import { loadVideoByUrl } from '@server/lib/model-loaders' | ||
5 | import { RefreshPayload } from '@shared/models' | ||
6 | import { logger } from '../../../helpers/logger' | ||
7 | import { ActorModel } from '../../../models/actor/actor' | ||
8 | import { VideoPlaylistModel } from '../../../models/video/video-playlist' | ||
9 | import { refreshActorIfNeeded } from '../../activitypub/actors' | ||
10 | |||
11 | async function refreshAPObject (job: Job) { | ||
12 | const payload = job.data as RefreshPayload | ||
13 | |||
14 | logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url) | ||
15 | |||
16 | if (payload.type === 'video') return refreshVideo(payload.url) | ||
17 | if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) | ||
18 | if (payload.type === 'actor') return refreshActor(payload.url) | ||
19 | } | ||
20 | |||
21 | // --------------------------------------------------------------------------- | ||
22 | |||
23 | export { | ||
24 | refreshAPObject | ||
25 | } | ||
26 | |||
27 | // --------------------------------------------------------------------------- | ||
28 | |||
29 | async function refreshVideo (videoUrl: string) { | ||
30 | const fetchType = 'all' as 'all' | ||
31 | const syncParam = { rates: true, shares: true, comments: true } | ||
32 | |||
33 | const videoFromDatabase = await loadVideoByUrl(videoUrl, fetchType) | ||
34 | if (videoFromDatabase) { | ||
35 | const refreshOptions = { | ||
36 | video: videoFromDatabase, | ||
37 | fetchedType: fetchType, | ||
38 | syncParam | ||
39 | } | ||
40 | |||
41 | await refreshVideoIfNeeded(refreshOptions) | ||
42 | } | ||
43 | } | ||
44 | |||
45 | async function refreshActor (actorUrl: string) { | ||
46 | const fetchType = 'all' as 'all' | ||
47 | const actor = await ActorModel.loadByUrlAndPopulateAccountAndChannel(actorUrl) | ||
48 | |||
49 | if (actor) { | ||
50 | await refreshActorIfNeeded({ actor, fetchedType: fetchType }) | ||
51 | } | ||
52 | } | ||
53 | |||
54 | async function refreshVideoPlaylist (playlistUrl: string) { | ||
55 | const playlist = await VideoPlaylistModel.loadByUrlAndPopulateAccount(playlistUrl) | ||
56 | |||
57 | if (playlist) { | ||
58 | await refreshVideoPlaylistIfNeeded(playlist) | ||
59 | } | ||
60 | } | ||
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts deleted file mode 100644 index 27a2d431b..000000000 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ /dev/null | |||
@@ -1,20 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' | ||
3 | import { ActorModel } from '@server/models/actor/actor' | ||
4 | import { ActorKeysPayload } from '@shared/models' | ||
5 | import { logger } from '../../../helpers/logger' | ||
6 | |||
7 | async function processActorKeys (job: Job) { | ||
8 | const payload = job.data as ActorKeysPayload | ||
9 | logger.info('Processing actor keys in job %s.', job.id) | ||
10 | |||
11 | const actor = await ActorModel.load(payload.actorId) | ||
12 | |||
13 | await generateAndSaveActorKeys(actor) | ||
14 | } | ||
15 | |||
16 | // --------------------------------------------------------------------------- | ||
17 | |||
18 | export { | ||
19 | processActorKeys | ||
20 | } | ||
diff --git a/server/lib/job-queue/handlers/after-video-channel-import.ts b/server/lib/job-queue/handlers/after-video-channel-import.ts deleted file mode 100644 index ffdd8c5b5..000000000 --- a/server/lib/job-queue/handlers/after-video-channel-import.ts +++ /dev/null | |||
@@ -1,37 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' | ||
4 | import { AfterVideoChannelImportPayload, VideoChannelSyncState, VideoImportPreventExceptionResult } from '@shared/models' | ||
5 | |||
6 | export async function processAfterVideoChannelImport (job: Job) { | ||
7 | const payload = job.data as AfterVideoChannelImportPayload | ||
8 | if (!payload.channelSyncId) return | ||
9 | |||
10 | logger.info('Processing after video channel import in job %s.', job.id) | ||
11 | |||
12 | const sync = await VideoChannelSyncModel.loadWithChannel(payload.channelSyncId) | ||
13 | if (!sync) { | ||
14 | logger.error('Unknown sync id %d.', payload.channelSyncId) | ||
15 | return | ||
16 | } | ||
17 | |||
18 | const childrenValues = await job.getChildrenValues<VideoImportPreventExceptionResult>() | ||
19 | |||
20 | let errors = 0 | ||
21 | let successes = 0 | ||
22 | |||
23 | for (const value of Object.values(childrenValues)) { | ||
24 | if (value.resultType === 'success') successes++ | ||
25 | else if (value.resultType === 'error') errors++ | ||
26 | } | ||
27 | |||
28 | if (errors > 0) { | ||
29 | sync.state = VideoChannelSyncState.FAILED | ||
30 | logger.error(`Finished synchronizing "${sync.VideoChannel.Actor.preferredUsername}" with failures.`, { errors, successes }) | ||
31 | } else { | ||
32 | sync.state = VideoChannelSyncState.SYNCED | ||
33 | logger.info(`Finished synchronizing "${sync.VideoChannel.Actor.preferredUsername}" successfully.`, { successes }) | ||
34 | } | ||
35 | |||
36 | await sync.save() | ||
37 | } | ||
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts deleted file mode 100644 index 567bcc076..000000000 --- a/server/lib/job-queue/handlers/email.ts +++ /dev/null | |||
@@ -1,17 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { EmailPayload } from '@shared/models' | ||
3 | import { logger } from '../../../helpers/logger' | ||
4 | import { Emailer } from '../../emailer' | ||
5 | |||
6 | async function processEmail (job: Job) { | ||
7 | const payload = job.data as EmailPayload | ||
8 | logger.info('Processing email in job %s.', job.id) | ||
9 | |||
10 | return Emailer.Instance.sendMail(payload) | ||
11 | } | ||
12 | |||
13 | // --------------------------------------------------------------------------- | ||
14 | |||
15 | export { | ||
16 | processEmail | ||
17 | } | ||
diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts deleted file mode 100644 index 6aac36741..000000000 --- a/server/lib/job-queue/handlers/federate-video.ts +++ /dev/null | |||
@@ -1,28 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
3 | import { sequelizeTypescript } from '@server/initializers/database' | ||
4 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | ||
5 | import { VideoModel } from '@server/models/video/video' | ||
6 | import { FederateVideoPayload } from '@shared/models' | ||
7 | import { logger } from '../../../helpers/logger' | ||
8 | |||
9 | function processFederateVideo (job: Job) { | ||
10 | const payload = job.data as FederateVideoPayload | ||
11 | |||
12 | logger.info('Processing video federation in job %s.', job.id) | ||
13 | |||
14 | return retryTransactionWrapper(() => { | ||
15 | return sequelizeTypescript.transaction(async t => { | ||
16 | const video = await VideoModel.loadFull(payload.videoUUID, t) | ||
17 | if (!video) return | ||
18 | |||
19 | return federateVideoIfNeeded(video, payload.isNewVideo, t) | ||
20 | }) | ||
21 | }) | ||
22 | } | ||
23 | |||
24 | // --------------------------------------------------------------------------- | ||
25 | |||
26 | export { | ||
27 | processFederateVideo | ||
28 | } | ||
diff --git a/server/lib/job-queue/handlers/generate-storyboard.ts b/server/lib/job-queue/handlers/generate-storyboard.ts deleted file mode 100644 index eea20274a..000000000 --- a/server/lib/job-queue/handlers/generate-storyboard.ts +++ /dev/null | |||
@@ -1,163 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { join } from 'path' | ||
3 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
4 | import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' | ||
5 | import { generateImageFilename, getImageSize } from '@server/helpers/image-utils' | ||
6 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
7 | import { deleteFileAndCatch } from '@server/helpers/utils' | ||
8 | import { CONFIG } from '@server/initializers/config' | ||
9 | import { STORYBOARD } from '@server/initializers/constants' | ||
10 | import { sequelizeTypescript } from '@server/initializers/database' | ||
11 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | ||
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
13 | import { StoryboardModel } from '@server/models/video/storyboard' | ||
14 | import { VideoModel } from '@server/models/video/video' | ||
15 | import { MVideo } from '@server/types/models' | ||
16 | import { FFmpegImage, isAudioFile } from '@shared/ffmpeg' | ||
17 | import { GenerateStoryboardPayload } from '@shared/models' | ||
18 | |||
19 | const lTagsBase = loggerTagsFactory('storyboard') | ||
20 | |||
21 | async function processGenerateStoryboard (job: Job): Promise<void> { | ||
22 | const payload = job.data as GenerateStoryboardPayload | ||
23 | const lTags = lTagsBase(payload.videoUUID) | ||
24 | |||
25 | logger.info('Processing generate storyboard of %s in job %s.', payload.videoUUID, job.id, lTags) | ||
26 | |||
27 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(payload.videoUUID) | ||
28 | |||
29 | try { | ||
30 | const video = await VideoModel.loadFull(payload.videoUUID) | ||
31 | if (!video) { | ||
32 | logger.info('Video %s does not exist anymore, skipping storyboard generation.', payload.videoUUID, lTags) | ||
33 | return | ||
34 | } | ||
35 | |||
36 | const inputFile = video.getMaxQualityFile() | ||
37 | |||
38 | await VideoPathManager.Instance.makeAvailableVideoFile(inputFile, async videoPath => { | ||
39 | const isAudio = await isAudioFile(videoPath) | ||
40 | |||
41 | if (isAudio) { | ||
42 | logger.info('Do not generate a storyboard of %s since the video does not have a video stream', payload.videoUUID, lTags) | ||
43 | return | ||
44 | } | ||
45 | |||
46 | const ffmpeg = new FFmpegImage(getFFmpegCommandWrapperOptions('thumbnail')) | ||
47 | |||
48 | const filename = generateImageFilename() | ||
49 | const destination = join(CONFIG.STORAGE.STORYBOARDS_DIR, filename) | ||
50 | |||
51 | const totalSprites = buildTotalSprites(video) | ||
52 | if (totalSprites === 0) { | ||
53 | logger.info('Do not generate a storyboard of %s because the video is not long enough', payload.videoUUID, lTags) | ||
54 | return | ||
55 | } | ||
56 | |||
57 | const spriteDuration = Math.round(video.duration / totalSprites) | ||
58 | |||
59 | const spritesCount = findGridSize({ | ||
60 | toFind: totalSprites, | ||
61 | maxEdgeCount: STORYBOARD.SPRITES_MAX_EDGE_COUNT | ||
62 | }) | ||
63 | |||
64 | logger.debug( | ||
65 | 'Generating storyboard from video of %s to %s', video.uuid, destination, | ||
66 | { ...lTags, spritesCount, spriteDuration, videoDuration: video.duration } | ||
67 | ) | ||
68 | |||
69 | await ffmpeg.generateStoryboardFromVideo({ | ||
70 | destination, | ||
71 | path: videoPath, | ||
72 | sprites: { | ||
73 | size: STORYBOARD.SPRITE_SIZE, | ||
74 | count: spritesCount, | ||
75 | duration: spriteDuration | ||
76 | } | ||
77 | }) | ||
78 | |||
79 | const imageSize = await getImageSize(destination) | ||
80 | |||
81 | await retryTransactionWrapper(() => { | ||
82 | return sequelizeTypescript.transaction(async transaction => { | ||
83 | const videoStillExists = await VideoModel.load(video.id, transaction) | ||
84 | if (!videoStillExists) { | ||
85 | logger.info('Video %s does not exist anymore, skipping storyboard generation.', payload.videoUUID, lTags) | ||
86 | deleteFileAndCatch(destination) | ||
87 | return | ||
88 | } | ||
89 | |||
90 | const existing = await StoryboardModel.loadByVideo(video.id, transaction) | ||
91 | if (existing) await existing.destroy({ transaction }) | ||
92 | |||
93 | await StoryboardModel.create({ | ||
94 | filename, | ||
95 | totalHeight: imageSize.height, | ||
96 | totalWidth: imageSize.width, | ||
97 | spriteHeight: STORYBOARD.SPRITE_SIZE.height, | ||
98 | spriteWidth: STORYBOARD.SPRITE_SIZE.width, | ||
99 | spriteDuration, | ||
100 | videoId: video.id | ||
101 | }, { transaction }) | ||
102 | |||
103 | logger.info('Storyboard generation %s ended for video %s.', destination, video.uuid, lTags) | ||
104 | |||
105 | if (payload.federate) { | ||
106 | await federateVideoIfNeeded(video, false, transaction) | ||
107 | } | ||
108 | }) | ||
109 | }) | ||
110 | }) | ||
111 | } finally { | ||
112 | inputFileMutexReleaser() | ||
113 | } | ||
114 | } | ||
115 | |||
116 | // --------------------------------------------------------------------------- | ||
117 | |||
118 | export { | ||
119 | processGenerateStoryboard | ||
120 | } | ||
121 | |||
122 | function buildTotalSprites (video: MVideo) { | ||
123 | const maxSprites = STORYBOARD.SPRITE_SIZE.height * STORYBOARD.SPRITE_SIZE.width | ||
124 | const totalSprites = Math.min(Math.ceil(video.duration), maxSprites) | ||
125 | |||
126 | // We can generate a single line | ||
127 | if (totalSprites <= STORYBOARD.SPRITES_MAX_EDGE_COUNT) return totalSprites | ||
128 | |||
129 | return findGridFit(totalSprites, STORYBOARD.SPRITES_MAX_EDGE_COUNT) | ||
130 | } | ||
131 | |||
132 | function findGridSize (options: { | ||
133 | toFind: number | ||
134 | maxEdgeCount: number | ||
135 | }) { | ||
136 | const { toFind, maxEdgeCount } = options | ||
137 | |||
138 | for (let i = 1; i <= maxEdgeCount; i++) { | ||
139 | for (let j = i; j <= maxEdgeCount; j++) { | ||
140 | if (toFind === i * j) return { width: j, height: i } | ||
141 | } | ||
142 | } | ||
143 | |||
144 | throw new Error(`Could not find grid size (to find: ${toFind}, max edge count: ${maxEdgeCount}`) | ||
145 | } | ||
146 | |||
147 | function findGridFit (value: number, maxMultiplier: number) { | ||
148 | for (let i = value; i--; i > 0) { | ||
149 | if (!isPrimeWithin(i, maxMultiplier)) return i | ||
150 | } | ||
151 | |||
152 | throw new Error('Could not find prime number below ' + value) | ||
153 | } | ||
154 | |||
155 | function isPrimeWithin (value: number, maxMultiplier: number) { | ||
156 | if (value < 2) return false | ||
157 | |||
158 | for (let i = 2, end = Math.min(Math.sqrt(value), maxMultiplier); i <= end; i++) { | ||
159 | if (value % i === 0 && value / i <= maxMultiplier) return false | ||
160 | } | ||
161 | |||
162 | return true | ||
163 | } | ||
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts deleted file mode 100644 index edf52de0c..000000000 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ /dev/null | |||
@@ -1,110 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { extractVideo } from '@server/helpers/video' | ||
3 | import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' | ||
4 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
5 | import { VideoModel } from '@server/models/video/video' | ||
6 | import { VideoFileModel } from '@server/models/video/video-file' | ||
7 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | ||
8 | import { ManageVideoTorrentPayload } from '@shared/models' | ||
9 | import { logger } from '../../../helpers/logger' | ||
10 | |||
11 | async function processManageVideoTorrent (job: Job) { | ||
12 | const payload = job.data as ManageVideoTorrentPayload | ||
13 | logger.info('Processing torrent in job %s.', job.id) | ||
14 | |||
15 | if (payload.action === 'create') return doCreateAction(payload) | ||
16 | if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) | ||
17 | } | ||
18 | |||
19 | // --------------------------------------------------------------------------- | ||
20 | |||
21 | export { | ||
22 | processManageVideoTorrent | ||
23 | } | ||
24 | |||
25 | // --------------------------------------------------------------------------- | ||
26 | |||
27 | async function doCreateAction (payload: ManageVideoTorrentPayload & { action: 'create' }) { | ||
28 | const [ video, file ] = await Promise.all([ | ||
29 | loadVideoOrLog(payload.videoId), | ||
30 | loadFileOrLog(payload.videoFileId) | ||
31 | ]) | ||
32 | |||
33 | if (!video || !file) return | ||
34 | |||
35 | const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
36 | |||
37 | try { | ||
38 | await video.reload() | ||
39 | await file.reload() | ||
40 | |||
41 | await createTorrentAndSetInfoHash(video, file) | ||
42 | |||
43 | // Refresh videoFile because the createTorrentAndSetInfoHash could be long | ||
44 | const refreshedFile = await VideoFileModel.loadWithVideo(file.id) | ||
45 | // File does not exist anymore, remove the generated torrent | ||
46 | if (!refreshedFile) return file.removeTorrent() | ||
47 | |||
48 | refreshedFile.infoHash = file.infoHash | ||
49 | refreshedFile.torrentFilename = file.torrentFilename | ||
50 | |||
51 | await refreshedFile.save() | ||
52 | } finally { | ||
53 | fileMutexReleaser() | ||
54 | } | ||
55 | } | ||
56 | |||
57 | async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) { | ||
58 | const [ video, streamingPlaylist, file ] = await Promise.all([ | ||
59 | loadVideoOrLog(payload.videoId), | ||
60 | loadStreamingPlaylistOrLog(payload.streamingPlaylistId), | ||
61 | loadFileOrLog(payload.videoFileId) | ||
62 | ]) | ||
63 | |||
64 | if ((!video && !streamingPlaylist) || !file) return | ||
65 | |||
66 | const extractedVideo = extractVideo(video || streamingPlaylist) | ||
67 | const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(extractedVideo.uuid) | ||
68 | |||
69 | try { | ||
70 | await updateTorrentMetadata(video || streamingPlaylist, file) | ||
71 | |||
72 | await file.save() | ||
73 | } finally { | ||
74 | fileMutexReleaser() | ||
75 | } | ||
76 | } | ||
77 | |||
78 | async function loadVideoOrLog (videoId: number) { | ||
79 | if (!videoId) return undefined | ||
80 | |||
81 | const video = await VideoModel.load(videoId) | ||
82 | if (!video) { | ||
83 | logger.debug('Do not process torrent for video %d: does not exist anymore.', videoId) | ||
84 | } | ||
85 | |||
86 | return video | ||
87 | } | ||
88 | |||
89 | async function loadStreamingPlaylistOrLog (streamingPlaylistId: number) { | ||
90 | if (!streamingPlaylistId) return undefined | ||
91 | |||
92 | const streamingPlaylist = await VideoStreamingPlaylistModel.loadWithVideo(streamingPlaylistId) | ||
93 | if (!streamingPlaylist) { | ||
94 | logger.debug('Do not process torrent for streaming playlist %d: does not exist anymore.', streamingPlaylistId) | ||
95 | } | ||
96 | |||
97 | return streamingPlaylist | ||
98 | } | ||
99 | |||
100 | async function loadFileOrLog (videoFileId: number) { | ||
101 | if (!videoFileId) return undefined | ||
102 | |||
103 | const file = await VideoFileModel.load(videoFileId) | ||
104 | |||
105 | if (!file) { | ||
106 | logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId) | ||
107 | } | ||
108 | |||
109 | return file | ||
110 | } | ||
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts deleted file mode 100644 index 9a99b6722..000000000 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ /dev/null | |||
@@ -1,159 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { remove } from 'fs-extra' | ||
3 | import { join } from 'path' | ||
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
5 | import { updateTorrentMetadata } from '@server/helpers/webtorrent' | ||
6 | import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' | ||
7 | import { storeHLSFileFromFilename, storeWebVideoFile } from '@server/lib/object-storage' | ||
8 | import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' | ||
9 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
10 | import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state' | ||
11 | import { VideoModel } from '@server/models/video/video' | ||
12 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
13 | import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' | ||
14 | import { MoveObjectStoragePayload, VideoState, VideoStorage } from '@shared/models' | ||
15 | |||
16 | const lTagsBase = loggerTagsFactory('move-object-storage') | ||
17 | |||
18 | export async function processMoveToObjectStorage (job: Job) { | ||
19 | const payload = job.data as MoveObjectStoragePayload | ||
20 | logger.info('Moving video %s in job %s.', payload.videoUUID, job.id) | ||
21 | |||
22 | const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(payload.videoUUID) | ||
23 | |||
24 | const video = await VideoModel.loadWithFiles(payload.videoUUID) | ||
25 | // No video, maybe deleted? | ||
26 | if (!video) { | ||
27 | logger.info('Can\'t process job %d, video does not exist.', job.id, lTagsBase(payload.videoUUID)) | ||
28 | fileMutexReleaser() | ||
29 | return undefined | ||
30 | } | ||
31 | |||
32 | const lTags = lTagsBase(video.uuid, video.url) | ||
33 | |||
34 | try { | ||
35 | if (video.VideoFiles) { | ||
36 | logger.debug('Moving %d web video files for video %s.', video.VideoFiles.length, video.uuid, lTags) | ||
37 | |||
38 | await moveWebVideoFiles(video) | ||
39 | } | ||
40 | |||
41 | if (video.VideoStreamingPlaylists) { | ||
42 | logger.debug('Moving HLS playlist of %s.', video.uuid) | ||
43 | |||
44 | await moveHLSFiles(video) | ||
45 | } | ||
46 | |||
47 | const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') | ||
48 | if (pendingMove === 0) { | ||
49 | logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags) | ||
50 | |||
51 | await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) | ||
52 | } | ||
53 | } catch (err) { | ||
54 | await onMoveToObjectStorageFailure(job, err) | ||
55 | |||
56 | throw err | ||
57 | } finally { | ||
58 | fileMutexReleaser() | ||
59 | } | ||
60 | |||
61 | return payload.videoUUID | ||
62 | } | ||
63 | |||
64 | export async function onMoveToObjectStorageFailure (job: Job, err: any) { | ||
65 | const payload = job.data as MoveObjectStoragePayload | ||
66 | |||
67 | const video = await VideoModel.loadWithFiles(payload.videoUUID) | ||
68 | if (!video) return | ||
69 | |||
70 | logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTagsBase(video.uuid, video.url) }) | ||
71 | |||
72 | await moveToFailedMoveToObjectStorageState(video) | ||
73 | await VideoJobInfoModel.abortAllTasks(video.uuid, 'pendingMove') | ||
74 | } | ||
75 | |||
76 | // --------------------------------------------------------------------------- | ||
77 | |||
78 | async function moveWebVideoFiles (video: MVideoWithAllFiles) { | ||
79 | for (const file of video.VideoFiles) { | ||
80 | if (file.storage !== VideoStorage.FILE_SYSTEM) continue | ||
81 | |||
82 | const fileUrl = await storeWebVideoFile(video, file) | ||
83 | |||
84 | const oldPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, file) | ||
85 | await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath }) | ||
86 | } | ||
87 | } | ||
88 | |||
89 | async function moveHLSFiles (video: MVideoWithAllFiles) { | ||
90 | for (const playlist of video.VideoStreamingPlaylists) { | ||
91 | const playlistWithVideo = playlist.withVideo(video) | ||
92 | |||
93 | for (const file of playlist.VideoFiles) { | ||
94 | if (file.storage !== VideoStorage.FILE_SYSTEM) continue | ||
95 | |||
96 | // Resolution playlist | ||
97 | const playlistFilename = getHlsResolutionPlaylistFilename(file.filename) | ||
98 | await storeHLSFileFromFilename(playlistWithVideo, playlistFilename) | ||
99 | |||
100 | // Resolution fragmented file | ||
101 | const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename) | ||
102 | |||
103 | const oldPath = join(getHLSDirectory(video), file.filename) | ||
104 | |||
105 | await onFileMoved({ videoOrPlaylist: Object.assign(playlist, { Video: video }), file, fileUrl, oldPath }) | ||
106 | } | ||
107 | } | ||
108 | } | ||
109 | |||
110 | async function doAfterLastJob (options: { | ||
111 | video: MVideoWithAllFiles | ||
112 | previousVideoState: VideoState | ||
113 | isNewVideo: boolean | ||
114 | }) { | ||
115 | const { video, previousVideoState, isNewVideo } = options | ||
116 | |||
117 | for (const playlist of video.VideoStreamingPlaylists) { | ||
118 | if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue | ||
119 | |||
120 | const playlistWithVideo = playlist.withVideo(video) | ||
121 | |||
122 | // Master playlist | ||
123 | playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename) | ||
124 | // Sha256 segments file | ||
125 | playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename) | ||
126 | |||
127 | playlist.storage = VideoStorage.OBJECT_STORAGE | ||
128 | |||
129 | playlist.assignP2PMediaLoaderInfoHashes(video, playlist.VideoFiles) | ||
130 | playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION | ||
131 | |||
132 | await playlist.save() | ||
133 | } | ||
134 | |||
135 | // Remove empty hls video directory | ||
136 | if (video.VideoStreamingPlaylists) { | ||
137 | await remove(getHLSDirectory(video)) | ||
138 | } | ||
139 | |||
140 | await moveToNextState({ video, previousVideoState, isNewVideo }) | ||
141 | } | ||
142 | |||
143 | async function onFileMoved (options: { | ||
144 | videoOrPlaylist: MVideo | MStreamingPlaylistVideo | ||
145 | file: MVideoFile | ||
146 | fileUrl: string | ||
147 | oldPath: string | ||
148 | }) { | ||
149 | const { videoOrPlaylist, file, fileUrl, oldPath } = options | ||
150 | |||
151 | file.fileUrl = fileUrl | ||
152 | file.storage = VideoStorage.OBJECT_STORAGE | ||
153 | |||
154 | await updateTorrentMetadata(videoOrPlaylist, file) | ||
155 | await file.save() | ||
156 | |||
157 | logger.debug('Removing %s because it\'s now on object storage', oldPath) | ||
158 | await remove(oldPath) | ||
159 | } | ||
diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts deleted file mode 100644 index 83605396c..000000000 --- a/server/lib/job-queue/handlers/notify.ts +++ /dev/null | |||
@@ -1,27 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { Notifier } from '@server/lib/notifier' | ||
3 | import { VideoModel } from '@server/models/video/video' | ||
4 | import { NotifyPayload } from '@shared/models' | ||
5 | import { logger } from '../../../helpers/logger' | ||
6 | |||
7 | async function processNotify (job: Job) { | ||
8 | const payload = job.data as NotifyPayload | ||
9 | logger.info('Processing %s notification in job %s.', payload.action, job.id) | ||
10 | |||
11 | if (payload.action === 'new-video') return doNotifyNewVideo(payload) | ||
12 | } | ||
13 | |||
14 | // --------------------------------------------------------------------------- | ||
15 | |||
16 | export { | ||
17 | processNotify | ||
18 | } | ||
19 | |||
20 | // --------------------------------------------------------------------------- | ||
21 | |||
22 | async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) { | ||
23 | const refreshedVideo = await VideoModel.loadFull(payload.videoUUID) | ||
24 | if (!refreshedVideo) return | ||
25 | |||
26 | Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) | ||
27 | } | ||
diff --git a/server/lib/job-queue/handlers/transcoding-job-builder.ts b/server/lib/job-queue/handlers/transcoding-job-builder.ts deleted file mode 100644 index 8621b109f..000000000 --- a/server/lib/job-queue/handlers/transcoding-job-builder.ts +++ /dev/null | |||
@@ -1,48 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' | ||
3 | import { UserModel } from '@server/models/user/user' | ||
4 | import { VideoModel } from '@server/models/video/video' | ||
5 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
6 | import { pick } from '@shared/core-utils' | ||
7 | import { TranscodingJobBuilderPayload } from '@shared/models' | ||
8 | import { logger } from '../../../helpers/logger' | ||
9 | import { JobQueue } from '../job-queue' | ||
10 | |||
11 | async function processTranscodingJobBuilder (job: Job) { | ||
12 | const payload = job.data as TranscodingJobBuilderPayload | ||
13 | |||
14 | logger.info('Processing transcoding job builder in job %s.', job.id) | ||
15 | |||
16 | if (payload.optimizeJob) { | ||
17 | const video = await VideoModel.loadFull(payload.videoUUID) | ||
18 | const user = await UserModel.loadByVideoId(video.id) | ||
19 | const videoFile = video.getMaxQualityFile() | ||
20 | |||
21 | await createOptimizeOrMergeAudioJobs({ | ||
22 | ...pick(payload.optimizeJob, [ 'isNewVideo' ]), | ||
23 | |||
24 | video, | ||
25 | videoFile, | ||
26 | user, | ||
27 | videoFileAlreadyLocked: false | ||
28 | }) | ||
29 | } | ||
30 | |||
31 | for (const job of (payload.jobs || [])) { | ||
32 | await JobQueue.Instance.createJob(job) | ||
33 | |||
34 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') | ||
35 | } | ||
36 | |||
37 | for (const sequentialJobs of (payload.sequentialJobs || [])) { | ||
38 | await JobQueue.Instance.createSequentialJobFlow(...sequentialJobs) | ||
39 | |||
40 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode', sequentialJobs.filter(s => !!s).length) | ||
41 | } | ||
42 | } | ||
43 | |||
44 | // --------------------------------------------------------------------------- | ||
45 | |||
46 | export { | ||
47 | processTranscodingJobBuilder | ||
48 | } | ||
diff --git a/server/lib/job-queue/handlers/video-channel-import.ts b/server/lib/job-queue/handlers/video-channel-import.ts deleted file mode 100644 index 035f88e96..000000000 --- a/server/lib/job-queue/handlers/video-channel-import.ts +++ /dev/null | |||
@@ -1,43 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { CONFIG } from '@server/initializers/config' | ||
4 | import { synchronizeChannel } from '@server/lib/sync-channel' | ||
5 | import { VideoChannelModel } from '@server/models/video/video-channel' | ||
6 | import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' | ||
7 | import { MChannelSync } from '@server/types/models' | ||
8 | import { VideoChannelImportPayload } from '@shared/models' | ||
9 | |||
10 | export async function processVideoChannelImport (job: Job) { | ||
11 | const payload = job.data as VideoChannelImportPayload | ||
12 | |||
13 | logger.info('Processing video channel import in job %s.', job.id) | ||
14 | |||
15 | // Channel import requires only http upload to be allowed | ||
16 | if (!CONFIG.IMPORT.VIDEOS.HTTP.ENABLED) { | ||
17 | throw new Error('Cannot import channel as the HTTP upload is disabled') | ||
18 | } | ||
19 | |||
20 | if (!CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.ENABLED) { | ||
21 | throw new Error('Cannot import channel as the synchronization is disabled') | ||
22 | } | ||
23 | |||
24 | let channelSync: MChannelSync | ||
25 | if (payload.partOfChannelSyncId) { | ||
26 | channelSync = await VideoChannelSyncModel.loadWithChannel(payload.partOfChannelSyncId) | ||
27 | |||
28 | if (!channelSync) { | ||
29 | throw new Error('Unlnown channel sync specified in videos channel import') | ||
30 | } | ||
31 | } | ||
32 | |||
33 | const videoChannel = await VideoChannelModel.loadAndPopulateAccount(payload.videoChannelId) | ||
34 | |||
35 | logger.info(`Starting importing videos from external channel "${payload.externalChannelUrl}" to "${videoChannel.name}" `) | ||
36 | |||
37 | await synchronizeChannel({ | ||
38 | channel: videoChannel, | ||
39 | externalChannelUrl: payload.externalChannelUrl, | ||
40 | channelSync, | ||
41 | videosCountLimit: CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.FULL_SYNC_VIDEOS_LIMIT | ||
42 | }) | ||
43 | } | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts deleted file mode 100644 index d221e8968..000000000 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ /dev/null | |||
@@ -1,83 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { copy, stat } from 'fs-extra' | ||
3 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | ||
4 | import { CONFIG } from '@server/initializers/config' | ||
5 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | ||
6 | import { generateWebVideoFilename } from '@server/lib/paths' | ||
7 | import { buildMoveToObjectStorageJob } from '@server/lib/video' | ||
8 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
9 | import { VideoModel } from '@server/models/video/video' | ||
10 | import { VideoFileModel } from '@server/models/video/video-file' | ||
11 | import { MVideoFullLight } from '@server/types/models' | ||
12 | import { getLowercaseExtension } from '@shared/core-utils' | ||
13 | import { getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' | ||
14 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' | ||
15 | import { logger } from '../../../helpers/logger' | ||
16 | import { JobQueue } from '../job-queue' | ||
17 | |||
18 | async function processVideoFileImport (job: Job) { | ||
19 | const payload = job.data as VideoFileImportPayload | ||
20 | logger.info('Processing video file import in job %s.', job.id) | ||
21 | |||
22 | const video = await VideoModel.loadFull(payload.videoUUID) | ||
23 | // No video, maybe deleted? | ||
24 | if (!video) { | ||
25 | logger.info('Do not process job %d, video does not exist.', job.id) | ||
26 | return undefined | ||
27 | } | ||
28 | |||
29 | await updateVideoFile(video, payload.filePath) | ||
30 | |||
31 | if (CONFIG.OBJECT_STORAGE.ENABLED) { | ||
32 | await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state })) | ||
33 | } else { | ||
34 | await federateVideoIfNeeded(video, false) | ||
35 | } | ||
36 | |||
37 | return video | ||
38 | } | ||
39 | |||
40 | // --------------------------------------------------------------------------- | ||
41 | |||
42 | export { | ||
43 | processVideoFileImport | ||
44 | } | ||
45 | |||
46 | // --------------------------------------------------------------------------- | ||
47 | |||
48 | async function updateVideoFile (video: MVideoFullLight, inputFilePath: string) { | ||
49 | const { resolution } = await getVideoStreamDimensionsInfo(inputFilePath) | ||
50 | const { size } = await stat(inputFilePath) | ||
51 | const fps = await getVideoStreamFPS(inputFilePath) | ||
52 | |||
53 | const fileExt = getLowercaseExtension(inputFilePath) | ||
54 | |||
55 | const currentVideoFile = video.VideoFiles.find(videoFile => videoFile.resolution === resolution) | ||
56 | |||
57 | if (currentVideoFile) { | ||
58 | // Remove old file and old torrent | ||
59 | await video.removeWebVideoFile(currentVideoFile) | ||
60 | // Remove the old video file from the array | ||
61 | video.VideoFiles = video.VideoFiles.filter(f => f !== currentVideoFile) | ||
62 | |||
63 | await currentVideoFile.destroy() | ||
64 | } | ||
65 | |||
66 | const newVideoFile = new VideoFileModel({ | ||
67 | resolution, | ||
68 | extname: fileExt, | ||
69 | filename: generateWebVideoFilename(resolution, fileExt), | ||
70 | storage: VideoStorage.FILE_SYSTEM, | ||
71 | size, | ||
72 | fps, | ||
73 | videoId: video.id | ||
74 | }) | ||
75 | |||
76 | const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, newVideoFile) | ||
77 | await copy(inputFilePath, outputPath) | ||
78 | |||
79 | video.VideoFiles.push(newVideoFile) | ||
80 | await createTorrentAndSetInfoHash(video, newVideoFile) | ||
81 | |||
82 | await newVideoFile.save() | ||
83 | } | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts deleted file mode 100644 index e5cd258d6..000000000 --- a/server/lib/job-queue/handlers/video-import.ts +++ /dev/null | |||
@@ -1,344 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { move, remove, stat } from 'fs-extra' | ||
3 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
4 | import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' | ||
5 | import { CONFIG } from '@server/initializers/config' | ||
6 | import { isPostImportVideoAccepted } from '@server/lib/moderation' | ||
7 | import { generateWebVideoFilename } from '@server/lib/paths' | ||
8 | import { Hooks } from '@server/lib/plugins/hooks' | ||
9 | import { ServerConfigManager } from '@server/lib/server-config-manager' | ||
10 | import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' | ||
11 | import { isAbleToUploadVideo } from '@server/lib/user' | ||
12 | import { buildMoveToObjectStorageJob } from '@server/lib/video' | ||
13 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
14 | import { buildNextVideoState } from '@server/lib/video-state' | ||
15 | import { ThumbnailModel } from '@server/models/video/thumbnail' | ||
16 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' | ||
17 | import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | ||
18 | import { getLowercaseExtension } from '@shared/core-utils' | ||
19 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg' | ||
20 | import { | ||
21 | ThumbnailType, | ||
22 | VideoImportPayload, | ||
23 | VideoImportPreventExceptionResult, | ||
24 | VideoImportState, | ||
25 | VideoImportTorrentPayload, | ||
26 | VideoImportTorrentPayloadType, | ||
27 | VideoImportYoutubeDLPayload, | ||
28 | VideoImportYoutubeDLPayloadType, | ||
29 | VideoResolution, | ||
30 | VideoState | ||
31 | } from '@shared/models' | ||
32 | import { logger } from '../../../helpers/logger' | ||
33 | import { getSecureTorrentName } from '../../../helpers/utils' | ||
34 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' | ||
35 | import { JOB_TTL } from '../../../initializers/constants' | ||
36 | import { sequelizeTypescript } from '../../../initializers/database' | ||
37 | import { VideoModel } from '../../../models/video/video' | ||
38 | import { VideoFileModel } from '../../../models/video/video-file' | ||
39 | import { VideoImportModel } from '../../../models/video/video-import' | ||
40 | import { federateVideoIfNeeded } from '../../activitypub/videos' | ||
41 | import { Notifier } from '../../notifier' | ||
42 | import { generateLocalVideoMiniature } from '../../thumbnail' | ||
43 | import { JobQueue } from '../job-queue' | ||
44 | |||
45 | async function processVideoImport (job: Job): Promise<VideoImportPreventExceptionResult> { | ||
46 | const payload = job.data as VideoImportPayload | ||
47 | |||
48 | const videoImport = await getVideoImportOrDie(payload) | ||
49 | if (videoImport.state === VideoImportState.CANCELLED) { | ||
50 | logger.info('Do not process import since it has been cancelled', { payload }) | ||
51 | return { resultType: 'success' } | ||
52 | } | ||
53 | |||
54 | videoImport.state = VideoImportState.PROCESSING | ||
55 | await videoImport.save() | ||
56 | |||
57 | try { | ||
58 | if (payload.type === 'youtube-dl') await processYoutubeDLImport(job, videoImport, payload) | ||
59 | if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') await processTorrentImport(job, videoImport, payload) | ||
60 | |||
61 | return { resultType: 'success' } | ||
62 | } catch (err) { | ||
63 | if (!payload.preventException) throw err | ||
64 | |||
65 | logger.warn('Catch error in video import to send value to parent job.', { payload, err }) | ||
66 | return { resultType: 'error' } | ||
67 | } | ||
68 | } | ||
69 | |||
70 | // --------------------------------------------------------------------------- | ||
71 | |||
72 | export { | ||
73 | processVideoImport | ||
74 | } | ||
75 | |||
76 | // --------------------------------------------------------------------------- | ||
77 | |||
78 | async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { | ||
79 | logger.info('Processing torrent video import in job %s.', job.id) | ||
80 | |||
81 | const options = { type: payload.type, videoImportId: payload.videoImportId } | ||
82 | |||
83 | const target = { | ||
84 | torrentName: videoImport.torrentName ? getSecureTorrentName(videoImport.torrentName) : undefined, | ||
85 | uri: videoImport.magnetUri | ||
86 | } | ||
87 | return processFile(() => downloadWebTorrentVideo(target, JOB_TTL['video-import']), videoImport, options) | ||
88 | } | ||
89 | |||
90 | async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { | ||
91 | logger.info('Processing youtubeDL video import in job %s.', job.id) | ||
92 | |||
93 | const options = { type: payload.type, videoImportId: videoImport.id } | ||
94 | |||
95 | const youtubeDL = new YoutubeDLWrapper( | ||
96 | videoImport.targetUrl, | ||
97 | ServerConfigManager.Instance.getEnabledResolutions('vod'), | ||
98 | CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION | ||
99 | ) | ||
100 | |||
101 | return processFile( | ||
102 | () => youtubeDL.downloadVideo(payload.fileExt, JOB_TTL['video-import']), | ||
103 | videoImport, | ||
104 | options | ||
105 | ) | ||
106 | } | ||
107 | |||
108 | async function getVideoImportOrDie (payload: VideoImportPayload) { | ||
109 | const videoImport = await VideoImportModel.loadAndPopulateVideo(payload.videoImportId) | ||
110 | if (!videoImport?.Video) { | ||
111 | throw new Error(`Cannot import video ${payload.videoImportId}: the video import or video linked to this import does not exist anymore.`) | ||
112 | } | ||
113 | |||
114 | return videoImport | ||
115 | } | ||
116 | |||
117 | type ProcessFileOptions = { | ||
118 | type: VideoImportYoutubeDLPayloadType | VideoImportTorrentPayloadType | ||
119 | videoImportId: number | ||
120 | } | ||
121 | async function processFile (downloader: () => Promise<string>, videoImport: MVideoImportDefault, options: ProcessFileOptions) { | ||
122 | let tempVideoPath: string | ||
123 | let videoFile: VideoFileModel | ||
124 | |||
125 | try { | ||
126 | // Download video from youtubeDL | ||
127 | tempVideoPath = await downloader() | ||
128 | |||
129 | // Get information about this video | ||
130 | const stats = await stat(tempVideoPath) | ||
131 | const isAble = await isAbleToUploadVideo(videoImport.User.id, stats.size) | ||
132 | if (isAble === false) { | ||
133 | throw new Error('The user video quota is exceeded with this video to import.') | ||
134 | } | ||
135 | |||
136 | const probe = await ffprobePromise(tempVideoPath) | ||
137 | |||
138 | const { resolution } = await isAudioFile(tempVideoPath, probe) | ||
139 | ? { resolution: VideoResolution.H_NOVIDEO } | ||
140 | : await getVideoStreamDimensionsInfo(tempVideoPath, probe) | ||
141 | |||
142 | const fps = await getVideoStreamFPS(tempVideoPath, probe) | ||
143 | const duration = await getVideoStreamDuration(tempVideoPath, probe) | ||
144 | |||
145 | // Prepare video file object for creation in database | ||
146 | const fileExt = getLowercaseExtension(tempVideoPath) | ||
147 | const videoFileData = { | ||
148 | extname: fileExt, | ||
149 | resolution, | ||
150 | size: stats.size, | ||
151 | filename: generateWebVideoFilename(resolution, fileExt), | ||
152 | fps, | ||
153 | videoId: videoImport.videoId | ||
154 | } | ||
155 | videoFile = new VideoFileModel(videoFileData) | ||
156 | |||
157 | const hookName = options.type === 'youtube-dl' | ||
158 | ? 'filter:api.video.post-import-url.accept.result' | ||
159 | : 'filter:api.video.post-import-torrent.accept.result' | ||
160 | |||
161 | // Check we accept this video | ||
162 | const acceptParameters = { | ||
163 | videoImport, | ||
164 | video: videoImport.Video, | ||
165 | videoFilePath: tempVideoPath, | ||
166 | videoFile, | ||
167 | user: videoImport.User | ||
168 | } | ||
169 | const acceptedResult = await Hooks.wrapFun(isPostImportVideoAccepted, acceptParameters, hookName) | ||
170 | |||
171 | if (acceptedResult.accepted !== true) { | ||
172 | logger.info('Refused imported video.', { acceptedResult, acceptParameters }) | ||
173 | |||
174 | videoImport.state = VideoImportState.REJECTED | ||
175 | await videoImport.save() | ||
176 | |||
177 | throw new Error(acceptedResult.errorMessage) | ||
178 | } | ||
179 | |||
180 | // Video is accepted, resuming preparation | ||
181 | const videoFileLockReleaser = await VideoPathManager.Instance.lockFiles(videoImport.Video.uuid) | ||
182 | |||
183 | try { | ||
184 | const videoImportWithFiles = await refreshVideoImportFromDB(videoImport, videoFile) | ||
185 | |||
186 | // Move file | ||
187 | const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile) | ||
188 | await move(tempVideoPath, videoDestFile) | ||
189 | |||
190 | tempVideoPath = null // This path is not used anymore | ||
191 | |||
192 | let { | ||
193 | miniatureModel: thumbnailModel, | ||
194 | miniatureJSONSave: thumbnailSave | ||
195 | } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.MINIATURE) | ||
196 | |||
197 | let { | ||
198 | miniatureModel: previewModel, | ||
199 | miniatureJSONSave: previewSave | ||
200 | } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.PREVIEW) | ||
201 | |||
202 | // Create torrent | ||
203 | await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) | ||
204 | |||
205 | const videoFileSave = videoFile.toJSON() | ||
206 | |||
207 | const { videoImportUpdated, video } = await retryTransactionWrapper(() => { | ||
208 | return sequelizeTypescript.transaction(async t => { | ||
209 | // Refresh video | ||
210 | const video = await VideoModel.load(videoImportWithFiles.videoId, t) | ||
211 | if (!video) throw new Error('Video linked to import ' + videoImportWithFiles.videoId + ' does not exist anymore.') | ||
212 | |||
213 | await videoFile.save({ transaction: t }) | ||
214 | |||
215 | // Update video DB object | ||
216 | video.duration = duration | ||
217 | video.state = buildNextVideoState(video.state) | ||
218 | await video.save({ transaction: t }) | ||
219 | |||
220 | if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) | ||
221 | if (previewModel) await video.addAndSaveThumbnail(previewModel, t) | ||
222 | |||
223 | // Now we can federate the video (reload from database, we need more attributes) | ||
224 | const videoForFederation = await VideoModel.loadFull(video.uuid, t) | ||
225 | await federateVideoIfNeeded(videoForFederation, true, t) | ||
226 | |||
227 | // Update video import object | ||
228 | videoImportWithFiles.state = VideoImportState.SUCCESS | ||
229 | const videoImportUpdated = await videoImportWithFiles.save({ transaction: t }) as MVideoImport | ||
230 | |||
231 | logger.info('Video %s imported.', video.uuid) | ||
232 | |||
233 | return { videoImportUpdated, video: videoForFederation } | ||
234 | }).catch(err => { | ||
235 | // Reset fields | ||
236 | if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave) | ||
237 | if (previewModel) previewModel = new ThumbnailModel(previewSave) | ||
238 | |||
239 | videoFile = new VideoFileModel(videoFileSave) | ||
240 | |||
241 | throw err | ||
242 | }) | ||
243 | }) | ||
244 | |||
245 | await afterImportSuccess({ videoImport: videoImportUpdated, video, videoFile, user: videoImport.User, videoFileAlreadyLocked: true }) | ||
246 | } finally { | ||
247 | videoFileLockReleaser() | ||
248 | } | ||
249 | } catch (err) { | ||
250 | await onImportError(err, tempVideoPath, videoImport) | ||
251 | |||
252 | throw err | ||
253 | } | ||
254 | } | ||
255 | |||
256 | async function refreshVideoImportFromDB (videoImport: MVideoImportDefault, videoFile: MVideoFile): Promise<MVideoImportDefaultFiles> { | ||
257 | // Refresh video, privacy may have changed | ||
258 | const video = await videoImport.Video.reload() | ||
259 | const videoWithFiles = Object.assign(video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] }) | ||
260 | |||
261 | return Object.assign(videoImport, { Video: videoWithFiles }) | ||
262 | } | ||
263 | |||
264 | async function generateMiniature (videoImportWithFiles: MVideoImportDefaultFiles, videoFile: MVideoFile, thumbnailType: ThumbnailType) { | ||
265 | // Generate miniature if the import did not created it | ||
266 | const needsMiniature = thumbnailType === ThumbnailType.MINIATURE | ||
267 | ? !videoImportWithFiles.Video.getMiniature() | ||
268 | : !videoImportWithFiles.Video.getPreview() | ||
269 | |||
270 | if (!needsMiniature) { | ||
271 | return { | ||
272 | miniatureModel: null, | ||
273 | miniatureJSONSave: null | ||
274 | } | ||
275 | } | ||
276 | |||
277 | const miniatureModel = await generateLocalVideoMiniature({ | ||
278 | video: videoImportWithFiles.Video, | ||
279 | videoFile, | ||
280 | type: thumbnailType | ||
281 | }) | ||
282 | const miniatureJSONSave = miniatureModel.toJSON() | ||
283 | |||
284 | return { | ||
285 | miniatureModel, | ||
286 | miniatureJSONSave | ||
287 | } | ||
288 | } | ||
289 | |||
290 | async function afterImportSuccess (options: { | ||
291 | videoImport: MVideoImport | ||
292 | video: MVideoFullLight | ||
293 | videoFile: MVideoFile | ||
294 | user: MUserId | ||
295 | videoFileAlreadyLocked: boolean | ||
296 | }) { | ||
297 | const { video, videoFile, videoImport, user, videoFileAlreadyLocked } = options | ||
298 | |||
299 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: Object.assign(videoImport, { Video: video }), success: true }) | ||
300 | |||
301 | if (video.isBlacklisted()) { | ||
302 | const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video }) | ||
303 | |||
304 | Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist) | ||
305 | } else { | ||
306 | Notifier.Instance.notifyOnNewVideoIfNeeded(video) | ||
307 | } | ||
308 | |||
309 | // Generate the storyboard in the job queue, and don't forget to federate an update after | ||
310 | await JobQueue.Instance.createJob({ | ||
311 | type: 'generate-video-storyboard' as 'generate-video-storyboard', | ||
312 | payload: { | ||
313 | videoUUID: video.uuid, | ||
314 | federate: true | ||
315 | } | ||
316 | }) | ||
317 | |||
318 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | ||
319 | await JobQueue.Instance.createJob( | ||
320 | await buildMoveToObjectStorageJob({ video, previousVideoState: VideoState.TO_IMPORT }) | ||
321 | ) | ||
322 | return | ||
323 | } | ||
324 | |||
325 | if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? | ||
326 | await createOptimizeOrMergeAudioJobs({ video, videoFile, isNewVideo: true, user, videoFileAlreadyLocked }) | ||
327 | } | ||
328 | } | ||
329 | |||
330 | async function onImportError (err: Error, tempVideoPath: string, videoImport: MVideoImportVideo) { | ||
331 | try { | ||
332 | if (tempVideoPath) await remove(tempVideoPath) | ||
333 | } catch (errUnlink) { | ||
334 | logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink }) | ||
335 | } | ||
336 | |||
337 | videoImport.error = err.message | ||
338 | if (videoImport.state !== VideoImportState.REJECTED) { | ||
339 | videoImport.state = VideoImportState.FAILED | ||
340 | } | ||
341 | await videoImport.save() | ||
342 | |||
343 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false }) | ||
344 | } | ||
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts deleted file mode 100644 index 070d1d7a2..000000000 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ /dev/null | |||
@@ -1,279 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { readdir, remove } from 'fs-extra' | ||
3 | import { join } from 'path' | ||
4 | import { peertubeTruncate } from '@server/helpers/core-utils' | ||
5 | import { CONSTRAINTS_FIELDS } from '@server/initializers/constants' | ||
6 | import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' | ||
7 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | ||
8 | import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' | ||
9 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' | ||
10 | import { generateLocalVideoMiniature, regenerateMiniaturesIfNeeded } from '@server/lib/thumbnail' | ||
11 | import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/hls-transcoding' | ||
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
13 | import { moveToNextState } from '@server/lib/video-state' | ||
14 | import { VideoModel } from '@server/models/video/video' | ||
15 | import { VideoBlacklistModel } from '@server/models/video/video-blacklist' | ||
16 | import { VideoFileModel } from '@server/models/video/video-file' | ||
17 | import { VideoLiveModel } from '@server/models/video/video-live' | ||
18 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
19 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' | ||
20 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | ||
21 | import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' | ||
22 | import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' | ||
23 | import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' | ||
24 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | ||
25 | import { JobQueue } from '../job-queue' | ||
26 | |||
27 | const lTags = loggerTagsFactory('live', 'job') | ||
28 | |||
29 | async function processVideoLiveEnding (job: Job) { | ||
30 | const payload = job.data as VideoLiveEndingPayload | ||
31 | |||
32 | logger.info('Processing video live ending for %s.', payload.videoId, { payload, ...lTags() }) | ||
33 | |||
34 | function logError () { | ||
35 | logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId, lTags()) | ||
36 | } | ||
37 | |||
38 | const video = await VideoModel.load(payload.videoId) | ||
39 | const live = await VideoLiveModel.loadByVideoId(payload.videoId) | ||
40 | const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId) | ||
41 | |||
42 | if (!video || !live || !liveSession) { | ||
43 | logError() | ||
44 | return | ||
45 | } | ||
46 | |||
47 | const permanentLive = live.permanentLive | ||
48 | |||
49 | liveSession.endingProcessed = true | ||
50 | await liveSession.save() | ||
51 | |||
52 | if (liveSession.saveReplay !== true) { | ||
53 | return cleanupLiveAndFederate({ permanentLive, video, streamingPlaylistId: payload.streamingPlaylistId }) | ||
54 | } | ||
55 | |||
56 | if (permanentLive) { | ||
57 | await saveReplayToExternalVideo({ | ||
58 | liveVideo: video, | ||
59 | liveSession, | ||
60 | publishedAt: payload.publishedAt, | ||
61 | replayDirectory: payload.replayDirectory | ||
62 | }) | ||
63 | |||
64 | return cleanupLiveAndFederate({ permanentLive, video, streamingPlaylistId: payload.streamingPlaylistId }) | ||
65 | } | ||
66 | |||
67 | return replaceLiveByReplay({ | ||
68 | video, | ||
69 | liveSession, | ||
70 | live, | ||
71 | permanentLive, | ||
72 | replayDirectory: payload.replayDirectory | ||
73 | }) | ||
74 | } | ||
75 | |||
76 | // --------------------------------------------------------------------------- | ||
77 | |||
78 | export { | ||
79 | processVideoLiveEnding | ||
80 | } | ||
81 | |||
82 | // --------------------------------------------------------------------------- | ||
83 | |||
84 | async function saveReplayToExternalVideo (options: { | ||
85 | liveVideo: MVideo | ||
86 | liveSession: MVideoLiveSession | ||
87 | publishedAt: string | ||
88 | replayDirectory: string | ||
89 | }) { | ||
90 | const { liveVideo, liveSession, publishedAt, replayDirectory } = options | ||
91 | |||
92 | const replaySettings = await VideoLiveReplaySettingModel.load(liveSession.replaySettingId) | ||
93 | |||
94 | const videoNameSuffix = ` - ${new Date(publishedAt).toLocaleString()}` | ||
95 | const truncatedVideoName = peertubeTruncate(liveVideo.name, { | ||
96 | length: CONSTRAINTS_FIELDS.VIDEOS.NAME.max - videoNameSuffix.length | ||
97 | }) | ||
98 | |||
99 | const replayVideo = new VideoModel({ | ||
100 | name: truncatedVideoName + videoNameSuffix, | ||
101 | isLive: false, | ||
102 | state: VideoState.TO_TRANSCODE, | ||
103 | duration: 0, | ||
104 | |||
105 | remote: liveVideo.remote, | ||
106 | category: liveVideo.category, | ||
107 | licence: liveVideo.licence, | ||
108 | language: liveVideo.language, | ||
109 | commentsEnabled: liveVideo.commentsEnabled, | ||
110 | downloadEnabled: liveVideo.downloadEnabled, | ||
111 | waitTranscoding: true, | ||
112 | nsfw: liveVideo.nsfw, | ||
113 | description: liveVideo.description, | ||
114 | support: liveVideo.support, | ||
115 | privacy: replaySettings.privacy, | ||
116 | channelId: liveVideo.channelId | ||
117 | }) as MVideoWithAllFiles | ||
118 | |||
119 | replayVideo.Thumbnails = [] | ||
120 | replayVideo.VideoFiles = [] | ||
121 | replayVideo.VideoStreamingPlaylists = [] | ||
122 | |||
123 | replayVideo.url = getLocalVideoActivityPubUrl(replayVideo) | ||
124 | |||
125 | await replayVideo.save() | ||
126 | |||
127 | liveSession.replayVideoId = replayVideo.id | ||
128 | await liveSession.save() | ||
129 | |||
130 | // If live is blacklisted, also blacklist the replay | ||
131 | const blacklist = await VideoBlacklistModel.loadByVideoId(liveVideo.id) | ||
132 | if (blacklist) { | ||
133 | await VideoBlacklistModel.create({ | ||
134 | videoId: replayVideo.id, | ||
135 | unfederated: blacklist.unfederated, | ||
136 | reason: blacklist.reason, | ||
137 | type: blacklist.type | ||
138 | }) | ||
139 | } | ||
140 | |||
141 | await assignReplayFilesToVideo({ video: replayVideo, replayDirectory }) | ||
142 | |||
143 | await remove(replayDirectory) | ||
144 | |||
145 | for (const type of [ ThumbnailType.MINIATURE, ThumbnailType.PREVIEW ]) { | ||
146 | const image = await generateLocalVideoMiniature({ video: replayVideo, videoFile: replayVideo.getMaxQualityFile(), type }) | ||
147 | await replayVideo.addAndSaveThumbnail(image) | ||
148 | } | ||
149 | |||
150 | await moveToNextState({ video: replayVideo, isNewVideo: true }) | ||
151 | |||
152 | await createStoryboardJob(replayVideo) | ||
153 | } | ||
154 | |||
155 | async function replaceLiveByReplay (options: { | ||
156 | video: MVideo | ||
157 | liveSession: MVideoLiveSession | ||
158 | live: MVideoLive | ||
159 | permanentLive: boolean | ||
160 | replayDirectory: string | ||
161 | }) { | ||
162 | const { video, liveSession, live, permanentLive, replayDirectory } = options | ||
163 | |||
164 | const replaySettings = await VideoLiveReplaySettingModel.load(liveSession.replaySettingId) | ||
165 | const videoWithFiles = await VideoModel.loadFull(video.id) | ||
166 | const hlsPlaylist = videoWithFiles.getHLSPlaylist() | ||
167 | |||
168 | await cleanupTMPLiveFiles(videoWithFiles, hlsPlaylist) | ||
169 | |||
170 | await live.destroy() | ||
171 | |||
172 | videoWithFiles.isLive = false | ||
173 | videoWithFiles.privacy = replaySettings.privacy | ||
174 | videoWithFiles.waitTranscoding = true | ||
175 | videoWithFiles.state = VideoState.TO_TRANSCODE | ||
176 | |||
177 | await videoWithFiles.save() | ||
178 | |||
179 | liveSession.replayVideoId = videoWithFiles.id | ||
180 | await liveSession.save() | ||
181 | |||
182 | await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) | ||
183 | |||
184 | // Reset playlist | ||
185 | hlsPlaylist.VideoFiles = [] | ||
186 | hlsPlaylist.playlistFilename = generateHLSMasterPlaylistFilename() | ||
187 | hlsPlaylist.segmentsSha256Filename = generateHlsSha256SegmentsFilename() | ||
188 | await hlsPlaylist.save() | ||
189 | |||
190 | await assignReplayFilesToVideo({ video: videoWithFiles, replayDirectory }) | ||
191 | |||
192 | // Should not happen in this function, but we keep the code if in the future we can replace the permanent live by a replay | ||
193 | if (permanentLive) { // Remove session replay | ||
194 | await remove(replayDirectory) | ||
195 | } else { // We won't stream again in this live, we can delete the base replay directory | ||
196 | await remove(getLiveReplayBaseDirectory(videoWithFiles)) | ||
197 | } | ||
198 | |||
199 | // Regenerate the thumbnail & preview? | ||
200 | await regenerateMiniaturesIfNeeded(videoWithFiles) | ||
201 | |||
202 | // We consider this is a new video | ||
203 | await moveToNextState({ video: videoWithFiles, isNewVideo: true }) | ||
204 | |||
205 | await createStoryboardJob(videoWithFiles) | ||
206 | } | ||
207 | |||
208 | async function assignReplayFilesToVideo (options: { | ||
209 | video: MVideo | ||
210 | replayDirectory: string | ||
211 | }) { | ||
212 | const { video, replayDirectory } = options | ||
213 | |||
214 | const concatenatedTsFiles = await readdir(replayDirectory) | ||
215 | |||
216 | for (const concatenatedTsFile of concatenatedTsFiles) { | ||
217 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
218 | await video.reload() | ||
219 | |||
220 | const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile) | ||
221 | |||
222 | const probe = await ffprobePromise(concatenatedTsFilePath) | ||
223 | const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) | ||
224 | const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) | ||
225 | const fps = await getVideoStreamFPS(concatenatedTsFilePath, probe) | ||
226 | |||
227 | try { | ||
228 | await generateHlsPlaylistResolutionFromTS({ | ||
229 | video, | ||
230 | inputFileMutexReleaser, | ||
231 | concatenatedTsFilePath, | ||
232 | resolution, | ||
233 | fps, | ||
234 | isAAC: audioStream?.codec_name === 'aac' | ||
235 | }) | ||
236 | } catch (err) { | ||
237 | logger.error('Cannot generate HLS playlist resolution from TS files.', { err }) | ||
238 | } | ||
239 | |||
240 | inputFileMutexReleaser() | ||
241 | } | ||
242 | |||
243 | return video | ||
244 | } | ||
245 | |||
246 | async function cleanupLiveAndFederate (options: { | ||
247 | video: MVideo | ||
248 | permanentLive: boolean | ||
249 | streamingPlaylistId: number | ||
250 | }) { | ||
251 | const { permanentLive, video, streamingPlaylistId } = options | ||
252 | |||
253 | const streamingPlaylist = await VideoStreamingPlaylistModel.loadWithVideo(streamingPlaylistId) | ||
254 | |||
255 | if (streamingPlaylist) { | ||
256 | if (permanentLive) { | ||
257 | await cleanupAndDestroyPermanentLive(video, streamingPlaylist) | ||
258 | } else { | ||
259 | await cleanupUnsavedNormalLive(video, streamingPlaylist) | ||
260 | } | ||
261 | } | ||
262 | |||
263 | try { | ||
264 | const fullVideo = await VideoModel.loadFull(video.id) | ||
265 | return federateVideoIfNeeded(fullVideo, false, undefined) | ||
266 | } catch (err) { | ||
267 | logger.warn('Cannot federate live after cleanup', { videoId: video.id, err }) | ||
268 | } | ||
269 | } | ||
270 | |||
271 | function createStoryboardJob (video: MVideo) { | ||
272 | return JobQueue.Instance.createJob({ | ||
273 | type: 'generate-video-storyboard' as 'generate-video-storyboard', | ||
274 | payload: { | ||
275 | videoUUID: video.uuid, | ||
276 | federate: true | ||
277 | } | ||
278 | }) | ||
279 | } | ||
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts deleted file mode 100644 index bac99fdb7..000000000 --- a/server/lib/job-queue/handlers/video-redundancy.ts +++ /dev/null | |||
@@ -1,17 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' | ||
3 | import { VideoRedundancyPayload } from '@shared/models' | ||
4 | import { logger } from '../../../helpers/logger' | ||
5 | |||
6 | async function processVideoRedundancy (job: Job) { | ||
7 | const payload = job.data as VideoRedundancyPayload | ||
8 | logger.info('Processing video redundancy in job %s.', job.id) | ||
9 | |||
10 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) | ||
11 | } | ||
12 | |||
13 | // --------------------------------------------------------------------------- | ||
14 | |||
15 | export { | ||
16 | processVideoRedundancy | ||
17 | } | ||
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts deleted file mode 100644 index caf051bfa..000000000 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ /dev/null | |||
@@ -1,180 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { remove } from 'fs-extra' | ||
3 | import { join } from 'path' | ||
4 | import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' | ||
5 | import { CONFIG } from '@server/initializers/config' | ||
6 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | ||
7 | import { isAbleToUploadVideo } from '@server/lib/user' | ||
8 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
9 | import { approximateIntroOutroAdditionalSize, onVideoStudioEnded, safeCleanupStudioTMPFiles } from '@server/lib/video-studio' | ||
10 | import { UserModel } from '@server/models/user/user' | ||
11 | import { VideoModel } from '@server/models/video/video' | ||
12 | import { MVideo, MVideoFullLight } from '@server/types/models' | ||
13 | import { pick } from '@shared/core-utils' | ||
14 | import { buildUUID } from '@shared/extra-utils' | ||
15 | import { FFmpegEdition } from '@shared/ffmpeg' | ||
16 | import { | ||
17 | VideoStudioEditionPayload, | ||
18 | VideoStudioTask, | ||
19 | VideoStudioTaskCutPayload, | ||
20 | VideoStudioTaskIntroPayload, | ||
21 | VideoStudioTaskOutroPayload, | ||
22 | VideoStudioTaskPayload, | ||
23 | VideoStudioTaskWatermarkPayload | ||
24 | } from '@shared/models' | ||
25 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | ||
26 | |||
27 | const lTagsBase = loggerTagsFactory('video-studio') | ||
28 | |||
29 | async function processVideoStudioEdition (job: Job) { | ||
30 | const payload = job.data as VideoStudioEditionPayload | ||
31 | const lTags = lTagsBase(payload.videoUUID) | ||
32 | |||
33 | logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags) | ||
34 | |||
35 | try { | ||
36 | const video = await VideoModel.loadFull(payload.videoUUID) | ||
37 | |||
38 | // No video, maybe deleted? | ||
39 | if (!video) { | ||
40 | logger.info('Can\'t process job %d, video does not exist.', job.id, lTags) | ||
41 | |||
42 | await safeCleanupStudioTMPFiles(payload.tasks) | ||
43 | return undefined | ||
44 | } | ||
45 | |||
46 | await checkUserQuotaOrThrow(video, payload) | ||
47 | |||
48 | const inputFile = video.getMaxQualityFile() | ||
49 | |||
50 | const editionResultPath = await VideoPathManager.Instance.makeAvailableVideoFile(inputFile, async originalFilePath => { | ||
51 | let tmpInputFilePath: string | ||
52 | let outputPath: string | ||
53 | |||
54 | for (const task of payload.tasks) { | ||
55 | const outputFilename = buildUUID() + inputFile.extname | ||
56 | outputPath = join(CONFIG.STORAGE.TMP_DIR, outputFilename) | ||
57 | |||
58 | await processTask({ | ||
59 | inputPath: tmpInputFilePath ?? originalFilePath, | ||
60 | video, | ||
61 | outputPath, | ||
62 | task, | ||
63 | lTags | ||
64 | }) | ||
65 | |||
66 | if (tmpInputFilePath) await remove(tmpInputFilePath) | ||
67 | |||
68 | // For the next iteration | ||
69 | tmpInputFilePath = outputPath | ||
70 | } | ||
71 | |||
72 | return outputPath | ||
73 | }) | ||
74 | |||
75 | logger.info('Video edition ended for video %s.', video.uuid, lTags) | ||
76 | |||
77 | await onVideoStudioEnded({ video, editionResultPath, tasks: payload.tasks }) | ||
78 | } catch (err) { | ||
79 | await safeCleanupStudioTMPFiles(payload.tasks) | ||
80 | |||
81 | throw err | ||
82 | } | ||
83 | } | ||
84 | |||
85 | // --------------------------------------------------------------------------- | ||
86 | |||
87 | export { | ||
88 | processVideoStudioEdition | ||
89 | } | ||
90 | |||
91 | // --------------------------------------------------------------------------- | ||
92 | |||
93 | type TaskProcessorOptions <T extends VideoStudioTaskPayload = VideoStudioTaskPayload> = { | ||
94 | inputPath: string | ||
95 | outputPath: string | ||
96 | video: MVideo | ||
97 | task: T | ||
98 | lTags: { tags: string[] } | ||
99 | } | ||
100 | |||
101 | const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = { | ||
102 | 'add-intro': processAddIntroOutro, | ||
103 | 'add-outro': processAddIntroOutro, | ||
104 | 'cut': processCut, | ||
105 | 'add-watermark': processAddWatermark | ||
106 | } | ||
107 | |||
108 | async function processTask (options: TaskProcessorOptions) { | ||
109 | const { video, task, lTags } = options | ||
110 | |||
111 | logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...lTags }) | ||
112 | |||
113 | const processor = taskProcessors[options.task.name] | ||
114 | if (!process) throw new Error('Unknown task ' + task.name) | ||
115 | |||
116 | return processor(options) | ||
117 | } | ||
118 | |||
119 | function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { | ||
120 | const { task, lTags } = options | ||
121 | |||
122 | logger.debug('Will add intro/outro to the video.', { options, ...lTags }) | ||
123 | |||
124 | return buildFFmpegEdition().addIntroOutro({ | ||
125 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
126 | |||
127 | introOutroPath: task.options.file, | ||
128 | type: task.name === 'add-intro' | ||
129 | ? 'intro' | ||
130 | : 'outro' | ||
131 | }) | ||
132 | } | ||
133 | |||
134 | function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { | ||
135 | const { task, lTags } = options | ||
136 | |||
137 | logger.debug('Will cut the video.', { options, ...lTags }) | ||
138 | |||
139 | return buildFFmpegEdition().cutVideo({ | ||
140 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
141 | |||
142 | start: task.options.start, | ||
143 | end: task.options.end | ||
144 | }) | ||
145 | } | ||
146 | |||
147 | function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { | ||
148 | const { task, lTags } = options | ||
149 | |||
150 | logger.debug('Will add watermark to the video.', { options, ...lTags }) | ||
151 | |||
152 | return buildFFmpegEdition().addWatermark({ | ||
153 | ...pick(options, [ 'inputPath', 'outputPath' ]), | ||
154 | |||
155 | watermarkPath: task.options.file, | ||
156 | |||
157 | videoFilters: { | ||
158 | watermarkSizeRatio: task.options.watermarkSizeRatio, | ||
159 | horitonzalMarginRatio: task.options.horitonzalMarginRatio, | ||
160 | verticalMarginRatio: task.options.verticalMarginRatio | ||
161 | } | ||
162 | }) | ||
163 | } | ||
164 | |||
165 | // --------------------------------------------------------------------------- | ||
166 | |||
167 | async function checkUserQuotaOrThrow (video: MVideoFullLight, payload: VideoStudioEditionPayload) { | ||
168 | const user = await UserModel.loadByVideoId(video.id) | ||
169 | |||
170 | const filePathFinder = (i: number) => (payload.tasks[i] as VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload).options.file | ||
171 | |||
172 | const additionalBytes = await approximateIntroOutroAdditionalSize(video, payload.tasks, filePathFinder) | ||
173 | if (await isAbleToUploadVideo(user.id, additionalBytes) === false) { | ||
174 | throw new Error('Quota exceeded for this user to edit the video') | ||
175 | } | ||
176 | } | ||
177 | |||
178 | function buildFFmpegEdition () { | ||
179 | return new FFmpegEdition(getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) | ||
180 | } | ||
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts deleted file mode 100644 index 1c8f4fd9f..000000000 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ /dev/null | |||
@@ -1,150 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' | ||
3 | import { generateHlsPlaylistResolution } from '@server/lib/transcoding/hls-transcoding' | ||
4 | import { mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebVideoResolution } from '@server/lib/transcoding/web-transcoding' | ||
5 | import { removeAllWebVideoFiles } from '@server/lib/video-file' | ||
6 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
7 | import { moveToFailedTranscodingState } from '@server/lib/video-state' | ||
8 | import { UserModel } from '@server/models/user/user' | ||
9 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
10 | import { MUser, MUserId, MVideoFullLight } from '@server/types/models' | ||
11 | import { | ||
12 | HLSTranscodingPayload, | ||
13 | MergeAudioTranscodingPayload, | ||
14 | NewWebVideoResolutionTranscodingPayload, | ||
15 | OptimizeTranscodingPayload, | ||
16 | VideoTranscodingPayload | ||
17 | } from '@shared/models' | ||
18 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | ||
19 | import { VideoModel } from '../../../models/video/video' | ||
20 | |||
21 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> | ||
22 | |||
23 | const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { | ||
24 | 'new-resolution-to-hls': handleHLSJob, | ||
25 | 'new-resolution-to-web-video': handleNewWebVideoResolutionJob, | ||
26 | 'merge-audio-to-web-video': handleWebVideoMergeAudioJob, | ||
27 | 'optimize-to-web-video': handleWebVideoOptimizeJob | ||
28 | } | ||
29 | |||
30 | const lTags = loggerTagsFactory('transcoding') | ||
31 | |||
32 | async function processVideoTranscoding (job: Job) { | ||
33 | const payload = job.data as VideoTranscodingPayload | ||
34 | logger.info('Processing transcoding job %s.', job.id, lTags(payload.videoUUID)) | ||
35 | |||
36 | const video = await VideoModel.loadFull(payload.videoUUID) | ||
37 | // No video, maybe deleted? | ||
38 | if (!video) { | ||
39 | logger.info('Do not process job %d, video does not exist.', job.id, lTags(payload.videoUUID)) | ||
40 | return undefined | ||
41 | } | ||
42 | |||
43 | const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) | ||
44 | |||
45 | const handler = handlers[payload.type] | ||
46 | |||
47 | if (!handler) { | ||
48 | await moveToFailedTranscodingState(video) | ||
49 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
50 | |||
51 | throw new Error('Cannot find transcoding handler for ' + payload.type) | ||
52 | } | ||
53 | |||
54 | try { | ||
55 | await handler(job, payload, video, user) | ||
56 | } catch (error) { | ||
57 | await moveToFailedTranscodingState(video) | ||
58 | |||
59 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
60 | |||
61 | throw error | ||
62 | } | ||
63 | |||
64 | return video | ||
65 | } | ||
66 | |||
67 | // --------------------------------------------------------------------------- | ||
68 | |||
69 | export { | ||
70 | processVideoTranscoding | ||
71 | } | ||
72 | |||
73 | // --------------------------------------------------------------------------- | ||
74 | // Job handlers | ||
75 | // --------------------------------------------------------------------------- | ||
76 | |||
77 | async function handleWebVideoMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { | ||
78 | logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid), { payload }) | ||
79 | |||
80 | await mergeAudioVideofile({ video, resolution: payload.resolution, fps: payload.fps, job }) | ||
81 | |||
82 | logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid), { payload }) | ||
83 | |||
84 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: !payload.hasChildren, video }) | ||
85 | } | ||
86 | |||
87 | async function handleWebVideoOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { | ||
88 | logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid), { payload }) | ||
89 | |||
90 | await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), quickTranscode: payload.quickTranscode, job }) | ||
91 | |||
92 | logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid), { payload }) | ||
93 | |||
94 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: !payload.hasChildren, video }) | ||
95 | } | ||
96 | |||
97 | // --------------------------------------------------------------------------- | ||
98 | |||
99 | async function handleNewWebVideoResolutionJob (job: Job, payload: NewWebVideoResolutionTranscodingPayload, video: MVideoFullLight) { | ||
100 | logger.info('Handling Web Video transcoding job for %s.', video.uuid, lTags(video.uuid), { payload }) | ||
101 | |||
102 | await transcodeNewWebVideoResolution({ video, resolution: payload.resolution, fps: payload.fps, job }) | ||
103 | |||
104 | logger.info('Web Video transcoding job for %s ended.', video.uuid, lTags(video.uuid), { payload }) | ||
105 | |||
106 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) | ||
107 | } | ||
108 | |||
109 | // --------------------------------------------------------------------------- | ||
110 | |||
111 | async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, videoArg: MVideoFullLight) { | ||
112 | logger.info('Handling HLS transcoding job for %s.', videoArg.uuid, lTags(videoArg.uuid), { payload }) | ||
113 | |||
114 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid) | ||
115 | let video: MVideoFullLight | ||
116 | |||
117 | try { | ||
118 | video = await VideoModel.loadFull(videoArg.uuid) | ||
119 | |||
120 | const videoFileInput = payload.copyCodecs | ||
121 | ? video.getWebVideoFile(payload.resolution) | ||
122 | : video.getMaxQualityFile() | ||
123 | |||
124 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() | ||
125 | |||
126 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { | ||
127 | return generateHlsPlaylistResolution({ | ||
128 | video, | ||
129 | videoInputPath, | ||
130 | inputFileMutexReleaser, | ||
131 | resolution: payload.resolution, | ||
132 | fps: payload.fps, | ||
133 | copyCodecs: payload.copyCodecs, | ||
134 | job | ||
135 | }) | ||
136 | }) | ||
137 | } finally { | ||
138 | inputFileMutexReleaser() | ||
139 | } | ||
140 | |||
141 | logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid), { payload }) | ||
142 | |||
143 | if (payload.deleteWebVideoFiles === true) { | ||
144 | logger.info('Removing Web Video files of %s now we have a HLS version of it.', video.uuid, lTags(video.uuid)) | ||
145 | |||
146 | await removeAllWebVideoFiles(video) | ||
147 | } | ||
148 | |||
149 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) | ||
150 | } | ||
diff --git a/server/lib/job-queue/handlers/video-views-stats.ts b/server/lib/job-queue/handlers/video-views-stats.ts deleted file mode 100644 index c9aa218e5..000000000 --- a/server/lib/job-queue/handlers/video-views-stats.ts +++ /dev/null | |||
@@ -1,57 +0,0 @@ | |||
1 | import { VideoViewModel } from '@server/models/view/video-view' | ||
2 | import { isTestOrDevInstance } from '../../../helpers/core-utils' | ||
3 | import { logger } from '../../../helpers/logger' | ||
4 | import { VideoModel } from '../../../models/video/video' | ||
5 | import { Redis } from '../../redis' | ||
6 | |||
7 | async function processVideosViewsStats () { | ||
8 | const lastHour = new Date() | ||
9 | |||
10 | // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour | ||
11 | if (!isTestOrDevInstance()) lastHour.setHours(lastHour.getHours() - 1) | ||
12 | |||
13 | const hour = lastHour.getHours() | ||
14 | const startDate = lastHour.setMinutes(0, 0, 0) | ||
15 | const endDate = lastHour.setMinutes(59, 59, 999) | ||
16 | |||
17 | const videoIds = await Redis.Instance.listVideosViewedForStats(hour) | ||
18 | if (videoIds.length === 0) return | ||
19 | |||
20 | logger.info('Processing videos views stats in job for hour %d.', hour) | ||
21 | |||
22 | for (const videoId of videoIds) { | ||
23 | try { | ||
24 | const views = await Redis.Instance.getVideoViewsStats(videoId, hour) | ||
25 | await Redis.Instance.deleteVideoViewsStats(videoId, hour) | ||
26 | |||
27 | if (views) { | ||
28 | logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour) | ||
29 | |||
30 | try { | ||
31 | const video = await VideoModel.load(videoId) | ||
32 | if (!video) { | ||
33 | logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId) | ||
34 | continue | ||
35 | } | ||
36 | |||
37 | await VideoViewModel.create({ | ||
38 | startDate: new Date(startDate), | ||
39 | endDate: new Date(endDate), | ||
40 | views, | ||
41 | videoId | ||
42 | }) | ||
43 | } catch (err) { | ||
44 | logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err }) | ||
45 | } | ||
46 | } | ||
47 | } catch (err) { | ||
48 | logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err }) | ||
49 | } | ||
50 | } | ||
51 | } | ||
52 | |||
53 | // --------------------------------------------------------------------------- | ||
54 | |||
55 | export { | ||
56 | processVideosViewsStats | ||
57 | } | ||