diff options
author | Chocobozzz <me@florianbigard.com> | 2018-09-24 13:07:33 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-09-24 13:38:39 +0200 |
commit | e5565833f62b97f62ea75eba5b479963ae78b873 (patch) | |
tree | 835793ce464f9666b0ceae79f3d278cc4e007b32 /server/lib | |
parent | d1a63fc7ac58a1db00d8ca4f43aadba02eb9b084 (diff) | |
download | PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.tar.gz PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.tar.zst PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.zip |
Improve redundancy: add 'min_lifetime' configuration
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/activitypub/audience.ts | 7 | ||||
-rw-r--r-- | server/lib/activitypub/cache-file.ts | 22 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-create.ts | 8 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-undo.ts | 2 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-update.ts | 28 | ||||
-rw-r--r-- | server/lib/activitypub/send/send-update.ts | 4 | ||||
-rw-r--r-- | server/lib/activitypub/videos.ts | 50 | ||||
-rw-r--r-- | server/lib/cache/index.ts | 1 | ||||
-rw-r--r-- | server/lib/redundancy.ts | 3 | ||||
-rw-r--r-- | server/lib/schedulers/videos-redundancy-scheduler.ts | 86 |
10 files changed, 138 insertions, 73 deletions
diff --git a/server/lib/activitypub/audience.ts b/server/lib/activitypub/audience.ts index a86428461..10277eca7 100644 --- a/server/lib/activitypub/audience.ts +++ b/server/lib/activitypub/audience.ts | |||
@@ -50,7 +50,12 @@ function getAudienceFromFollowersOf (actorsInvolvedInObject: ActorModel[]): Acti | |||
50 | 50 | ||
51 | async function getActorsInvolvedInVideo (video: VideoModel, t: Transaction) { | 51 | async function getActorsInvolvedInVideo (video: VideoModel, t: Transaction) { |
52 | const actors = await VideoShareModel.loadActorsByShare(video.id, t) | 52 | const actors = await VideoShareModel.loadActorsByShare(video.id, t) |
53 | actors.push(video.VideoChannel.Account.Actor) | 53 | |
54 | const videoActor = video.VideoChannel && video.VideoChannel.Account | ||
55 | ? video.VideoChannel.Account.Actor | ||
56 | : await ActorModel.loadAccountActorByVideoId(video.id, t) | ||
57 | |||
58 | actors.push(videoActor) | ||
54 | 59 | ||
55 | return actors | 60 | return actors |
56 | } | 61 | } |
diff --git a/server/lib/activitypub/cache-file.ts b/server/lib/activitypub/cache-file.ts index 87f8a4162..5286d8e6d 100644 --- a/server/lib/activitypub/cache-file.ts +++ b/server/lib/activitypub/cache-file.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { CacheFileObject } from '../../../shared/index' | 1 | import { CacheFileObject } from '../../../shared/index' |
2 | import { VideoModel } from '../../models/video/video' | 2 | import { VideoModel } from '../../models/video/video' |
3 | import { sequelizeTypescript } from '../../initializers' | ||
4 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' | 3 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' |
4 | import { Transaction } from 'sequelize' | ||
5 | 5 | ||
6 | function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { | 6 | function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { |
7 | const url = cacheFileObject.url | 7 | const url = cacheFileObject.url |
@@ -22,25 +22,29 @@ function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject | |||
22 | } | 22 | } |
23 | } | 23 | } |
24 | 24 | ||
25 | function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { | 25 | function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }, t: Transaction) { |
26 | return sequelizeTypescript.transaction(async t => { | 26 | const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) |
27 | const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) | ||
28 | 27 | ||
29 | return VideoRedundancyModel.create(attributes, { transaction: t }) | 28 | return VideoRedundancyModel.create(attributes, { transaction: t }) |
30 | }) | ||
31 | } | 29 | } |
32 | 30 | ||
33 | function updateCacheFile (cacheFileObject: CacheFileObject, redundancyModel: VideoRedundancyModel, byActor: { id?: number }) { | 31 | function updateCacheFile ( |
32 | cacheFileObject: CacheFileObject, | ||
33 | redundancyModel: VideoRedundancyModel, | ||
34 | video: VideoModel, | ||
35 | byActor: { id?: number }, | ||
36 | t: Transaction | ||
37 | ) { | ||
34 | if (redundancyModel.actorId !== byActor.id) { | 38 | if (redundancyModel.actorId !== byActor.id) { |
35 | throw new Error('Cannot update redundancy ' + redundancyModel.url + ' of another actor.') | 39 | throw new Error('Cannot update redundancy ' + redundancyModel.url + ' of another actor.') |
36 | } | 40 | } |
37 | 41 | ||
38 | const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, redundancyModel.VideoFile.Video, byActor) | 42 | const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) |
39 | 43 | ||
40 | redundancyModel.set('expires', attributes.expiresOn) | 44 | redundancyModel.set('expires', attributes.expiresOn) |
41 | redundancyModel.set('fileUrl', attributes.fileUrl) | 45 | redundancyModel.set('fileUrl', attributes.fileUrl) |
42 | 46 | ||
43 | return redundancyModel.save() | 47 | return redundancyModel.save({ transaction: t }) |
44 | } | 48 | } |
45 | 49 | ||
46 | export { | 50 | export { |
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index cff8dcfc6..ceb5413ca 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts | |||
@@ -95,7 +95,7 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate) | |||
95 | if (video.isOwned()) { | 95 | if (video.isOwned()) { |
96 | // Don't resend the activity to the sender | 96 | // Don't resend the activity to the sender |
97 | const exceptions = [ byActor ] | 97 | const exceptions = [ byActor ] |
98 | await forwardActivity(activity, undefined, exceptions) | 98 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
99 | } | 99 | } |
100 | } | 100 | } |
101 | 101 | ||
@@ -104,12 +104,14 @@ async function processCacheFile (byActor: ActorModel, activity: ActivityCreate) | |||
104 | 104 | ||
105 | const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFile.object }) | 105 | const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFile.object }) |
106 | 106 | ||
107 | await createCacheFile(cacheFile, video, byActor) | 107 | await sequelizeTypescript.transaction(async t => { |
108 | return createCacheFile(cacheFile, video, byActor, t) | ||
109 | }) | ||
108 | 110 | ||
109 | if (video.isOwned()) { | 111 | if (video.isOwned()) { |
110 | // Don't resend the activity to the sender | 112 | // Don't resend the activity to the sender |
111 | const exceptions = [ byActor ] | 113 | const exceptions = [ byActor ] |
112 | await forwardActivity(activity, undefined, exceptions) | 114 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
113 | } | 115 | } |
114 | } | 116 | } |
115 | 117 | ||
diff --git a/server/lib/activitypub/process/process-undo.ts b/server/lib/activitypub/process/process-undo.ts index 73ca0a17c..ff019cd8c 100644 --- a/server/lib/activitypub/process/process-undo.ts +++ b/server/lib/activitypub/process/process-undo.ts | |||
@@ -100,7 +100,7 @@ async function processUndoCacheFile (byActor: ActorModel, activity: ActivityUndo | |||
100 | 100 | ||
101 | return sequelizeTypescript.transaction(async t => { | 101 | return sequelizeTypescript.transaction(async t => { |
102 | const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) | 102 | const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) |
103 | if (!cacheFile) throw new Error('Unknown video cache ' + cacheFile.url) | 103 | if (!cacheFile) throw new Error('Unknown video cache ' + cacheFileObject.id) |
104 | 104 | ||
105 | if (cacheFile.actorId !== byActor.id) throw new Error('Cannot delete redundancy ' + cacheFile.url + ' of another actor.') | 105 | if (cacheFile.actorId !== byActor.id) throw new Error('Cannot delete redundancy ' + cacheFile.url + ' of another actor.') |
106 | 106 | ||
diff --git a/server/lib/activitypub/process/process-update.ts b/server/lib/activitypub/process/process-update.ts index ed3489ebf..e092a6729 100644 --- a/server/lib/activitypub/process/process-update.ts +++ b/server/lib/activitypub/process/process-update.ts | |||
@@ -12,6 +12,7 @@ import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-vali | |||
12 | import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file' | 12 | import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file' |
13 | import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' | 13 | import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' |
14 | import { createCacheFile, updateCacheFile } from '../cache-file' | 14 | import { createCacheFile, updateCacheFile } from '../cache-file' |
15 | import { forwardVideoRelatedActivity } from '../send/utils' | ||
15 | 16 | ||
16 | async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) { | 17 | async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) { |
17 | const objectType = activity.object.type | 18 | const objectType = activity.object.type |
@@ -68,18 +69,29 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate) | |||
68 | async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) { | 69 | async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) { |
69 | const cacheFileObject = activity.object as CacheFileObject | 70 | const cacheFileObject = activity.object as CacheFileObject |
70 | 71 | ||
71 | if (!isCacheFileObjectValid(cacheFileObject) === false) { | 72 | if (!isCacheFileObjectValid(cacheFileObject)) { |
72 | logger.debug('Cahe file object sent by update is not valid.', { cacheFileObject }) | 73 | logger.debug('Cache file object sent by update is not valid.', { cacheFileObject }) |
73 | return undefined | 74 | return undefined |
74 | } | 75 | } |
75 | 76 | ||
76 | const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) | 77 | const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.object }) |
77 | if (!redundancyModel) { | 78 | |
78 | const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.id }) | 79 | await sequelizeTypescript.transaction(async t => { |
79 | return createCacheFile(cacheFileObject, video, byActor) | 80 | const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id, t) |
80 | } | 81 | |
82 | if (!redundancyModel) { | ||
83 | await createCacheFile(cacheFileObject, video, byActor, t) | ||
84 | } else { | ||
85 | await updateCacheFile(cacheFileObject, redundancyModel, video, byActor, t) | ||
86 | } | ||
87 | }) | ||
88 | |||
89 | if (video.isOwned()) { | ||
90 | // Don't resend the activity to the sender | ||
91 | const exceptions = [ byActor ] | ||
81 | 92 | ||
82 | return updateCacheFile(cacheFileObject, redundancyModel, byActor) | 93 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
94 | } | ||
83 | } | 95 | } |
84 | 96 | ||
85 | async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) { | 97 | async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) { |
diff --git a/server/lib/activitypub/send/send-update.ts b/server/lib/activitypub/send/send-update.ts index ec46789b7..a68f03edf 100644 --- a/server/lib/activitypub/send/send-update.ts +++ b/server/lib/activitypub/send/send-update.ts | |||
@@ -7,8 +7,8 @@ import { VideoModel } from '../../../models/video/video' | |||
7 | import { VideoChannelModel } from '../../../models/video/video-channel' | 7 | import { VideoChannelModel } from '../../../models/video/video-channel' |
8 | import { VideoShareModel } from '../../../models/video/video-share' | 8 | import { VideoShareModel } from '../../../models/video/video-share' |
9 | import { getUpdateActivityPubUrl } from '../url' | 9 | import { getUpdateActivityPubUrl } from '../url' |
10 | import { broadcastToFollowers, sendVideoRelatedActivity, unicastTo } from './utils' | 10 | import { broadcastToFollowers, sendVideoRelatedActivity } from './utils' |
11 | import { audiencify, getActorsInvolvedInVideo, getAudience, getAudienceFromFollowersOf } from '../audience' | 11 | import { audiencify, getActorsInvolvedInVideo, getAudience } from '../audience' |
12 | import { logger } from '../../../helpers/logger' | 12 | import { logger } from '../../../helpers/logger' |
13 | import { VideoCaptionModel } from '../../../models/video/video-caption' | 13 | import { VideoCaptionModel } from '../../../models/video/video-caption' |
14 | import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' | 14 | import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' |
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index 48c0e0a5c..db72ef23c 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts | |||
@@ -176,7 +176,7 @@ async function getOrCreateVideoAndAccountAndChannel (options: { | |||
176 | syncParam, | 176 | syncParam, |
177 | refreshViews | 177 | refreshViews |
178 | } | 178 | } |
179 | const p = retryTransactionWrapper(refreshVideoIfNeeded, refreshOptions) | 179 | const p = refreshVideoIfNeeded(refreshOptions) |
180 | if (syncParam.refreshVideo === true) videoFromDatabase = await p | 180 | if (syncParam.refreshVideo === true) videoFromDatabase = await p |
181 | 181 | ||
182 | return { video: videoFromDatabase } | 182 | return { video: videoFromDatabase } |
@@ -245,29 +245,37 @@ async function updateVideoFromAP (options: { | |||
245 | generateThumbnailFromUrl(options.video, options.videoObject.icon) | 245 | generateThumbnailFromUrl(options.video, options.videoObject.icon) |
246 | .catch(err => logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })) | 246 | .catch(err => logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })) |
247 | 247 | ||
248 | // Remove old video files | 248 | { |
249 | const videoFileDestroyTasks: Bluebird<void>[] = [] | 249 | const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject) |
250 | for (const videoFile of options.video.VideoFiles) { | 250 | const newVideoFiles = videoFileAttributes.map(a => new VideoFileModel(a)) |
251 | videoFileDestroyTasks.push(videoFile.destroy(sequelizeOptions)) | ||
252 | } | ||
253 | await Promise.all(videoFileDestroyTasks) | ||
254 | 251 | ||
255 | const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject) | 252 | // Remove video files that do not exist anymore |
256 | const tasks = videoFileAttributes.map(f => VideoFileModel.create(f, sequelizeOptions)) | 253 | const destroyTasks = options.video.VideoFiles |
257 | await Promise.all(tasks) | 254 | .filter(f => !newVideoFiles.find(newFile => newFile.hasSameUniqueKeysThan(f))) |
255 | .map(f => f.destroy(sequelizeOptions)) | ||
256 | await Promise.all(destroyTasks) | ||
258 | 257 | ||
259 | // Update Tags | 258 | // Update or add other one |
260 | const tags = options.videoObject.tag.map(tag => tag.name) | 259 | const upsertTasks = videoFileAttributes.map(a => VideoFileModel.upsert(a, sequelizeOptions)) |
261 | const tagInstances = await TagModel.findOrCreateTags(tags, t) | 260 | await Promise.all(upsertTasks) |
262 | await options.video.$set('Tags', tagInstances, sequelizeOptions) | 261 | } |
263 | 262 | ||
264 | // Update captions | 263 | { |
265 | await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t) | 264 | // Update Tags |
265 | const tags = options.videoObject.tag.map(tag => tag.name) | ||
266 | const tagInstances = await TagModel.findOrCreateTags(tags, t) | ||
267 | await options.video.$set('Tags', tagInstances, sequelizeOptions) | ||
268 | } | ||
266 | 269 | ||
267 | const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => { | 270 | { |
268 | return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t) | 271 | // Update captions |
269 | }) | 272 | await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t) |
270 | await Promise.all(videoCaptionsPromises) | 273 | |
274 | const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => { | ||
275 | return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t) | ||
276 | }) | ||
277 | await Promise.all(videoCaptionsPromises) | ||
278 | } | ||
271 | }) | 279 | }) |
272 | 280 | ||
273 | logger.info('Remote video with uuid %s updated', options.videoObject.uuid) | 281 | logger.info('Remote video with uuid %s updated', options.videoObject.uuid) |
@@ -382,7 +390,7 @@ async function refreshVideoIfNeeded (options: { | |||
382 | channel: channelActor.VideoChannel, | 390 | channel: channelActor.VideoChannel, |
383 | updateViews: options.refreshViews | 391 | updateViews: options.refreshViews |
384 | } | 392 | } |
385 | await updateVideoFromAP(updateOptions) | 393 | await retryTransactionWrapper(updateVideoFromAP, updateOptions) |
386 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) | 394 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) |
387 | } catch (err) { | 395 | } catch (err) { |
388 | logger.warn('Cannot refresh video.', { err }) | 396 | logger.warn('Cannot refresh video.', { err }) |
diff --git a/server/lib/cache/index.ts b/server/lib/cache/index.ts index 7bf63790a..54eb983fa 100644 --- a/server/lib/cache/index.ts +++ b/server/lib/cache/index.ts | |||
@@ -1 +1,2 @@ | |||
1 | export * from './videos-preview-cache' | 1 | export * from './videos-preview-cache' |
2 | export * from './videos-caption-cache' | ||
diff --git a/server/lib/redundancy.ts b/server/lib/redundancy.ts index 78221cc3d..16b122658 100644 --- a/server/lib/redundancy.ts +++ b/server/lib/redundancy.ts | |||
@@ -6,7 +6,8 @@ import { getServerActor } from '../helpers/utils' | |||
6 | async function removeVideoRedundancy (videoRedundancy: VideoRedundancyModel, t?: Transaction) { | 6 | async function removeVideoRedundancy (videoRedundancy: VideoRedundancyModel, t?: Transaction) { |
7 | const serverActor = await getServerActor() | 7 | const serverActor = await getServerActor() |
8 | 8 | ||
9 | await sendUndoCacheFile(serverActor, videoRedundancy, t) | 9 | // Local cache, send undo to remote instances |
10 | if (videoRedundancy.actorId === serverActor.id) await sendUndoCacheFile(serverActor, videoRedundancy, t) | ||
10 | 11 | ||
11 | await videoRedundancy.destroy({ transaction: t }) | 12 | await videoRedundancy.destroy({ transaction: t }) |
12 | } | 13 | } |
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts index 998d2295a..97df3e4f5 100644 --- a/server/lib/schedulers/videos-redundancy-scheduler.ts +++ b/server/lib/schedulers/videos-redundancy-scheduler.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { AbstractScheduler } from './abstract-scheduler' | 1 | import { AbstractScheduler } from './abstract-scheduler' |
2 | import { CONFIG, JOB_TTL, REDUNDANCY, SCHEDULER_INTERVALS_MS } from '../../initializers' | 2 | import { CONFIG, JOB_TTL, REDUNDANCY } from '../../initializers' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { VideoRedundancyStrategy, VideosRedundancy } from '../../../shared/models/redundancy' | 4 | import { VideosRedundancy } from '../../../shared/models/redundancy' |
5 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' | 5 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' |
6 | import { VideoFileModel } from '../../models/video/video-file' | 6 | import { VideoFileModel } from '../../models/video/video-file' |
7 | import { downloadWebTorrentVideo } from '../../helpers/webtorrent' | 7 | import { downloadWebTorrentVideo } from '../../helpers/webtorrent' |
@@ -12,6 +12,7 @@ import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' | |||
12 | import { VideoModel } from '../../models/video/video' | 12 | import { VideoModel } from '../../models/video/video' |
13 | import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' | 13 | import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' |
14 | import { isTestInstance } from '../../helpers/core-utils' | 14 | import { isTestInstance } from '../../helpers/core-utils' |
15 | import { removeVideoRedundancy } from '../redundancy' | ||
15 | 16 | ||
16 | export class VideosRedundancyScheduler extends AbstractScheduler { | 17 | export class VideosRedundancyScheduler extends AbstractScheduler { |
17 | 18 | ||
@@ -30,7 +31,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
30 | this.executing = true | 31 | this.executing = true |
31 | 32 | ||
32 | for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { | 33 | for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { |
33 | if (!isTestInstance()) logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) | 34 | logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) |
34 | 35 | ||
35 | try { | 36 | try { |
36 | const videoToDuplicate = await this.findVideoToDuplicate(obj) | 37 | const videoToDuplicate = await this.findVideoToDuplicate(obj) |
@@ -39,20 +40,24 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
39 | const videoFiles = videoToDuplicate.VideoFiles | 40 | const videoFiles = videoToDuplicate.VideoFiles |
40 | videoFiles.forEach(f => f.Video = videoToDuplicate) | 41 | videoFiles.forEach(f => f.Video = videoToDuplicate) |
41 | 42 | ||
42 | if (await this.isTooHeavy(obj.strategy, videoFiles, obj.size)) { | 43 | await this.purgeCacheIfNeeded(obj, videoFiles) |
43 | if (!isTestInstance()) logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) | 44 | |
45 | if (await this.isTooHeavy(obj, videoFiles)) { | ||
46 | logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) | ||
44 | continue | 47 | continue |
45 | } | 48 | } |
46 | 49 | ||
47 | logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) | 50 | logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) |
48 | 51 | ||
49 | await this.createVideoRedundancy(obj.strategy, videoFiles) | 52 | await this.createVideoRedundancy(obj, videoFiles) |
50 | } catch (err) { | 53 | } catch (err) { |
51 | logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) | 54 | logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) |
52 | } | 55 | } |
53 | } | 56 | } |
54 | 57 | ||
55 | await this.removeExpired() | 58 | await this.extendsLocalExpiration() |
59 | |||
60 | await this.purgeRemoteExpired() | ||
56 | 61 | ||
57 | this.executing = false | 62 | this.executing = false |
58 | } | 63 | } |
@@ -61,16 +66,27 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
61 | return this.instance || (this.instance = new this()) | 66 | return this.instance || (this.instance = new this()) |
62 | } | 67 | } |
63 | 68 | ||
64 | private async removeExpired () { | 69 | private async extendsLocalExpiration () { |
65 | const expired = await VideoRedundancyModel.listAllExpired() | 70 | const expired = await VideoRedundancyModel.listLocalExpired() |
71 | |||
72 | for (const redundancyModel of expired) { | ||
73 | try { | ||
74 | const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) | ||
75 | await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime) | ||
76 | } catch (err) { | ||
77 | logger.error('Cannot extend expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel)) | ||
78 | } | ||
79 | } | ||
80 | } | ||
66 | 81 | ||
67 | for (const m of expired) { | 82 | private async purgeRemoteExpired () { |
68 | logger.info('Removing expired video %s from our redundancy system.', this.buildEntryLogId(m)) | 83 | const expired = await VideoRedundancyModel.listRemoteExpired() |
69 | 84 | ||
85 | for (const redundancyModel of expired) { | ||
70 | try { | 86 | try { |
71 | await m.destroy() | 87 | await removeVideoRedundancy(redundancyModel) |
72 | } catch (err) { | 88 | } catch (err) { |
73 | logger.error('Cannot remove %s video from our redundancy system.', this.buildEntryLogId(m)) | 89 | logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel)) |
74 | } | 90 | } |
75 | } | 91 | } |
76 | } | 92 | } |
@@ -90,18 +106,14 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
90 | } | 106 | } |
91 | } | 107 | } |
92 | 108 | ||
93 | private async createVideoRedundancy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[]) { | 109 | private async createVideoRedundancy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { |
94 | const serverActor = await getServerActor() | 110 | const serverActor = await getServerActor() |
95 | 111 | ||
96 | for (const file of filesToDuplicate) { | 112 | for (const file of filesToDuplicate) { |
97 | const existing = await VideoRedundancyModel.loadByFileId(file.id) | 113 | const existing = await VideoRedundancyModel.loadByFileId(file.id) |
98 | if (existing) { | 114 | if (existing) { |
99 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', file.Video.url, file.resolution, strategy) | 115 | await this.extendsExpirationOf(existing, redundancy.minLifetime) |
100 | 116 | ||
101 | existing.expiresOn = this.buildNewExpiration() | ||
102 | await existing.save() | ||
103 | |||
104 | await sendUpdateCacheFile(serverActor, existing) | ||
105 | continue | 117 | continue |
106 | } | 118 | } |
107 | 119 | ||
@@ -109,7 +121,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
109 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id) | 121 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id) |
110 | if (!video) continue | 122 | if (!video) continue |
111 | 123 | ||
112 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy) | 124 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy) |
113 | 125 | ||
114 | const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() | 126 | const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() |
115 | const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) | 127 | const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) |
@@ -120,10 +132,10 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
120 | await rename(tmpPath, destPath) | 132 | await rename(tmpPath, destPath) |
121 | 133 | ||
122 | const createdModel = await VideoRedundancyModel.create({ | 134 | const createdModel = await VideoRedundancyModel.create({ |
123 | expiresOn: new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS), | 135 | expiresOn: this.buildNewExpiration(redundancy.minLifetime), |
124 | url: getVideoCacheFileActivityPubUrl(file), | 136 | url: getVideoCacheFileActivityPubUrl(file), |
125 | fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), | 137 | fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), |
126 | strategy, | 138 | strategy: redundancy.strategy, |
127 | videoFileId: file.id, | 139 | videoFileId: file.id, |
128 | actorId: serverActor.id | 140 | actorId: serverActor.id |
129 | }) | 141 | }) |
@@ -133,16 +145,36 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
133 | } | 145 | } |
134 | } | 146 | } |
135 | 147 | ||
136 | private async isTooHeavy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[], maxSizeArg: number) { | 148 | private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) { |
137 | const maxSize = maxSizeArg - this.getTotalFileSizes(filesToDuplicate) | 149 | logger.info('Extending expiration of %s.', redundancy.url) |
150 | |||
151 | const serverActor = await getServerActor() | ||
152 | |||
153 | redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs) | ||
154 | await redundancy.save() | ||
155 | |||
156 | await sendUpdateCacheFile(serverActor, redundancy) | ||
157 | } | ||
158 | |||
159 | private async purgeCacheIfNeeded (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { | ||
160 | while (this.isTooHeavy(redundancy, filesToDuplicate)) { | ||
161 | const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime) | ||
162 | if (!toDelete) return | ||
163 | |||
164 | await removeVideoRedundancy(toDelete) | ||
165 | } | ||
166 | } | ||
167 | |||
168 | private async isTooHeavy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { | ||
169 | const maxSize = redundancy.size - this.getTotalFileSizes(filesToDuplicate) | ||
138 | 170 | ||
139 | const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(strategy) | 171 | const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(redundancy.strategy) |
140 | 172 | ||
141 | return totalDuplicated > maxSize | 173 | return totalDuplicated > maxSize |
142 | } | 174 | } |
143 | 175 | ||
144 | private buildNewExpiration () { | 176 | private buildNewExpiration (expiresAfterMs: number) { |
145 | return new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS) | 177 | return new Date(Date.now() + expiresAfterMs) |
146 | } | 178 | } |
147 | 179 | ||
148 | private buildEntryLogId (object: VideoRedundancyModel) { | 180 | private buildEntryLogId (object: VideoRedundancyModel) { |