diff options
Diffstat (limited to 'server/lib/job-queue')
5 files changed, 71 insertions, 56 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index d5e4508fe..509dd1cb5 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts | |||
@@ -8,36 +8,35 @@ import { | |||
8 | } from '@server/helpers/custom-validators/activitypub/activity' | 8 | } from '@server/helpers/custom-validators/activitypub/activity' |
9 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' | 9 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' |
10 | import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' | 10 | import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' |
11 | import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' | 11 | import { AP_CLEANER } from '@server/initializers/constants' |
12 | import { Redis } from '@server/lib/redis' | ||
12 | import { VideoModel } from '@server/models/video/video' | 13 | import { VideoModel } from '@server/models/video/video' |
13 | import { VideoCommentModel } from '@server/models/video/video-comment' | 14 | import { VideoCommentModel } from '@server/models/video/video-comment' |
14 | import { VideoShareModel } from '@server/models/video/video-share' | 15 | import { VideoShareModel } from '@server/models/video/video-share' |
15 | import { HttpStatusCode } from '@shared/models' | 16 | import { HttpStatusCode } from '@shared/models' |
16 | import { logger } from '../../../helpers/logger' | 17 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
17 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' | 18 | import { AccountVideoRateModel } from '../../../models/account/account-video-rate' |
18 | 19 | ||
20 | const lTags = loggerTagsFactory('ap-cleaner') | ||
21 | |||
19 | // Job to clean remote interactions off local videos | 22 | // Job to clean remote interactions off local videos |
20 | 23 | ||
21 | async function processActivityPubCleaner (_job: Job) { | 24 | async function processActivityPubCleaner (_job: Job) { |
22 | logger.info('Processing ActivityPub cleaner.') | 25 | logger.info('Processing ActivityPub cleaner.', lTags()) |
23 | 26 | ||
24 | { | 27 | { |
25 | const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() | 28 | const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() |
26 | const { bodyValidator, deleter, updater } = rateOptionsFactory() | 29 | const { bodyValidator, deleter, updater } = rateOptionsFactory() |
27 | 30 | ||
28 | await map(rateUrls, async rateUrl => { | 31 | await map(rateUrls, async rateUrl => { |
29 | try { | 32 | const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter }) |
30 | const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) | ||
31 | 33 | ||
32 | if (result?.status === 'deleted') { | 34 | if (result?.status === 'deleted') { |
33 | const { videoId, type } = result.data | 35 | const { videoId, type } = result.data |
34 | 36 | ||
35 | await VideoModel.updateRatesOf(videoId, type, undefined) | 37 | await VideoModel.updateRatesOf(videoId, type, undefined) |
36 | } | ||
37 | } catch (err) { | ||
38 | logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) | ||
39 | } | 38 | } |
40 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | 39 | }, { concurrency: AP_CLEANER.CONCURRENCY }) |
41 | } | 40 | } |
42 | 41 | ||
43 | { | 42 | { |
@@ -45,12 +44,8 @@ async function processActivityPubCleaner (_job: Job) { | |||
45 | const { bodyValidator, deleter, updater } = shareOptionsFactory() | 44 | const { bodyValidator, deleter, updater } = shareOptionsFactory() |
46 | 45 | ||
47 | await map(shareUrls, async shareUrl => { | 46 | await map(shareUrls, async shareUrl => { |
48 | try { | 47 | await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter }) |
49 | await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) | 48 | }, { concurrency: AP_CLEANER.CONCURRENCY }) |
50 | } catch (err) { | ||
51 | logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) | ||
52 | } | ||
53 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | ||
54 | } | 49 | } |
55 | 50 | ||
56 | { | 51 | { |
@@ -58,12 +53,8 @@ async function processActivityPubCleaner (_job: Job) { | |||
58 | const { bodyValidator, deleter, updater } = commentOptionsFactory() | 53 | const { bodyValidator, deleter, updater } = commentOptionsFactory() |
59 | 54 | ||
60 | await map(commentUrls, async commentUrl => { | 55 | await map(commentUrls, async commentUrl => { |
61 | try { | 56 | await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter }) |
62 | await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) | 57 | }, { concurrency: AP_CLEANER.CONCURRENCY }) |
63 | } catch (err) { | ||
64 | logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) | ||
65 | } | ||
66 | }, { concurrency: AP_CLEANER_CONCURRENCY }) | ||
67 | } | 58 | } |
68 | } | 59 | } |
69 | 60 | ||
@@ -75,14 +66,16 @@ export { | |||
75 | 66 | ||
76 | // --------------------------------------------------------------------------- | 67 | // --------------------------------------------------------------------------- |
77 | 68 | ||
78 | async function updateObjectIfNeeded <T> ( | 69 | async function updateObjectIfNeeded <T> (options: { |
79 | url: string, | 70 | url: string |
80 | bodyValidator: (body: any) => boolean, | 71 | bodyValidator: (body: any) => boolean |
81 | updater: (url: string, newUrl: string) => Promise<T>, | 72 | updater: (url: string, newUrl: string) => Promise<T> |
82 | deleter: (url: string) => Promise<T> | 73 | deleter: (url: string) => Promise<T> } |
83 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { | 74 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { |
75 | const { url, bodyValidator, updater, deleter } = options | ||
76 | |||
84 | const on404OrTombstone = async () => { | 77 | const on404OrTombstone = async () => { |
85 | logger.info('Removing remote AP object %s.', url) | 78 | logger.info('Removing remote AP object %s.', url, lTags(url)) |
86 | const data = await deleter(url) | 79 | const data = await deleter(url) |
87 | 80 | ||
88 | return { status: 'deleted' as 'deleted', data } | 81 | return { status: 'deleted' as 'deleted', data } |
@@ -104,7 +97,7 @@ async function updateObjectIfNeeded <T> ( | |||
104 | throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) | 97 | throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) |
105 | } | 98 | } |
106 | 99 | ||
107 | logger.info('Updating remote AP object %s.', url) | 100 | logger.info('Updating remote AP object %s.', url, lTags(url)) |
108 | const data = await updater(url, newUrl) | 101 | const data = await updater(url, newUrl) |
109 | 102 | ||
110 | return { status: 'updated', data } | 103 | return { status: 'updated', data } |
@@ -117,7 +110,15 @@ async function updateObjectIfNeeded <T> ( | |||
117 | return on404OrTombstone() | 110 | return on404OrTombstone() |
118 | } | 111 | } |
119 | 112 | ||
120 | throw err | 113 | logger.debug('Remote AP object %s is unavailable.', url, lTags(url)) |
114 | |||
115 | const unavailability = await Redis.Instance.addAPUnavailability(url) | ||
116 | if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) { | ||
117 | logger.info('Removing unavailable AP resource %s.', url, lTags(url)) | ||
118 | return on404OrTombstone() | ||
119 | } | ||
120 | |||
121 | return null | ||
121 | } | 122 | } |
122 | } | 123 | } |
123 | 124 | ||
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index 54a7c566b..9e39322a8 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts | |||
@@ -2,16 +2,16 @@ import { Job } from 'bull' | |||
2 | import { remove } from 'fs-extra' | 2 | import { remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { logger } from '@server/helpers/logger' | 4 | import { logger } from '@server/helpers/logger' |
5 | import { updateTorrentUrls } from '@server/helpers/webtorrent' | 5 | import { updateTorrentMetadata } from '@server/helpers/webtorrent' |
6 | import { CONFIG } from '@server/initializers/config' | 6 | import { CONFIG } from '@server/initializers/config' |
7 | import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' | 7 | import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' |
8 | import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage' | 8 | import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage' |
9 | import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' | 9 | import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' |
10 | import { moveToNextState } from '@server/lib/video-state' | 10 | import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state' |
11 | import { VideoModel } from '@server/models/video/video' | 11 | import { VideoModel } from '@server/models/video/video' |
12 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | 12 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' |
13 | import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' | 13 | import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' |
14 | import { MoveObjectStoragePayload, VideoStorage } from '../../../../shared' | 14 | import { MoveObjectStoragePayload, VideoStorage } from '@shared/models' |
15 | 15 | ||
16 | export async function processMoveToObjectStorage (job: Job) { | 16 | export async function processMoveToObjectStorage (job: Job) { |
17 | const payload = job.data as MoveObjectStoragePayload | 17 | const payload = job.data as MoveObjectStoragePayload |
@@ -24,18 +24,25 @@ export async function processMoveToObjectStorage (job: Job) { | |||
24 | return undefined | 24 | return undefined |
25 | } | 25 | } |
26 | 26 | ||
27 | if (video.VideoFiles) { | 27 | try { |
28 | await moveWebTorrentFiles(video) | 28 | if (video.VideoFiles) { |
29 | } | 29 | await moveWebTorrentFiles(video) |
30 | } | ||
30 | 31 | ||
31 | if (video.VideoStreamingPlaylists) { | 32 | if (video.VideoStreamingPlaylists) { |
32 | await moveHLSFiles(video) | 33 | await moveHLSFiles(video) |
33 | } | 34 | } |
35 | |||
36 | const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') | ||
37 | if (pendingMove === 0) { | ||
38 | logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id) | ||
39 | await doAfterLastJob(video, payload.isNewVideo) | ||
40 | } | ||
41 | } catch (err) { | ||
42 | logger.error('Cannot move video %s to object storage.', video.url, { err }) | ||
34 | 43 | ||
35 | const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') | 44 | await moveToFailedMoveToObjectStorageState(video) |
36 | if (pendingMove === 0) { | 45 | await VideoJobInfoModel.abortAllTasks(video.uuid, 'pendingMove') |
37 | logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id) | ||
38 | await doAfterLastJob(video, payload.isNewVideo) | ||
39 | } | 46 | } |
40 | 47 | ||
41 | return payload.videoUUID | 48 | return payload.videoUUID |
@@ -113,7 +120,7 @@ async function onFileMoved (options: { | |||
113 | file.fileUrl = fileUrl | 120 | file.fileUrl = fileUrl |
114 | file.storage = VideoStorage.OBJECT_STORAGE | 121 | file.storage = VideoStorage.OBJECT_STORAGE |
115 | 122 | ||
116 | await updateTorrentUrls(videoOrPlaylist, file) | 123 | await updateTorrentMetadata(videoOrPlaylist, file) |
117 | await file.save() | 124 | await file.save() |
118 | 125 | ||
119 | logger.debug('Removing %s because it\'s now on object storage', oldPath) | 126 | logger.debug('Removing %s because it\'s now on object storage', oldPath) |
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index a91c2ef80..0d9e80cb8 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -1,6 +1,6 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bull' |
2 | import { copy, stat } from 'fs-extra' | 2 | import { copy, stat } from 'fs-extra' |
3 | import { getLowercaseExtension } from '@server/helpers/core-utils' | 3 | import { getLowercaseExtension } from '@shared/core-utils' |
4 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | 4 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' |
5 | import { CONFIG } from '@server/initializers/config' | 5 | import { CONFIG } from '@server/initializers/config' |
6 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | 6 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4ce1a6c30..2f74e9fbd 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -1,6 +1,5 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bull' |
2 | import { move, remove, stat } from 'fs-extra' | 2 | import { move, remove, stat } from 'fs-extra' |
3 | import { getLowercaseExtension } from '@server/helpers/core-utils' | ||
4 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | 3 | import { retryTransactionWrapper } from '@server/helpers/database-utils' |
5 | import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' | 4 | import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' |
6 | import { isPostImportVideoAccepted } from '@server/lib/moderation' | 5 | import { isPostImportVideoAccepted } from '@server/lib/moderation' |
@@ -13,17 +12,20 @@ import { VideoPathManager } from '@server/lib/video-path-manager' | |||
13 | import { buildNextVideoState } from '@server/lib/video-state' | 12 | import { buildNextVideoState } from '@server/lib/video-state' |
14 | import { ThumbnailModel } from '@server/models/video/thumbnail' | 13 | import { ThumbnailModel } from '@server/models/video/thumbnail' |
15 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | 14 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' |
15 | import { getLowercaseExtension } from '@shared/core-utils' | ||
16 | import { isAudioFile } from '@shared/extra-utils' | ||
16 | import { | 17 | import { |
18 | ThumbnailType, | ||
17 | VideoImportPayload, | 19 | VideoImportPayload, |
20 | VideoImportState, | ||
18 | VideoImportTorrentPayload, | 21 | VideoImportTorrentPayload, |
19 | VideoImportTorrentPayloadType, | 22 | VideoImportTorrentPayloadType, |
20 | VideoImportYoutubeDLPayload, | 23 | VideoImportYoutubeDLPayload, |
21 | VideoImportYoutubeDLPayloadType, | 24 | VideoImportYoutubeDLPayloadType, |
25 | VideoResolution, | ||
22 | VideoState | 26 | VideoState |
23 | } from '../../../../shared' | 27 | } from '@shared/models' |
24 | import { VideoImportState } from '../../../../shared/models/videos' | 28 | import { ffprobePromise, getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' |
25 | import { ThumbnailType } from '../../../../shared/models/videos/thumbnail.type' | ||
26 | import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' | ||
27 | import { logger } from '../../../helpers/logger' | 29 | import { logger } from '../../../helpers/logger' |
28 | import { getSecureTorrentName } from '../../../helpers/utils' | 30 | import { getSecureTorrentName } from '../../../helpers/utils' |
29 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' | 31 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' |
@@ -114,9 +116,14 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
114 | throw new Error('The user video quota is exceeded with this video to import.') | 116 | throw new Error('The user video quota is exceeded with this video to import.') |
115 | } | 117 | } |
116 | 118 | ||
117 | const { resolution } = await getVideoFileResolution(tempVideoPath) | 119 | const probe = await ffprobePromise(tempVideoPath) |
118 | const fps = await getVideoFileFPS(tempVideoPath) | 120 | |
119 | const duration = await getDurationFromVideoFile(tempVideoPath) | 121 | const { resolution } = await isAudioFile(tempVideoPath, probe) |
122 | ? { resolution: VideoResolution.H_NOVIDEO } | ||
123 | : await getVideoFileResolution(tempVideoPath) | ||
124 | |||
125 | const fps = await getVideoFileFPS(tempVideoPath, probe) | ||
126 | const duration = await getDurationFromVideoFile(tempVideoPath, probe) | ||
120 | 127 | ||
121 | // Prepare video file object for creation in database | 128 | // Prepare video file object for creation in database |
122 | const fileExt = getLowercaseExtension(tempVideoPath) | 129 | const fileExt = getLowercaseExtension(tempVideoPath) |
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 0edcdcba3..ef3abcbcd 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -12,7 +12,7 @@ import { | |||
12 | NewResolutionTranscodingPayload, | 12 | NewResolutionTranscodingPayload, |
13 | OptimizeTranscodingPayload, | 13 | OptimizeTranscodingPayload, |
14 | VideoTranscodingPayload | 14 | VideoTranscodingPayload |
15 | } from '../../../../shared' | 15 | } from '@shared/models' |
16 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 16 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
17 | import { computeLowerResolutionsToTranscode } from '../../../helpers/ffprobe-utils' | 17 | import { computeLowerResolutionsToTranscode } from '../../../helpers/ffprobe-utils' |
18 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 18 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |