aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/schedulers
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/schedulers')
-rw-r--r--server/lib/schedulers/abstract-scheduler.ts35
-rw-r--r--server/lib/schedulers/actor-follow-scheduler.ts54
-rw-r--r--server/lib/schedulers/auto-follow-index-instances.ts75
-rw-r--r--server/lib/schedulers/geo-ip-update-scheduler.ts22
-rw-r--r--server/lib/schedulers/peertube-version-check-scheduler.ts55
-rw-r--r--server/lib/schedulers/plugins-check-scheduler.ts74
-rw-r--r--server/lib/schedulers/remove-dangling-resumable-uploads-scheduler.ts40
-rw-r--r--server/lib/schedulers/remove-old-history-scheduler.ts31
-rw-r--r--server/lib/schedulers/remove-old-views-scheduler.ts31
-rw-r--r--server/lib/schedulers/runner-job-watch-dog-scheduler.ts42
-rw-r--r--server/lib/schedulers/update-videos-scheduler.ts89
-rw-r--r--server/lib/schedulers/video-channel-sync-latest-scheduler.ts50
-rw-r--r--server/lib/schedulers/video-views-buffer-scheduler.ts52
-rw-r--r--server/lib/schedulers/videos-redundancy-scheduler.ts375
-rw-r--r--server/lib/schedulers/youtube-dl-update-scheduler.ts22
15 files changed, 0 insertions, 1047 deletions
diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts
deleted file mode 100644
index f3d51a22e..000000000
--- a/server/lib/schedulers/abstract-scheduler.ts
+++ /dev/null
@@ -1,35 +0,0 @@
1import Bluebird from 'bluebird'
2import { logger } from '../../helpers/logger'
3
4export abstract class AbstractScheduler {
5
6 protected abstract schedulerIntervalMs: number
7
8 private interval: NodeJS.Timer
9 private isRunning = false
10
11 enable () {
12 if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.')
13
14 this.interval = setInterval(() => this.execute(), this.schedulerIntervalMs)
15 }
16
17 disable () {
18 clearInterval(this.interval)
19 }
20
21 async execute () {
22 if (this.isRunning === true) return
23 this.isRunning = true
24
25 try {
26 await this.internalExecute()
27 } catch (err) {
28 logger.error('Cannot execute %s scheduler.', this.constructor.name, { err })
29 } finally {
30 this.isRunning = false
31 }
32 }
33
34 protected abstract internalExecute (): Promise<any> | Bluebird<any>
35}
diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts
deleted file mode 100644
index e1c56c135..000000000
--- a/server/lib/schedulers/actor-follow-scheduler.ts
+++ /dev/null
@@ -1,54 +0,0 @@
1import { isTestOrDevInstance } from '../../helpers/core-utils'
2import { logger } from '../../helpers/logger'
3import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { ActorFollowModel } from '../../models/actor/actor-follow'
5import { ActorFollowHealthCache } from '../actor-follow-health-cache'
6import { AbstractScheduler } from './abstract-scheduler'
7
8export class ActorFollowScheduler extends AbstractScheduler {
9
10 private static instance: AbstractScheduler
11
12 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.ACTOR_FOLLOW_SCORES
13
14 private constructor () {
15 super()
16 }
17
18 protected async internalExecute () {
19 await this.processPendingScores()
20
21 await this.removeBadActorFollows()
22 }
23
24 private async processPendingScores () {
25 const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore()
26 const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds()
27 const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds()
28
29 ActorFollowHealthCache.Instance.clearPendingFollowsScore()
30 ActorFollowHealthCache.Instance.clearBadFollowingServerIds()
31 ActorFollowHealthCache.Instance.clearGoodFollowingServerIds()
32
33 for (const inbox of Object.keys(pendingScores)) {
34 await ActorFollowModel.updateScore(inbox, pendingScores[inbox])
35 }
36
37 await ActorFollowModel.updateScoreByFollowingServers(badServerIds, ACTOR_FOLLOW_SCORE.PENALTY)
38 await ActorFollowModel.updateScoreByFollowingServers(goodServerIds, ACTOR_FOLLOW_SCORE.BONUS)
39 }
40
41 private async removeBadActorFollows () {
42 if (!isTestOrDevInstance()) logger.info('Removing bad actor follows (scheduler).')
43
44 try {
45 await ActorFollowModel.removeBadActorFollows()
46 } catch (err) {
47 logger.error('Error in bad actor follows scheduler.', { err })
48 }
49 }
50
51 static get Instance () {
52 return this.instance || (this.instance = new this())
53 }
54}
diff --git a/server/lib/schedulers/auto-follow-index-instances.ts b/server/lib/schedulers/auto-follow-index-instances.ts
deleted file mode 100644
index 956ece749..000000000
--- a/server/lib/schedulers/auto-follow-index-instances.ts
+++ /dev/null
@@ -1,75 +0,0 @@
1import { chunk } from 'lodash'
2import { doJSONRequest } from '@server/helpers/requests'
3import { JobQueue } from '@server/lib/job-queue'
4import { ActorFollowModel } from '@server/models/actor/actor-follow'
5import { getServerActor } from '@server/models/application/application'
6import { logger } from '../../helpers/logger'
7import { CONFIG } from '../../initializers/config'
8import { SCHEDULER_INTERVALS_MS, SERVER_ACTOR_NAME } from '../../initializers/constants'
9import { AbstractScheduler } from './abstract-scheduler'
10
11export class AutoFollowIndexInstances extends AbstractScheduler {
12
13 private static instance: AbstractScheduler
14
15 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.AUTO_FOLLOW_INDEX_INSTANCES
16
17 private lastCheck: Date
18
19 private constructor () {
20 super()
21 }
22
23 protected async internalExecute () {
24 return this.autoFollow()
25 }
26
27 private async autoFollow () {
28 if (CONFIG.FOLLOWINGS.INSTANCE.AUTO_FOLLOW_INDEX.ENABLED === false) return
29
30 const indexUrl = CONFIG.FOLLOWINGS.INSTANCE.AUTO_FOLLOW_INDEX.INDEX_URL
31
32 logger.info('Auto follow instances of index %s.', indexUrl)
33
34 try {
35 const serverActor = await getServerActor()
36
37 const searchParams = { count: 1000 }
38 if (this.lastCheck) Object.assign(searchParams, { since: this.lastCheck.toISOString() })
39
40 this.lastCheck = new Date()
41
42 const { body } = await doJSONRequest<any>(indexUrl, { searchParams })
43 if (!body.data || Array.isArray(body.data) === false) {
44 logger.error('Cannot auto follow instances of index %s. Please check the auto follow URL.', indexUrl, { body })
45 return
46 }
47
48 const hosts: string[] = body.data.map(o => o.host)
49 const chunks = chunk(hosts, 20)
50
51 for (const chunk of chunks) {
52 const unfollowedHosts = await ActorFollowModel.keepUnfollowedInstance(chunk)
53
54 for (const unfollowedHost of unfollowedHosts) {
55 const payload = {
56 host: unfollowedHost,
57 name: SERVER_ACTOR_NAME,
58 followerActorId: serverActor.id,
59 isAutoFollow: true
60 }
61
62 JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
63 }
64 }
65
66 } catch (err) {
67 logger.error('Cannot auto follow hosts of index %s.', indexUrl, { err })
68 }
69
70 }
71
72 static get Instance () {
73 return this.instance || (this.instance = new this())
74 }
75}
diff --git a/server/lib/schedulers/geo-ip-update-scheduler.ts b/server/lib/schedulers/geo-ip-update-scheduler.ts
deleted file mode 100644
index b06f5a9b5..000000000
--- a/server/lib/schedulers/geo-ip-update-scheduler.ts
+++ /dev/null
@@ -1,22 +0,0 @@
1import { GeoIP } from '@server/helpers/geo-ip'
2import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
3import { AbstractScheduler } from './abstract-scheduler'
4
5export class GeoIPUpdateScheduler extends AbstractScheduler {
6
7 private static instance: AbstractScheduler
8
9 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.GEO_IP_UPDATE
10
11 private constructor () {
12 super()
13 }
14
15 protected internalExecute () {
16 return GeoIP.Instance.updateDatabase()
17 }
18
19 static get Instance () {
20 return this.instance || (this.instance = new this())
21 }
22}
diff --git a/server/lib/schedulers/peertube-version-check-scheduler.ts b/server/lib/schedulers/peertube-version-check-scheduler.ts
deleted file mode 100644
index bc38ed49f..000000000
--- a/server/lib/schedulers/peertube-version-check-scheduler.ts
+++ /dev/null
@@ -1,55 +0,0 @@
1
2import { doJSONRequest } from '@server/helpers/requests'
3import { ApplicationModel } from '@server/models/application/application'
4import { compareSemVer } from '@shared/core-utils'
5import { JoinPeerTubeVersions } from '@shared/models'
6import { logger } from '../../helpers/logger'
7import { CONFIG } from '../../initializers/config'
8import { PEERTUBE_VERSION, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
9import { Notifier } from '../notifier'
10import { AbstractScheduler } from './abstract-scheduler'
11
12export class PeerTubeVersionCheckScheduler extends AbstractScheduler {
13
14 private static instance: AbstractScheduler
15
16 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.CHECK_PEERTUBE_VERSION
17
18 private constructor () {
19 super()
20 }
21
22 protected async internalExecute () {
23 return this.checkLatestVersion()
24 }
25
26 private async checkLatestVersion () {
27 if (CONFIG.PEERTUBE.CHECK_LATEST_VERSION.ENABLED === false) return
28
29 logger.info('Checking latest PeerTube version.')
30
31 const { body } = await doJSONRequest<JoinPeerTubeVersions>(CONFIG.PEERTUBE.CHECK_LATEST_VERSION.URL)
32
33 if (!body?.peertube?.latestVersion) {
34 logger.warn('Cannot check latest PeerTube version: body is invalid.', { body })
35 return
36 }
37
38 const latestVersion = body.peertube.latestVersion
39 const application = await ApplicationModel.load()
40
41 // Already checked this version
42 if (application.latestPeerTubeVersion === latestVersion) return
43
44 if (compareSemVer(PEERTUBE_VERSION, latestVersion) < 0) {
45 application.latestPeerTubeVersion = latestVersion
46 await application.save()
47
48 Notifier.Instance.notifyOfNewPeerTubeVersion(application, latestVersion)
49 }
50 }
51
52 static get Instance () {
53 return this.instance || (this.instance = new this())
54 }
55}
diff --git a/server/lib/schedulers/plugins-check-scheduler.ts b/server/lib/schedulers/plugins-check-scheduler.ts
deleted file mode 100644
index 820c01693..000000000
--- a/server/lib/schedulers/plugins-check-scheduler.ts
+++ /dev/null
@@ -1,74 +0,0 @@
1import { chunk } from 'lodash'
2import { compareSemVer } from '@shared/core-utils'
3import { logger } from '../../helpers/logger'
4import { CONFIG } from '../../initializers/config'
5import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
6import { PluginModel } from '../../models/server/plugin'
7import { Notifier } from '../notifier'
8import { getLatestPluginsVersion } from '../plugins/plugin-index'
9import { AbstractScheduler } from './abstract-scheduler'
10
11export class PluginsCheckScheduler extends AbstractScheduler {
12
13 private static instance: AbstractScheduler
14
15 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.CHECK_PLUGINS
16
17 private constructor () {
18 super()
19 }
20
21 protected async internalExecute () {
22 return this.checkLatestPluginsVersion()
23 }
24
25 private async checkLatestPluginsVersion () {
26 if (CONFIG.PLUGINS.INDEX.ENABLED === false) return
27
28 logger.info('Checking latest plugins version.')
29
30 const plugins = await PluginModel.listInstalled()
31
32 // Process 10 plugins in 1 HTTP request
33 const chunks = chunk(plugins, 10)
34 for (const chunk of chunks) {
35 // Find plugins according to their npm name
36 const pluginIndex: { [npmName: string]: PluginModel } = {}
37 for (const plugin of chunk) {
38 pluginIndex[PluginModel.buildNpmName(plugin.name, plugin.type)] = plugin
39 }
40
41 const npmNames = Object.keys(pluginIndex)
42
43 try {
44 const results = await getLatestPluginsVersion(npmNames)
45
46 for (const result of results) {
47 const plugin = pluginIndex[result.npmName]
48 if (!result.latestVersion) continue
49
50 if (
51 !plugin.latestVersion ||
52 (plugin.latestVersion !== result.latestVersion && compareSemVer(plugin.latestVersion, result.latestVersion) < 0)
53 ) {
54 plugin.latestVersion = result.latestVersion
55 await plugin.save()
56
57 // Notify if there is an higher plugin version available
58 if (compareSemVer(plugin.version, result.latestVersion) < 0) {
59 Notifier.Instance.notifyOfNewPluginVersion(plugin)
60 }
61
62 logger.info('Plugin %s has a new latest version %s.', result.npmName, plugin.latestVersion)
63 }
64 }
65 } catch (err) {
66 logger.error('Cannot get latest plugins version.', { npmNames, err })
67 }
68 }
69 }
70
71 static get Instance () {
72 return this.instance || (this.instance = new this())
73 }
74}
diff --git a/server/lib/schedulers/remove-dangling-resumable-uploads-scheduler.ts b/server/lib/schedulers/remove-dangling-resumable-uploads-scheduler.ts
deleted file mode 100644
index 61e93eafa..000000000
--- a/server/lib/schedulers/remove-dangling-resumable-uploads-scheduler.ts
+++ /dev/null
@@ -1,40 +0,0 @@
1
2import { logger, loggerTagsFactory } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { uploadx } from '../uploadx'
5import { AbstractScheduler } from './abstract-scheduler'
6
7const lTags = loggerTagsFactory('scheduler', 'resumable-upload', 'cleaner')
8
9export class RemoveDanglingResumableUploadsScheduler extends AbstractScheduler {
10
11 private static instance: AbstractScheduler
12 private lastExecutionTimeMs: number
13
14 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.REMOVE_DANGLING_RESUMABLE_UPLOADS
15
16 private constructor () {
17 super()
18
19 this.lastExecutionTimeMs = new Date().getTime()
20 }
21
22 protected async internalExecute () {
23 logger.debug('Removing dangling resumable uploads', lTags())
24
25 const now = new Date().getTime()
26
27 try {
28 // Remove files that were not updated since the last execution
29 await uploadx.storage.purge(now - this.lastExecutionTimeMs)
30 } catch (error) {
31 logger.error('Failed to handle file during resumable video upload folder cleanup', { error, ...lTags() })
32 } finally {
33 this.lastExecutionTimeMs = now
34 }
35 }
36
37 static get Instance () {
38 return this.instance || (this.instance = new this())
39 }
40}
diff --git a/server/lib/schedulers/remove-old-history-scheduler.ts b/server/lib/schedulers/remove-old-history-scheduler.ts
deleted file mode 100644
index 34b160799..000000000
--- a/server/lib/schedulers/remove-old-history-scheduler.ts
+++ /dev/null
@@ -1,31 +0,0 @@
1import { logger } from '../../helpers/logger'
2import { AbstractScheduler } from './abstract-scheduler'
3import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { UserVideoHistoryModel } from '../../models/user/user-video-history'
5import { CONFIG } from '../../initializers/config'
6
7export class RemoveOldHistoryScheduler extends AbstractScheduler {
8
9 private static instance: AbstractScheduler
10
11 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.REMOVE_OLD_HISTORY
12
13 private constructor () {
14 super()
15 }
16
17 protected internalExecute () {
18 if (CONFIG.HISTORY.VIDEOS.MAX_AGE === -1) return
19
20 logger.info('Removing old videos history.')
21
22 const now = new Date()
23 const beforeDate = new Date(now.getTime() - CONFIG.HISTORY.VIDEOS.MAX_AGE).toISOString()
24
25 return UserVideoHistoryModel.removeOldHistory(beforeDate)
26 }
27
28 static get Instance () {
29 return this.instance || (this.instance = new this())
30 }
31}
diff --git a/server/lib/schedulers/remove-old-views-scheduler.ts b/server/lib/schedulers/remove-old-views-scheduler.ts
deleted file mode 100644
index 8bc53a045..000000000
--- a/server/lib/schedulers/remove-old-views-scheduler.ts
+++ /dev/null
@@ -1,31 +0,0 @@
1import { VideoViewModel } from '@server/models/view/video-view'
2import { logger } from '../../helpers/logger'
3import { CONFIG } from '../../initializers/config'
4import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
5import { AbstractScheduler } from './abstract-scheduler'
6
7export class RemoveOldViewsScheduler extends AbstractScheduler {
8
9 private static instance: AbstractScheduler
10
11 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.REMOVE_OLD_VIEWS
12
13 private constructor () {
14 super()
15 }
16
17 protected internalExecute () {
18 if (CONFIG.VIEWS.VIDEOS.REMOTE.MAX_AGE === -1) return
19
20 logger.info('Removing old videos views.')
21
22 const now = new Date()
23 const beforeDate = new Date(now.getTime() - CONFIG.VIEWS.VIDEOS.REMOTE.MAX_AGE).toISOString()
24
25 return VideoViewModel.removeOldRemoteViewsHistory(beforeDate)
26 }
27
28 static get Instance () {
29 return this.instance || (this.instance = new this())
30 }
31}
diff --git a/server/lib/schedulers/runner-job-watch-dog-scheduler.ts b/server/lib/schedulers/runner-job-watch-dog-scheduler.ts
deleted file mode 100644
index f7a26d2bc..000000000
--- a/server/lib/schedulers/runner-job-watch-dog-scheduler.ts
+++ /dev/null
@@ -1,42 +0,0 @@
1import { CONFIG } from '@server/initializers/config'
2import { RunnerJobModel } from '@server/models/runner/runner-job'
3import { logger, loggerTagsFactory } from '../../helpers/logger'
4import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
5import { getRunnerJobHandlerClass } from '../runners'
6import { AbstractScheduler } from './abstract-scheduler'
7
8const lTags = loggerTagsFactory('runner')
9
10export class RunnerJobWatchDogScheduler extends AbstractScheduler {
11
12 private static instance: AbstractScheduler
13
14 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.RUNNER_JOB_WATCH_DOG
15
16 private constructor () {
17 super()
18 }
19
20 protected async internalExecute () {
21 const vodStalledJobs = await RunnerJobModel.listStalledJobs({
22 staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.VOD,
23 types: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ]
24 })
25
26 const liveStalledJobs = await RunnerJobModel.listStalledJobs({
27 staleTimeMS: CONFIG.REMOTE_RUNNERS.STALLED_JOBS.LIVE,
28 types: [ 'live-rtmp-hls-transcoding' ]
29 })
30
31 for (const stalled of [ ...vodStalledJobs, ...liveStalledJobs ]) {
32 logger.info('Abort stalled runner job %s (%s)', stalled.uuid, stalled.type, lTags(stalled.uuid, stalled.type))
33
34 const Handler = getRunnerJobHandlerClass(stalled)
35 await new Handler().abort({ runnerJob: stalled })
36 }
37 }
38
39 static get Instance () {
40 return this.instance || (this.instance = new this())
41 }
42}
diff --git a/server/lib/schedulers/update-videos-scheduler.ts b/server/lib/schedulers/update-videos-scheduler.ts
deleted file mode 100644
index e38685c04..000000000
--- a/server/lib/schedulers/update-videos-scheduler.ts
+++ /dev/null
@@ -1,89 +0,0 @@
1import { VideoModel } from '@server/models/video/video'
2import { MScheduleVideoUpdate } from '@server/types/models'
3import { VideoPrivacy, VideoState } from '@shared/models'
4import { logger } from '../../helpers/logger'
5import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
6import { sequelizeTypescript } from '../../initializers/database'
7import { ScheduleVideoUpdateModel } from '../../models/video/schedule-video-update'
8import { Notifier } from '../notifier'
9import { addVideoJobsAfterUpdate } from '../video'
10import { VideoPathManager } from '../video-path-manager'
11import { setVideoPrivacy } from '../video-privacy'
12import { AbstractScheduler } from './abstract-scheduler'
13
14export class UpdateVideosScheduler extends AbstractScheduler {
15
16 private static instance: AbstractScheduler
17
18 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.UPDATE_VIDEOS
19
20 private constructor () {
21 super()
22 }
23
24 protected async internalExecute () {
25 return this.updateVideos()
26 }
27
28 private async updateVideos () {
29 if (!await ScheduleVideoUpdateModel.areVideosToUpdate()) return undefined
30
31 const schedules = await ScheduleVideoUpdateModel.listVideosToUpdate()
32
33 for (const schedule of schedules) {
34 const videoOnly = await VideoModel.load(schedule.videoId)
35 const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoOnly.uuid)
36
37 try {
38 const { video, published } = await this.updateAVideo(schedule)
39
40 if (published) Notifier.Instance.notifyOnVideoPublishedAfterScheduledUpdate(video)
41 } catch (err) {
42 logger.error('Cannot update video', { err })
43 }
44
45 mutexReleaser()
46 }
47 }
48
49 private async updateAVideo (schedule: MScheduleVideoUpdate) {
50 let oldPrivacy: VideoPrivacy
51 let isNewVideo: boolean
52 let published = false
53
54 const video = await sequelizeTypescript.transaction(async t => {
55 const video = await VideoModel.loadFull(schedule.videoId, t)
56 if (video.state === VideoState.TO_TRANSCODE) return null
57
58 logger.info('Executing scheduled video update on %s.', video.uuid)
59
60 if (schedule.privacy) {
61 isNewVideo = video.isNewVideo(schedule.privacy)
62 oldPrivacy = video.privacy
63
64 setVideoPrivacy(video, schedule.privacy)
65 await video.save({ transaction: t })
66
67 if (oldPrivacy === VideoPrivacy.PRIVATE) {
68 published = true
69 }
70 }
71
72 await schedule.destroy({ transaction: t })
73
74 return video
75 })
76
77 if (!video) {
78 return { video, published: false }
79 }
80
81 await addVideoJobsAfterUpdate({ video, oldPrivacy, isNewVideo, nameChanged: false })
82
83 return { video, published }
84 }
85
86 static get Instance () {
87 return this.instance || (this.instance = new this())
88 }
89}
diff --git a/server/lib/schedulers/video-channel-sync-latest-scheduler.ts b/server/lib/schedulers/video-channel-sync-latest-scheduler.ts
deleted file mode 100644
index efb957fac..000000000
--- a/server/lib/schedulers/video-channel-sync-latest-scheduler.ts
+++ /dev/null
@@ -1,50 +0,0 @@
1import { logger } from '@server/helpers/logger'
2import { CONFIG } from '@server/initializers/config'
3import { VideoChannelModel } from '@server/models/video/video-channel'
4import { VideoChannelSyncModel } from '@server/models/video/video-channel-sync'
5import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
6import { synchronizeChannel } from '../sync-channel'
7import { AbstractScheduler } from './abstract-scheduler'
8
9export class VideoChannelSyncLatestScheduler extends AbstractScheduler {
10 private static instance: AbstractScheduler
11 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.CHANNEL_SYNC_CHECK_INTERVAL
12
13 private constructor () {
14 super()
15 }
16
17 protected async internalExecute () {
18 if (!CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.ENABLED) {
19 logger.debug('Discard channels synchronization as the feature is disabled')
20 return
21 }
22
23 logger.info('Checking channels to synchronize')
24
25 const channelSyncs = await VideoChannelSyncModel.listSyncs()
26
27 for (const sync of channelSyncs) {
28 const channel = await VideoChannelModel.loadAndPopulateAccount(sync.videoChannelId)
29
30 logger.info(
31 'Creating video import jobs for "%s" sync with external channel "%s"',
32 channel.Actor.preferredUsername, sync.externalChannelUrl
33 )
34
35 const onlyAfter = sync.lastSyncAt || sync.createdAt
36
37 await synchronizeChannel({
38 channel,
39 externalChannelUrl: sync.externalChannelUrl,
40 videosCountLimit: CONFIG.IMPORT.VIDEO_CHANNEL_SYNCHRONIZATION.VIDEOS_LIMIT_PER_SYNCHRONIZATION,
41 channelSync: sync,
42 onlyAfter
43 })
44 }
45 }
46
47 static get Instance () {
48 return this.instance || (this.instance = new this())
49 }
50}
diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts
deleted file mode 100644
index 244a88b14..000000000
--- a/server/lib/schedulers/video-views-buffer-scheduler.ts
+++ /dev/null
@@ -1,52 +0,0 @@
1import { logger, loggerTagsFactory } from '@server/helpers/logger'
2import { VideoModel } from '@server/models/video/video'
3import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { federateVideoIfNeeded } from '../activitypub/videos'
5import { Redis } from '../redis'
6import { AbstractScheduler } from './abstract-scheduler'
7
8const lTags = loggerTagsFactory('views')
9
10export class VideoViewsBufferScheduler extends AbstractScheduler {
11
12 private static instance: AbstractScheduler
13
14 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE
15
16 private constructor () {
17 super()
18 }
19
20 protected async internalExecute () {
21 const videoIds = await Redis.Instance.listLocalVideosViewed()
22 if (videoIds.length === 0) return
23
24 for (const videoId of videoIds) {
25 try {
26 const views = await Redis.Instance.getLocalVideoViews(videoId)
27 await Redis.Instance.deleteLocalVideoViews(videoId)
28
29 const video = await VideoModel.loadFull(videoId)
30 if (!video) {
31 logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags())
32 continue
33 }
34
35 logger.info('Processing local video %s views buffer.', video.uuid, lTags(video.uuid))
36
37 // If this is a remote video, the origin instance will send us an update
38 await VideoModel.incrementViews(videoId, views)
39
40 // Send video update
41 video.views += views
42 await federateVideoIfNeeded(video, false)
43 } catch (err) {
44 logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() })
45 }
46 }
47 }
48
49 static get Instance () {
50 return this.instance || (this.instance = new this())
51 }
52}
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts
deleted file mode 100644
index 91625ccb5..000000000
--- a/server/lib/schedulers/videos-redundancy-scheduler.ts
+++ /dev/null
@@ -1,375 +0,0 @@
1import { move } from 'fs-extra'
2import { join } from 'path'
3import { getServerActor } from '@server/models/application/application'
4import { VideoModel } from '@server/models/video/video'
5import {
6 MStreamingPlaylistFiles,
7 MVideoAccountLight,
8 MVideoFile,
9 MVideoFileVideo,
10 MVideoRedundancyFileVideo,
11 MVideoRedundancyStreamingPlaylistVideo,
12 MVideoRedundancyVideo,
13 MVideoWithAllFiles
14} from '@server/types/models'
15import { VideosRedundancyStrategy } from '../../../shared/models/redundancy'
16import { logger, loggerTagsFactory } from '../../helpers/logger'
17import { downloadWebTorrentVideo } from '../../helpers/webtorrent'
18import { CONFIG } from '../../initializers/config'
19import { DIRECTORIES, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers/constants'
20import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
21import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
22import { getLocalVideoCacheFileActivityPubUrl, getLocalVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url'
23import { getOrCreateAPVideo } from '../activitypub/videos'
24import { downloadPlaylistSegments } from '../hls'
25import { removeVideoRedundancy } from '../redundancy'
26import { generateHLSRedundancyUrl, generateWebVideoRedundancyUrl } from '../video-urls'
27import { AbstractScheduler } from './abstract-scheduler'
28
29const lTags = loggerTagsFactory('redundancy')
30
31type CandidateToDuplicate = {
32 redundancy: VideosRedundancyStrategy
33 video: MVideoWithAllFiles
34 files: MVideoFile[]
35 streamingPlaylists: MStreamingPlaylistFiles[]
36}
37
38function isMVideoRedundancyFileVideo (
39 o: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo
40): o is MVideoRedundancyFileVideo {
41 return !!(o as MVideoRedundancyFileVideo).VideoFile
42}
43
44export class VideosRedundancyScheduler extends AbstractScheduler {
45
46 private static instance: VideosRedundancyScheduler
47
48 protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
49
50 private constructor () {
51 super()
52 }
53
54 async createManualRedundancy (videoId: number) {
55 const videoToDuplicate = await VideoModel.loadWithFiles(videoId)
56
57 if (!videoToDuplicate) {
58 logger.warn('Video to manually duplicate %d does not exist anymore.', videoId, lTags())
59 return
60 }
61
62 return this.createVideoRedundancies({
63 video: videoToDuplicate,
64 redundancy: null,
65 files: videoToDuplicate.VideoFiles,
66 streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
67 })
68 }
69
70 protected async internalExecute () {
71 for (const redundancyConfig of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
72 logger.info('Running redundancy scheduler for strategy %s.', redundancyConfig.strategy, lTags())
73
74 try {
75 const videoToDuplicate = await this.findVideoToDuplicate(redundancyConfig)
76 if (!videoToDuplicate) continue
77
78 const candidateToDuplicate = {
79 video: videoToDuplicate,
80 redundancy: redundancyConfig,
81 files: videoToDuplicate.VideoFiles,
82 streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
83 }
84
85 await this.purgeCacheIfNeeded(candidateToDuplicate)
86
87 if (await this.isTooHeavy(candidateToDuplicate)) {
88 logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url, lTags(videoToDuplicate.uuid))
89 continue
90 }
91
92 logger.info(
93 'Will duplicate video %s in redundancy scheduler "%s".',
94 videoToDuplicate.url, redundancyConfig.strategy, lTags(videoToDuplicate.uuid)
95 )
96
97 await this.createVideoRedundancies(candidateToDuplicate)
98 } catch (err) {
99 logger.error('Cannot run videos redundancy %s.', redundancyConfig.strategy, { err, ...lTags() })
100 }
101 }
102
103 await this.extendsLocalExpiration()
104
105 await this.purgeRemoteExpired()
106 }
107
108 static get Instance () {
109 return this.instance || (this.instance = new this())
110 }
111
112 private async extendsLocalExpiration () {
113 const expired = await VideoRedundancyModel.listLocalExpired()
114
115 for (const redundancyModel of expired) {
116 try {
117 const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
118
119 // If the admin disabled the redundancy, remove this redundancy instead of extending it
120 if (!redundancyConfig) {
121 logger.info(
122 'Destroying redundancy %s because the redundancy %s does not exist anymore.',
123 redundancyModel.url, redundancyModel.strategy
124 )
125
126 await removeVideoRedundancy(redundancyModel)
127 continue
128 }
129
130 const { totalUsed } = await VideoRedundancyModel.getStats(redundancyConfig.strategy)
131
132 // If the admin decreased the cache size, remove this redundancy instead of extending it
133 if (totalUsed > redundancyConfig.size) {
134 logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy)
135
136 await removeVideoRedundancy(redundancyModel)
137 continue
138 }
139
140 await this.extendsRedundancy(redundancyModel)
141 } catch (err) {
142 logger.error(
143 'Cannot extend or remove expiration of %s video from our redundancy system.',
144 this.buildEntryLogId(redundancyModel), { err, ...lTags(redundancyModel.getVideoUUID()) }
145 )
146 }
147 }
148 }
149
150 private async extendsRedundancy (redundancyModel: MVideoRedundancyVideo) {
151 const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
152 // Redundancy strategy disabled, remove our redundancy instead of extending expiration
153 if (!redundancy) {
154 await removeVideoRedundancy(redundancyModel)
155 return
156 }
157
158 await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
159 }
160
161 private async purgeRemoteExpired () {
162 const expired = await VideoRedundancyModel.listRemoteExpired()
163
164 for (const redundancyModel of expired) {
165 try {
166 await removeVideoRedundancy(redundancyModel)
167 } catch (err) {
168 logger.error(
169 'Cannot remove redundancy %s from our redundancy system.',
170 this.buildEntryLogId(redundancyModel), lTags(redundancyModel.getVideoUUID())
171 )
172 }
173 }
174 }
175
176 private findVideoToDuplicate (cache: VideosRedundancyStrategy) {
177 if (cache.strategy === 'most-views') {
178 return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
179 }
180
181 if (cache.strategy === 'trending') {
182 return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
183 }
184
185 if (cache.strategy === 'recently-added') {
186 const minViews = cache.minViews
187 return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews)
188 }
189 }
190
191 private async createVideoRedundancies (data: CandidateToDuplicate) {
192 const video = await this.loadAndRefreshVideo(data.video.url)
193
194 if (!video) {
195 logger.info('Video %s we want to duplicate does not existing anymore, skipping.', data.video.url, lTags(data.video.uuid))
196
197 return
198 }
199
200 for (const file of data.files) {
201 const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id)
202 if (existingRedundancy) {
203 await this.extendsRedundancy(existingRedundancy)
204
205 continue
206 }
207
208 await this.createVideoFileRedundancy(data.redundancy, video, file)
209 }
210
211 for (const streamingPlaylist of data.streamingPlaylists) {
212 const existingRedundancy = await VideoRedundancyModel.loadLocalByStreamingPlaylistId(streamingPlaylist.id)
213 if (existingRedundancy) {
214 await this.extendsRedundancy(existingRedundancy)
215
216 continue
217 }
218
219 await this.createStreamingPlaylistRedundancy(data.redundancy, video, streamingPlaylist)
220 }
221 }
222
223 private async createVideoFileRedundancy (redundancy: VideosRedundancyStrategy | null, video: MVideoAccountLight, fileArg: MVideoFile) {
224 let strategy = 'manual'
225 let expiresOn: Date = null
226
227 if (redundancy) {
228 strategy = redundancy.strategy
229 expiresOn = this.buildNewExpiration(redundancy.minLifetime)
230 }
231
232 const file = fileArg as MVideoFileVideo
233 file.Video = video
234
235 const serverActor = await getServerActor()
236
237 logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy, lTags(video.uuid))
238
239 const tmpPath = await downloadWebTorrentVideo({ uri: file.torrentUrl }, VIDEO_IMPORT_TIMEOUT)
240
241 const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, file.filename)
242 await move(tmpPath, destPath, { overwrite: true })
243
244 const createdModel: MVideoRedundancyFileVideo = await VideoRedundancyModel.create({
245 expiresOn,
246 url: getLocalVideoCacheFileActivityPubUrl(file),
247 fileUrl: generateWebVideoRedundancyUrl(file),
248 strategy,
249 videoFileId: file.id,
250 actorId: serverActor.id
251 })
252
253 createdModel.VideoFile = file
254
255 await sendCreateCacheFile(serverActor, video, createdModel)
256
257 logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url, lTags(video.uuid))
258 }
259
260 private async createStreamingPlaylistRedundancy (
261 redundancy: VideosRedundancyStrategy,
262 video: MVideoAccountLight,
263 playlistArg: MStreamingPlaylistFiles
264 ) {
265 let strategy = 'manual'
266 let expiresOn: Date = null
267
268 if (redundancy) {
269 strategy = redundancy.strategy
270 expiresOn = this.buildNewExpiration(redundancy.minLifetime)
271 }
272
273 const playlist = Object.assign(playlistArg, { Video: video })
274 const serverActor = await getServerActor()
275
276 logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, strategy, lTags(video.uuid))
277
278 const destDirectory = join(DIRECTORIES.HLS_REDUNDANCY, video.uuid)
279 const masterPlaylistUrl = playlist.getMasterPlaylistUrl(video)
280
281 const maxSizeKB = this.getTotalFileSizes([], [ playlist ]) / 1000
282 const toleranceKB = maxSizeKB + ((5 * maxSizeKB) / 100) // 5% more tolerance
283 await downloadPlaylistSegments(masterPlaylistUrl, destDirectory, VIDEO_IMPORT_TIMEOUT, toleranceKB)
284
285 const createdModel: MVideoRedundancyStreamingPlaylistVideo = await VideoRedundancyModel.create({
286 expiresOn,
287 url: getLocalVideoCacheStreamingPlaylistActivityPubUrl(video, playlist),
288 fileUrl: generateHLSRedundancyUrl(video, playlistArg),
289 strategy,
290 videoStreamingPlaylistId: playlist.id,
291 actorId: serverActor.id
292 })
293
294 createdModel.VideoStreamingPlaylist = playlist
295
296 await sendCreateCacheFile(serverActor, video, createdModel)
297
298 logger.info('Duplicated playlist %s -> %s.', masterPlaylistUrl, createdModel.url, lTags(video.uuid))
299 }
300
301 private async extendsExpirationOf (redundancy: MVideoRedundancyVideo, expiresAfterMs: number) {
302 logger.info('Extending expiration of %s.', redundancy.url, lTags(redundancy.getVideoUUID()))
303
304 const serverActor = await getServerActor()
305
306 redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
307 await redundancy.save()
308
309 await sendUpdateCacheFile(serverActor, redundancy)
310 }
311
312 private async purgeCacheIfNeeded (candidateToDuplicate: CandidateToDuplicate) {
313 while (await this.isTooHeavy(candidateToDuplicate)) {
314 const redundancy = candidateToDuplicate.redundancy
315 const toDelete = await VideoRedundancyModel.loadOldestLocalExpired(redundancy.strategy, redundancy.minLifetime)
316 if (!toDelete) return
317
318 const videoId = toDelete.VideoFile
319 ? toDelete.VideoFile.videoId
320 : toDelete.VideoStreamingPlaylist.videoId
321
322 const redundancies = await VideoRedundancyModel.listLocalByVideoId(videoId)
323
324 for (const redundancy of redundancies) {
325 await removeVideoRedundancy(redundancy)
326 }
327 }
328 }
329
330 private async isTooHeavy (candidateToDuplicate: CandidateToDuplicate) {
331 const maxSize = candidateToDuplicate.redundancy.size
332
333 const { totalUsed: alreadyUsed } = await VideoRedundancyModel.getStats(candidateToDuplicate.redundancy.strategy)
334
335 const videoSize = this.getTotalFileSizes(candidateToDuplicate.files, candidateToDuplicate.streamingPlaylists)
336 const willUse = alreadyUsed + videoSize
337
338 logger.debug('Checking candidate size.', { maxSize, alreadyUsed, videoSize, willUse, ...lTags(candidateToDuplicate.video.uuid) })
339
340 return willUse > maxSize
341 }
342
343 private buildNewExpiration (expiresAfterMs: number) {
344 return new Date(Date.now() + expiresAfterMs)
345 }
346
347 private buildEntryLogId (object: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo) {
348 if (isMVideoRedundancyFileVideo(object)) return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}`
349
350 return `${object.VideoStreamingPlaylist.getMasterPlaylistUrl(object.VideoStreamingPlaylist.Video)}`
351 }
352
353 private getTotalFileSizes (files: MVideoFile[], playlists: MStreamingPlaylistFiles[]): number {
354 const fileReducer = (previous: number, current: MVideoFile) => previous + current.size
355
356 let allFiles = files
357 for (const p of playlists) {
358 allFiles = allFiles.concat(p.VideoFiles)
359 }
360
361 return allFiles.reduce(fileReducer, 0)
362 }
363
364 private async loadAndRefreshVideo (videoUrl: string) {
365 // We need more attributes and check if the video still exists
366 const getVideoOptions = {
367 videoObject: videoUrl,
368 syncParam: { rates: false, shares: false, comments: false, refreshVideo: true },
369 fetchType: 'all' as 'all'
370 }
371 const { video } = await getOrCreateAPVideo(getVideoOptions)
372
373 return video
374 }
375}
diff --git a/server/lib/schedulers/youtube-dl-update-scheduler.ts b/server/lib/schedulers/youtube-dl-update-scheduler.ts
deleted file mode 100644
index 1ee4ae1b2..000000000
--- a/server/lib/schedulers/youtube-dl-update-scheduler.ts
+++ /dev/null
@@ -1,22 +0,0 @@
1import { YoutubeDLCLI } from '@server/helpers/youtube-dl'
2import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
3import { AbstractScheduler } from './abstract-scheduler'
4
5export class YoutubeDlUpdateScheduler extends AbstractScheduler {
6
7 private static instance: AbstractScheduler
8
9 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.YOUTUBE_DL_UPDATE
10
11 private constructor () {
12 super()
13 }
14
15 protected internalExecute () {
16 return YoutubeDLCLI.updateYoutubeDLBinary()
17 }
18
19 static get Instance () {
20 return this.instance || (this.instance = new this())
21 }
22}