aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-cleaner.ts65
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts35
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts2
-rw-r--r--server/lib/job-queue/handlers/video-import.ts23
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts2
5 files changed, 71 insertions, 56 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
index d5e4508fe..509dd1cb5 100644
--- a/server/lib/job-queue/handlers/activitypub-cleaner.ts
+++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts
@@ -8,36 +8,35 @@ import {
8} from '@server/helpers/custom-validators/activitypub/activity' 8} from '@server/helpers/custom-validators/activitypub/activity'
9import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' 9import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
10import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' 10import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests'
11import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' 11import { AP_CLEANER } from '@server/initializers/constants'
12import { Redis } from '@server/lib/redis'
12import { VideoModel } from '@server/models/video/video' 13import { VideoModel } from '@server/models/video/video'
13import { VideoCommentModel } from '@server/models/video/video-comment' 14import { VideoCommentModel } from '@server/models/video/video-comment'
14import { VideoShareModel } from '@server/models/video/video-share' 15import { VideoShareModel } from '@server/models/video/video-share'
15import { HttpStatusCode } from '@shared/models' 16import { HttpStatusCode } from '@shared/models'
16import { logger } from '../../../helpers/logger' 17import { logger, loggerTagsFactory } from '../../../helpers/logger'
17import { AccountVideoRateModel } from '../../../models/account/account-video-rate' 18import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
18 19
20const lTags = loggerTagsFactory('ap-cleaner')
21
19// Job to clean remote interactions off local videos 22// Job to clean remote interactions off local videos
20 23
21async function processActivityPubCleaner (_job: Job) { 24async function processActivityPubCleaner (_job: Job) {
22 logger.info('Processing ActivityPub cleaner.') 25 logger.info('Processing ActivityPub cleaner.', lTags())
23 26
24 { 27 {
25 const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() 28 const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
26 const { bodyValidator, deleter, updater } = rateOptionsFactory() 29 const { bodyValidator, deleter, updater } = rateOptionsFactory()
27 30
28 await map(rateUrls, async rateUrl => { 31 await map(rateUrls, async rateUrl => {
29 try { 32 const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter })
30 const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter)
31 33
32 if (result?.status === 'deleted') { 34 if (result?.status === 'deleted') {
33 const { videoId, type } = result.data 35 const { videoId, type } = result.data
34 36
35 await VideoModel.updateRatesOf(videoId, type, undefined) 37 await VideoModel.updateRatesOf(videoId, type, undefined)
36 }
37 } catch (err) {
38 logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
39 } 38 }
40 }, { concurrency: AP_CLEANER_CONCURRENCY }) 39 }, { concurrency: AP_CLEANER.CONCURRENCY })
41 } 40 }
42 41
43 { 42 {
@@ -45,12 +44,8 @@ async function processActivityPubCleaner (_job: Job) {
45 const { bodyValidator, deleter, updater } = shareOptionsFactory() 44 const { bodyValidator, deleter, updater } = shareOptionsFactory()
46 45
47 await map(shareUrls, async shareUrl => { 46 await map(shareUrls, async shareUrl => {
48 try { 47 await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter })
49 await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) 48 }, { concurrency: AP_CLEANER.CONCURRENCY })
50 } catch (err) {
51 logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
52 }
53 }, { concurrency: AP_CLEANER_CONCURRENCY })
54 } 49 }
55 50
56 { 51 {
@@ -58,12 +53,8 @@ async function processActivityPubCleaner (_job: Job) {
58 const { bodyValidator, deleter, updater } = commentOptionsFactory() 53 const { bodyValidator, deleter, updater } = commentOptionsFactory()
59 54
60 await map(commentUrls, async commentUrl => { 55 await map(commentUrls, async commentUrl => {
61 try { 56 await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter })
62 await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) 57 }, { concurrency: AP_CLEANER.CONCURRENCY })
63 } catch (err) {
64 logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
65 }
66 }, { concurrency: AP_CLEANER_CONCURRENCY })
67 } 58 }
68} 59}
69 60
@@ -75,14 +66,16 @@ export {
75 66
76// --------------------------------------------------------------------------- 67// ---------------------------------------------------------------------------
77 68
78async function updateObjectIfNeeded <T> ( 69async function updateObjectIfNeeded <T> (options: {
79 url: string, 70 url: string
80 bodyValidator: (body: any) => boolean, 71 bodyValidator: (body: any) => boolean
81 updater: (url: string, newUrl: string) => Promise<T>, 72 updater: (url: string, newUrl: string) => Promise<T>
82 deleter: (url: string) => Promise<T> 73 deleter: (url: string) => Promise<T> }
83): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { 74): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
75 const { url, bodyValidator, updater, deleter } = options
76
84 const on404OrTombstone = async () => { 77 const on404OrTombstone = async () => {
85 logger.info('Removing remote AP object %s.', url) 78 logger.info('Removing remote AP object %s.', url, lTags(url))
86 const data = await deleter(url) 79 const data = await deleter(url)
87 80
88 return { status: 'deleted' as 'deleted', data } 81 return { status: 'deleted' as 'deleted', data }
@@ -104,7 +97,7 @@ async function updateObjectIfNeeded <T> (
104 throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) 97 throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
105 } 98 }
106 99
107 logger.info('Updating remote AP object %s.', url) 100 logger.info('Updating remote AP object %s.', url, lTags(url))
108 const data = await updater(url, newUrl) 101 const data = await updater(url, newUrl)
109 102
110 return { status: 'updated', data } 103 return { status: 'updated', data }
@@ -117,7 +110,15 @@ async function updateObjectIfNeeded <T> (
117 return on404OrTombstone() 110 return on404OrTombstone()
118 } 111 }
119 112
120 throw err 113 logger.debug('Remote AP object %s is unavailable.', url, lTags(url))
114
115 const unavailability = await Redis.Instance.addAPUnavailability(url)
116 if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) {
117 logger.info('Removing unavailable AP resource %s.', url, lTags(url))
118 return on404OrTombstone()
119 }
120
121 return null
121 } 122 }
122} 123}
123 124
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index 54a7c566b..9e39322a8 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -2,16 +2,16 @@ import { Job } from 'bull'
2import { remove } from 'fs-extra' 2import { remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { logger } from '@server/helpers/logger' 4import { logger } from '@server/helpers/logger'
5import { updateTorrentUrls } from '@server/helpers/webtorrent' 5import { updateTorrentMetadata } from '@server/helpers/webtorrent'
6import { CONFIG } from '@server/initializers/config' 6import { CONFIG } from '@server/initializers/config'
7import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' 7import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants'
8import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage' 8import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage'
9import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' 9import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
10import { moveToNextState } from '@server/lib/video-state' 10import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state'
11import { VideoModel } from '@server/models/video/video' 11import { VideoModel } from '@server/models/video/video'
12import { VideoJobInfoModel } from '@server/models/video/video-job-info' 12import { VideoJobInfoModel } from '@server/models/video/video-job-info'
13import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' 13import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models'
14import { MoveObjectStoragePayload, VideoStorage } from '../../../../shared' 14import { MoveObjectStoragePayload, VideoStorage } from '@shared/models'
15 15
16export async function processMoveToObjectStorage (job: Job) { 16export async function processMoveToObjectStorage (job: Job) {
17 const payload = job.data as MoveObjectStoragePayload 17 const payload = job.data as MoveObjectStoragePayload
@@ -24,18 +24,25 @@ export async function processMoveToObjectStorage (job: Job) {
24 return undefined 24 return undefined
25 } 25 }
26 26
27 if (video.VideoFiles) { 27 try {
28 await moveWebTorrentFiles(video) 28 if (video.VideoFiles) {
29 } 29 await moveWebTorrentFiles(video)
30 }
30 31
31 if (video.VideoStreamingPlaylists) { 32 if (video.VideoStreamingPlaylists) {
32 await moveHLSFiles(video) 33 await moveHLSFiles(video)
33 } 34 }
35
36 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
37 if (pendingMove === 0) {
38 logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id)
39 await doAfterLastJob(video, payload.isNewVideo)
40 }
41 } catch (err) {
42 logger.error('Cannot move video %s to object storage.', video.url, { err })
34 43
35 const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') 44 await moveToFailedMoveToObjectStorageState(video)
36 if (pendingMove === 0) { 45 await VideoJobInfoModel.abortAllTasks(video.uuid, 'pendingMove')
37 logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id)
38 await doAfterLastJob(video, payload.isNewVideo)
39 } 46 }
40 47
41 return payload.videoUUID 48 return payload.videoUUID
@@ -113,7 +120,7 @@ async function onFileMoved (options: {
113 file.fileUrl = fileUrl 120 file.fileUrl = fileUrl
114 file.storage = VideoStorage.OBJECT_STORAGE 121 file.storage = VideoStorage.OBJECT_STORAGE
115 122
116 await updateTorrentUrls(videoOrPlaylist, file) 123 await updateTorrentMetadata(videoOrPlaylist, file)
117 await file.save() 124 await file.save()
118 125
119 logger.debug('Removing %s because it\'s now on object storage', oldPath) 126 logger.debug('Removing %s because it\'s now on object storage', oldPath)
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index a91c2ef80..0d9e80cb8 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -1,6 +1,6 @@
1import { Job } from 'bull' 1import { Job } from 'bull'
2import { copy, stat } from 'fs-extra' 2import { copy, stat } from 'fs-extra'
3import { getLowercaseExtension } from '@server/helpers/core-utils' 3import { getLowercaseExtension } from '@shared/core-utils'
4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' 4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
5import { CONFIG } from '@server/initializers/config' 5import { CONFIG } from '@server/initializers/config'
6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 4ce1a6c30..2f74e9fbd 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -1,6 +1,5 @@
1import { Job } from 'bull' 1import { Job } from 'bull'
2import { move, remove, stat } from 'fs-extra' 2import { move, remove, stat } from 'fs-extra'
3import { getLowercaseExtension } from '@server/helpers/core-utils'
4import { retryTransactionWrapper } from '@server/helpers/database-utils' 3import { retryTransactionWrapper } from '@server/helpers/database-utils'
5import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' 4import { YoutubeDLWrapper } from '@server/helpers/youtube-dl'
6import { isPostImportVideoAccepted } from '@server/lib/moderation' 5import { isPostImportVideoAccepted } from '@server/lib/moderation'
@@ -13,17 +12,20 @@ import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state' 12import { buildNextVideoState } from '@server/lib/video-state'
14import { ThumbnailModel } from '@server/models/video/thumbnail' 13import { ThumbnailModel } from '@server/models/video/thumbnail'
15import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 14import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
15import { getLowercaseExtension } from '@shared/core-utils'
16import { isAudioFile } from '@shared/extra-utils'
16import { 17import {
18 ThumbnailType,
17 VideoImportPayload, 19 VideoImportPayload,
20 VideoImportState,
18 VideoImportTorrentPayload, 21 VideoImportTorrentPayload,
19 VideoImportTorrentPayloadType, 22 VideoImportTorrentPayloadType,
20 VideoImportYoutubeDLPayload, 23 VideoImportYoutubeDLPayload,
21 VideoImportYoutubeDLPayloadType, 24 VideoImportYoutubeDLPayloadType,
25 VideoResolution,
22 VideoState 26 VideoState
23} from '../../../../shared' 27} from '@shared/models'
24import { VideoImportState } from '../../../../shared/models/videos' 28import { ffprobePromise, getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils'
25import { ThumbnailType } from '../../../../shared/models/videos/thumbnail.type'
26import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils'
27import { logger } from '../../../helpers/logger' 29import { logger } from '../../../helpers/logger'
28import { getSecureTorrentName } from '../../../helpers/utils' 30import { getSecureTorrentName } from '../../../helpers/utils'
29import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' 31import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent'
@@ -114,9 +116,14 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
114 throw new Error('The user video quota is exceeded with this video to import.') 116 throw new Error('The user video quota is exceeded with this video to import.')
115 } 117 }
116 118
117 const { resolution } = await getVideoFileResolution(tempVideoPath) 119 const probe = await ffprobePromise(tempVideoPath)
118 const fps = await getVideoFileFPS(tempVideoPath) 120
119 const duration = await getDurationFromVideoFile(tempVideoPath) 121 const { resolution } = await isAudioFile(tempVideoPath, probe)
122 ? { resolution: VideoResolution.H_NOVIDEO }
123 : await getVideoFileResolution(tempVideoPath)
124
125 const fps = await getVideoFileFPS(tempVideoPath, probe)
126 const duration = await getDurationFromVideoFile(tempVideoPath, probe)
120 127
121 // Prepare video file object for creation in database 128 // Prepare video file object for creation in database
122 const fileExt = getLowercaseExtension(tempVideoPath) 129 const fileExt = getLowercaseExtension(tempVideoPath)
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 0edcdcba3..ef3abcbcd 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -12,7 +12,7 @@ import {
12 NewResolutionTranscodingPayload, 12 NewResolutionTranscodingPayload,
13 OptimizeTranscodingPayload, 13 OptimizeTranscodingPayload,
14 VideoTranscodingPayload 14 VideoTranscodingPayload
15} from '../../../../shared' 15} from '@shared/models'
16import { retryTransactionWrapper } from '../../../helpers/database-utils' 16import { retryTransactionWrapper } from '../../../helpers/database-utils'
17import { computeLowerResolutionsToTranscode } from '../../../helpers/ffprobe-utils' 17import { computeLowerResolutionsToTranscode } from '../../../helpers/ffprobe-utils'
18import { logger, loggerTagsFactory } from '../../../helpers/logger' 18import { logger, loggerTagsFactory } from '../../../helpers/logger'