diff options
author | Chocobozzz <me@florianbigard.com> | 2023-07-31 14:34:36 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-08-11 15:02:33 +0200 |
commit | 3a4992633ee62d5edfbb484d9c6bcb3cf158489d (patch) | |
tree | e4510b39bdac9c318fdb4b47018d08f15368b8f0 /server/lib/schedulers/videos-redundancy-scheduler.ts | |
parent | 04d1da5621d25d59bd5fa1543b725c497bf5d9a8 (diff) | |
download | PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.gz PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.zst PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.zip |
Migrate server to ESM
Sorry for the very big commit that may lead to git log issues and merge
conflicts, but it's a major step forward:
* Server can be faster at startup because imports() are async and we can
easily lazy import big modules
* Angular doesn't seem to support ES import (with .js extension), so we
had to correctly organize peertube into a monorepo:
* Use yarn workspace feature
* Use typescript reference projects for dependencies
* Shared projects have been moved into "packages", each one is now a
node module (with a dedicated package.json/tsconfig.json)
* server/tools have been moved into apps/ and is now a dedicated app
bundled and published on NPM so users don't have to build peertube
cli tools manually
* server/tests have been moved into packages/ so we don't compile
them every time we want to run the server
* Use isolatedModule option:
* Had to move from const enum to const
(https://www.typescriptlang.org/docs/handbook/enums.html#objects-vs-enums)
* Had to explictely specify "type" imports when used in decorators
* Prefer tsx (that uses esbuild under the hood) instead of ts-node to
load typescript files (tests with mocha or scripts):
* To reduce test complexity as esbuild doesn't support decorator
metadata, we only test server files that do not import server
models
* We still build tests files into js files for a faster CI
* Remove unmaintained peertube CLI import script
* Removed some barrels to speed up execution (less imports)
Diffstat (limited to 'server/lib/schedulers/videos-redundancy-scheduler.ts')
-rw-r--r-- | server/lib/schedulers/videos-redundancy-scheduler.ts | 375 |
1 files changed, 0 insertions, 375 deletions
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts deleted file mode 100644 index 91625ccb5..000000000 --- a/server/lib/schedulers/videos-redundancy-scheduler.ts +++ /dev/null | |||
@@ -1,375 +0,0 @@ | |||
1 | import { move } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { getServerActor } from '@server/models/application/application' | ||
4 | import { VideoModel } from '@server/models/video/video' | ||
5 | import { | ||
6 | MStreamingPlaylistFiles, | ||
7 | MVideoAccountLight, | ||
8 | MVideoFile, | ||
9 | MVideoFileVideo, | ||
10 | MVideoRedundancyFileVideo, | ||
11 | MVideoRedundancyStreamingPlaylistVideo, | ||
12 | MVideoRedundancyVideo, | ||
13 | MVideoWithAllFiles | ||
14 | } from '@server/types/models' | ||
15 | import { VideosRedundancyStrategy } from '../../../shared/models/redundancy' | ||
16 | import { logger, loggerTagsFactory } from '../../helpers/logger' | ||
17 | import { downloadWebTorrentVideo } from '../../helpers/webtorrent' | ||
18 | import { CONFIG } from '../../initializers/config' | ||
19 | import { DIRECTORIES, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers/constants' | ||
20 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' | ||
21 | import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' | ||
22 | import { getLocalVideoCacheFileActivityPubUrl, getLocalVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url' | ||
23 | import { getOrCreateAPVideo } from '../activitypub/videos' | ||
24 | import { downloadPlaylistSegments } from '../hls' | ||
25 | import { removeVideoRedundancy } from '../redundancy' | ||
26 | import { generateHLSRedundancyUrl, generateWebVideoRedundancyUrl } from '../video-urls' | ||
27 | import { AbstractScheduler } from './abstract-scheduler' | ||
28 | |||
29 | const lTags = loggerTagsFactory('redundancy') | ||
30 | |||
31 | type CandidateToDuplicate = { | ||
32 | redundancy: VideosRedundancyStrategy | ||
33 | video: MVideoWithAllFiles | ||
34 | files: MVideoFile[] | ||
35 | streamingPlaylists: MStreamingPlaylistFiles[] | ||
36 | } | ||
37 | |||
38 | function isMVideoRedundancyFileVideo ( | ||
39 | o: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo | ||
40 | ): o is MVideoRedundancyFileVideo { | ||
41 | return !!(o as MVideoRedundancyFileVideo).VideoFile | ||
42 | } | ||
43 | |||
44 | export class VideosRedundancyScheduler extends AbstractScheduler { | ||
45 | |||
46 | private static instance: VideosRedundancyScheduler | ||
47 | |||
48 | protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL | ||
49 | |||
50 | private constructor () { | ||
51 | super() | ||
52 | } | ||
53 | |||
54 | async createManualRedundancy (videoId: number) { | ||
55 | const videoToDuplicate = await VideoModel.loadWithFiles(videoId) | ||
56 | |||
57 | if (!videoToDuplicate) { | ||
58 | logger.warn('Video to manually duplicate %d does not exist anymore.', videoId, lTags()) | ||
59 | return | ||
60 | } | ||
61 | |||
62 | return this.createVideoRedundancies({ | ||
63 | video: videoToDuplicate, | ||
64 | redundancy: null, | ||
65 | files: videoToDuplicate.VideoFiles, | ||
66 | streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists | ||
67 | }) | ||
68 | } | ||
69 | |||
70 | protected async internalExecute () { | ||
71 | for (const redundancyConfig of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { | ||
72 | logger.info('Running redundancy scheduler for strategy %s.', redundancyConfig.strategy, lTags()) | ||
73 | |||
74 | try { | ||
75 | const videoToDuplicate = await this.findVideoToDuplicate(redundancyConfig) | ||
76 | if (!videoToDuplicate) continue | ||
77 | |||
78 | const candidateToDuplicate = { | ||
79 | video: videoToDuplicate, | ||
80 | redundancy: redundancyConfig, | ||
81 | files: videoToDuplicate.VideoFiles, | ||
82 | streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists | ||
83 | } | ||
84 | |||
85 | await this.purgeCacheIfNeeded(candidateToDuplicate) | ||
86 | |||
87 | if (await this.isTooHeavy(candidateToDuplicate)) { | ||
88 | logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url, lTags(videoToDuplicate.uuid)) | ||
89 | continue | ||
90 | } | ||
91 | |||
92 | logger.info( | ||
93 | 'Will duplicate video %s in redundancy scheduler "%s".', | ||
94 | videoToDuplicate.url, redundancyConfig.strategy, lTags(videoToDuplicate.uuid) | ||
95 | ) | ||
96 | |||
97 | await this.createVideoRedundancies(candidateToDuplicate) | ||
98 | } catch (err) { | ||
99 | logger.error('Cannot run videos redundancy %s.', redundancyConfig.strategy, { err, ...lTags() }) | ||
100 | } | ||
101 | } | ||
102 | |||
103 | await this.extendsLocalExpiration() | ||
104 | |||
105 | await this.purgeRemoteExpired() | ||
106 | } | ||
107 | |||
108 | static get Instance () { | ||
109 | return this.instance || (this.instance = new this()) | ||
110 | } | ||
111 | |||
112 | private async extendsLocalExpiration () { | ||
113 | const expired = await VideoRedundancyModel.listLocalExpired() | ||
114 | |||
115 | for (const redundancyModel of expired) { | ||
116 | try { | ||
117 | const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) | ||
118 | |||
119 | // If the admin disabled the redundancy, remove this redundancy instead of extending it | ||
120 | if (!redundancyConfig) { | ||
121 | logger.info( | ||
122 | 'Destroying redundancy %s because the redundancy %s does not exist anymore.', | ||
123 | redundancyModel.url, redundancyModel.strategy | ||
124 | ) | ||
125 | |||
126 | await removeVideoRedundancy(redundancyModel) | ||
127 | continue | ||
128 | } | ||
129 | |||
130 | const { totalUsed } = await VideoRedundancyModel.getStats(redundancyConfig.strategy) | ||
131 | |||
132 | // If the admin decreased the cache size, remove this redundancy instead of extending it | ||
133 | if (totalUsed > redundancyConfig.size) { | ||
134 | logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy) | ||
135 | |||
136 | await removeVideoRedundancy(redundancyModel) | ||
137 | continue | ||
138 | } | ||
139 | |||
140 | await this.extendsRedundancy(redundancyModel) | ||
141 | } catch (err) { | ||
142 | logger.error( | ||
143 | 'Cannot extend or remove expiration of %s video from our redundancy system.', | ||
144 | this.buildEntryLogId(redundancyModel), { err, ...lTags(redundancyModel.getVideoUUID()) } | ||
145 | ) | ||
146 | } | ||
147 | } | ||
148 | } | ||
149 | |||
150 | private async extendsRedundancy (redundancyModel: MVideoRedundancyVideo) { | ||
151 | const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) | ||
152 | // Redundancy strategy disabled, remove our redundancy instead of extending expiration | ||
153 | if (!redundancy) { | ||
154 | await removeVideoRedundancy(redundancyModel) | ||
155 | return | ||
156 | } | ||
157 | |||
158 | await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime) | ||
159 | } | ||
160 | |||
161 | private async purgeRemoteExpired () { | ||
162 | const expired = await VideoRedundancyModel.listRemoteExpired() | ||
163 | |||
164 | for (const redundancyModel of expired) { | ||
165 | try { | ||
166 | await removeVideoRedundancy(redundancyModel) | ||
167 | } catch (err) { | ||
168 | logger.error( | ||
169 | 'Cannot remove redundancy %s from our redundancy system.', | ||
170 | this.buildEntryLogId(redundancyModel), lTags(redundancyModel.getVideoUUID()) | ||
171 | ) | ||
172 | } | ||
173 | } | ||
174 | } | ||
175 | |||
176 | private findVideoToDuplicate (cache: VideosRedundancyStrategy) { | ||
177 | if (cache.strategy === 'most-views') { | ||
178 | return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR) | ||
179 | } | ||
180 | |||
181 | if (cache.strategy === 'trending') { | ||
182 | return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR) | ||
183 | } | ||
184 | |||
185 | if (cache.strategy === 'recently-added') { | ||
186 | const minViews = cache.minViews | ||
187 | return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews) | ||
188 | } | ||
189 | } | ||
190 | |||
191 | private async createVideoRedundancies (data: CandidateToDuplicate) { | ||
192 | const video = await this.loadAndRefreshVideo(data.video.url) | ||
193 | |||
194 | if (!video) { | ||
195 | logger.info('Video %s we want to duplicate does not existing anymore, skipping.', data.video.url, lTags(data.video.uuid)) | ||
196 | |||
197 | return | ||
198 | } | ||
199 | |||
200 | for (const file of data.files) { | ||
201 | const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id) | ||
202 | if (existingRedundancy) { | ||
203 | await this.extendsRedundancy(existingRedundancy) | ||
204 | |||
205 | continue | ||
206 | } | ||
207 | |||
208 | await this.createVideoFileRedundancy(data.redundancy, video, file) | ||
209 | } | ||
210 | |||
211 | for (const streamingPlaylist of data.streamingPlaylists) { | ||
212 | const existingRedundancy = await VideoRedundancyModel.loadLocalByStreamingPlaylistId(streamingPlaylist.id) | ||
213 | if (existingRedundancy) { | ||
214 | await this.extendsRedundancy(existingRedundancy) | ||
215 | |||
216 | continue | ||
217 | } | ||
218 | |||
219 | await this.createStreamingPlaylistRedundancy(data.redundancy, video, streamingPlaylist) | ||
220 | } | ||
221 | } | ||
222 | |||
223 | private async createVideoFileRedundancy (redundancy: VideosRedundancyStrategy | null, video: MVideoAccountLight, fileArg: MVideoFile) { | ||
224 | let strategy = 'manual' | ||
225 | let expiresOn: Date = null | ||
226 | |||
227 | if (redundancy) { | ||
228 | strategy = redundancy.strategy | ||
229 | expiresOn = this.buildNewExpiration(redundancy.minLifetime) | ||
230 | } | ||
231 | |||
232 | const file = fileArg as MVideoFileVideo | ||
233 | file.Video = video | ||
234 | |||
235 | const serverActor = await getServerActor() | ||
236 | |||
237 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy, lTags(video.uuid)) | ||
238 | |||
239 | const tmpPath = await downloadWebTorrentVideo({ uri: file.torrentUrl }, VIDEO_IMPORT_TIMEOUT) | ||
240 | |||
241 | const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, file.filename) | ||
242 | await move(tmpPath, destPath, { overwrite: true }) | ||
243 | |||
244 | const createdModel: MVideoRedundancyFileVideo = await VideoRedundancyModel.create({ | ||
245 | expiresOn, | ||
246 | url: getLocalVideoCacheFileActivityPubUrl(file), | ||
247 | fileUrl: generateWebVideoRedundancyUrl(file), | ||
248 | strategy, | ||
249 | videoFileId: file.id, | ||
250 | actorId: serverActor.id | ||
251 | }) | ||
252 | |||
253 | createdModel.VideoFile = file | ||
254 | |||
255 | await sendCreateCacheFile(serverActor, video, createdModel) | ||
256 | |||
257 | logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url, lTags(video.uuid)) | ||
258 | } | ||
259 | |||
260 | private async createStreamingPlaylistRedundancy ( | ||
261 | redundancy: VideosRedundancyStrategy, | ||
262 | video: MVideoAccountLight, | ||
263 | playlistArg: MStreamingPlaylistFiles | ||
264 | ) { | ||
265 | let strategy = 'manual' | ||
266 | let expiresOn: Date = null | ||
267 | |||
268 | if (redundancy) { | ||
269 | strategy = redundancy.strategy | ||
270 | expiresOn = this.buildNewExpiration(redundancy.minLifetime) | ||
271 | } | ||
272 | |||
273 | const playlist = Object.assign(playlistArg, { Video: video }) | ||
274 | const serverActor = await getServerActor() | ||
275 | |||
276 | logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, strategy, lTags(video.uuid)) | ||
277 | |||
278 | const destDirectory = join(DIRECTORIES.HLS_REDUNDANCY, video.uuid) | ||
279 | const masterPlaylistUrl = playlist.getMasterPlaylistUrl(video) | ||
280 | |||
281 | const maxSizeKB = this.getTotalFileSizes([], [ playlist ]) / 1000 | ||
282 | const toleranceKB = maxSizeKB + ((5 * maxSizeKB) / 100) // 5% more tolerance | ||
283 | await downloadPlaylistSegments(masterPlaylistUrl, destDirectory, VIDEO_IMPORT_TIMEOUT, toleranceKB) | ||
284 | |||
285 | const createdModel: MVideoRedundancyStreamingPlaylistVideo = await VideoRedundancyModel.create({ | ||
286 | expiresOn, | ||
287 | url: getLocalVideoCacheStreamingPlaylistActivityPubUrl(video, playlist), | ||
288 | fileUrl: generateHLSRedundancyUrl(video, playlistArg), | ||
289 | strategy, | ||
290 | videoStreamingPlaylistId: playlist.id, | ||
291 | actorId: serverActor.id | ||
292 | }) | ||
293 | |||
294 | createdModel.VideoStreamingPlaylist = playlist | ||
295 | |||
296 | await sendCreateCacheFile(serverActor, video, createdModel) | ||
297 | |||
298 | logger.info('Duplicated playlist %s -> %s.', masterPlaylistUrl, createdModel.url, lTags(video.uuid)) | ||
299 | } | ||
300 | |||
301 | private async extendsExpirationOf (redundancy: MVideoRedundancyVideo, expiresAfterMs: number) { | ||
302 | logger.info('Extending expiration of %s.', redundancy.url, lTags(redundancy.getVideoUUID())) | ||
303 | |||
304 | const serverActor = await getServerActor() | ||
305 | |||
306 | redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs) | ||
307 | await redundancy.save() | ||
308 | |||
309 | await sendUpdateCacheFile(serverActor, redundancy) | ||
310 | } | ||
311 | |||
312 | private async purgeCacheIfNeeded (candidateToDuplicate: CandidateToDuplicate) { | ||
313 | while (await this.isTooHeavy(candidateToDuplicate)) { | ||
314 | const redundancy = candidateToDuplicate.redundancy | ||
315 | const toDelete = await VideoRedundancyModel.loadOldestLocalExpired(redundancy.strategy, redundancy.minLifetime) | ||
316 | if (!toDelete) return | ||
317 | |||
318 | const videoId = toDelete.VideoFile | ||
319 | ? toDelete.VideoFile.videoId | ||
320 | : toDelete.VideoStreamingPlaylist.videoId | ||
321 | |||
322 | const redundancies = await VideoRedundancyModel.listLocalByVideoId(videoId) | ||
323 | |||
324 | for (const redundancy of redundancies) { | ||
325 | await removeVideoRedundancy(redundancy) | ||
326 | } | ||
327 | } | ||
328 | } | ||
329 | |||
330 | private async isTooHeavy (candidateToDuplicate: CandidateToDuplicate) { | ||
331 | const maxSize = candidateToDuplicate.redundancy.size | ||
332 | |||
333 | const { totalUsed: alreadyUsed } = await VideoRedundancyModel.getStats(candidateToDuplicate.redundancy.strategy) | ||
334 | |||
335 | const videoSize = this.getTotalFileSizes(candidateToDuplicate.files, candidateToDuplicate.streamingPlaylists) | ||
336 | const willUse = alreadyUsed + videoSize | ||
337 | |||
338 | logger.debug('Checking candidate size.', { maxSize, alreadyUsed, videoSize, willUse, ...lTags(candidateToDuplicate.video.uuid) }) | ||
339 | |||
340 | return willUse > maxSize | ||
341 | } | ||
342 | |||
343 | private buildNewExpiration (expiresAfterMs: number) { | ||
344 | return new Date(Date.now() + expiresAfterMs) | ||
345 | } | ||
346 | |||
347 | private buildEntryLogId (object: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo) { | ||
348 | if (isMVideoRedundancyFileVideo(object)) return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}` | ||
349 | |||
350 | return `${object.VideoStreamingPlaylist.getMasterPlaylistUrl(object.VideoStreamingPlaylist.Video)}` | ||
351 | } | ||
352 | |||
353 | private getTotalFileSizes (files: MVideoFile[], playlists: MStreamingPlaylistFiles[]): number { | ||
354 | const fileReducer = (previous: number, current: MVideoFile) => previous + current.size | ||
355 | |||
356 | let allFiles = files | ||
357 | for (const p of playlists) { | ||
358 | allFiles = allFiles.concat(p.VideoFiles) | ||
359 | } | ||
360 | |||
361 | return allFiles.reduce(fileReducer, 0) | ||
362 | } | ||
363 | |||
364 | private async loadAndRefreshVideo (videoUrl: string) { | ||
365 | // We need more attributes and check if the video still exists | ||
366 | const getVideoOptions = { | ||
367 | videoObject: videoUrl, | ||
368 | syncParam: { rates: false, shares: false, comments: false, refreshVideo: true }, | ||
369 | fetchType: 'all' as 'all' | ||
370 | } | ||
371 | const { video } = await getOrCreateAPVideo(getVideoOptions) | ||
372 | |||
373 | return video | ||
374 | } | ||
375 | } | ||