diff options
Diffstat (limited to 'server/lib/schedulers')
15 files changed, 0 insertions, 1047 deletions
diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts deleted file mode 100644 index f3d51a22e..000000000 --- a/server/lib/schedulers/abstract-scheduler.ts +++ /dev/null | |||
@@ -1,35 +0,0 @@ | |||
1 | import Bluebird from 'bluebird' | ||
2 | import { logger } from '../../helpers/logger' | ||
3 | |||
4 | export abstract class AbstractScheduler { | ||
5 | |||
6 | protected abstract schedulerIntervalMs: number | ||
7 | |||
8 | private interval: NodeJS.Timer | ||
9 | private isRunning = false | ||
10 | |||
11 | enable () { | ||
12 | if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') | ||
13 | |||
14 | this.interval = setInterval(() => this.execute(), this.schedulerIntervalMs) | ||
15 | } | ||
16 | |||
17 | disable () { | ||
18 | clearInterval(this.interval) | ||
19 | } | ||
20 | |||
21 | async execute () { | ||
22 | if (this.isRunning === true) return | ||
23 | this.isRunning = true | ||
24 | |||
25 | try { | ||
26 | await this.internalExecute() | ||
27 | } catch (err) { | ||
28 | logger.error('Cannot execute %s scheduler.', this.constructor.name, { err }) | ||
29 | } finally { | ||
30 | this.isRunning = false | ||
31 | } | ||
32 | } | ||
33 | |||
34 | protected abstract internalExecute (): Promise<any> | Bluebird<any> | ||
35 | } | ||
diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts deleted file mode 100644 index e1c56c135..000000000 --- a/server/lib/schedulers/actor-follow-scheduler.ts +++ /dev/null | |||
@@ -1,54 +0,0 @@ | |||
1 | import { isTestOrDevInstance } from '../../helpers/core-utils' | ||
2 | import { logger } from '../../helpers/logger' | ||
3 | import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
4 | import { ActorFollowModel } from '../../models/actor/actor-follow' | ||
5 | import { ActorFollowHealthCache } from '../actor-follow-health-cache' | ||
6 | import { AbstractScheduler } from './abstract-scheduler' | ||
7 | |||
8 | export class ActorFollowScheduler extends AbstractScheduler { | ||
9 | |||
10 | private static instance: AbstractScheduler | ||
11 | |||
12 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.ACTOR_FOLLOW_SCORES | ||
13 | |||
14 | private constructor () { | ||
15 | super() | ||
16 | } | ||
17 | |||
18 | protected async internalExecute () { | ||
19 | await this.processPendingScores() | ||
20 | |||
21 | await this.removeBadActorFollows() | ||
22 | } | ||
23 | |||
24 | private async processPendingScores () { | ||
25 | const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore() | ||
26 | const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds() | ||
27 | const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds() | ||
28 | |||
29 | ActorFollowHealthCache.Instance.clearPendingFollowsScore() | ||
30 | ActorFollowHealthCache.Instance.clearBadFollowingServerIds() | ||
31 | ActorFollowHealthCache.Instance.clearGoodFollowingServerIds() | ||
32 | |||
33 | for (const inbox of Object.keys(pendingScores)) { | ||
34 | await ActorFollowModel.updateScore(inbox, pendingScores[inbox]) | ||
35 | } | ||
36 | |||
37 | await ActorFollowModel.updateScoreByFollowingServers(badServerIds, ACTOR_FOLLOW_SCORE.PENALTY) | ||
38 | await ActorFollowModel.updateScoreByFollowingServers(goodServerIds, ACTOR_FOLLOW_SCORE.BONUS) | ||
39 | } | ||
40 | |||
41 | private async removeBadActorFollows () { | ||
42 | if (!isTestOrDevInstance()) logger.info('Removing bad actor follows (scheduler).') | ||
43 | |||
44 | try { | ||
45 | await ActorFollowModel.removeBadActorFollows() | ||
46 | } catch (err) { | ||
47 | logger.error('Error in bad actor follows scheduler.', { err }) | ||
48 | } | ||
49 | } | ||
50 | |||
51 | static get Instance () { | ||
52 | return this.instance || (this.instance = new this()) | ||
53 | } | ||
54 | } | ||
diff --git a/server/lib/schedulers/auto-follow-index-instances.ts b/server/lib/schedulers/auto-follow-index-instances.ts deleted file mode 100644 index 956ece749..000000000 --- a/server/lib/schedulers/auto-follow-index-instances.ts +++ /dev/null | |||
@@ -1,75 +0,0 @@ | |||
1 | import { chunk } from 'lodash' | ||
2 | import { doJSONRequest } from '@server/helpers/requests' | ||
3 | import { JobQueue } from '@server/lib/job-queue' | ||
4 | import { ActorFollowModel } from '@server/models/actor/actor-follow' | ||
5 | import { getServerActor } from '@server/models/application/application' | ||
6 | import { logger } from '../../helpers/logger' | ||
7 | import { CONFIG } from '../../initializers/config' | ||
8 | import { SCHEDULER_INTERVALS_MS, SERVER_ACTOR_NAME } from '../../initializers/constants' | ||
9 | import { AbstractScheduler } from './abstract-scheduler' | ||
10 | |||
11 | export class AutoFollowIndexInstances extends AbstractScheduler { | ||
12 | |||
13 | private static instance: AbstractScheduler | ||
14 | |||
15 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES | ||
16 | |||
17 | private lastCheck: Date | ||
18 | |||
19 | private constructor () { | ||
20 | super() | ||
21 | } | ||
22 | |||
23 | protected async internalExecute () { | ||
24 | return this.autoFollow() | ||
25 | } | ||
26 | |||
27 | private async autoFollow () { | ||
28 | if (CONFIG.FOLLOWINGS.INSTANCE.AUTO_FOLLOW_INDEX.ENABLED === false) return | ||
29 | |||
30 | const indexUrl = CONFIG.FOLLOWINGS.INSTANCE.AUTO_FOLLOW_INDEX.INDEX_URL | ||
31 | |||
32 | logger.info('Auto follow instances of index %s.', indexUrl) | ||
33 | |||
34 | try { | ||
35 | const serverActor = await getServerActor() | ||
36 | |||
37 | const searchParams = { count: 1000 } | ||
38 | if (this.lastCheck) Object.assign(searchParams, { since: this.lastCheck.toISOString() }) | ||
39 | |||
40 | this.lastCheck = new Date() | ||
41 | |||
42 | const { body } = await doJSONRequest<any>(indexUrl, { searchParams }) | ||
43 | if (!body.data || Array.isArray(body.data) === false) { | ||
44 | logger.error('Cannot auto follow instances of index %s. Please check the auto follow URL.', indexUrl, { body }) | ||
45 | return | ||
46 | } | ||
47 | |||
48 | const hosts: string[] = body.data.map(o => o.host) | ||
49 | const chunks = chunk(hosts, 20) | ||
50 | |||
51 | for (const chunk of chunks) { | ||
52 | const unfollowedHosts = await ActorFollowModel.keepUnfollowedInstance(chunk) | ||
53 | |||
54 | for (const unfollowedHost of unfollowedHosts) { | ||
55 | const payload = { | ||
56 | host: unfollowedHost, | ||
57 | name: SERVER_ACTOR_NAME, | ||
58 | followerActorId: serverActor.id, | ||
59 | isAutoFollow: true | ||
60 | } | ||
61 | |||
62 | JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) | ||
63 | } | ||
64 | } | ||
65 | |||
66 | } catch (err) { | ||
67 | logger.error('Cannot auto follow hosts of index %s.', indexUrl, { err }) | ||
68 | } | ||
69 | |||
70 | } | ||
71 | |||
72 | static get Instance () { | ||
73 | return this.instance || (this.instance = new this()) | ||
74 | } | ||
75 | } | ||
diff --git a/server/lib/schedulers/geo-ip-update-scheduler.ts b/server/lib/schedulers/geo-ip-update-scheduler.ts deleted file mode 100644 index b06f5a9b5..000000000 --- a/server/lib/schedulers/geo-ip-update-scheduler.ts +++ /dev/null | |||
@@ -1,22 +0,0 @@ | |||
1 | import { GeoIP } from '@server/helpers/geo-ip' | ||
2 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
3 | import { AbstractScheduler } from './abstract-scheduler' | ||
4 | |||
5 | export class GeoIPUpdateScheduler extends AbstractScheduler { | ||
6 | |||
7 | private static instance: AbstractScheduler | ||
8 | |||
9 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.GEO_IP_UPDATE | ||
10 | |||
11 | private constructor () { | ||
12 | super() | ||
13 | } | ||
14 | |||
15 | protected internalExecute () { | ||
16 | return GeoIP.Instance.updateDatabase() | ||
17 | } | ||
18 | |||
19 | static get Instance () { | ||
20 | return this.instance || (this.instance = new this()) | ||
21 | } | ||
22 | } | ||
diff --git a/server/lib/schedulers/peertube-version-check-scheduler.ts b/server/lib/schedulers/peertube-version-check-scheduler.ts deleted file mode 100644 index bc38ed49f..000000000 --- a/server/lib/schedulers/peertube-version-check-scheduler.ts +++ /dev/null | |||
@@ -1,55 +0,0 @@ | |||
1 | |||
2 | import { doJSONRequest } from '@server/helpers/requests' | ||
3 | import { ApplicationModel } from '@server/models/application/application' | ||
4 | import { compareSemVer } from '@shared/core-utils' | ||
5 | import { JoinPeerTubeVersions } from '@shared/models' | ||
6 | import { logger } from '../../helpers/logger' | ||
7 | import { CONFIG } from '../../initializers/config' | ||
8 | import { PEERTUBE_VERSION, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
9 | import { Notifier } from '../notifier' | ||
10 | import { AbstractScheduler } from './abstract-scheduler' | ||
11 | |||
12 | export class PeerTubeVersionCheckScheduler extends AbstractScheduler { | ||
13 | |||
14 | private static instance: AbstractScheduler | ||
15 | |||
16 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION | ||
17 | |||
18 | private constructor () { | ||
19 | super() | ||
20 | } | ||
21 | |||
22 | protected async internalExecute () { | ||
23 | return this.checkLatestVersion() | ||
24 | } | ||
25 | |||
26 | private async checkLatestVersion () { | ||
27 | if (CONFIG.PEERTUBE.CHECK_LATEST_VERSION.ENABLED === false) return | ||
28 | |||
29 | logger.info('Checking latest PeerTube version.') | ||
30 | |||
31 | const { body } = await doJSONRequest<JoinPeerTubeVersions>(CONFIG.PEERTUBE.CHECK_LATEST_VERSION.URL) | ||
32 | |||
33 | if (!body?.peertube?.latestVersion) { | ||
34 | logger.warn('Cannot check latest PeerTube version: body is invalid.', { body }) | ||
35 | return | ||
36 | } | ||
37 | |||
38 | const latestVersion = body.peertube.latestVersion | ||
39 | const application = await ApplicationModel.load() | ||
40 | |||
41 | // Already checked this version | ||
42 | if (application.latestPeerTubeVersion === latestVersion) return | ||
43 | |||
44 | if (compareSemVer(PEERTUBE_VERSION, latestVersion) < 0) { | ||
45 | application.latestPeerTubeVersion = latestVersion | ||
46 | await application.save() | ||
47 | |||
48 | Notifier.Instance.notifyOfNewPeerTubeVersion(application, latestVersion) | ||
49 | } | ||
50 | } | ||
51 | |||
52 | static get Instance () { | ||
53 | return this.instance || (this.instance = new this()) | ||
54 | } | ||
55 | } | ||
diff --git a/server/lib/schedulers/plugins-check-scheduler.ts b/server/lib/schedulers/plugins-check-scheduler.ts deleted file mode 100644 index 820c01693..000000000 --- a/server/lib/schedulers/plugins-check-scheduler.ts +++ /dev/null | |||
@@ -1,74 +0,0 @@ | |||
1 | import { chunk } from 'lodash' | ||
2 | import { compareSemVer } from '@shared/core-utils' | ||
3 | import { logger } from '../../helpers/logger' | ||
4 | import { CONFIG } from '../../initializers/config' | ||
5 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
6 | import { PluginModel } from '../../models/server/plugin' | ||
7 | import { Notifier } from '../notifier' | ||
8 | import { getLatestPluginsVersion } from '../plugins/plugin-index' | ||
9 | import { AbstractScheduler } from './abstract-scheduler' | ||
10 | |||
11 | export class PluginsCheckScheduler extends AbstractScheduler { | ||
12 | |||
13 | private static instance: AbstractScheduler | ||
14 | |||
15 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.CHECK_PLUGINS | ||
16 | |||
17 | private constructor () { | ||
18 | super() | ||
19 | } | ||
20 | |||
21 | protected async internalExecute () { | ||
22 | return this.checkLatestPluginsVersion() | ||
23 | } | ||
24 | |||
25 | private async checkLatestPluginsVersion () { | ||
26 | if (CONFIG.PLUGINS.INDEX.ENABLED === false) return | ||
27 | |||
28 | logger.info('Checking latest plugins version.') | ||
29 | |||
30 | const plugins = await PluginModel.listInstalled() | ||
31 | |||
32 | // Process 10 plugins in 1 HTTP request | ||
33 | const chunks = chunk(plugins, 10) | ||
34 | for (const chunk of chunks) { | ||
35 | // Find plugins according to their npm name | ||
36 | const pluginIndex: { [npmName: string]: PluginModel } = {} | ||
37 | for (const plugin of chunk) { | ||
38 | pluginIndex[PluginModel.buildNpmName(plugin.name, plugin.type)] = plugin | ||
39 | } | ||
40 | |||
41 | const npmNames = Object.keys(pluginIndex) | ||
42 | |||
43 | try { | ||
44 | const results = await getLatestPluginsVersion(npmNames) | ||
45 | |||
46 | for (const result of results) { | ||
47 | const plugin = pluginIndex[result.npmName] | ||
48 | if (!result.latestVersion) continue | ||
49 | |||
50 | if ( | ||
51 | !plugin.latestVersion || | ||
52 | (plugin.latestVersion !== result.latestVersion && compareSemVer(plugin.latestVersion, result.latestVersion) < 0) | ||
53 | ) { | ||
54 | plugin.latestVersion = result.latestVersion | ||
55 | await plugin.save() | ||
56 | |||
57 | // Notify if there is an higher plugin version available | ||
58 | if (compareSemVer(plugin.version, result.latestVersion) < 0) { | ||
59 | Notifier.Instance.notifyOfNewPluginVersion(plugin) | ||
60 | } | ||
61 | |||
62 | logger.info('Plugin %s has a new latest version %s.', result.npmName, plugin.latestVersion) | ||
63 | } | ||
64 | } | ||
65 | } catch (err) { | ||
66 | logger.error('Cannot get latest plugins version.', { npmNames, err }) | ||
67 | } | ||
68 | } | ||
69 | } | ||
70 | |||
71 | static get Instance () { | ||
72 | return this.instance || (this.instance = new this()) | ||
73 | } | ||
74 | } | ||
diff --git a/server/lib/schedulers/remove-dangling-resumable-uploads-scheduler.ts b/server/lib/schedulers/remove-dangling-resumable-uploads-scheduler.ts deleted file mode 100644 index 61e93eafa..000000000 --- a/server/lib/schedulers/remove-dangling-resumable-uploads-scheduler.ts +++ /dev/null | |||
@@ -1,40 +0,0 @@ | |||
1 | |||
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
3 | import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' | ||
4 | import { uploadx } from '../uploadx' | ||
5 | import { AbstractScheduler } from './abstract-scheduler' | ||
6 | |||
7 | const lTags = loggerTagsFactory('scheduler', 'resumable-upload', 'cleaner') | ||
8 | |||
9 | export class RemoveDanglingResumableUploadsScheduler extends AbstractScheduler { | ||
10 | |||
11 | private static instance: AbstractScheduler | ||
12 | private lastExecutionTimeMs: number | ||
13 | |||
14 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.REMOVE_DANGLING_RESUMABLE_UPLOADS | ||
15 | |||
16 | private constructor () { | ||
17 | super() | ||
18 | |||
19 | this.lastExecutionTimeMs = new Date().getTime() | ||
20 | } | ||
21 | |||
22 | protected async internalExecute () { | ||
23 | logger.debug('Removing dangling resumable uploads', lTags()) | ||
24 | |||
25 | const now = new Date().getTime() | ||
26 | |||
27 | try { | ||
28 | // Remove files that were not updated since the last execution | ||
29 | await uploadx.storage.purge(now - this.lastExecutionTimeMs) | ||
30 | } catch (error) { | ||
31 | logger.error('Failed to handle file during resumable video upload folder cleanup', { error, ...lTags() }) | ||
32 | } finally { | ||
33 | this.lastExecutionTimeMs = now | ||
34 | } | ||
35 | } | ||
36 | |||
37 | static get Instance () { | ||
38 | return this.instance || (this.instance = new this()) | ||
39 | } | ||
40 | } | ||
diff --git a/server/lib/schedulers/remove-old-history-scheduler.ts b/server/lib/schedulers/remove-old-history-scheduler.ts deleted file mode 100644 index 34b160799..000000000 --- a/server/lib/schedulers/remove-old-history-scheduler.ts +++ /dev/null | |||
@@ -1,31 +0,0 @@ | |||
1 | import { logger } from '../../helpers/logger' | ||
2 | import { AbstractScheduler } from './abstract-scheduler' | ||
3 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
4 | import { UserVideoHistoryModel } from '../../models/user/user-video-history' | ||
5 | import { CONFIG } from '../../initializers/config' | ||
6 | |||
7 | export class RemoveOldHistoryScheduler extends AbstractScheduler { | ||
8 | |||
9 | private static instance: AbstractScheduler | ||
10 | |||
11 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.REMOVE_OLD_HISTORY | ||
12 | |||
13 | private constructor () { | ||
14 | super() | ||
15 | } | ||
16 | |||
17 | protected internalExecute () { | ||
18 | if (CONFIG.HISTORY.VIDEOS.MAX_AGE === -1) return | ||
19 | |||
20 | logger.info('Removing old videos history.') | ||
21 | |||
22 | const now = new Date() | ||
23 | const beforeDate = new Date(now.getTime() - CONFIG.HISTORY.VIDEOS.MAX_AGE).toISOString() | ||
24 | |||
25 | return UserVideoHistoryModel.removeOldHistory(beforeDate) | ||
26 | } | ||
27 | |||
28 | static get Instance () { | ||
29 | return this.instance || (this.instance = new this()) | ||
30 | } | ||
31 | } | ||
diff --git a/server/lib/schedulers/remove-old-views-scheduler.ts b/server/lib/schedulers/remove-old-views-scheduler.ts deleted file mode 100644 index 8bc53a045..000000000 --- a/server/lib/schedulers/remove-old-views-scheduler.ts +++ /dev/null | |||
@@ -1,31 +0,0 @@ | |||
1 | import { VideoViewModel } from '@server/models/view/video-view' | ||
2 | import { logger } from '../../helpers/logger' | ||
3 | import { CONFIG } from '../../initializers/config' | ||
4 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
5 | import { AbstractScheduler } from './abstract-scheduler' | ||
6 | |||
7 | export class RemoveOldViewsScheduler extends AbstractScheduler { | ||
8 | |||
9 | private static instance: AbstractScheduler | ||
10 | |||
11 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.REMOVE_OLD_VIEWS | ||
12 | |||
13 | private constructor () { | ||
14 | super() | ||
15 | } | ||
16 | |||
17 | protected internalExecute () { | ||
18 | if (CONFIG.VIEWS.VIDEOS.REMOTE.MAX_AGE === -1) return | ||
19 | |||
20 | logger.info('Removing old videos views.') | ||
21 | |||
22 | const now = new Date() | ||
23 | const beforeDate = new Date(now.getTime() - CONFIG.VIEWS.VIDEOS.REMOTE.MAX_AGE).toISOString() | ||
24 | |||
25 | return VideoViewModel.removeOldRemoteViewsHistory(beforeDate) | ||
26 | } | ||
27 | |||
28 | static get Instance () { | ||
29 | return this.instance || (this.instance = new this()) | ||
30 | } | ||
31 | } | ||
diff --git a/server/lib/schedulers/runner-job-watch-dog-scheduler.ts b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts deleted file mode 100644 index f7a26d2bc..000000000 --- a/server/lib/schedulers/runner-job-watch-dog-scheduler.ts +++ /dev/null | |||
@@ -1,42 +0,0 @@ | |||
1 | import { CONFIG } from '@server/initializers/config' | ||
2 | import { RunnerJobModel } from '@server/models/runner/runner-job' | ||
3 | import { logger, loggerTagsFactory } from '../../helpers/logger' | ||
4 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
5 | import { getRunnerJobHandlerClass } from '../runners' | ||
6 | import { AbstractScheduler } from './abstract-scheduler' | ||
7 | |||
8 | const lTags = loggerTagsFactory('runner') | ||
9 | |||
10 | export class RunnerJobWatchDogScheduler extends AbstractScheduler { | ||
11 | |||
12 | private static instance: AbstractScheduler | ||
13 | |||
14 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.RUNNER_JOB_WATCH_DOG | ||
15 | |||
16 | private constructor () { | ||
17 | super() | ||
18 | } | ||
19 | |||
20 | protected async internalExecute () { | ||
21 | const vodStalledJobs = await RunnerJobModel.listStalledJobs({ | ||
22 | staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.VOD, | ||
23 | types: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ] | ||
24 | }) | ||
25 | |||
26 | const liveStalledJobs = await RunnerJobModel.listStalledJobs({ | ||
27 | staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.LIVE, | ||
28 | types: [ 'live-rtmp-hls-transcoding' ] | ||
29 | }) | ||
30 | |||
31 | for (const stalled of [ ...vodStalledJobs, ...liveStalledJobs ]) { | ||
32 | logger.info('Abort stalled runner job %s (%s)', stalled.uuid, stalled.type, lTags(stalled.uuid, stalled.type)) | ||
33 | |||
34 | const Handler = getRunnerJobHandlerClass(stalled) | ||
35 | await new Handler().abort({ runnerJob: stalled }) | ||
36 | } | ||
37 | } | ||
38 | |||
39 | static get Instance () { | ||
40 | return this.instance || (this.instance = new this()) | ||
41 | } | ||
42 | } | ||
diff --git a/server/lib/schedulers/update-videos-scheduler.ts b/server/lib/schedulers/update-videos-scheduler.ts deleted file mode 100644 index e38685c04..000000000 --- a/server/lib/schedulers/update-videos-scheduler.ts +++ /dev/null | |||
@@ -1,89 +0,0 @@ | |||
1 | import { VideoModel } from '@server/models/video/video' | ||
2 | import { MScheduleVideoUpdate } from '@server/types/models' | ||
3 | import { VideoPrivacy, VideoState } from '@shared/models' | ||
4 | import { logger } from '../../helpers/logger' | ||
5 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
6 | import { sequelizeTypescript } from '../../initializers/database' | ||
7 | import { ScheduleVideoUpdateModel } from '../../models/video/schedule-video-update' | ||
8 | import { Notifier } from '../notifier' | ||
9 | import { addVideoJobsAfterUpdate } from '../video' | ||
10 | import { VideoPathManager } from '../video-path-manager' | ||
11 | import { setVideoPrivacy } from '../video-privacy' | ||
12 | import { AbstractScheduler } from './abstract-scheduler' | ||
13 | |||
14 | export class UpdateVideosScheduler extends AbstractScheduler { | ||
15 | |||
16 | private static instance: AbstractScheduler | ||
17 | |||
18 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.UPDATE_VIDEOS | ||
19 | |||
20 | private constructor () { | ||
21 | super() | ||
22 | } | ||
23 | |||
24 | protected async internalExecute () { | ||
25 | return this.updateVideos() | ||
26 | } | ||
27 | |||
28 | private async updateVideos () { | ||
29 | if (!await ScheduleVideoUpdateModel.areVideosToUpdate()) return undefined | ||
30 | |||
31 | const schedules = await ScheduleVideoUpdateModel.listVideosToUpdate() | ||
32 | |||
33 | for (const schedule of schedules) { | ||
34 | const videoOnly = await VideoModel.load(schedule.videoId) | ||
35 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoOnly.uuid) | ||
36 | |||
37 | try { | ||
38 | const { video, published } = await this.updateAVideo(schedule) | ||
39 | |||
40 | if (published) Notifier.Instance.notifyOnVideoPublishedAfterScheduledUpdate(video) | ||
41 | } catch (err) { | ||
42 | logger.error('Cannot update video', { err }) | ||
43 | } | ||
44 | |||
45 | mutexReleaser() | ||
46 | } | ||
47 | } | ||
48 | |||
49 | private async updateAVideo (schedule: MScheduleVideoUpdate) { | ||
50 | let oldPrivacy: VideoPrivacy | ||
51 | let isNewVideo: boolean | ||
52 | let published = false | ||
53 | |||
54 | const video = await sequelizeTypescript.transaction(async t => { | ||
55 | const video = await VideoModel.loadFull(schedule.videoId, t) | ||
56 | if (video.state === VideoState.TO_TRANSCODE) return null | ||
57 | |||
58 | logger.info('Executing scheduled video update on %s.', video.uuid) | ||
59 | |||
60 | if (schedule.privacy) { | ||
61 | isNewVideo = video.isNewVideo(schedule.privacy) | ||
62 | oldPrivacy = video.privacy | ||
63 | |||
64 | setVideoPrivacy(video, schedule.privacy) | ||
65 | await video.save({ transaction: t }) | ||
66 | |||
67 | if (oldPrivacy === VideoPrivacy.PRIVATE) { | ||
68 | published = true | ||
69 | } | ||
70 | } | ||
71 | |||
72 | await schedule.destroy({ transaction: t }) | ||
73 | |||
74 | return video | ||
75 | }) | ||
76 | |||
77 | if (!video) { | ||
78 | return { video, published: false } | ||
79 | } | ||
80 | |||
81 | await addVideoJobsAfterUpdate({ video, oldPrivacy, isNewVideo, nameChanged: false }) | ||
82 | |||
83 | return { video, published } | ||
84 | } | ||
85 | |||
86 | static get Instance () { | ||
87 | return this.instance || (this.instance = new this()) | ||
88 | } | ||
89 | } | ||
diff --git a/server/lib/schedulers/video-channel-sync-latest-scheduler.ts b/server/lib/schedulers/video-channel-sync-latest-scheduler.ts deleted file mode 100644 index efb957fac..000000000 --- a/server/lib/schedulers/video-channel-sync-latest-scheduler.ts +++ /dev/null | |||
@@ -1,50 +0,0 @@ | |||
1 | import { logger } from '@server/helpers/logger' | ||
2 | import { CONFIG } from '@server/initializers/config' | ||
3 | import { VideoChannelModel } from '@server/models/video/video-channel' | ||
4 | import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync' | ||
5 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
6 | import { synchronizeChannel } from '../sync-channel' | ||
7 | import { AbstractScheduler } from './abstract-scheduler' | ||
8 | |||
9 | export class VideoChannelSyncLatestScheduler extends AbstractScheduler { | ||
10 | private static instance: AbstractScheduler | ||
11 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.CHANNEL_SYNC_CHECK_INTERVAL | ||
12 | |||
13 | private constructor () { | ||
14 | super() | ||
15 | } | ||
16 | |||
17 | protected async internalExecute () { | ||
18 | if (!CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.ENABLED) { | ||
19 | logger.debug('Discard channels synchronization as the feature is disabled') | ||
20 | return | ||
21 | } | ||
22 | |||
23 | logger.info('Checking channels to synchronize') | ||
24 | |||
25 | const channelSyncs = await VideoChannelSyncModel.listSyncs() | ||
26 | |||
27 | for (const sync of channelSyncs) { | ||
28 | const channel = await VideoChannelModel.loadAndPopulateAccount(sync.videoChannelId) | ||
29 | |||
30 | logger.info( | ||
31 | 'Creating video import jobs for "%s" sync with external channel "%s"', | ||
32 | channel.Actor.preferredUsername, sync.externalChannelUrl | ||
33 | ) | ||
34 | |||
35 | const onlyAfter = sync.lastSyncAt || sync.createdAt | ||
36 | |||
37 | await synchronizeChannel({ | ||
38 | channel, | ||
39 | externalChannelUrl: sync.externalChannelUrl, | ||
40 | videosCountLimit: CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.VIDEOS_LIMIT_PER_SYNCHRONIZATION, | ||
41 | channelSync: sync, | ||
42 | onlyAfter | ||
43 | }) | ||
44 | } | ||
45 | } | ||
46 | |||
47 | static get Instance () { | ||
48 | return this.instance || (this.instance = new this()) | ||
49 | } | ||
50 | } | ||
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts deleted file mode 100644 index 244a88b14..000000000 --- a/server/lib/schedulers/video-views-buffer-scheduler.ts +++ /dev/null | |||
@@ -1,52 +0,0 @@ | |||
1 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
2 | import { VideoModel } from '@server/models/video/video' | ||
3 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
4 | import { federateVideoIfNeeded } from '../activitypub/videos' | ||
5 | import { Redis } from '../redis' | ||
6 | import { AbstractScheduler } from './abstract-scheduler' | ||
7 | |||
8 | const lTags = loggerTagsFactory('views') | ||
9 | |||
10 | export class VideoViewsBufferScheduler extends AbstractScheduler { | ||
11 | |||
12 | private static instance: AbstractScheduler | ||
13 | |||
14 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE | ||
15 | |||
16 | private constructor () { | ||
17 | super() | ||
18 | } | ||
19 | |||
20 | protected async internalExecute () { | ||
21 | const videoIds = await Redis.Instance.listLocalVideosViewed() | ||
22 | if (videoIds.length === 0) return | ||
23 | |||
24 | for (const videoId of videoIds) { | ||
25 | try { | ||
26 | const views = await Redis.Instance.getLocalVideoViews(videoId) | ||
27 | await Redis.Instance.deleteLocalVideoViews(videoId) | ||
28 | |||
29 | const video = await VideoModel.loadFull(videoId) | ||
30 | if (!video) { | ||
31 | logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags()) | ||
32 | continue | ||
33 | } | ||
34 | |||
35 | logger.info('Processing local video %s views buffer.', video.uuid, lTags(video.uuid)) | ||
36 | |||
37 | // If this is a remote video, the origin instance will send us an update | ||
38 | await VideoModel.incrementViews(videoId, views) | ||
39 | |||
40 | // Send video update | ||
41 | video.views += views | ||
42 | await federateVideoIfNeeded(video, false) | ||
43 | } catch (err) { | ||
44 | logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() }) | ||
45 | } | ||
46 | } | ||
47 | } | ||
48 | |||
49 | static get Instance () { | ||
50 | return this.instance || (this.instance = new this()) | ||
51 | } | ||
52 | } | ||
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts deleted file mode 100644 index 91625ccb5..000000000 --- a/server/lib/schedulers/videos-redundancy-scheduler.ts +++ /dev/null | |||
@@ -1,375 +0,0 @@ | |||
1 | import { move } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { getServerActor } from '@server/models/application/application' | ||
4 | import { VideoModel } from '@server/models/video/video' | ||
5 | import { | ||
6 | MStreamingPlaylistFiles, | ||
7 | MVideoAccountLight, | ||
8 | MVideoFile, | ||
9 | MVideoFileVideo, | ||
10 | MVideoRedundancyFileVideo, | ||
11 | MVideoRedundancyStreamingPlaylistVideo, | ||
12 | MVideoRedundancyVideo, | ||
13 | MVideoWithAllFiles | ||
14 | } from '@server/types/models' | ||
15 | import { VideosRedundancyStrategy } from '../../../shared/models/redundancy' | ||
16 | import { logger, loggerTagsFactory } from '../../helpers/logger' | ||
17 | import { downloadWebTorrentVideo } from '../../helpers/webtorrent' | ||
18 | import { CONFIG } from '../../initializers/config' | ||
19 | import { DIRECTORIES, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers/constants' | ||
20 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' | ||
21 | import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' | ||
22 | import { getLocalVideoCacheFileActivityPubUrl, getLocalVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url' | ||
23 | import { getOrCreateAPVideo } from '../activitypub/videos' | ||
24 | import { downloadPlaylistSegments } from '../hls' | ||
25 | import { removeVideoRedundancy } from '../redundancy' | ||
26 | import { generateHLSRedundancyUrl, generateWebVideoRedundancyUrl } from '../video-urls' | ||
27 | import { AbstractScheduler } from './abstract-scheduler' | ||
28 | |||
29 | const lTags = loggerTagsFactory('redundancy') | ||
30 | |||
31 | type CandidateToDuplicate = { | ||
32 | redundancy: VideosRedundancyStrategy | ||
33 | video: MVideoWithAllFiles | ||
34 | files: MVideoFile[] | ||
35 | streamingPlaylists: MStreamingPlaylistFiles[] | ||
36 | } | ||
37 | |||
38 | function isMVideoRedundancyFileVideo ( | ||
39 | o: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo | ||
40 | ): o is MVideoRedundancyFileVideo { | ||
41 | return !!(o as MVideoRedundancyFileVideo).VideoFile | ||
42 | } | ||
43 | |||
44 | export class VideosRedundancyScheduler extends AbstractScheduler { | ||
45 | |||
46 | private static instance: VideosRedundancyScheduler | ||
47 | |||
48 | protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL | ||
49 | |||
50 | private constructor () { | ||
51 | super() | ||
52 | } | ||
53 | |||
54 | async createManualRedundancy (videoId: number) { | ||
55 | const videoToDuplicate = await VideoModel.loadWithFiles(videoId) | ||
56 | |||
57 | if (!videoToDuplicate) { | ||
58 | logger.warn('Video to manually duplicate %d does not exist anymore.', videoId, lTags()) | ||
59 | return | ||
60 | } | ||
61 | |||
62 | return this.createVideoRedundancies({ | ||
63 | video: videoToDuplicate, | ||
64 | redundancy: null, | ||
65 | files: videoToDuplicate.VideoFiles, | ||
66 | streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists | ||
67 | }) | ||
68 | } | ||
69 | |||
70 | protected async internalExecute () { | ||
71 | for (const redundancyConfig of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { | ||
72 | logger.info('Running redundancy scheduler for strategy %s.', redundancyConfig.strategy, lTags()) | ||
73 | |||
74 | try { | ||
75 | const videoToDuplicate = await this.findVideoToDuplicate(redundancyConfig) | ||
76 | if (!videoToDuplicate) continue | ||
77 | |||
78 | const candidateToDuplicate = { | ||
79 | video: videoToDuplicate, | ||
80 | redundancy: redundancyConfig, | ||
81 | files: videoToDuplicate.VideoFiles, | ||
82 | streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists | ||
83 | } | ||
84 | |||
85 | await this.purgeCacheIfNeeded(candidateToDuplicate) | ||
86 | |||
87 | if (await this.isTooHeavy(candidateToDuplicate)) { | ||
88 | logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url, lTags(videoToDuplicate.uuid)) | ||
89 | continue | ||
90 | } | ||
91 | |||
92 | logger.info( | ||
93 | 'Will duplicate video %s in redundancy scheduler "%s".', | ||
94 | videoToDuplicate.url, redundancyConfig.strategy, lTags(videoToDuplicate.uuid) | ||
95 | ) | ||
96 | |||
97 | await this.createVideoRedundancies(candidateToDuplicate) | ||
98 | } catch (err) { | ||
99 | logger.error('Cannot run videos redundancy %s.', redundancyConfig.strategy, { err, ...lTags() }) | ||
100 | } | ||
101 | } | ||
102 | |||
103 | await this.extendsLocalExpiration() | ||
104 | |||
105 | await this.purgeRemoteExpired() | ||
106 | } | ||
107 | |||
108 | static get Instance () { | ||
109 | return this.instance || (this.instance = new this()) | ||
110 | } | ||
111 | |||
112 | private async extendsLocalExpiration () { | ||
113 | const expired = await VideoRedundancyModel.listLocalExpired() | ||
114 | |||
115 | for (const redundancyModel of expired) { | ||
116 | try { | ||
117 | const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) | ||
118 | |||
119 | // If the admin disabled the redundancy, remove this redundancy instead of extending it | ||
120 | if (!redundancyConfig) { | ||
121 | logger.info( | ||
122 | 'Destroying redundancy %s because the redundancy %s does not exist anymore.', | ||
123 | redundancyModel.url, redundancyModel.strategy | ||
124 | ) | ||
125 | |||
126 | await removeVideoRedundancy(redundancyModel) | ||
127 | continue | ||
128 | } | ||
129 | |||
130 | const { totalUsed } = await VideoRedundancyModel.getStats(redundancyConfig.strategy) | ||
131 | |||
132 | // If the admin decreased the cache size, remove this redundancy instead of extending it | ||
133 | if (totalUsed > redundancyConfig.size) { | ||
134 | logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy) | ||
135 | |||
136 | await removeVideoRedundancy(redundancyModel) | ||
137 | continue | ||
138 | } | ||
139 | |||
140 | await this.extendsRedundancy(redundancyModel) | ||
141 | } catch (err) { | ||
142 | logger.error( | ||
143 | 'Cannot extend or remove expiration of %s video from our redundancy system.', | ||
144 | this.buildEntryLogId(redundancyModel), { err, ...lTags(redundancyModel.getVideoUUID()) } | ||
145 | ) | ||
146 | } | ||
147 | } | ||
148 | } | ||
149 | |||
150 | private async extendsRedundancy (redundancyModel: MVideoRedundancyVideo) { | ||
151 | const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) | ||
152 | // Redundancy strategy disabled, remove our redundancy instead of extending expiration | ||
153 | if (!redundancy) { | ||
154 | await removeVideoRedundancy(redundancyModel) | ||
155 | return | ||
156 | } | ||
157 | |||
158 | await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime) | ||
159 | } | ||
160 | |||
161 | private async purgeRemoteExpired () { | ||
162 | const expired = await VideoRedundancyModel.listRemoteExpired() | ||
163 | |||
164 | for (const redundancyModel of expired) { | ||
165 | try { | ||
166 | await removeVideoRedundancy(redundancyModel) | ||
167 | } catch (err) { | ||
168 | logger.error( | ||
169 | 'Cannot remove redundancy %s from our redundancy system.', | ||
170 | this.buildEntryLogId(redundancyModel), lTags(redundancyModel.getVideoUUID()) | ||
171 | ) | ||
172 | } | ||
173 | } | ||
174 | } | ||
175 | |||
176 | private findVideoToDuplicate (cache: VideosRedundancyStrategy) { | ||
177 | if (cache.strategy === 'most-views') { | ||
178 | return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR) | ||
179 | } | ||
180 | |||
181 | if (cache.strategy === 'trending') { | ||
182 | return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR) | ||
183 | } | ||
184 | |||
185 | if (cache.strategy === 'recently-added') { | ||
186 | const minViews = cache.minViews | ||
187 | return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews) | ||
188 | } | ||
189 | } | ||
190 | |||
191 | private async createVideoRedundancies (data: CandidateToDuplicate) { | ||
192 | const video = await this.loadAndRefreshVideo(data.video.url) | ||
193 | |||
194 | if (!video) { | ||
195 | logger.info('Video %s we want to duplicate does not existing anymore, skipping.', data.video.url, lTags(data.video.uuid)) | ||
196 | |||
197 | return | ||
198 | } | ||
199 | |||
200 | for (const file of data.files) { | ||
201 | const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id) | ||
202 | if (existingRedundancy) { | ||
203 | await this.extendsRedundancy(existingRedundancy) | ||
204 | |||
205 | continue | ||
206 | } | ||
207 | |||
208 | await this.createVideoFileRedundancy(data.redundancy, video, file) | ||
209 | } | ||
210 | |||
211 | for (const streamingPlaylist of data.streamingPlaylists) { | ||
212 | const existingRedundancy = await VideoRedundancyModel.loadLocalByStreamingPlaylistId(streamingPlaylist.id) | ||
213 | if (existingRedundancy) { | ||
214 | await this.extendsRedundancy(existingRedundancy) | ||
215 | |||
216 | continue | ||
217 | } | ||
218 | |||
219 | await this.createStreamingPlaylistRedundancy(data.redundancy, video, streamingPlaylist) | ||
220 | } | ||
221 | } | ||
222 | |||
223 | private async createVideoFileRedundancy (redundancy: VideosRedundancyStrategy | null, video: MVideoAccountLight, fileArg: MVideoFile) { | ||
224 | let strategy = 'manual' | ||
225 | let expiresOn: Date = null | ||
226 | |||
227 | if (redundancy) { | ||
228 | strategy = redundancy.strategy | ||
229 | expiresOn = this.buildNewExpiration(redundancy.minLifetime) | ||
230 | } | ||
231 | |||
232 | const file = fileArg as MVideoFileVideo | ||
233 | file.Video = video | ||
234 | |||
235 | const serverActor = await getServerActor() | ||
236 | |||
237 | logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy, lTags(video.uuid)) | ||
238 | |||
239 | const tmpPath = await downloadWebTorrentVideo({ uri: file.torrentUrl }, VIDEO_IMPORT_TIMEOUT) | ||
240 | |||
241 | const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, file.filename) | ||
242 | await move(tmpPath, destPath, { overwrite: true }) | ||
243 | |||
244 | const createdModel: MVideoRedundancyFileVideo = await VideoRedundancyModel.create({ | ||
245 | expiresOn, | ||
246 | url: getLocalVideoCacheFileActivityPubUrl(file), | ||
247 | fileUrl: generateWebVideoRedundancyUrl(file), | ||
248 | strategy, | ||
249 | videoFileId: file.id, | ||
250 | actorId: serverActor.id | ||
251 | }) | ||
252 | |||
253 | createdModel.VideoFile = file | ||
254 | |||
255 | await sendCreateCacheFile(serverActor, video, createdModel) | ||
256 | |||
257 | logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url, lTags(video.uuid)) | ||
258 | } | ||
259 | |||
260 | private async createStreamingPlaylistRedundancy ( | ||
261 | redundancy: VideosRedundancyStrategy, | ||
262 | video: MVideoAccountLight, | ||
263 | playlistArg: MStreamingPlaylistFiles | ||
264 | ) { | ||
265 | let strategy = 'manual' | ||
266 | let expiresOn: Date = null | ||
267 | |||
268 | if (redundancy) { | ||
269 | strategy = redundancy.strategy | ||
270 | expiresOn = this.buildNewExpiration(redundancy.minLifetime) | ||
271 | } | ||
272 | |||
273 | const playlist = Object.assign(playlistArg, { Video: video }) | ||
274 | const serverActor = await getServerActor() | ||
275 | |||
276 | logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, strategy, lTags(video.uuid)) | ||
277 | |||
278 | const destDirectory = join(DIRECTORIES.HLS_REDUNDANCY, video.uuid) | ||
279 | const masterPlaylistUrl = playlist.getMasterPlaylistUrl(video) | ||
280 | |||
281 | const maxSizeKB = this.getTotalFileSizes([], [ playlist ]) / 1000 | ||
282 | const toleranceKB = maxSizeKB + ((5 * maxSizeKB) / 100) // 5% more tolerance | ||
283 | await downloadPlaylistSegments(masterPlaylistUrl, destDirectory, VIDEO_IMPORT_TIMEOUT, toleranceKB) | ||
284 | |||
285 | const createdModel: MVideoRedundancyStreamingPlaylistVideo = await VideoRedundancyModel.create({ | ||
286 | expiresOn, | ||
287 | url: getLocalVideoCacheStreamingPlaylistActivityPubUrl(video, playlist), | ||
288 | fileUrl: generateHLSRedundancyUrl(video, playlistArg), | ||
289 | strategy, | ||
290 | videoStreamingPlaylistId: playlist.id, | ||
291 | actorId: serverActor.id | ||
292 | }) | ||
293 | |||
294 | createdModel.VideoStreamingPlaylist = playlist | ||
295 | |||
296 | await sendCreateCacheFile(serverActor, video, createdModel) | ||
297 | |||
298 | logger.info('Duplicated playlist %s -> %s.', masterPlaylistUrl, createdModel.url, lTags(video.uuid)) | ||
299 | } | ||
300 | |||
301 | private async extendsExpirationOf (redundancy: MVideoRedundancyVideo, expiresAfterMs: number) { | ||
302 | logger.info('Extending expiration of %s.', redundancy.url, lTags(redundancy.getVideoUUID())) | ||
303 | |||
304 | const serverActor = await getServerActor() | ||
305 | |||
306 | redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs) | ||
307 | await redundancy.save() | ||
308 | |||
309 | await sendUpdateCacheFile(serverActor, redundancy) | ||
310 | } | ||
311 | |||
312 | private async purgeCacheIfNeeded (candidateToDuplicate: CandidateToDuplicate) { | ||
313 | while (await this.isTooHeavy(candidateToDuplicate)) { | ||
314 | const redundancy = candidateToDuplicate.redundancy | ||
315 | const toDelete = await VideoRedundancyModel.loadOldestLocalExpired(redundancy.strategy, redundancy.minLifetime) | ||
316 | if (!toDelete) return | ||
317 | |||
318 | const videoId = toDelete.VideoFile | ||
319 | ? toDelete.VideoFile.videoId | ||
320 | : toDelete.VideoStreamingPlaylist.videoId | ||
321 | |||
322 | const redundancies = await VideoRedundancyModel.listLocalByVideoId(videoId) | ||
323 | |||
324 | for (const redundancy of redundancies) { | ||
325 | await removeVideoRedundancy(redundancy) | ||
326 | } | ||
327 | } | ||
328 | } | ||
329 | |||
330 | private async isTooHeavy (candidateToDuplicate: CandidateToDuplicate) { | ||
331 | const maxSize = candidateToDuplicate.redundancy.size | ||
332 | |||
333 | const { totalUsed: alreadyUsed } = await VideoRedundancyModel.getStats(candidateToDuplicate.redundancy.strategy) | ||
334 | |||
335 | const videoSize = this.getTotalFileSizes(candidateToDuplicate.files, candidateToDuplicate.streamingPlaylists) | ||
336 | const willUse = alreadyUsed + videoSize | ||
337 | |||
338 | logger.debug('Checking candidate size.', { maxSize, alreadyUsed, videoSize, willUse, ...lTags(candidateToDuplicate.video.uuid) }) | ||
339 | |||
340 | return willUse > maxSize | ||
341 | } | ||
342 | |||
343 | private buildNewExpiration (expiresAfterMs: number) { | ||
344 | return new Date(Date.now() + expiresAfterMs) | ||
345 | } | ||
346 | |||
347 | private buildEntryLogId (object: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo) { | ||
348 | if (isMVideoRedundancyFileVideo(object)) return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}` | ||
349 | |||
350 | return `${object.VideoStreamingPlaylist.getMasterPlaylistUrl(object.VideoStreamingPlaylist.Video)}` | ||
351 | } | ||
352 | |||
353 | private getTotalFileSizes (files: MVideoFile[], playlists: MStreamingPlaylistFiles[]): number { | ||
354 | const fileReducer = (previous: number, current: MVideoFile) => previous + current.size | ||
355 | |||
356 | let allFiles = files | ||
357 | for (const p of playlists) { | ||
358 | allFiles = allFiles.concat(p.VideoFiles) | ||
359 | } | ||
360 | |||
361 | return allFiles.reduce(fileReducer, 0) | ||
362 | } | ||
363 | |||
364 | private async loadAndRefreshVideo (videoUrl: string) { | ||
365 | // We need more attributes and check if the video still exists | ||
366 | const getVideoOptions = { | ||
367 | videoObject: videoUrl, | ||
368 | syncParam: { rates: false, shares: false, comments: false, refreshVideo: true }, | ||
369 | fetchType: 'all' as 'all' | ||
370 | } | ||
371 | const { video } = await getOrCreateAPVideo(getVideoOptions) | ||
372 | |||
373 | return video | ||
374 | } | ||
375 | } | ||
diff --git a/server/lib/schedulers/youtube-dl-update-scheduler.ts b/server/lib/schedulers/youtube-dl-update-scheduler.ts deleted file mode 100644 index 1ee4ae1b2..000000000 --- a/server/lib/schedulers/youtube-dl-update-scheduler.ts +++ /dev/null | |||
@@ -1,22 +0,0 @@ | |||
1 | import { YoutubeDLCLI } from '@server/helpers/youtube-dl' | ||
2 | import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' | ||
3 | import { AbstractScheduler } from './abstract-scheduler' | ||
4 | |||
5 | export class YoutubeDlUpdateScheduler extends AbstractScheduler { | ||
6 | |||
7 | private static instance: AbstractScheduler | ||
8 | |||
9 | protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.YOUTUBE_DL_UPDATE | ||
10 | |||
11 | private constructor () { | ||
12 | super() | ||
13 | } | ||
14 | |||
15 | protected internalExecute () { | ||
16 | return YoutubeDLCLI.updateYoutubeDLBinary() | ||
17 | } | ||
18 | |||
19 | static get Instance () { | ||
20 | return this.instance || (this.instance = new this()) | ||
21 | } | ||
22 | } | ||