diff options
Diffstat (limited to 'server/lib/schedulers')
-rw-r--r-- | server/lib/schedulers/videos-redundancy-scheduler.ts | 86 |
1 files changed, 59 insertions, 27 deletions
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts index 998d2295a..97df3e4f5 100644 --- a/server/lib/schedulers/videos-redundancy-scheduler.ts +++ b/server/lib/schedulers/videos-redundancy-scheduler.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { AbstractScheduler } from './abstract-scheduler' | 1 | import { AbstractScheduler } from './abstract-scheduler' |
2 | import { CONFIG, JOB_TTL, REDUNDANCY, SCHEDULER_INTERVALS_MS } from '../../initializers' | 2 | import { CONFIG, JOB_TTL, REDUNDANCY } from '../../initializers' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { VideoRedundancyStrategy, VideosRedundancy } from '../../../shared/models/redundancy' | 4 | import { VideosRedundancy } from '../../../shared/models/redundancy' |
5 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' | 5 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' |
6 | import { VideoFileModel } from '../../models/video/video-file' | 6 | import { VideoFileModel } from '../../models/video/video-file' |
7 | import { downloadWebTorrentVideo } from '../../helpers/webtorrent' | 7 | import { downloadWebTorrentVideo } from '../../helpers/webtorrent' |
@@ -12,6 +12,7 @@ import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' | |||
12 | import { VideoModel } from '../../models/video/video' | 12 | import { VideoModel } from '../../models/video/video' |
13 | import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' | 13 | import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' |
14 | import { isTestInstance } from '../../helpers/core-utils' | 14 | import { isTestInstance } from '../../helpers/core-utils' |
15 | import { removeVideoRedundancy } from '../redundancy' | ||
15 | 16 | ||
16 | export class VideosRedundancyScheduler extends AbstractScheduler { | 17 | export class VideosRedundancyScheduler extends AbstractScheduler { |
17 | 18 | ||
@@ -30,7 +31,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
30 | this.executing = true | 31 | this.executing = true |
31 | 32 | ||
32 | for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { | 33 | for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { |
33 | if (!isTestInstance()) logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) | 34 | logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) |
34 | 35 | ||
35 | try { | 36 | try { |
36 | const videoToDuplicate = await this.findVideoToDuplicate(obj) | 37 | const videoToDuplicate = await this.findVideoToDuplicate(obj) |
@@ -39,20 +40,24 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
39 | const videoFiles = videoToDuplicate.VideoFiles | 40 | const videoFiles = videoToDuplicate.VideoFiles |
40 | videoFiles.forEach(f => f.Video = videoToDuplicate) | 41 | videoFiles.forEach(f => f.Video = videoToDuplicate) |
41 | 42 | ||
42 | if (await this.isTooHeavy(obj.strategy, videoFiles, obj.size)) { | 43 | await this.purgeCacheIfNeeded(obj, videoFiles) |
43 | if (!isTestInstance()) logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) | 44 | |
45 | if (await this.isTooHeavy(obj, videoFiles)) { | ||
46 | logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) | ||
44 | continue | 47 | continue |
45 | } | 48 | } |
46 | 49 | ||
47 | logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) | 50 | logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) |
48 | 51 | ||
49 | await this.createVideoRedundancy(obj.strategy, videoFiles) | 52 | await this.createVideoRedundancy(obj, videoFiles) |
50 | } catch (err) { | 53 | } catch (err) { |
51 | logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) | 54 | logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) |
52 | } | 55 | } |
53 | } | 56 | } |
54 | 57 | ||
55 | await this.removeExpired() | 58 | await this.extendsLocalExpiration() |
59 | |||
60 | await this.purgeRemoteExpired() | ||
56 | 61 | ||
57 | this.executing = false | 62 | this.executing = false |
58 | } | 63 | } |
@@ -61,16 +66,27 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
61 | return this.instance || (this.instance = new this()) | 66 | return this.instance || (this.instance = new this()) |
62 | } | 67 | } |
63 | 68 | ||
64 | private async removeExpired () { | 69 | private async extendsLocalExpiration () { |
65 | const expired = await VideoRedundancyModel.listAllExpired() | 70 | const expired = await VideoRedundancyModel.listLocalExpired() |
71 | |||
72 | for (const redundancyModel of expired) { | ||
73 | try { | ||
74 | const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) | ||
75 | await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime) | ||
76 | } catch (err) { | ||
77 | logger.error('Cannot extend expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel)) | ||
78 | } | ||
79 | } | ||
80 | } | ||
66 | 81 | ||
67 | for (const m of expired) { | 82 | private async purgeRemoteExpired () { |
68 | logger.info('Removing expired video %s from our redundancy system.', this.buildEntryLogId(m)) | 83 | const expired = await VideoRedundancyModel.listRemoteExpired() |
69 | 84 | ||
85 | for (const redundancyModel of expired) { | ||
70 | try { | 86 | try { |
71 | await m.destroy() | 87 | await removeVideoRedundancy(redundancyModel) |
72 | } catch (err) { | 88 | } catch (err) { |
73 | logger.error('Cannot remove %s video from our redundancy system.', this.buildEntryLogId(m)) | 89 | logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel)) |
74 | } | 90 | } |
75 | } | 91 | } |
76 | } | 92 | } |
@@ -90,18 +106,14 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
90 | } | 106 | } |
91 | } | 107 | } |
92 | 108 | ||
93 | private async createVideoRedundancy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[]) { | 109 | private async createVideoRedundancy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { |
94 | const serverActor = await getServerActor() | 110 | const serverActor = await getServerActor() |
95 | 111 | ||
96 | for (const file of filesToDuplicate) { | 112 | for (const file of filesToDuplicate) { |
97 | const existing = await VideoRedundancyModel.loadByFileId(file.id) | 113 | const existing = await VideoRedundancyModel.loadByFileId(file.id) |
98 | if (existing) { | 114 | if (existing) { |
99 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', file.Video.url, file.resolution, strategy) | 115 | await this.extendsExpirationOf(existing, redundancy.minLifetime) |
100 | 116 | ||
101 | existing.expiresOn = this.buildNewExpiration() | ||
102 | await existing.save() | ||
103 | |||
104 | await sendUpdateCacheFile(serverActor, existing) | ||
105 | continue | 117 | continue |
106 | } | 118 | } |
107 | 119 | ||
@@ -109,7 +121,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
109 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id) | 121 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id) |
110 | if (!video) continue | 122 | if (!video) continue |
111 | 123 | ||
112 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy) | 124 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy) |
113 | 125 | ||
114 | const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() | 126 | const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() |
115 | const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) | 127 | const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) |
@@ -120,10 +132,10 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
120 | await rename(tmpPath, destPath) | 132 | await rename(tmpPath, destPath) |
121 | 133 | ||
122 | const createdModel = await VideoRedundancyModel.create({ | 134 | const createdModel = await VideoRedundancyModel.create({ |
123 | expiresOn: new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS), | 135 | expiresOn: this.buildNewExpiration(redundancy.minLifetime), |
124 | url: getVideoCacheFileActivityPubUrl(file), | 136 | url: getVideoCacheFileActivityPubUrl(file), |
125 | fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), | 137 | fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), |
126 | strategy, | 138 | strategy: redundancy.strategy, |
127 | videoFileId: file.id, | 139 | videoFileId: file.id, |
128 | actorId: serverActor.id | 140 | actorId: serverActor.id |
129 | }) | 141 | }) |
@@ -133,16 +145,36 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
133 | } | 145 | } |
134 | } | 146 | } |
135 | 147 | ||
136 | private async isTooHeavy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[], maxSizeArg: number) { | 148 | private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) { |
137 | const maxSize = maxSizeArg - this.getTotalFileSizes(filesToDuplicate) | 149 | logger.info('Extending expiration of %s.', redundancy.url) |
150 | |||
151 | const serverActor = await getServerActor() | ||
152 | |||
153 | redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs) | ||
154 | await redundancy.save() | ||
155 | |||
156 | await sendUpdateCacheFile(serverActor, redundancy) | ||
157 | } | ||
158 | |||
159 | private async purgeCacheIfNeeded (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { | ||
160 | while (this.isTooHeavy(redundancy, filesToDuplicate)) { | ||
161 | const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime) | ||
162 | if (!toDelete) return | ||
163 | |||
164 | await removeVideoRedundancy(toDelete) | ||
165 | } | ||
166 | } | ||
167 | |||
168 | private async isTooHeavy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { | ||
169 | const maxSize = redundancy.size - this.getTotalFileSizes(filesToDuplicate) | ||
138 | 170 | ||
139 | const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(strategy) | 171 | const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(redundancy.strategy) |
140 | 172 | ||
141 | return totalDuplicated > maxSize | 173 | return totalDuplicated > maxSize |
142 | } | 174 | } |
143 | 175 | ||
144 | private buildNewExpiration () { | 176 | private buildNewExpiration (expiresAfterMs: number) { |
145 | return new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS) | 177 | return new Date(Date.now() + expiresAfterMs) |
146 | } | 178 | } |
147 | 179 | ||
148 | private buildEntryLogId (object: VideoRedundancyModel) { | 180 | private buildEntryLogId (object: VideoRedundancyModel) { |