aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/schedulers/videos-redundancy-scheduler.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-09-24 13:07:33 +0200
committerChocobozzz <me@florianbigard.com>2018-09-24 13:38:39 +0200
commite5565833f62b97f62ea75eba5b479963ae78b873 (patch)
tree835793ce464f9666b0ceae79f3d278cc4e007b32 /server/lib/schedulers/videos-redundancy-scheduler.ts
parentd1a63fc7ac58a1db00d8ca4f43aadba02eb9b084 (diff)
downloadPeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.tar.gz
PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.tar.zst
PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.zip
Improve redundancy: add 'min_lifetime' configuration
Diffstat (limited to 'server/lib/schedulers/videos-redundancy-scheduler.ts')
-rw-r--r--server/lib/schedulers/videos-redundancy-scheduler.ts86
1 files changed, 59 insertions, 27 deletions
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts
index 998d2295a..97df3e4f5 100644
--- a/server/lib/schedulers/videos-redundancy-scheduler.ts
+++ b/server/lib/schedulers/videos-redundancy-scheduler.ts
@@ -1,7 +1,7 @@
1import { AbstractScheduler } from './abstract-scheduler' 1import { AbstractScheduler } from './abstract-scheduler'
2import { CONFIG, JOB_TTL, REDUNDANCY, SCHEDULER_INTERVALS_MS } from '../../initializers' 2import { CONFIG, JOB_TTL, REDUNDANCY } from '../../initializers'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { VideoRedundancyStrategy, VideosRedundancy } from '../../../shared/models/redundancy' 4import { VideosRedundancy } from '../../../shared/models/redundancy'
5import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' 5import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
6import { VideoFileModel } from '../../models/video/video-file' 6import { VideoFileModel } from '../../models/video/video-file'
7import { downloadWebTorrentVideo } from '../../helpers/webtorrent' 7import { downloadWebTorrentVideo } from '../../helpers/webtorrent'
@@ -12,6 +12,7 @@ import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
12import { VideoModel } from '../../models/video/video' 12import { VideoModel } from '../../models/video/video'
13import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' 13import { getVideoCacheFileActivityPubUrl } from '../activitypub/url'
14import { isTestInstance } from '../../helpers/core-utils' 14import { isTestInstance } from '../../helpers/core-utils'
15import { removeVideoRedundancy } from '../redundancy'
15 16
16export class VideosRedundancyScheduler extends AbstractScheduler { 17export class VideosRedundancyScheduler extends AbstractScheduler {
17 18
@@ -30,7 +31,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
30 this.executing = true 31 this.executing = true
31 32
32 for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { 33 for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
33 if (!isTestInstance()) logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) 34 logger.info('Running redundancy scheduler for strategy %s.', obj.strategy)
34 35
35 try { 36 try {
36 const videoToDuplicate = await this.findVideoToDuplicate(obj) 37 const videoToDuplicate = await this.findVideoToDuplicate(obj)
@@ -39,20 +40,24 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
39 const videoFiles = videoToDuplicate.VideoFiles 40 const videoFiles = videoToDuplicate.VideoFiles
40 videoFiles.forEach(f => f.Video = videoToDuplicate) 41 videoFiles.forEach(f => f.Video = videoToDuplicate)
41 42
42 if (await this.isTooHeavy(obj.strategy, videoFiles, obj.size)) { 43 await this.purgeCacheIfNeeded(obj, videoFiles)
43 if (!isTestInstance()) logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) 44
45 if (await this.isTooHeavy(obj, videoFiles)) {
46 logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url)
44 continue 47 continue
45 } 48 }
46 49
47 logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) 50 logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy)
48 51
49 await this.createVideoRedundancy(obj.strategy, videoFiles) 52 await this.createVideoRedundancy(obj, videoFiles)
50 } catch (err) { 53 } catch (err) {
51 logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) 54 logger.error('Cannot run videos redundancy %s.', obj.strategy, { err })
52 } 55 }
53 } 56 }
54 57
55 await this.removeExpired() 58 await this.extendsLocalExpiration()
59
60 await this.purgeRemoteExpired()
56 61
57 this.executing = false 62 this.executing = false
58 } 63 }
@@ -61,16 +66,27 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
61 return this.instance || (this.instance = new this()) 66 return this.instance || (this.instance = new this())
62 } 67 }
63 68
64 private async removeExpired () { 69 private async extendsLocalExpiration () {
65 const expired = await VideoRedundancyModel.listAllExpired() 70 const expired = await VideoRedundancyModel.listLocalExpired()
71
72 for (const redundancyModel of expired) {
73 try {
74 const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
75 await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
76 } catch (err) {
77 logger.error('Cannot extend expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel))
78 }
79 }
80 }
66 81
67 for (const m of expired) { 82 private async purgeRemoteExpired () {
68 logger.info('Removing expired video %s from our redundancy system.', this.buildEntryLogId(m)) 83 const expired = await VideoRedundancyModel.listRemoteExpired()
69 84
85 for (const redundancyModel of expired) {
70 try { 86 try {
71 await m.destroy() 87 await removeVideoRedundancy(redundancyModel)
72 } catch (err) { 88 } catch (err) {
73 logger.error('Cannot remove %s video from our redundancy system.', this.buildEntryLogId(m)) 89 logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel))
74 } 90 }
75 } 91 }
76 } 92 }
@@ -90,18 +106,14 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
90 } 106 }
91 } 107 }
92 108
93 private async createVideoRedundancy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[]) { 109 private async createVideoRedundancy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
94 const serverActor = await getServerActor() 110 const serverActor = await getServerActor()
95 111
96 for (const file of filesToDuplicate) { 112 for (const file of filesToDuplicate) {
97 const existing = await VideoRedundancyModel.loadByFileId(file.id) 113 const existing = await VideoRedundancyModel.loadByFileId(file.id)
98 if (existing) { 114 if (existing) {
99 logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', file.Video.url, file.resolution, strategy) 115 await this.extendsExpirationOf(existing, redundancy.minLifetime)
100 116
101 existing.expiresOn = this.buildNewExpiration()
102 await existing.save()
103
104 await sendUpdateCacheFile(serverActor, existing)
105 continue 117 continue
106 } 118 }
107 119
@@ -109,7 +121,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
109 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id) 121 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id)
110 if (!video) continue 122 if (!video) continue
111 123
112 logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy) 124 logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy)
113 125
114 const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() 126 const { baseUrlHttp, baseUrlWs } = video.getBaseUrls()
115 const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) 127 const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs)
@@ -120,10 +132,10 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
120 await rename(tmpPath, destPath) 132 await rename(tmpPath, destPath)
121 133
122 const createdModel = await VideoRedundancyModel.create({ 134 const createdModel = await VideoRedundancyModel.create({
123 expiresOn: new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS), 135 expiresOn: this.buildNewExpiration(redundancy.minLifetime),
124 url: getVideoCacheFileActivityPubUrl(file), 136 url: getVideoCacheFileActivityPubUrl(file),
125 fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), 137 fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL),
126 strategy, 138 strategy: redundancy.strategy,
127 videoFileId: file.id, 139 videoFileId: file.id,
128 actorId: serverActor.id 140 actorId: serverActor.id
129 }) 141 })
@@ -133,16 +145,36 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
133 } 145 }
134 } 146 }
135 147
136 private async isTooHeavy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[], maxSizeArg: number) { 148 private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) {
137 const maxSize = maxSizeArg - this.getTotalFileSizes(filesToDuplicate) 149 logger.info('Extending expiration of %s.', redundancy.url)
150
151 const serverActor = await getServerActor()
152
153 redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
154 await redundancy.save()
155
156 await sendUpdateCacheFile(serverActor, redundancy)
157 }
158
159 private async purgeCacheIfNeeded (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
160 while (this.isTooHeavy(redundancy, filesToDuplicate)) {
161 const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime)
162 if (!toDelete) return
163
164 await removeVideoRedundancy(toDelete)
165 }
166 }
167
168 private async isTooHeavy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
169 const maxSize = redundancy.size - this.getTotalFileSizes(filesToDuplicate)
138 170
139 const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(strategy) 171 const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(redundancy.strategy)
140 172
141 return totalDuplicated > maxSize 173 return totalDuplicated > maxSize
142 } 174 }
143 175
144 private buildNewExpiration () { 176 private buildNewExpiration (expiresAfterMs: number) {
145 return new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS) 177 return new Date(Date.now() + expiresAfterMs)
146 } 178 }
147 179
148 private buildEntryLogId (object: VideoRedundancyModel) { 180 private buildEntryLogId (object: VideoRedundancyModel) {