aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/schedulers
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/schedulers')
-rw-r--r--server/lib/schedulers/abstract-scheduler.ts18
-rw-r--r--server/lib/schedulers/actor-follow-scheduler.ts (renamed from server/lib/schedulers/bad-actor-follow-scheduler.ts)23
-rw-r--r--server/lib/schedulers/remove-old-jobs-scheduler.ts6
-rw-r--r--server/lib/schedulers/update-videos-scheduler.ts32
-rw-r--r--server/lib/schedulers/videos-redundancy-scheduler.ts17
-rw-r--r--server/lib/schedulers/youtube-dl-update-scheduler.ts2
6 files changed, 64 insertions, 34 deletions
diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts
index b9d0a4d17..86ea7aa38 100644
--- a/server/lib/schedulers/abstract-scheduler.ts
+++ b/server/lib/schedulers/abstract-scheduler.ts
@@ -1,8 +1,11 @@
1import { logger } from '../../helpers/logger'
2
1export abstract class AbstractScheduler { 3export abstract class AbstractScheduler {
2 4
3 protected abstract schedulerIntervalMs: number 5 protected abstract schedulerIntervalMs: number
4 6
5 private interval: NodeJS.Timer 7 private interval: NodeJS.Timer
8 private isRunning = false
6 9
7 enable () { 10 enable () {
8 if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') 11 if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.')
@@ -14,5 +17,18 @@ export abstract class AbstractScheduler {
14 clearInterval(this.interval) 17 clearInterval(this.interval)
15 } 18 }
16 19
17 abstract execute () 20 async execute () {
21 if (this.isRunning === true) return
22 this.isRunning = true
23
24 try {
25 await this.internalExecute()
26 } catch (err) {
27 logger.error('Cannot execute %s scheduler.', this.constructor.name, { err })
28 } finally {
29 this.isRunning = false
30 }
31 }
32
33 protected abstract internalExecute (): Promise<any>
18} 34}
diff --git a/server/lib/schedulers/bad-actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts
index 617149aaf..3967be7f8 100644
--- a/server/lib/schedulers/bad-actor-follow-scheduler.ts
+++ b/server/lib/schedulers/actor-follow-scheduler.ts
@@ -3,18 +3,35 @@ import { logger } from '../../helpers/logger'
3import { ActorFollowModel } from '../../models/activitypub/actor-follow' 3import { ActorFollowModel } from '../../models/activitypub/actor-follow'
4import { AbstractScheduler } from './abstract-scheduler' 4import { AbstractScheduler } from './abstract-scheduler'
5import { SCHEDULER_INTERVALS_MS } from '../../initializers' 5import { SCHEDULER_INTERVALS_MS } from '../../initializers'
6import { ActorFollowScoreCache } from '../cache'
6 7
7export class BadActorFollowScheduler extends AbstractScheduler { 8export class ActorFollowScheduler extends AbstractScheduler {
8 9
9 private static instance: AbstractScheduler 10 private static instance: AbstractScheduler
10 11
11 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.badActorFollow 12 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.actorFollowScores
12 13
13 private constructor () { 14 private constructor () {
14 super() 15 super()
15 } 16 }
16 17
17 async execute () { 18 protected async internalExecute () {
19 await this.processPendingScores()
20
21 await this.removeBadActorFollows()
22 }
23
24 private async processPendingScores () {
25 const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy()
26
27 ActorFollowScoreCache.Instance.clearPendingFollowsScore()
28
29 for (const inbox of Object.keys(pendingScores)) {
30 await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox])
31 }
32 }
33
34 private async removeBadActorFollows () {
18 if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') 35 if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).')
19 36
20 try { 37 try {
diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts
index a29a6b800..4a4341ba9 100644
--- a/server/lib/schedulers/remove-old-jobs-scheduler.ts
+++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts
@@ -14,10 +14,10 @@ export class RemoveOldJobsScheduler extends AbstractScheduler {
14 super() 14 super()
15 } 15 }
16 16
17 async execute () { 17 protected internalExecute () {
18 if (!isTestInstance()) logger.info('Removing old jobs (scheduler).') 18 if (!isTestInstance()) logger.info('Removing old jobs in scheduler.')
19 19
20 JobQueue.Instance.removeOldJobs() 20 return JobQueue.Instance.removeOldJobs()
21 } 21 }
22 22
23 static get Instance () { 23 static get Instance () {
diff --git a/server/lib/schedulers/update-videos-scheduler.ts b/server/lib/schedulers/update-videos-scheduler.ts
index fd2edfd17..2618a5857 100644
--- a/server/lib/schedulers/update-videos-scheduler.ts
+++ b/server/lib/schedulers/update-videos-scheduler.ts
@@ -5,6 +5,8 @@ import { retryTransactionWrapper } from '../../helpers/database-utils'
5import { federateVideoIfNeeded } from '../activitypub' 5import { federateVideoIfNeeded } from '../activitypub'
6import { SCHEDULER_INTERVALS_MS, sequelizeTypescript } from '../../initializers' 6import { SCHEDULER_INTERVALS_MS, sequelizeTypescript } from '../../initializers'
7import { VideoPrivacy } from '../../../shared/models/videos' 7import { VideoPrivacy } from '../../../shared/models/videos'
8import { Notifier } from '../notifier'
9import { VideoModel } from '../../models/video/video'
8 10
9export class UpdateVideosScheduler extends AbstractScheduler { 11export class UpdateVideosScheduler extends AbstractScheduler {
10 12
@@ -12,30 +14,20 @@ export class UpdateVideosScheduler extends AbstractScheduler {
12 14
13 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos 15 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos
14 16
15 private isRunning = false
16
17 private constructor () { 17 private constructor () {
18 super() 18 super()
19 } 19 }
20 20
21 async execute () { 21 protected async internalExecute () {
22 if (this.isRunning === true) return 22 return retryTransactionWrapper(this.updateVideos.bind(this))
23 this.isRunning = true
24
25 try {
26 await retryTransactionWrapper(this.updateVideos.bind(this))
27 } catch (err) {
28 logger.error('Cannot execute update videos scheduler.', { err })
29 } finally {
30 this.isRunning = false
31 }
32 } 23 }
33 24
34 private async updateVideos () { 25 private async updateVideos () {
35 if (!await ScheduleVideoUpdateModel.areVideosToUpdate()) return undefined 26 if (!await ScheduleVideoUpdateModel.areVideosToUpdate()) return undefined
36 27
37 return sequelizeTypescript.transaction(async t => { 28 const publishedVideos = await sequelizeTypescript.transaction(async t => {
38 const schedules = await ScheduleVideoUpdateModel.listVideosToUpdate(t) 29 const schedules = await ScheduleVideoUpdateModel.listVideosToUpdate(t)
30 const publishedVideos: VideoModel[] = []
39 31
40 for (const schedule of schedules) { 32 for (const schedule of schedules) {
41 const video = schedule.Video 33 const video = schedule.Video
@@ -50,11 +42,23 @@ export class UpdateVideosScheduler extends AbstractScheduler {
50 42
51 await video.save({ transaction: t }) 43 await video.save({ transaction: t })
52 await federateVideoIfNeeded(video, isNewVideo, t) 44 await federateVideoIfNeeded(video, isNewVideo, t)
45
46 if (oldPrivacy === VideoPrivacy.UNLISTED || oldPrivacy === VideoPrivacy.PRIVATE) {
47 video.ScheduleVideoUpdate = schedule
48 publishedVideos.push(video)
49 }
53 } 50 }
54 51
55 await schedule.destroy({ transaction: t }) 52 await schedule.destroy({ transaction: t })
56 } 53 }
54
55 return publishedVideos
57 }) 56 })
57
58 for (const v of publishedVideos) {
59 Notifier.Instance.notifyOnNewVideo(v)
60 Notifier.Instance.notifyOnPendingVideoPublished(v)
61 }
58 } 62 }
59 63
60 static get Instance () { 64 static get Instance () {
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts
index 8b7f33539..f643ee226 100644
--- a/server/lib/schedulers/videos-redundancy-scheduler.ts
+++ b/server/lib/schedulers/videos-redundancy-scheduler.ts
@@ -6,7 +6,7 @@ import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
6import { VideoFileModel } from '../../models/video/video-file' 6import { VideoFileModel } from '../../models/video/video-file'
7import { downloadWebTorrentVideo } from '../../helpers/webtorrent' 7import { downloadWebTorrentVideo } from '../../helpers/webtorrent'
8import { join } from 'path' 8import { join } from 'path'
9import { rename } from 'fs-extra' 9import { move } from 'fs-extra'
10import { getServerActor } from '../../helpers/utils' 10import { getServerActor } from '../../helpers/utils'
11import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' 11import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
12import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' 12import { getVideoCacheFileActivityPubUrl } from '../activitypub/url'
@@ -16,7 +16,6 @@ import { getOrCreateVideoAndAccountAndChannel } from '../activitypub'
16export class VideosRedundancyScheduler extends AbstractScheduler { 16export class VideosRedundancyScheduler extends AbstractScheduler {
17 17
18 private static instance: AbstractScheduler 18 private static instance: AbstractScheduler
19 private executing = false
20 19
21 protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL 20 protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
22 21
@@ -24,11 +23,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
24 super() 23 super()
25 } 24 }
26 25
27 async execute () { 26 protected async internalExecute () {
28 if (this.executing) return
29
30 this.executing = true
31
32 for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { 27 for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
33 logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) 28 logger.info('Running redundancy scheduler for strategy %s.', obj.strategy)
34 29
@@ -57,8 +52,6 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
57 await this.extendsLocalExpiration() 52 await this.extendsLocalExpiration()
58 53
59 await this.purgeRemoteExpired() 54 await this.purgeRemoteExpired()
60
61 this.executing = false
62 } 55 }
63 56
64 static get Instance () { 57 static get Instance () {
@@ -145,13 +138,13 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
145 138
146 const tmpPath = await downloadWebTorrentVideo({ magnetUri }, VIDEO_IMPORT_TIMEOUT) 139 const tmpPath = await downloadWebTorrentVideo({ magnetUri }, VIDEO_IMPORT_TIMEOUT)
147 140
148 const destPath = join(CONFIG.STORAGE.VIDEOS_DIR, video.getVideoFilename(file)) 141 const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, video.getVideoFilename(file))
149 await rename(tmpPath, destPath) 142 await move(tmpPath, destPath)
150 143
151 const createdModel = await VideoRedundancyModel.create({ 144 const createdModel = await VideoRedundancyModel.create({
152 expiresOn: this.buildNewExpiration(redundancy.minLifetime), 145 expiresOn: this.buildNewExpiration(redundancy.minLifetime),
153 url: getVideoCacheFileActivityPubUrl(file), 146 url: getVideoCacheFileActivityPubUrl(file),
154 fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), 147 fileUrl: video.getVideoRedundancyUrl(file, CONFIG.WEBSERVER.URL),
155 strategy: redundancy.strategy, 148 strategy: redundancy.strategy,
156 videoFileId: file.id, 149 videoFileId: file.id,
157 actorId: serverActor.id 150 actorId: serverActor.id
diff --git a/server/lib/schedulers/youtube-dl-update-scheduler.ts b/server/lib/schedulers/youtube-dl-update-scheduler.ts
index 461cd045e..aa027116d 100644
--- a/server/lib/schedulers/youtube-dl-update-scheduler.ts
+++ b/server/lib/schedulers/youtube-dl-update-scheduler.ts
@@ -12,7 +12,7 @@ export class YoutubeDlUpdateScheduler extends AbstractScheduler {
12 super() 12 super()
13 } 13 }
14 14
15 execute () { 15 protected internalExecute () {
16 return updateYoutubeDLBinary() 16 return updateYoutubeDLBinary()
17 } 17 }
18 18