From e1ab52d7ec7370a6f9f5937192d6003206af1ac0 Mon Sep 17 00:00:00 2001 From: kontrollanten <6680299+kontrollanten@users.noreply.github.com> Date: Tue, 9 Nov 2021 11:05:35 +0100 Subject: Add migrate-to-object-storage script (#4481) * add migrate-to-object-storage-script closes #4467 * add migrate-to-unique-playlist-filenames script * fix(migrate-to-unique-playlist-filenames): update master/segments256 run updateMasterHLSPlaylist and updateSha256VODSegments after file rename. * Improve move to object storage scripts * PR remarks Co-authored-by: Chocobozzz --- server/lib/hls.ts | 6 +- server/lib/job-queue/job-queue.ts | 8 +- server/lib/video-state.ts | 38 +++++--- server/models/video/video.ts | 9 +- server/tests/cli/create-move-video-storage-job.ts | 114 ++++++++++++++++++++++ server/tests/cli/index.ts | 1 + 6 files changed, 155 insertions(+), 21 deletions(-) create mode 100644 server/tests/cli/create-move-video-storage-job.ts (limited to 'server') diff --git a/server/lib/hls.ts b/server/lib/hls.ts index 0828a2d0f..8160e7949 100644 --- a/server/lib/hls.ts +++ b/server/lib/hls.ts @@ -1,7 +1,7 @@ import { close, ensureDir, move, open, outputJSON, read, readFile, remove, stat, writeFile } from 'fs-extra' import { flatten, uniq } from 'lodash' import { basename, dirname, join } from 'path' -import { MStreamingPlaylistFilesVideo, MVideoWithFile } from '@server/types/models' +import { MStreamingPlaylistFilesVideo, MVideo, MVideoUUID } from '@server/types/models' import { sha256 } from '../helpers/core-utils' import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamSize } from '../helpers/ffprobe-utils' import { logger } from '../helpers/logger' @@ -31,7 +31,7 @@ async function updateStreamingPlaylistsInfohashesIfNeeded () { } } -async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { +async function updateMasterHLSPlaylist (video: MVideo, playlist: MStreamingPlaylistFilesVideo) { const masterPlaylists: string[] = [ '#EXTM3U', '#EXT-X-VERSION:3' ] for (const file of playlist.VideoFiles) { @@ -63,7 +63,7 @@ async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStream }) } -async function updateSha256VODSegments (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { +async function updateSha256VODSegments (video: MVideoUUID, playlist: MStreamingPlaylistFilesVideo) { const json: { [filename: string]: { [range: string]: string } } = {} // For all the resolutions available for this video diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 53d6b6a9c..0eab720d9 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -108,7 +108,7 @@ class JobQueue { private constructor () { } - init () { + init (produceOnly = false) { // Already initialized if (this.initialized === true) return this.initialized = true @@ -124,6 +124,12 @@ class JobQueue { for (const handlerName of (Object.keys(handlers) as JobType[])) { const queue = new Bull(handlerName, queueOptions) + + if (produceOnly) { + queue.pause(true) + .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) + } + const handler = handlers[handlerName] queue.process(this.getJobConcurrency(handlerName), handler) diff --git a/server/lib/video-state.ts b/server/lib/video-state.ts index 9352a67d1..d5bbbec43 100644 --- a/server/lib/video-state.ts +++ b/server/lib/video-state.ts @@ -57,10 +57,33 @@ function moveToNextState (video: MVideoUUID, isNewVideo = true) { }) } +async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) { + const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction) + const pendingTranscode = videoJobInfo?.pendingTranscode || 0 + + // We want to wait all transcoding jobs before moving the video on an external storage + if (pendingTranscode !== 0) return false + + await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction) + + logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) + + try { + await addMoveToObjectStorageJob(video, isNewVideo) + + return true + } catch (err) { + logger.error('Cannot add move to object storage job', { err }) + + return false + } +} + // --------------------------------------------------------------------------- export { buildNextVideoState, + moveToExternalStorageState, moveToNextState } @@ -82,18 +105,3 @@ async function moveToPublishedState (video: MVideoFullLight, isNewVideo: boolean Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(video) } } - -async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) { - const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction) - const pendingTranscode = videoJobInfo?.pendingTranscode || 0 - - // We want to wait all transcoding jobs before moving the video on an external storage - if (pendingTranscode !== 0) return - - await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction) - - logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) - - addMoveToObjectStorageJob(video, isNewVideo) - .catch(err => logger.error('Cannot add move to object storage job', { err })) -} diff --git a/server/models/video/video.ts b/server/models/video/video.ts index aef4fd20a..3eed1b58d 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -805,14 +805,17 @@ export class VideoModel extends Model>> { await Promise.all(tasks) } - static listLocal (): Promise { + static listLocalIds (): Promise { const query = { + attributes: [ 'id' ], + raw: true, where: { remote: false } } return VideoModel.findAll(query) + .then(rows => rows.map(r => r.id)) } static listAllAndSharedByActorForOutbox (actorId: number, start: number, count: number) { @@ -1674,6 +1677,8 @@ export class VideoModel extends Model>> { if (!this.VideoStreamingPlaylists) return undefined const playlist = this.VideoStreamingPlaylists.find(p => p.type === VideoStreamingPlaylistType.HLS) + if (!playlist) return undefined + playlist.Video = this return playlist @@ -1785,7 +1790,7 @@ export class VideoModel extends Model>> { await this.save({ transaction }) } - getBandwidthBits (videoFile: MVideoFile) { + getBandwidthBits (this: MVideo, videoFile: MVideoFile) { return Math.ceil((videoFile.size * 8) / this.duration) } diff --git a/server/tests/cli/create-move-video-storage-job.ts b/server/tests/cli/create-move-video-storage-job.ts new file mode 100644 index 000000000..b598c8359 --- /dev/null +++ b/server/tests/cli/create-move-video-storage-job.ts @@ -0,0 +1,114 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import 'mocha' + +import { + areObjectStorageTestsDisabled, + cleanupTests, + createMultipleServers, + doubleFollow, + expectStartWith, + makeRawRequest, + ObjectStorageCommand, + PeerTubeServer, + setAccessTokensToServers, + waitJobs +} from '@shared/extra-utils' +import { HttpStatusCode, VideoDetails } from '@shared/models' + +async function checkFiles (origin: PeerTubeServer, video: VideoDetails, inObjectStorage: boolean) { + for (const file of video.files) { + const start = inObjectStorage + ? ObjectStorageCommand.getWebTorrentBaseUrl() + : origin.url + + expectStartWith(file.fileUrl, start) + + await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200) + } + + const start = inObjectStorage + ? ObjectStorageCommand.getPlaylistBaseUrl() + : origin.url + + const hls = video.streamingPlaylists[0] + expectStartWith(hls.playlistUrl, start) + expectStartWith(hls.segmentsSha256Url, start) + + for (const file of hls.files) { + expectStartWith(file.fileUrl, start) + + await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200) + } +} + +describe('Test create move video storage job', function () { + if (areObjectStorageTestsDisabled()) return + + let servers: PeerTubeServer[] = [] + const uuids: string[] = [] + + before(async function () { + this.timeout(360000) + + // Run server 2 to have transcoding enabled + servers = await createMultipleServers(2) + await setAccessTokensToServers(servers) + + await doubleFollow(servers[0], servers[1]) + + await ObjectStorageCommand.prepareDefaultBuckets() + + await servers[0].config.enableTranscoding() + + for (let i = 0; i < 3; i++) { + const { uuid } = await servers[0].videos.upload({ attributes: { name: 'video' + i } }) + uuids.push(uuid) + } + + await waitJobs(servers) + + await servers[0].kill() + await servers[0].run(ObjectStorageCommand.getDefaultConfig()) + }) + + it('Should move only one file', async function () { + this.timeout(120000) + + const command = `npm run create-move-video-storage-job -- --to-object-storage -v ${uuids[1]}` + await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig()) + await waitJobs(servers) + + for (const server of servers) { + const video = await server.videos.get({ id: uuids[1] }) + + await checkFiles(servers[0], video, true) + + for (const id of [ uuids[0], uuids[2] ]) { + const video = await server.videos.get({ id }) + + await checkFiles(servers[0], video, false) + } + } + }) + + it('Should move all files', async function () { + this.timeout(120000) + + const command = `npm run create-move-video-storage-job -- --to-object-storage --all-videos` + await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig()) + await waitJobs(servers) + + for (const server of servers) { + for (const id of [ uuids[0], uuids[2] ]) { + const video = await server.videos.get({ id }) + + await checkFiles(servers[0], video, true) + } + } + }) + + after(async function () { + await cleanupTests(servers) + }) +}) diff --git a/server/tests/cli/index.ts b/server/tests/cli/index.ts index c6dd0581a..6e0cbe58b 100644 --- a/server/tests/cli/index.ts +++ b/server/tests/cli/index.ts @@ -1,6 +1,7 @@ // Order of the tests we want to execute import './create-import-video-file-job' import './create-transcoding-job' +import './create-move-video-storage-job' import './peertube' import './plugins' import './print-transcode-command' -- cgit v1.2.3