diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/lib/hls.ts | 6 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 8 | ||||
-rw-r--r-- | server/lib/video-state.ts | 38 | ||||
-rw-r--r-- | server/models/video/video.ts | 9 | ||||
-rw-r--r-- | server/tests/cli/create-move-video-storage-job.ts | 114 | ||||
-rw-r--r-- | server/tests/cli/index.ts | 1 |
6 files changed, 155 insertions, 21 deletions
diff --git a/server/lib/hls.ts b/server/lib/hls.ts index 0828a2d0f..8160e7949 100644 --- a/server/lib/hls.ts +++ b/server/lib/hls.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { close, ensureDir, move, open, outputJSON, read, readFile, remove, stat, writeFile } from 'fs-extra' | 1 | import { close, ensureDir, move, open, outputJSON, read, readFile, remove, stat, writeFile } from 'fs-extra' |
2 | import { flatten, uniq } from 'lodash' | 2 | import { flatten, uniq } from 'lodash' |
3 | import { basename, dirname, join } from 'path' | 3 | import { basename, dirname, join } from 'path' |
4 | import { MStreamingPlaylistFilesVideo, MVideoWithFile } from '@server/types/models' | 4 | import { MStreamingPlaylistFilesVideo, MVideo, MVideoUUID } from '@server/types/models' |
5 | import { sha256 } from '../helpers/core-utils' | 5 | import { sha256 } from '../helpers/core-utils' |
6 | import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamSize } from '../helpers/ffprobe-utils' | 6 | import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamSize } from '../helpers/ffprobe-utils' |
7 | import { logger } from '../helpers/logger' | 7 | import { logger } from '../helpers/logger' |
@@ -31,7 +31,7 @@ async function updateStreamingPlaylistsInfohashesIfNeeded () { | |||
31 | } | 31 | } |
32 | } | 32 | } |
33 | 33 | ||
34 | async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { | 34 | async function updateMasterHLSPlaylist (video: MVideo, playlist: MStreamingPlaylistFilesVideo) { |
35 | const masterPlaylists: string[] = [ '#EXTM3U', '#EXT-X-VERSION:3' ] | 35 | const masterPlaylists: string[] = [ '#EXTM3U', '#EXT-X-VERSION:3' ] |
36 | 36 | ||
37 | for (const file of playlist.VideoFiles) { | 37 | for (const file of playlist.VideoFiles) { |
@@ -63,7 +63,7 @@ async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStream | |||
63 | }) | 63 | }) |
64 | } | 64 | } |
65 | 65 | ||
66 | async function updateSha256VODSegments (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { | 66 | async function updateSha256VODSegments (video: MVideoUUID, playlist: MStreamingPlaylistFilesVideo) { |
67 | const json: { [filename: string]: { [range: string]: string } } = {} | 67 | const json: { [filename: string]: { [range: string]: string } } = {} |
68 | 68 | ||
69 | // For all the resolutions available for this video | 69 | // For all the resolutions available for this video |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 53d6b6a9c..0eab720d9 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -108,7 +108,7 @@ class JobQueue { | |||
108 | private constructor () { | 108 | private constructor () { |
109 | } | 109 | } |
110 | 110 | ||
111 | init () { | 111 | init (produceOnly = false) { |
112 | // Already initialized | 112 | // Already initialized |
113 | if (this.initialized === true) return | 113 | if (this.initialized === true) return |
114 | this.initialized = true | 114 | this.initialized = true |
@@ -124,6 +124,12 @@ class JobQueue { | |||
124 | 124 | ||
125 | for (const handlerName of (Object.keys(handlers) as JobType[])) { | 125 | for (const handlerName of (Object.keys(handlers) as JobType[])) { |
126 | const queue = new Bull(handlerName, queueOptions) | 126 | const queue = new Bull(handlerName, queueOptions) |
127 | |||
128 | if (produceOnly) { | ||
129 | queue.pause(true) | ||
130 | .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) | ||
131 | } | ||
132 | |||
127 | const handler = handlers[handlerName] | 133 | const handler = handlers[handlerName] |
128 | 134 | ||
129 | queue.process(this.getJobConcurrency(handlerName), handler) | 135 | queue.process(this.getJobConcurrency(handlerName), handler) |
diff --git a/server/lib/video-state.ts b/server/lib/video-state.ts index 9352a67d1..d5bbbec43 100644 --- a/server/lib/video-state.ts +++ b/server/lib/video-state.ts | |||
@@ -57,10 +57,33 @@ function moveToNextState (video: MVideoUUID, isNewVideo = true) { | |||
57 | }) | 57 | }) |
58 | } | 58 | } |
59 | 59 | ||
60 | async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) { | ||
61 | const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction) | ||
62 | const pendingTranscode = videoJobInfo?.pendingTranscode || 0 | ||
63 | |||
64 | // We want to wait all transcoding jobs before moving the video on an external storage | ||
65 | if (pendingTranscode !== 0) return false | ||
66 | |||
67 | await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction) | ||
68 | |||
69 | logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) | ||
70 | |||
71 | try { | ||
72 | await addMoveToObjectStorageJob(video, isNewVideo) | ||
73 | |||
74 | return true | ||
75 | } catch (err) { | ||
76 | logger.error('Cannot add move to object storage job', { err }) | ||
77 | |||
78 | return false | ||
79 | } | ||
80 | } | ||
81 | |||
60 | // --------------------------------------------------------------------------- | 82 | // --------------------------------------------------------------------------- |
61 | 83 | ||
62 | export { | 84 | export { |
63 | buildNextVideoState, | 85 | buildNextVideoState, |
86 | moveToExternalStorageState, | ||
64 | moveToNextState | 87 | moveToNextState |
65 | } | 88 | } |
66 | 89 | ||
@@ -82,18 +105,3 @@ async function moveToPublishedState (video: MVideoFullLight, isNewVideo: boolean | |||
82 | Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(video) | 105 | Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(video) |
83 | } | 106 | } |
84 | } | 107 | } |
85 | |||
86 | async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) { | ||
87 | const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction) | ||
88 | const pendingTranscode = videoJobInfo?.pendingTranscode || 0 | ||
89 | |||
90 | // We want to wait all transcoding jobs before moving the video on an external storage | ||
91 | if (pendingTranscode !== 0) return | ||
92 | |||
93 | await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction) | ||
94 | |||
95 | logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) | ||
96 | |||
97 | addMoveToObjectStorageJob(video, isNewVideo) | ||
98 | .catch(err => logger.error('Cannot add move to object storage job', { err })) | ||
99 | } | ||
diff --git a/server/models/video/video.ts b/server/models/video/video.ts index aef4fd20a..3eed1b58d 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts | |||
@@ -805,14 +805,17 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> { | |||
805 | await Promise.all(tasks) | 805 | await Promise.all(tasks) |
806 | } | 806 | } |
807 | 807 | ||
808 | static listLocal (): Promise<MVideo[]> { | 808 | static listLocalIds (): Promise<number[]> { |
809 | const query = { | 809 | const query = { |
810 | attributes: [ 'id' ], | ||
811 | raw: true, | ||
810 | where: { | 812 | where: { |
811 | remote: false | 813 | remote: false |
812 | } | 814 | } |
813 | } | 815 | } |
814 | 816 | ||
815 | return VideoModel.findAll(query) | 817 | return VideoModel.findAll(query) |
818 | .then(rows => rows.map(r => r.id)) | ||
816 | } | 819 | } |
817 | 820 | ||
818 | static listAllAndSharedByActorForOutbox (actorId: number, start: number, count: number) { | 821 | static listAllAndSharedByActorForOutbox (actorId: number, start: number, count: number) { |
@@ -1674,6 +1677,8 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> { | |||
1674 | if (!this.VideoStreamingPlaylists) return undefined | 1677 | if (!this.VideoStreamingPlaylists) return undefined |
1675 | 1678 | ||
1676 | const playlist = this.VideoStreamingPlaylists.find(p => p.type === VideoStreamingPlaylistType.HLS) | 1679 | const playlist = this.VideoStreamingPlaylists.find(p => p.type === VideoStreamingPlaylistType.HLS) |
1680 | if (!playlist) return undefined | ||
1681 | |||
1677 | playlist.Video = this | 1682 | playlist.Video = this |
1678 | 1683 | ||
1679 | return playlist | 1684 | return playlist |
@@ -1785,7 +1790,7 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> { | |||
1785 | await this.save({ transaction }) | 1790 | await this.save({ transaction }) |
1786 | } | 1791 | } |
1787 | 1792 | ||
1788 | getBandwidthBits (videoFile: MVideoFile) { | 1793 | getBandwidthBits (this: MVideo, videoFile: MVideoFile) { |
1789 | return Math.ceil((videoFile.size * 8) / this.duration) | 1794 | return Math.ceil((videoFile.size * 8) / this.duration) |
1790 | } | 1795 | } |
1791 | 1796 | ||
diff --git a/server/tests/cli/create-move-video-storage-job.ts b/server/tests/cli/create-move-video-storage-job.ts new file mode 100644 index 000000000..b598c8359 --- /dev/null +++ b/server/tests/cli/create-move-video-storage-job.ts | |||
@@ -0,0 +1,114 @@ | |||
1 | /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ | ||
2 | |||
3 | import 'mocha' | ||
4 | |||
5 | import { | ||
6 | areObjectStorageTestsDisabled, | ||
7 | cleanupTests, | ||
8 | createMultipleServers, | ||
9 | doubleFollow, | ||
10 | expectStartWith, | ||
11 | makeRawRequest, | ||
12 | ObjectStorageCommand, | ||
13 | PeerTubeServer, | ||
14 | setAccessTokensToServers, | ||
15 | waitJobs | ||
16 | } from '@shared/extra-utils' | ||
17 | import { HttpStatusCode, VideoDetails } from '@shared/models' | ||
18 | |||
19 | async function checkFiles (origin: PeerTubeServer, video: VideoDetails, inObjectStorage: boolean) { | ||
20 | for (const file of video.files) { | ||
21 | const start = inObjectStorage | ||
22 | ? ObjectStorageCommand.getWebTorrentBaseUrl() | ||
23 | : origin.url | ||
24 | |||
25 | expectStartWith(file.fileUrl, start) | ||
26 | |||
27 | await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200) | ||
28 | } | ||
29 | |||
30 | const start = inObjectStorage | ||
31 | ? ObjectStorageCommand.getPlaylistBaseUrl() | ||
32 | : origin.url | ||
33 | |||
34 | const hls = video.streamingPlaylists[0] | ||
35 | expectStartWith(hls.playlistUrl, start) | ||
36 | expectStartWith(hls.segmentsSha256Url, start) | ||
37 | |||
38 | for (const file of hls.files) { | ||
39 | expectStartWith(file.fileUrl, start) | ||
40 | |||
41 | await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200) | ||
42 | } | ||
43 | } | ||
44 | |||
45 | describe('Test create move video storage job', function () { | ||
46 | if (areObjectStorageTestsDisabled()) return | ||
47 | |||
48 | let servers: PeerTubeServer[] = [] | ||
49 | const uuids: string[] = [] | ||
50 | |||
51 | before(async function () { | ||
52 | this.timeout(360000) | ||
53 | |||
54 | // Run server 2 to have transcoding enabled | ||
55 | servers = await createMultipleServers(2) | ||
56 | await setAccessTokensToServers(servers) | ||
57 | |||
58 | await doubleFollow(servers[0], servers[1]) | ||
59 | |||
60 | await ObjectStorageCommand.prepareDefaultBuckets() | ||
61 | |||
62 | await servers[0].config.enableTranscoding() | ||
63 | |||
64 | for (let i = 0; i < 3; i++) { | ||
65 | const { uuid } = await servers[0].videos.upload({ attributes: { name: 'video' + i } }) | ||
66 | uuids.push(uuid) | ||
67 | } | ||
68 | |||
69 | await waitJobs(servers) | ||
70 | |||
71 | await servers[0].kill() | ||
72 | await servers[0].run(ObjectStorageCommand.getDefaultConfig()) | ||
73 | }) | ||
74 | |||
75 | it('Should move only one file', async function () { | ||
76 | this.timeout(120000) | ||
77 | |||
78 | const command = `npm run create-move-video-storage-job -- --to-object-storage -v ${uuids[1]}` | ||
79 | await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig()) | ||
80 | await waitJobs(servers) | ||
81 | |||
82 | for (const server of servers) { | ||
83 | const video = await server.videos.get({ id: uuids[1] }) | ||
84 | |||
85 | await checkFiles(servers[0], video, true) | ||
86 | |||
87 | for (const id of [ uuids[0], uuids[2] ]) { | ||
88 | const video = await server.videos.get({ id }) | ||
89 | |||
90 | await checkFiles(servers[0], video, false) | ||
91 | } | ||
92 | } | ||
93 | }) | ||
94 | |||
95 | it('Should move all files', async function () { | ||
96 | this.timeout(120000) | ||
97 | |||
98 | const command = `npm run create-move-video-storage-job -- --to-object-storage --all-videos` | ||
99 | await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig()) | ||
100 | await waitJobs(servers) | ||
101 | |||
102 | for (const server of servers) { | ||
103 | for (const id of [ uuids[0], uuids[2] ]) { | ||
104 | const video = await server.videos.get({ id }) | ||
105 | |||
106 | await checkFiles(servers[0], video, true) | ||
107 | } | ||
108 | } | ||
109 | }) | ||
110 | |||
111 | after(async function () { | ||
112 | await cleanupTests(servers) | ||
113 | }) | ||
114 | }) | ||
diff --git a/server/tests/cli/index.ts b/server/tests/cli/index.ts index c6dd0581a..6e0cbe58b 100644 --- a/server/tests/cli/index.ts +++ b/server/tests/cli/index.ts | |||
@@ -1,6 +1,7 @@ | |||
1 | // Order of the tests we want to execute | 1 | // Order of the tests we want to execute |
2 | import './create-import-video-file-job' | 2 | import './create-import-video-file-job' |
3 | import './create-transcoding-job' | 3 | import './create-transcoding-job' |
4 | import './create-move-video-storage-job' | ||
4 | import './peertube' | 5 | import './peertube' |
5 | import './plugins' | 6 | import './plugins' |
6 | import './print-transcode-command' | 7 | import './print-transcode-command' |