"create-transcoding-job": "node ./dist/scripts/create-transcoding-job.js",
"regenerate-thumbnails": "node ./dist/scripts/regenerate-thumbnails.js",
"create-import-video-file-job": "node ./dist/scripts/create-import-video-file-job.js",
+ "create-move-video-storage-job": "node ./dist/scripts/create-move-video-storage-job.js",
"print-transcode-command": "node ./dist/scripts/print-transcode-command.js",
"test": "bash ./scripts/test.sh",
"help": "bash ./scripts/help.sh",
filePath: resolve(options.import)
}
- JobQueue.Instance.init()
+ JobQueue.Instance.init(true)
await JobQueue.Instance.createJobWithPromise({ type: 'video-file-import', payload: dataInput })
console.log('Import job for video %s created.', video.uuid)
}
--- /dev/null
+import { registerTSPaths } from '../server/helpers/register-ts-paths'
+registerTSPaths()
+
+import { program } from 'commander'
+import { VideoModel } from '@server/models/video/video'
+import { initDatabaseModels } from '@server/initializers/database'
+import { VideoStorage } from '@shared/models'
+import { moveToExternalStorageState } from '@server/lib/video-state'
+import { JobQueue } from '@server/lib/job-queue'
+import { CONFIG } from '@server/initializers/config'
+
+program
+ .description('Move videos to another storage.')
+ .option('-o, --to-object-storage', 'Move videos in object storage')
+ .option('-v, --video [videoUUID]', 'Move a specific video')
+ .option('-a, --all-videos', 'Migrate all videos')
+ .parse(process.argv)
+
+const options = program.opts()
+
+if (!options['toObjectStorage']) {
+ console.error('You need to choose where to send video files.')
+ process.exit(-1)
+}
+
+if (!options['video'] && !options['allVideos']) {
+ console.error('You need to choose which videos to move.')
+ process.exit(-1)
+}
+
+if (options['toObjectStorage'] && !CONFIG.OBJECT_STORAGE.ENABLED) {
+ console.error('Object storage is not enabled on this instance.')
+ process.exit(-1)
+}
+
+run()
+ .then(() => process.exit(0))
+ .catch(err => console.error(err))
+
+async function run () {
+ await initDatabaseModels(true)
+
+ JobQueue.Instance.init(true)
+
+ let ids: number[] = []
+
+ if (options['video']) {
+ const video = await VideoModel.load(options['video'])
+
+ if (!video) {
+ console.error('Unknown video ' + options['video'])
+ process.exit(-1)
+ }
+
+ if (video.remote === true) {
+ console.error('Cannot process a remote video')
+ process.exit(-1)
+ }
+
+ ids.push(video.id)
+ } else {
+ ids = await VideoModel.listLocalIds()
+ }
+
+ for (const id of ids) {
+ const videoFull = await VideoModel.loadAndPopulateAccountAndServerAndTags(id)
+
+ const files = videoFull.VideoFiles || []
+ const hls = videoFull.getHLSPlaylist()
+
+ if (files.some(f => f.storage === VideoStorage.FILE_SYSTEM) || hls?.storage === VideoStorage.FILE_SYSTEM) {
+ console.log('Processing video %s.', videoFull.name)
+
+ const success = await moveToExternalStorageState(videoFull, false, undefined)
+
+ if (!success) {
+ console.error(
+ 'Cannot create move job for %s: job creation may have failed or there may be pending transcoding jobs for this video',
+ videoFull.name
+ )
+ }
+ }
+
+ console.log(`Created move-to-object-storage job for ${videoFull.name}.`)
+ }
+}
}
}
- JobQueue.Instance.init()
+ JobQueue.Instance.init(true)
video.state = VideoState.TO_TRANSCODE
await video.save()
--- /dev/null
+import { registerTSPaths } from '../../server/helpers/register-ts-paths'
+registerTSPaths()
+
+import { join } from 'path'
+import { JobQueue } from '@server/lib/job-queue'
+import { initDatabaseModels } from '../../server/initializers/database'
+import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
+import { VideoPathManager } from '@server/lib/video-path-manager'
+import { VideoModel } from '@server/models/video/video'
+import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
+import { move, readFile, writeFile } from 'fs-extra'
+import Bluebird from 'bluebird'
+import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
+
+run()
+ .then(() => process.exit(0))
+ .catch(err => {
+ console.error(err)
+ process.exit(-1)
+ })
+
+async function run () {
+ console.log('Migrate old HLS paths to new format.')
+
+ await initDatabaseModels(true)
+
+ JobQueue.Instance.init(true)
+
+ const ids = await VideoModel.listLocalIds()
+
+ await Bluebird.map(ids, async id => {
+ try {
+ await processVideo(id)
+ } catch (err) {
+ console.error('Cannot process video %s.', { err })
+ }
+ }, { concurrency: 5 })
+
+ console.log('Migration finished!')
+}
+
+async function processVideo (videoId: number) {
+ const video = await VideoModel.loadWithFiles(videoId)
+
+ const hls = video.getHLSPlaylist()
+ if (!hls || hls.playlistFilename !== 'master.m3u8' || hls.VideoFiles.length === 0) {
+ return
+ }
+
+ console.log(`Renaming HLS playlist files of video ${video.name}.`)
+
+ const playlist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
+ const hlsDirPath = VideoPathManager.Instance.getFSHLSOutputPath(video)
+
+ const masterPlaylistPath = join(hlsDirPath, playlist.playlistFilename)
+ let masterPlaylistContent = await readFile(masterPlaylistPath, 'utf8')
+
+ for (const videoFile of hls.VideoFiles) {
+ const srcName = `${videoFile.resolution}.m3u8`
+ const dstName = getHlsResolutionPlaylistFilename(videoFile.filename)
+
+ const src = join(hlsDirPath, srcName)
+ const dst = join(hlsDirPath, dstName)
+
+ try {
+ await move(src, dst)
+
+ masterPlaylistContent = masterPlaylistContent.replace(new RegExp('^' + srcName + '$', 'm'), dstName)
+ } catch (err) {
+ console.error('Cannot move video file %s to %s.', src, dst, err)
+ }
+ }
+
+ await writeFile(masterPlaylistPath, masterPlaylistContent)
+
+ if (playlist.segmentsSha256Filename === 'segments-sha256.json') {
+ try {
+ const newName = generateHlsSha256SegmentsFilename(video.isLive)
+
+ const dst = join(hlsDirPath, newName)
+ await move(join(hlsDirPath, playlist.segmentsSha256Filename), dst)
+ playlist.segmentsSha256Filename = newName
+ } catch (err) {
+ console.error(`Cannot rename ${video.name} segments-sha256.json file to a new name`, err)
+ }
+ }
+
+ if (playlist.playlistFilename === 'master.m3u8') {
+ try {
+ const newName = generateHLSMasterPlaylistFilename(video.isLive)
+
+ const dst = join(hlsDirPath, newName)
+ await move(join(hlsDirPath, playlist.playlistFilename), dst)
+ playlist.playlistFilename = newName
+ } catch (err) {
+ console.error(`Cannot rename ${video.name} master.m3u8 file to a new name`, err)
+ }
+ }
+
+ // Everything worked, we can save the playlist now
+ await playlist.save()
+
+ const allVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id)
+ await federateVideoIfNeeded(allVideo, false)
+
+ console.log(`Successfully moved HLS files of ${video.name}.`)
+}
import { generateImageFilename, processImage } from '@server/helpers/image-utils'
import { THUMBNAILS_SIZE } from '@server/initializers/constants'
import { VideoModel } from '@server/models/video/video'
-import { MVideo } from '@server/types/models'
import { initDatabaseModels } from '@server/initializers/database'
program
async function run () {
await initDatabaseModels(true)
- const videos = await VideoModel.listLocal()
+ const ids = await VideoModel.listLocalIds()
- await map(videos, v => {
- return processVideo(v)
- .catch(err => console.error('Cannot process video %s.', v.url, err))
+ await map(ids, id => {
+ return processVideo(id)
+ .catch(err => console.error('Cannot process video %d.', id, err))
}, { concurrency: 20 })
}
-async function processVideo (videoArg: MVideo) {
- const video = await VideoModel.loadWithFiles(videoArg.id)
+async function processVideo (id: number) {
+ const video = await VideoModel.loadWithFiles(id)
console.log('Processing video %s.', video.name)
console.log('Updating video and torrent files.')
- const localVideos = await VideoModel.listLocal()
- for (const localVideo of localVideos) {
- const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(localVideo.id)
+ const ids = await VideoModel.listLocalIds()
+ for (const id of ids) {
+ const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(id)
console.log('Updating video ' + video.uuid)
import { close, ensureDir, move, open, outputJSON, read, readFile, remove, stat, writeFile } from 'fs-extra'
import { flatten, uniq } from 'lodash'
import { basename, dirname, join } from 'path'
-import { MStreamingPlaylistFilesVideo, MVideoWithFile } from '@server/types/models'
+import { MStreamingPlaylistFilesVideo, MVideo, MVideoUUID } from '@server/types/models'
import { sha256 } from '../helpers/core-utils'
import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamSize } from '../helpers/ffprobe-utils'
import { logger } from '../helpers/logger'
}
}
-async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) {
+async function updateMasterHLSPlaylist (video: MVideo, playlist: MStreamingPlaylistFilesVideo) {
const masterPlaylists: string[] = [ '#EXTM3U', '#EXT-X-VERSION:3' ]
for (const file of playlist.VideoFiles) {
})
}
-async function updateSha256VODSegments (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) {
+async function updateSha256VODSegments (video: MVideoUUID, playlist: MStreamingPlaylistFilesVideo) {
const json: { [filename: string]: { [range: string]: string } } = {}
// For all the resolutions available for this video
private constructor () {
}
- init () {
+ init (produceOnly = false) {
// Already initialized
if (this.initialized === true) return
this.initialized = true
for (const handlerName of (Object.keys(handlers) as JobType[])) {
const queue = new Bull(handlerName, queueOptions)
+
+ if (produceOnly) {
+ queue.pause(true)
+ .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err }))
+ }
+
const handler = handlers[handlerName]
queue.process(this.getJobConcurrency(handlerName), handler)
})
}
+async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) {
+ const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction)
+ const pendingTranscode = videoJobInfo?.pendingTranscode || 0
+
+ // We want to wait all transcoding jobs before moving the video on an external storage
+ if (pendingTranscode !== 0) return false
+
+ await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction)
+
+ logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] })
+
+ try {
+ await addMoveToObjectStorageJob(video, isNewVideo)
+
+ return true
+ } catch (err) {
+ logger.error('Cannot add move to object storage job', { err })
+
+ return false
+ }
+}
+
// ---------------------------------------------------------------------------
export {
buildNextVideoState,
+ moveToExternalStorageState,
moveToNextState
}
Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(video)
}
}
-
-async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) {
- const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction)
- const pendingTranscode = videoJobInfo?.pendingTranscode || 0
-
- // We want to wait all transcoding jobs before moving the video on an external storage
- if (pendingTranscode !== 0) return
-
- await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction)
-
- logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] })
-
- addMoveToObjectStorageJob(video, isNewVideo)
- .catch(err => logger.error('Cannot add move to object storage job', { err }))
-}
await Promise.all(tasks)
}
- static listLocal (): Promise<MVideo[]> {
+ static listLocalIds (): Promise<number[]> {
const query = {
+ attributes: [ 'id' ],
+ raw: true,
where: {
remote: false
}
}
return VideoModel.findAll(query)
+ .then(rows => rows.map(r => r.id))
}
static listAllAndSharedByActorForOutbox (actorId: number, start: number, count: number) {
if (!this.VideoStreamingPlaylists) return undefined
const playlist = this.VideoStreamingPlaylists.find(p => p.type === VideoStreamingPlaylistType.HLS)
+ if (!playlist) return undefined
+
playlist.Video = this
return playlist
await this.save({ transaction })
}
- getBandwidthBits (videoFile: MVideoFile) {
+ getBandwidthBits (this: MVideo, videoFile: MVideoFile) {
return Math.ceil((videoFile.size * 8) / this.duration)
}
--- /dev/null
+/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
+
+import 'mocha'
+
+import {
+ areObjectStorageTestsDisabled,
+ cleanupTests,
+ createMultipleServers,
+ doubleFollow,
+ expectStartWith,
+ makeRawRequest,
+ ObjectStorageCommand,
+ PeerTubeServer,
+ setAccessTokensToServers,
+ waitJobs
+} from '@shared/extra-utils'
+import { HttpStatusCode, VideoDetails } from '@shared/models'
+
+async function checkFiles (origin: PeerTubeServer, video: VideoDetails, inObjectStorage: boolean) {
+ for (const file of video.files) {
+ const start = inObjectStorage
+ ? ObjectStorageCommand.getWebTorrentBaseUrl()
+ : origin.url
+
+ expectStartWith(file.fileUrl, start)
+
+ await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
+ }
+
+ const start = inObjectStorage
+ ? ObjectStorageCommand.getPlaylistBaseUrl()
+ : origin.url
+
+ const hls = video.streamingPlaylists[0]
+ expectStartWith(hls.playlistUrl, start)
+ expectStartWith(hls.segmentsSha256Url, start)
+
+ for (const file of hls.files) {
+ expectStartWith(file.fileUrl, start)
+
+ await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
+ }
+}
+
+describe('Test create move video storage job', function () {
+ if (areObjectStorageTestsDisabled()) return
+
+ let servers: PeerTubeServer[] = []
+ const uuids: string[] = []
+
+ before(async function () {
+ this.timeout(360000)
+
+ // Run server 2 to have transcoding enabled
+ servers = await createMultipleServers(2)
+ await setAccessTokensToServers(servers)
+
+ await doubleFollow(servers[0], servers[1])
+
+ await ObjectStorageCommand.prepareDefaultBuckets()
+
+ await servers[0].config.enableTranscoding()
+
+ for (let i = 0; i < 3; i++) {
+ const { uuid } = await servers[0].videos.upload({ attributes: { name: 'video' + i } })
+ uuids.push(uuid)
+ }
+
+ await waitJobs(servers)
+
+ await servers[0].kill()
+ await servers[0].run(ObjectStorageCommand.getDefaultConfig())
+ })
+
+ it('Should move only one file', async function () {
+ this.timeout(120000)
+
+ const command = `npm run create-move-video-storage-job -- --to-object-storage -v ${uuids[1]}`
+ await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig())
+ await waitJobs(servers)
+
+ for (const server of servers) {
+ const video = await server.videos.get({ id: uuids[1] })
+
+ await checkFiles(servers[0], video, true)
+
+ for (const id of [ uuids[0], uuids[2] ]) {
+ const video = await server.videos.get({ id })
+
+ await checkFiles(servers[0], video, false)
+ }
+ }
+ })
+
+ it('Should move all files', async function () {
+ this.timeout(120000)
+
+ const command = `npm run create-move-video-storage-job -- --to-object-storage --all-videos`
+ await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig())
+ await waitJobs(servers)
+
+ for (const server of servers) {
+ for (const id of [ uuids[0], uuids[2] ]) {
+ const video = await server.videos.get({ id })
+
+ await checkFiles(servers[0], video, true)
+ }
+ }
+ })
+
+ after(async function () {
+ await cleanupTests(servers)
+ })
+})
// Order of the tests we want to execute
import './create-import-video-file-job'
import './create-transcoding-job'
+import './create-move-video-storage-job'
import './peertube'
import './plugins'
import './print-transcode-command'
return `NODE_ENV=test NODE_APP_INSTANCE=${this.server.internalServerNumber}`
}
- async execWithEnv (command: string) {
- return CLICommand.exec(`${this.getEnv()} ${command}`)
+ async execWithEnv (command: string, configOverride?: any) {
+ const prefix = configOverride
+ ? `NODE_CONFIG='${JSON.stringify(configOverride)}'`
+ : ''
+
+ return CLICommand.exec(`${prefix} ${this.getEnv()} ${command}`)
}
}
- [regenerate-thumbnails.js](#regenerate-thumbnailsjs)
- [create-transcoding-job.js](#create-transcoding-jobjs)
- [create-import-video-file-job.js](#create-import-video-file-jobjs)
+ - [create-move-video-storage-job.js](#create-move-video-storage-jobjs)
- [prune-storage.js](#prune-storagejs)
- [update-host.js](#update-hostjs)
- [reset-password.js](#reset-passwordjs)
$ docker-compose exec -u peertube peertube npm run create-import-video-file-job -- -v [videoUUID] -i [videoFile]
```
+### create-move-video-storage-job.js
+
+Use this script to move all video files or a specific video file to object storage.
+
+```bash
+$ # Basic installation
+$ cd /var/www/peertube/peertube-latest
+$ sudo -u peertube NODE_CONFIG_DIR=/var/www/peertube/config NODE_ENV=production npm run create-move-video-storage-job -- --to-object-storage -v [videoUUID]
+
+$ # Docker installation
+$ cd /var/www/peertube-docker
+$ docker-compose exec -u peertube peertube npm run create-move-video-storage-job -- --to-object-storage -v [videoUUID]
+```
+
+The script can also move all video files that are not already in object storage:
+
+```bash
+$ # Basic installation
+$ cd /var/www/peertube/peertube-latest
+$ sudo -u peertube NODE_CONFIG_DIR=/var/www/peertube/config NODE_ENV=production npm run create-move-video-storage-job -- --to-object-storage --all-videos
+
+$ # Docker installation
+$ cd /var/www/peertube-docker
+$ docker-compose exec -u peertube peertube npm run create-move-video-storage-job -- --to-object-storage --all-videos
+```
+
+
### prune-storage.js
Some transcoded videos or shutdown at a bad time can leave some unused files on your storage.