aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/lib/hls.ts6
-rw-r--r--server/lib/job-queue/job-queue.ts8
-rw-r--r--server/lib/video-state.ts38
-rw-r--r--server/models/video/video.ts9
-rw-r--r--server/tests/cli/create-move-video-storage-job.ts114
-rw-r--r--server/tests/cli/index.ts1
6 files changed, 155 insertions, 21 deletions
diff --git a/server/lib/hls.ts b/server/lib/hls.ts
index 0828a2d0f..8160e7949 100644
--- a/server/lib/hls.ts
+++ b/server/lib/hls.ts
@@ -1,7 +1,7 @@
1import { close, ensureDir, move, open, outputJSON, read, readFile, remove, stat, writeFile } from 'fs-extra' 1import { close, ensureDir, move, open, outputJSON, read, readFile, remove, stat, writeFile } from 'fs-extra'
2import { flatten, uniq } from 'lodash' 2import { flatten, uniq } from 'lodash'
3import { basename, dirname, join } from 'path' 3import { basename, dirname, join } from 'path'
4import { MStreamingPlaylistFilesVideo, MVideoWithFile } from '@server/types/models' 4import { MStreamingPlaylistFilesVideo, MVideo, MVideoUUID } from '@server/types/models'
5import { sha256 } from '../helpers/core-utils' 5import { sha256 } from '../helpers/core-utils'
6import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamSize } from '../helpers/ffprobe-utils' 6import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamSize } from '../helpers/ffprobe-utils'
7import { logger } from '../helpers/logger' 7import { logger } from '../helpers/logger'
@@ -31,7 +31,7 @@ async function updateStreamingPlaylistsInfohashesIfNeeded () {
31 } 31 }
32} 32}
33 33
34async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { 34async function updateMasterHLSPlaylist (video: MVideo, playlist: MStreamingPlaylistFilesVideo) {
35 const masterPlaylists: string[] = [ '#EXTM3U', '#EXT-X-VERSION:3' ] 35 const masterPlaylists: string[] = [ '#EXTM3U', '#EXT-X-VERSION:3' ]
36 36
37 for (const file of playlist.VideoFiles) { 37 for (const file of playlist.VideoFiles) {
@@ -63,7 +63,7 @@ async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStream
63 }) 63 })
64} 64}
65 65
66async function updateSha256VODSegments (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { 66async function updateSha256VODSegments (video: MVideoUUID, playlist: MStreamingPlaylistFilesVideo) {
67 const json: { [filename: string]: { [range: string]: string } } = {} 67 const json: { [filename: string]: { [range: string]: string } } = {}
68 68
69 // For all the resolutions available for this video 69 // For all the resolutions available for this video
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 53d6b6a9c..0eab720d9 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -108,7 +108,7 @@ class JobQueue {
108 private constructor () { 108 private constructor () {
109 } 109 }
110 110
111 init () { 111 init (produceOnly = false) {
112 // Already initialized 112 // Already initialized
113 if (this.initialized === true) return 113 if (this.initialized === true) return
114 this.initialized = true 114 this.initialized = true
@@ -124,6 +124,12 @@ class JobQueue {
124 124
125 for (const handlerName of (Object.keys(handlers) as JobType[])) { 125 for (const handlerName of (Object.keys(handlers) as JobType[])) {
126 const queue = new Bull(handlerName, queueOptions) 126 const queue = new Bull(handlerName, queueOptions)
127
128 if (produceOnly) {
129 queue.pause(true)
130 .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err }))
131 }
132
127 const handler = handlers[handlerName] 133 const handler = handlers[handlerName]
128 134
129 queue.process(this.getJobConcurrency(handlerName), handler) 135 queue.process(this.getJobConcurrency(handlerName), handler)
diff --git a/server/lib/video-state.ts b/server/lib/video-state.ts
index 9352a67d1..d5bbbec43 100644
--- a/server/lib/video-state.ts
+++ b/server/lib/video-state.ts
@@ -57,10 +57,33 @@ function moveToNextState (video: MVideoUUID, isNewVideo = true) {
57 }) 57 })
58} 58}
59 59
60async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) {
61 const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction)
62 const pendingTranscode = videoJobInfo?.pendingTranscode || 0
63
64 // We want to wait all transcoding jobs before moving the video on an external storage
65 if (pendingTranscode !== 0) return false
66
67 await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction)
68
69 logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] })
70
71 try {
72 await addMoveToObjectStorageJob(video, isNewVideo)
73
74 return true
75 } catch (err) {
76 logger.error('Cannot add move to object storage job', { err })
77
78 return false
79 }
80}
81
60// --------------------------------------------------------------------------- 82// ---------------------------------------------------------------------------
61 83
62export { 84export {
63 buildNextVideoState, 85 buildNextVideoState,
86 moveToExternalStorageState,
64 moveToNextState 87 moveToNextState
65} 88}
66 89
@@ -82,18 +105,3 @@ async function moveToPublishedState (video: MVideoFullLight, isNewVideo: boolean
82 Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(video) 105 Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(video)
83 } 106 }
84} 107}
85
86async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) {
87 const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction)
88 const pendingTranscode = videoJobInfo?.pendingTranscode || 0
89
90 // We want to wait all transcoding jobs before moving the video on an external storage
91 if (pendingTranscode !== 0) return
92
93 await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction)
94
95 logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] })
96
97 addMoveToObjectStorageJob(video, isNewVideo)
98 .catch(err => logger.error('Cannot add move to object storage job', { err }))
99}
diff --git a/server/models/video/video.ts b/server/models/video/video.ts
index aef4fd20a..3eed1b58d 100644
--- a/server/models/video/video.ts
+++ b/server/models/video/video.ts
@@ -805,14 +805,17 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> {
805 await Promise.all(tasks) 805 await Promise.all(tasks)
806 } 806 }
807 807
808 static listLocal (): Promise<MVideo[]> { 808 static listLocalIds (): Promise<number[]> {
809 const query = { 809 const query = {
810 attributes: [ 'id' ],
811 raw: true,
810 where: { 812 where: {
811 remote: false 813 remote: false
812 } 814 }
813 } 815 }
814 816
815 return VideoModel.findAll(query) 817 return VideoModel.findAll(query)
818 .then(rows => rows.map(r => r.id))
816 } 819 }
817 820
818 static listAllAndSharedByActorForOutbox (actorId: number, start: number, count: number) { 821 static listAllAndSharedByActorForOutbox (actorId: number, start: number, count: number) {
@@ -1674,6 +1677,8 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> {
1674 if (!this.VideoStreamingPlaylists) return undefined 1677 if (!this.VideoStreamingPlaylists) return undefined
1675 1678
1676 const playlist = this.VideoStreamingPlaylists.find(p => p.type === VideoStreamingPlaylistType.HLS) 1679 const playlist = this.VideoStreamingPlaylists.find(p => p.type === VideoStreamingPlaylistType.HLS)
1680 if (!playlist) return undefined
1681
1677 playlist.Video = this 1682 playlist.Video = this
1678 1683
1679 return playlist 1684 return playlist
@@ -1785,7 +1790,7 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> {
1785 await this.save({ transaction }) 1790 await this.save({ transaction })
1786 } 1791 }
1787 1792
1788 getBandwidthBits (videoFile: MVideoFile) { 1793 getBandwidthBits (this: MVideo, videoFile: MVideoFile) {
1789 return Math.ceil((videoFile.size * 8) / this.duration) 1794 return Math.ceil((videoFile.size * 8) / this.duration)
1790 } 1795 }
1791 1796
diff --git a/server/tests/cli/create-move-video-storage-job.ts b/server/tests/cli/create-move-video-storage-job.ts
new file mode 100644
index 000000000..b598c8359
--- /dev/null
+++ b/server/tests/cli/create-move-video-storage-job.ts
@@ -0,0 +1,114 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import 'mocha'
4
5import {
6 areObjectStorageTestsDisabled,
7 cleanupTests,
8 createMultipleServers,
9 doubleFollow,
10 expectStartWith,
11 makeRawRequest,
12 ObjectStorageCommand,
13 PeerTubeServer,
14 setAccessTokensToServers,
15 waitJobs
16} from '@shared/extra-utils'
17import { HttpStatusCode, VideoDetails } from '@shared/models'
18
19async function checkFiles (origin: PeerTubeServer, video: VideoDetails, inObjectStorage: boolean) {
20 for (const file of video.files) {
21 const start = inObjectStorage
22 ? ObjectStorageCommand.getWebTorrentBaseUrl()
23 : origin.url
24
25 expectStartWith(file.fileUrl, start)
26
27 await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
28 }
29
30 const start = inObjectStorage
31 ? ObjectStorageCommand.getPlaylistBaseUrl()
32 : origin.url
33
34 const hls = video.streamingPlaylists[0]
35 expectStartWith(hls.playlistUrl, start)
36 expectStartWith(hls.segmentsSha256Url, start)
37
38 for (const file of hls.files) {
39 expectStartWith(file.fileUrl, start)
40
41 await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
42 }
43}
44
45describe('Test create move video storage job', function () {
46 if (areObjectStorageTestsDisabled()) return
47
48 let servers: PeerTubeServer[] = []
49 const uuids: string[] = []
50
51 before(async function () {
52 this.timeout(360000)
53
54 // Run server 2 to have transcoding enabled
55 servers = await createMultipleServers(2)
56 await setAccessTokensToServers(servers)
57
58 await doubleFollow(servers[0], servers[1])
59
60 await ObjectStorageCommand.prepareDefaultBuckets()
61
62 await servers[0].config.enableTranscoding()
63
64 for (let i = 0; i < 3; i++) {
65 const { uuid } = await servers[0].videos.upload({ attributes: { name: 'video' + i } })
66 uuids.push(uuid)
67 }
68
69 await waitJobs(servers)
70
71 await servers[0].kill()
72 await servers[0].run(ObjectStorageCommand.getDefaultConfig())
73 })
74
75 it('Should move only one file', async function () {
76 this.timeout(120000)
77
78 const command = `npm run create-move-video-storage-job -- --to-object-storage -v ${uuids[1]}`
79 await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig())
80 await waitJobs(servers)
81
82 for (const server of servers) {
83 const video = await server.videos.get({ id: uuids[1] })
84
85 await checkFiles(servers[0], video, true)
86
87 for (const id of [ uuids[0], uuids[2] ]) {
88 const video = await server.videos.get({ id })
89
90 await checkFiles(servers[0], video, false)
91 }
92 }
93 })
94
95 it('Should move all files', async function () {
96 this.timeout(120000)
97
98 const command = `npm run create-move-video-storage-job -- --to-object-storage --all-videos`
99 await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig())
100 await waitJobs(servers)
101
102 for (const server of servers) {
103 for (const id of [ uuids[0], uuids[2] ]) {
104 const video = await server.videos.get({ id })
105
106 await checkFiles(servers[0], video, true)
107 }
108 }
109 })
110
111 after(async function () {
112 await cleanupTests(servers)
113 })
114})
diff --git a/server/tests/cli/index.ts b/server/tests/cli/index.ts
index c6dd0581a..6e0cbe58b 100644
--- a/server/tests/cli/index.ts
+++ b/server/tests/cli/index.ts
@@ -1,6 +1,7 @@
1// Order of the tests we want to execute 1// Order of the tests we want to execute
2import './create-import-video-file-job' 2import './create-import-video-file-job'
3import './create-transcoding-job' 3import './create-transcoding-job'
4import './create-move-video-storage-job'
4import './peertube' 5import './peertube'
5import './plugins' 6import './plugins'
6import './print-transcode-command' 7import './print-transcode-command'