diff options
author | Chocobozzz <me@florianbigard.com> | 2018-09-11 16:27:07 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-09-13 14:05:49 +0200 |
commit | c48e82b5e0478434de30626d14594a97f2402e7c (patch) | |
tree | a78e5272bd0fe4f5b41831e571e02d05f1515b82 /server/lib/schedulers/videos-redundancy-scheduler.ts | |
parent | a651038487faa838bda3ce04695b08bc65baff70 (diff) | |
download | PeerTube-c48e82b5e0478434de30626d14594a97f2402e7c.tar.gz PeerTube-c48e82b5e0478434de30626d14594a97f2402e7c.tar.zst PeerTube-c48e82b5e0478434de30626d14594a97f2402e7c.zip |
Basic video redundancy implementation
Diffstat (limited to 'server/lib/schedulers/videos-redundancy-scheduler.ts')
-rw-r--r-- | server/lib/schedulers/videos-redundancy-scheduler.ts | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts new file mode 100644 index 000000000..ee9ba1766 --- /dev/null +++ b/server/lib/schedulers/videos-redundancy-scheduler.ts | |||
@@ -0,0 +1,161 @@ | |||
1 | import { AbstractScheduler } from './abstract-scheduler' | ||
2 | import { CONFIG, JOB_TTL, REDUNDANCY, SCHEDULER_INTERVALS_MS } from '../../initializers' | ||
3 | import { logger } from '../../helpers/logger' | ||
4 | import { VideoRedundancyStrategy } from '../../../shared/models/redundancy' | ||
5 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' | ||
6 | import { VideoFileModel } from '../../models/video/video-file' | ||
7 | import { sortBy } from 'lodash' | ||
8 | import { downloadWebTorrentVideo } from '../../helpers/webtorrent' | ||
9 | import { join } from 'path' | ||
10 | import { rename } from 'fs-extra' | ||
11 | import { getServerActor } from '../../helpers/utils' | ||
12 | import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' | ||
13 | import { VideoModel } from '../../models/video/video' | ||
14 | import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' | ||
15 | import { removeVideoRedundancy } from '../redundancy' | ||
16 | import { isTestInstance } from '../../helpers/core-utils' | ||
17 | |||
18 | export class VideosRedundancyScheduler extends AbstractScheduler { | ||
19 | |||
20 | private static instance: AbstractScheduler | ||
21 | private executing = false | ||
22 | |||
23 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosRedundancy | ||
24 | |||
25 | private constructor () { | ||
26 | super() | ||
27 | } | ||
28 | |||
29 | async execute () { | ||
30 | if (this.executing) return | ||
31 | |||
32 | this.executing = true | ||
33 | |||
34 | for (const obj of CONFIG.REDUNDANCY.VIDEOS) { | ||
35 | |||
36 | try { | ||
37 | const videoToDuplicate = await this.findVideoToDuplicate(obj.strategy) | ||
38 | if (!videoToDuplicate) continue | ||
39 | |||
40 | const videoFiles = videoToDuplicate.VideoFiles | ||
41 | videoFiles.forEach(f => f.Video = videoToDuplicate) | ||
42 | |||
43 | const videosRedundancy = await VideoRedundancyModel.getVideoFiles(obj.strategy) | ||
44 | if (this.isTooHeavy(videosRedundancy, videoFiles, obj.size)) { | ||
45 | if (!isTestInstance()) logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) | ||
46 | continue | ||
47 | } | ||
48 | |||
49 | logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) | ||
50 | |||
51 | await this.createVideoRedundancy(obj.strategy, videoFiles) | ||
52 | } catch (err) { | ||
53 | logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) | ||
54 | } | ||
55 | } | ||
56 | |||
57 | const expired = await VideoRedundancyModel.listAllExpired() | ||
58 | |||
59 | for (const m of expired) { | ||
60 | logger.info('Removing expired video %s from our redundancy system.', this.buildEntryLogId(m)) | ||
61 | |||
62 | try { | ||
63 | await m.destroy() | ||
64 | } catch (err) { | ||
65 | logger.error('Cannot remove %s video from our redundancy system.', this.buildEntryLogId(m)) | ||
66 | } | ||
67 | } | ||
68 | |||
69 | this.executing = false | ||
70 | } | ||
71 | |||
72 | static get Instance () { | ||
73 | return this.instance || (this.instance = new this()) | ||
74 | } | ||
75 | |||
76 | private findVideoToDuplicate (strategy: VideoRedundancyStrategy) { | ||
77 | if (strategy === 'most-views') return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR) | ||
78 | } | ||
79 | |||
80 | private async createVideoRedundancy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[]) { | ||
81 | const serverActor = await getServerActor() | ||
82 | |||
83 | for (const file of filesToDuplicate) { | ||
84 | const existing = await VideoRedundancyModel.loadByFileId(file.id) | ||
85 | if (existing) { | ||
86 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', file.Video.url, file.resolution, strategy) | ||
87 | |||
88 | existing.expiresOn = this.buildNewExpiration() | ||
89 | await existing.save() | ||
90 | |||
91 | await sendUpdateCacheFile(serverActor, existing) | ||
92 | continue | ||
93 | } | ||
94 | |||
95 | // We need more attributes and check if the video still exists | ||
96 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id) | ||
97 | if (!video) continue | ||
98 | |||
99 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy) | ||
100 | |||
101 | const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() | ||
102 | const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) | ||
103 | |||
104 | const tmpPath = await downloadWebTorrentVideo({ magnetUri }, JOB_TTL['video-import']) | ||
105 | |||
106 | const destPath = join(CONFIG.STORAGE.VIDEOS_DIR, video.getVideoFilename(file)) | ||
107 | await rename(tmpPath, destPath) | ||
108 | |||
109 | const createdModel = await VideoRedundancyModel.create({ | ||
110 | expiresOn: new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS), | ||
111 | url: getVideoCacheFileActivityPubUrl(file), | ||
112 | fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), | ||
113 | strategy, | ||
114 | videoFileId: file.id, | ||
115 | actorId: serverActor.id | ||
116 | }) | ||
117 | createdModel.VideoFile = file | ||
118 | |||
119 | await sendCreateCacheFile(serverActor, createdModel) | ||
120 | } | ||
121 | } | ||
122 | |||
123 | // Unused, but could be useful in the future, with a custom strategy | ||
124 | private async purgeVideosIfNeeded (videosRedundancy: VideoRedundancyModel[], filesToDuplicate: VideoFileModel[], maxSize: number) { | ||
125 | const sortedVideosRedundancy = sortBy(videosRedundancy, 'createdAt') | ||
126 | |||
127 | while (this.isTooHeavy(sortedVideosRedundancy, filesToDuplicate, maxSize)) { | ||
128 | const toDelete = sortedVideosRedundancy.shift() | ||
129 | |||
130 | const videoFile = toDelete.VideoFile | ||
131 | logger.info('Purging video %s (resolution %d) from our redundancy system.', videoFile.Video.url, videoFile.resolution) | ||
132 | |||
133 | await removeVideoRedundancy(toDelete, undefined) | ||
134 | } | ||
135 | |||
136 | return sortedVideosRedundancy | ||
137 | } | ||
138 | |||
139 | private isTooHeavy (videosRedundancy: VideoRedundancyModel[], filesToDuplicate: VideoFileModel[], maxSizeArg: number) { | ||
140 | const maxSize = maxSizeArg - this.getTotalFileSizes(filesToDuplicate) | ||
141 | |||
142 | const redundancyReducer = (previous: number, current: VideoRedundancyModel) => previous + current.VideoFile.size | ||
143 | const totalDuplicated = videosRedundancy.reduce(redundancyReducer, 0) | ||
144 | |||
145 | return totalDuplicated > maxSize | ||
146 | } | ||
147 | |||
148 | private buildNewExpiration () { | ||
149 | return new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS) | ||
150 | } | ||
151 | |||
152 | private buildEntryLogId (object: VideoRedundancyModel) { | ||
153 | return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}` | ||
154 | } | ||
155 | |||
156 | private getTotalFileSizes (files: VideoFileModel[]) { | ||
157 | const fileReducer = (previous: number, current: VideoFileModel) => previous + current.size | ||
158 | |||
159 | return files.reduce(fileReducer, 0) | ||
160 | } | ||
161 | } | ||