diff options
Diffstat (limited to 'server/lib/schedulers')
-rw-r--r-- | server/lib/schedulers/abstract-scheduler.ts | 18 | ||||
-rw-r--r-- | server/lib/schedulers/actor-follow-scheduler.ts (renamed from server/lib/schedulers/bad-actor-follow-scheduler.ts) | 23 | ||||
-rw-r--r-- | server/lib/schedulers/remove-old-jobs-scheduler.ts | 6 | ||||
-rw-r--r-- | server/lib/schedulers/update-videos-scheduler.ts | 32 | ||||
-rw-r--r-- | server/lib/schedulers/videos-redundancy-scheduler.ts | 201 | ||||
-rw-r--r-- | server/lib/schedulers/youtube-dl-update-scheduler.ts | 2 |
6 files changed, 192 insertions, 90 deletions
diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts index b9d0a4d17..86ea7aa38 100644 --- a/server/lib/schedulers/abstract-scheduler.ts +++ b/server/lib/schedulers/abstract-scheduler.ts | |||
@@ -1,8 +1,11 @@ | |||
1 | import { logger } from '../../helpers/logger' | ||
2 | |||
1 | export abstract class AbstractScheduler { | 3 | export abstract class AbstractScheduler { |
2 | 4 | ||
3 | protected abstract schedulerIntervalMs: number | 5 | protected abstract schedulerIntervalMs: number |
4 | 6 | ||
5 | private interval: NodeJS.Timer | 7 | private interval: NodeJS.Timer |
8 | private isRunning = false | ||
6 | 9 | ||
7 | enable () { | 10 | enable () { |
8 | if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') | 11 | if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') |
@@ -14,5 +17,18 @@ export abstract class AbstractScheduler { | |||
14 | clearInterval(this.interval) | 17 | clearInterval(this.interval) |
15 | } | 18 | } |
16 | 19 | ||
17 | abstract execute () | 20 | async execute () { |
21 | if (this.isRunning === true) return | ||
22 | this.isRunning = true | ||
23 | |||
24 | try { | ||
25 | await this.internalExecute() | ||
26 | } catch (err) { | ||
27 | logger.error('Cannot execute %s scheduler.', this.constructor.name, { err }) | ||
28 | } finally { | ||
29 | this.isRunning = false | ||
30 | } | ||
31 | } | ||
32 | |||
33 | protected abstract internalExecute (): Promise<any> | ||
18 | } | 34 | } |
diff --git a/server/lib/schedulers/bad-actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts index 617149aaf..3967be7f8 100644 --- a/server/lib/schedulers/bad-actor-follow-scheduler.ts +++ b/server/lib/schedulers/actor-follow-scheduler.ts | |||
@@ -3,18 +3,35 @@ import { logger } from '../../helpers/logger' | |||
3 | import { ActorFollowModel } from '../../models/activitypub/actor-follow' | 3 | import { ActorFollowModel } from '../../models/activitypub/actor-follow' |
4 | import { AbstractScheduler } from './abstract-scheduler' | 4 | import { AbstractScheduler } from './abstract-scheduler' |
5 | import { SCHEDULER_INTERVALS_MS } from '../../initializers' | 5 | import { SCHEDULER_INTERVALS_MS } from '../../initializers' |
6 | import { ActorFollowScoreCache } from '../cache' | ||
6 | 7 | ||
7 | export class BadActorFollowScheduler extends AbstractScheduler { | 8 | export class ActorFollowScheduler extends AbstractScheduler { |
8 | 9 | ||
9 | private static instance: AbstractScheduler | 10 | private static instance: AbstractScheduler |
10 | 11 | ||
11 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.badActorFollow | 12 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.actorFollowScores |
12 | 13 | ||
13 | private constructor () { | 14 | private constructor () { |
14 | super() | 15 | super() |
15 | } | 16 | } |
16 | 17 | ||
17 | async execute () { | 18 | protected async internalExecute () { |
19 | await this.processPendingScores() | ||
20 | |||
21 | await this.removeBadActorFollows() | ||
22 | } | ||
23 | |||
24 | private async processPendingScores () { | ||
25 | const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy() | ||
26 | |||
27 | ActorFollowScoreCache.Instance.clearPendingFollowsScore() | ||
28 | |||
29 | for (const inbox of Object.keys(pendingScores)) { | ||
30 | await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox]) | ||
31 | } | ||
32 | } | ||
33 | |||
34 | private async removeBadActorFollows () { | ||
18 | if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') | 35 | if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') |
19 | 36 | ||
20 | try { | 37 | try { |
diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts index a29a6b800..4a4341ba9 100644 --- a/server/lib/schedulers/remove-old-jobs-scheduler.ts +++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts | |||
@@ -14,10 +14,10 @@ export class RemoveOldJobsScheduler extends AbstractScheduler { | |||
14 | super() | 14 | super() |
15 | } | 15 | } |
16 | 16 | ||
17 | async execute () { | 17 | protected internalExecute () { |
18 | if (!isTestInstance()) logger.info('Removing old jobs (scheduler).') | 18 | if (!isTestInstance()) logger.info('Removing old jobs in scheduler.') |
19 | 19 | ||
20 | JobQueue.Instance.removeOldJobs() | 20 | return JobQueue.Instance.removeOldJobs() |
21 | } | 21 | } |
22 | 22 | ||
23 | static get Instance () { | 23 | static get Instance () { |
diff --git a/server/lib/schedulers/update-videos-scheduler.ts b/server/lib/schedulers/update-videos-scheduler.ts index fd2edfd17..2618a5857 100644 --- a/server/lib/schedulers/update-videos-scheduler.ts +++ b/server/lib/schedulers/update-videos-scheduler.ts | |||
@@ -5,6 +5,8 @@ import { retryTransactionWrapper } from '../../helpers/database-utils' | |||
5 | import { federateVideoIfNeeded } from '../activitypub' | 5 | import { federateVideoIfNeeded } from '../activitypub' |
6 | import { SCHEDULER_INTERVALS_MS, sequelizeTypescript } from '../../initializers' | 6 | import { SCHEDULER_INTERVALS_MS, sequelizeTypescript } from '../../initializers' |
7 | import { VideoPrivacy } from '../../../shared/models/videos' | 7 | import { VideoPrivacy } from '../../../shared/models/videos' |
8 | import { Notifier } from '../notifier' | ||
9 | import { VideoModel } from '../../models/video/video' | ||
8 | 10 | ||
9 | export class UpdateVideosScheduler extends AbstractScheduler { | 11 | export class UpdateVideosScheduler extends AbstractScheduler { |
10 | 12 | ||
@@ -12,30 +14,20 @@ export class UpdateVideosScheduler extends AbstractScheduler { | |||
12 | 14 | ||
13 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos | 15 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos |
14 | 16 | ||
15 | private isRunning = false | ||
16 | |||
17 | private constructor () { | 17 | private constructor () { |
18 | super() | 18 | super() |
19 | } | 19 | } |
20 | 20 | ||
21 | async execute () { | 21 | protected async internalExecute () { |
22 | if (this.isRunning === true) return | 22 | return retryTransactionWrapper(this.updateVideos.bind(this)) |
23 | this.isRunning = true | ||
24 | |||
25 | try { | ||
26 | await retryTransactionWrapper(this.updateVideos.bind(this)) | ||
27 | } catch (err) { | ||
28 | logger.error('Cannot execute update videos scheduler.', { err }) | ||
29 | } finally { | ||
30 | this.isRunning = false | ||
31 | } | ||
32 | } | 23 | } |
33 | 24 | ||
34 | private async updateVideos () { | 25 | private async updateVideos () { |
35 | if (!await ScheduleVideoUpdateModel.areVideosToUpdate()) return undefined | 26 | if (!await ScheduleVideoUpdateModel.areVideosToUpdate()) return undefined |
36 | 27 | ||
37 | return sequelizeTypescript.transaction(async t => { | 28 | const publishedVideos = await sequelizeTypescript.transaction(async t => { |
38 | const schedules = await ScheduleVideoUpdateModel.listVideosToUpdate(t) | 29 | const schedules = await ScheduleVideoUpdateModel.listVideosToUpdate(t) |
30 | const publishedVideos: VideoModel[] = [] | ||
39 | 31 | ||
40 | for (const schedule of schedules) { | 32 | for (const schedule of schedules) { |
41 | const video = schedule.Video | 33 | const video = schedule.Video |
@@ -50,11 +42,23 @@ export class UpdateVideosScheduler extends AbstractScheduler { | |||
50 | 42 | ||
51 | await video.save({ transaction: t }) | 43 | await video.save({ transaction: t }) |
52 | await federateVideoIfNeeded(video, isNewVideo, t) | 44 | await federateVideoIfNeeded(video, isNewVideo, t) |
45 | |||
46 | if (oldPrivacy === VideoPrivacy.UNLISTED || oldPrivacy === VideoPrivacy.PRIVATE) { | ||
47 | video.ScheduleVideoUpdate = schedule | ||
48 | publishedVideos.push(video) | ||
49 | } | ||
53 | } | 50 | } |
54 | 51 | ||
55 | await schedule.destroy({ transaction: t }) | 52 | await schedule.destroy({ transaction: t }) |
56 | } | 53 | } |
54 | |||
55 | return publishedVideos | ||
57 | }) | 56 | }) |
57 | |||
58 | for (const v of publishedVideos) { | ||
59 | Notifier.Instance.notifyOnNewVideo(v) | ||
60 | Notifier.Instance.notifyOnPendingVideoPublished(v) | ||
61 | } | ||
58 | } | 62 | } |
59 | 63 | ||
60 | static get Instance () { | 64 | static get Instance () { |
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts index c49a8c89a..1a48f2bd0 100644 --- a/server/lib/schedulers/videos-redundancy-scheduler.ts +++ b/server/lib/schedulers/videos-redundancy-scheduler.ts | |||
@@ -1,22 +1,31 @@ | |||
1 | import { AbstractScheduler } from './abstract-scheduler' | 1 | import { AbstractScheduler } from './abstract-scheduler' |
2 | import { CONFIG, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers' | 2 | import { CONFIG, HLS_REDUNDANCY_DIRECTORY, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { VideosRedundancy } from '../../../shared/models/redundancy' | 4 | import { VideosRedundancy } from '../../../shared/models/redundancy' |
5 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' | 5 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' |
6 | import { VideoFileModel } from '../../models/video/video-file' | 6 | import { VideoFileModel } from '../../models/video/video-file' |
7 | import { downloadWebTorrentVideo } from '../../helpers/webtorrent' | 7 | import { downloadWebTorrentVideo } from '../../helpers/webtorrent' |
8 | import { join } from 'path' | 8 | import { join } from 'path' |
9 | import { rename } from 'fs-extra' | 9 | import { move } from 'fs-extra' |
10 | import { getServerActor } from '../../helpers/utils' | 10 | import { getServerActor } from '../../helpers/utils' |
11 | import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' | 11 | import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' |
12 | import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' | 12 | import { getVideoCacheFileActivityPubUrl, getVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url' |
13 | import { removeVideoRedundancy } from '../redundancy' | 13 | import { removeVideoRedundancy } from '../redundancy' |
14 | import { getOrCreateVideoAndAccountAndChannel } from '../activitypub' | 14 | import { getOrCreateVideoAndAccountAndChannel } from '../activitypub' |
15 | import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist' | ||
16 | import { VideoModel } from '../../models/video/video' | ||
17 | import { downloadPlaylistSegments } from '../hls' | ||
18 | |||
19 | type CandidateToDuplicate = { | ||
20 | redundancy: VideosRedundancy, | ||
21 | video: VideoModel, | ||
22 | files: VideoFileModel[], | ||
23 | streamingPlaylists: VideoStreamingPlaylistModel[] | ||
24 | } | ||
15 | 25 | ||
16 | export class VideosRedundancyScheduler extends AbstractScheduler { | 26 | export class VideosRedundancyScheduler extends AbstractScheduler { |
17 | 27 | ||
18 | private static instance: AbstractScheduler | 28 | private static instance: AbstractScheduler |
19 | private executing = false | ||
20 | 29 | ||
21 | protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL | 30 | protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL |
22 | 31 | ||
@@ -24,41 +33,39 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
24 | super() | 33 | super() |
25 | } | 34 | } |
26 | 35 | ||
27 | async execute () { | 36 | protected async internalExecute () { |
28 | if (this.executing) return | 37 | for (const redundancyConfig of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { |
29 | 38 | logger.info('Running redundancy scheduler for strategy %s.', redundancyConfig.strategy) | |
30 | this.executing = true | ||
31 | |||
32 | for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { | ||
33 | logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) | ||
34 | 39 | ||
35 | try { | 40 | try { |
36 | const videoToDuplicate = await this.findVideoToDuplicate(obj) | 41 | const videoToDuplicate = await this.findVideoToDuplicate(redundancyConfig) |
37 | if (!videoToDuplicate) continue | 42 | if (!videoToDuplicate) continue |
38 | 43 | ||
39 | const videoFiles = videoToDuplicate.VideoFiles | 44 | const candidateToDuplicate = { |
40 | videoFiles.forEach(f => f.Video = videoToDuplicate) | 45 | video: videoToDuplicate, |
46 | redundancy: redundancyConfig, | ||
47 | files: videoToDuplicate.VideoFiles, | ||
48 | streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists | ||
49 | } | ||
41 | 50 | ||
42 | await this.purgeCacheIfNeeded(obj, videoFiles) | 51 | await this.purgeCacheIfNeeded(candidateToDuplicate) |
43 | 52 | ||
44 | if (await this.isTooHeavy(obj, videoFiles)) { | 53 | if (await this.isTooHeavy(candidateToDuplicate)) { |
45 | logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) | 54 | logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) |
46 | continue | 55 | continue |
47 | } | 56 | } |
48 | 57 | ||
49 | logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) | 58 | logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, redundancyConfig.strategy) |
50 | 59 | ||
51 | await this.createVideoRedundancy(obj, videoFiles) | 60 | await this.createVideoRedundancies(candidateToDuplicate) |
52 | } catch (err) { | 61 | } catch (err) { |
53 | logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) | 62 | logger.error('Cannot run videos redundancy %s.', redundancyConfig.strategy, { err }) |
54 | } | 63 | } |
55 | } | 64 | } |
56 | 65 | ||
57 | await this.extendsLocalExpiration() | 66 | await this.extendsLocalExpiration() |
58 | 67 | ||
59 | await this.purgeRemoteExpired() | 68 | await this.purgeRemoteExpired() |
60 | |||
61 | this.executing = false | ||
62 | } | 69 | } |
63 | 70 | ||
64 | static get Instance () { | 71 | static get Instance () { |
@@ -70,25 +77,35 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
70 | 77 | ||
71 | for (const redundancyModel of expired) { | 78 | for (const redundancyModel of expired) { |
72 | try { | 79 | try { |
73 | await this.extendsOrDeleteRedundancy(redundancyModel) | 80 | const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) |
81 | const candidate = { | ||
82 | redundancy: redundancyConfig, | ||
83 | video: null, | ||
84 | files: [], | ||
85 | streamingPlaylists: [] | ||
86 | } | ||
87 | |||
88 | // If the administrator disabled the redundancy or decreased the cache size, remove this redundancy instead of extending it | ||
89 | if (!redundancyConfig || await this.isTooHeavy(candidate)) { | ||
90 | logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy) | ||
91 | await removeVideoRedundancy(redundancyModel) | ||
92 | } else { | ||
93 | await this.extendsRedundancy(redundancyModel) | ||
94 | } | ||
74 | } catch (err) { | 95 | } catch (err) { |
75 | logger.error('Cannot extend expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel)) | 96 | logger.error( |
97 | 'Cannot extend or remove expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel), | ||
98 | { err } | ||
99 | ) | ||
76 | } | 100 | } |
77 | } | 101 | } |
78 | } | 102 | } |
79 | 103 | ||
80 | private async extendsOrDeleteRedundancy (redundancyModel: VideoRedundancyModel) { | 104 | private async extendsRedundancy (redundancyModel: VideoRedundancyModel) { |
81 | // Refresh the video, maybe it was deleted | ||
82 | const video = await this.loadAndRefreshVideo(redundancyModel.VideoFile.Video.url) | ||
83 | |||
84 | if (!video) { | ||
85 | logger.info('Destroying existing redundancy %s, because the associated video does not exist anymore.', redundancyModel.url) | ||
86 | |||
87 | await redundancyModel.destroy() | ||
88 | return | ||
89 | } | ||
90 | |||
91 | const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) | 105 | const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) |
106 | // Redundancy strategy disabled, remove our redundancy instead of extending expiration | ||
107 | if (!redundancy) await removeVideoRedundancy(redundancyModel) | ||
108 | |||
92 | await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime) | 109 | await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime) |
93 | } | 110 | } |
94 | 111 | ||
@@ -119,49 +136,93 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
119 | } | 136 | } |
120 | } | 137 | } |
121 | 138 | ||
122 | private async createVideoRedundancy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { | 139 | private async createVideoRedundancies (data: CandidateToDuplicate) { |
123 | const serverActor = await getServerActor() | 140 | const video = await this.loadAndRefreshVideo(data.video.url) |
124 | 141 | ||
125 | for (const file of filesToDuplicate) { | 142 | if (!video) { |
126 | const video = await this.loadAndRefreshVideo(file.Video.url) | 143 | logger.info('Video %s we want to duplicate does not existing anymore, skipping.', data.video.url) |
127 | 144 | ||
145 | return | ||
146 | } | ||
147 | |||
148 | for (const file of data.files) { | ||
128 | const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id) | 149 | const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id) |
129 | if (existingRedundancy) { | 150 | if (existingRedundancy) { |
130 | await this.extendsOrDeleteRedundancy(existingRedundancy) | 151 | await this.extendsRedundancy(existingRedundancy) |
131 | 152 | ||
132 | continue | 153 | continue |
133 | } | 154 | } |
134 | 155 | ||
135 | if (!video) { | 156 | await this.createVideoFileRedundancy(data.redundancy, video, file) |
136 | logger.info('Video %s we want to duplicate does not existing anymore, skipping.', file.Video.url) | 157 | } |
158 | |||
159 | for (const streamingPlaylist of data.streamingPlaylists) { | ||
160 | const existingRedundancy = await VideoRedundancyModel.loadLocalByStreamingPlaylistId(streamingPlaylist.id) | ||
161 | if (existingRedundancy) { | ||
162 | await this.extendsRedundancy(existingRedundancy) | ||
137 | 163 | ||
138 | continue | 164 | continue |
139 | } | 165 | } |
140 | 166 | ||
141 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy) | 167 | await this.createStreamingPlaylistRedundancy(data.redundancy, video, streamingPlaylist) |
168 | } | ||
169 | } | ||
142 | 170 | ||
143 | const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() | 171 | private async createVideoFileRedundancy (redundancy: VideosRedundancy, video: VideoModel, file: VideoFileModel) { |
144 | const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) | 172 | file.Video = video |
145 | 173 | ||
146 | const tmpPath = await downloadWebTorrentVideo({ magnetUri }, VIDEO_IMPORT_TIMEOUT) | 174 | const serverActor = await getServerActor() |
147 | 175 | ||
148 | const destPath = join(CONFIG.STORAGE.VIDEOS_DIR, video.getVideoFilename(file)) | 176 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy) |
149 | await rename(tmpPath, destPath) | ||
150 | 177 | ||
151 | const createdModel = await VideoRedundancyModel.create({ | 178 | const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() |
152 | expiresOn: this.buildNewExpiration(redundancy.minLifetime), | 179 | const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) |
153 | url: getVideoCacheFileActivityPubUrl(file), | ||
154 | fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), | ||
155 | strategy: redundancy.strategy, | ||
156 | videoFileId: file.id, | ||
157 | actorId: serverActor.id | ||
158 | }) | ||
159 | createdModel.VideoFile = file | ||
160 | 180 | ||
161 | await sendCreateCacheFile(serverActor, createdModel) | 181 | const tmpPath = await downloadWebTorrentVideo({ magnetUri }, VIDEO_IMPORT_TIMEOUT) |
162 | 182 | ||
163 | logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url) | 183 | const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, video.getVideoFilename(file)) |
164 | } | 184 | await move(tmpPath, destPath) |
185 | |||
186 | const createdModel = await VideoRedundancyModel.create({ | ||
187 | expiresOn: this.buildNewExpiration(redundancy.minLifetime), | ||
188 | url: getVideoCacheFileActivityPubUrl(file), | ||
189 | fileUrl: video.getVideoRedundancyUrl(file, CONFIG.WEBSERVER.URL), | ||
190 | strategy: redundancy.strategy, | ||
191 | videoFileId: file.id, | ||
192 | actorId: serverActor.id | ||
193 | }) | ||
194 | |||
195 | createdModel.VideoFile = file | ||
196 | |||
197 | await sendCreateCacheFile(serverActor, video, createdModel) | ||
198 | |||
199 | logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url) | ||
200 | } | ||
201 | |||
202 | private async createStreamingPlaylistRedundancy (redundancy: VideosRedundancy, video: VideoModel, playlist: VideoStreamingPlaylistModel) { | ||
203 | playlist.Video = video | ||
204 | |||
205 | const serverActor = await getServerActor() | ||
206 | |||
207 | logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, redundancy.strategy) | ||
208 | |||
209 | const destDirectory = join(HLS_REDUNDANCY_DIRECTORY, video.uuid) | ||
210 | await downloadPlaylistSegments(playlist.playlistUrl, destDirectory, VIDEO_IMPORT_TIMEOUT) | ||
211 | |||
212 | const createdModel = await VideoRedundancyModel.create({ | ||
213 | expiresOn: this.buildNewExpiration(redundancy.minLifetime), | ||
214 | url: getVideoCacheStreamingPlaylistActivityPubUrl(video, playlist), | ||
215 | fileUrl: playlist.getVideoRedundancyUrl(CONFIG.WEBSERVER.URL), | ||
216 | strategy: redundancy.strategy, | ||
217 | videoStreamingPlaylistId: playlist.id, | ||
218 | actorId: serverActor.id | ||
219 | }) | ||
220 | |||
221 | createdModel.VideoStreamingPlaylist = playlist | ||
222 | |||
223 | await sendCreateCacheFile(serverActor, video, createdModel) | ||
224 | |||
225 | logger.info('Duplicated playlist %s -> %s.', playlist.playlistUrl, createdModel.url) | ||
165 | } | 226 | } |
166 | 227 | ||
167 | private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) { | 228 | private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) { |
@@ -175,8 +236,9 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
175 | await sendUpdateCacheFile(serverActor, redundancy) | 236 | await sendUpdateCacheFile(serverActor, redundancy) |
176 | } | 237 | } |
177 | 238 | ||
178 | private async purgeCacheIfNeeded (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { | 239 | private async purgeCacheIfNeeded (candidateToDuplicate: CandidateToDuplicate) { |
179 | while (this.isTooHeavy(redundancy, filesToDuplicate)) { | 240 | while (this.isTooHeavy(candidateToDuplicate)) { |
241 | const redundancy = candidateToDuplicate.redundancy | ||
180 | const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime) | 242 | const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime) |
181 | if (!toDelete) return | 243 | if (!toDelete) return |
182 | 244 | ||
@@ -184,12 +246,13 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
184 | } | 246 | } |
185 | } | 247 | } |
186 | 248 | ||
187 | private async isTooHeavy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { | 249 | private async isTooHeavy (candidateToDuplicate: CandidateToDuplicate) { |
188 | const maxSize = redundancy.size - this.getTotalFileSizes(filesToDuplicate) | 250 | const maxSize = candidateToDuplicate.redundancy.size |
189 | 251 | ||
190 | const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(redundancy.strategy) | 252 | const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(candidateToDuplicate.redundancy.strategy) |
253 | const totalWillDuplicate = totalDuplicated + this.getTotalFileSizes(candidateToDuplicate.files, candidateToDuplicate.streamingPlaylists) | ||
191 | 254 | ||
192 | return totalDuplicated > maxSize | 255 | return totalWillDuplicate > maxSize |
193 | } | 256 | } |
194 | 257 | ||
195 | private buildNewExpiration (expiresAfterMs: number) { | 258 | private buildNewExpiration (expiresAfterMs: number) { |
@@ -197,13 +260,15 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
197 | } | 260 | } |
198 | 261 | ||
199 | private buildEntryLogId (object: VideoRedundancyModel) { | 262 | private buildEntryLogId (object: VideoRedundancyModel) { |
200 | return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}` | 263 | if (object.VideoFile) return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}` |
264 | |||
265 | return `${object.VideoStreamingPlaylist.playlistUrl}` | ||
201 | } | 266 | } |
202 | 267 | ||
203 | private getTotalFileSizes (files: VideoFileModel[]) { | 268 | private getTotalFileSizes (files: VideoFileModel[], playlists: VideoStreamingPlaylistModel[]) { |
204 | const fileReducer = (previous: number, current: VideoFileModel) => previous + current.size | 269 | const fileReducer = (previous: number, current: VideoFileModel) => previous + current.size |
205 | 270 | ||
206 | return files.reduce(fileReducer, 0) | 271 | return files.reduce(fileReducer, 0) * playlists.length |
207 | } | 272 | } |
208 | 273 | ||
209 | private async loadAndRefreshVideo (videoUrl: string) { | 274 | private async loadAndRefreshVideo (videoUrl: string) { |
diff --git a/server/lib/schedulers/youtube-dl-update-scheduler.ts b/server/lib/schedulers/youtube-dl-update-scheduler.ts index 461cd045e..aa027116d 100644 --- a/server/lib/schedulers/youtube-dl-update-scheduler.ts +++ b/server/lib/schedulers/youtube-dl-update-scheduler.ts | |||
@@ -12,7 +12,7 @@ export class YoutubeDlUpdateScheduler extends AbstractScheduler { | |||
12 | super() | 12 | super() |
13 | } | 13 | } |
14 | 14 | ||
15 | execute () { | 15 | protected internalExecute () { |
16 | return updateYoutubeDLBinary() | 16 | return updateYoutubeDLBinary() |
17 | } | 17 | } |
18 | 18 | ||