uploaderClass: UploaderXFormData,
chunkSize,
retryConfig: {
- maxAttempts: 6,
- shouldRetry: (code: number) => {
- return code < 400 || code >= 501
+ maxAttempts: 30, // maximum attempts for 503 codes, otherwise set to 6, see below
+ maxDelay: 120_000, // 2 min
+ shouldRetry: (code: number, attempts: number) => {
+ return code === HttpStatusCode.SERVICE_UNAVAILABLE_503 || ((code < 400 || code > 500) && attempts < 6)
}
}
}
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
import { generateWebTorrentVideoFilename } from '@server/lib/paths'
+import { Redis } from '@server/lib/redis'
import {
addMoveToObjectStorageJob,
addOptimizeOrMergeAudioJob,
uploadRouter.put('/upload-resumable',
openapiOperationDoc({ operationId: 'uploadResumable' }),
authenticate,
- uploadxMiddleware, // uploadx doesn't use call next() before the file upload completes
+ uploadxMiddleware, // uploadx doesn't next() before the file upload completes
asyncMiddleware(videosAddResumableValidator),
asyncMiddleware(addVideoResumable)
)
const videoInfo: VideoCreate = req.body
const files = req.files
- return addVideo({ res, videoPhysicalFile, videoInfo, files })
+ const response = await addVideo({ res, videoPhysicalFile, videoInfo, files })
+
+ return res.json(response)
}
-export async function addVideoResumable (_req: express.Request, res: express.Response) {
+export async function addVideoResumable (req: express.Request, res: express.Response) {
const videoPhysicalFile = res.locals.videoFileResumable
const videoInfo = videoPhysicalFile.metadata
const files = { previewfile: videoInfo.previewfile }
- return addVideo({ res, videoPhysicalFile, videoInfo, files })
+ const response = await addVideo({ res, videoPhysicalFile, videoInfo, files })
+ await Redis.Instance.setUploadSession(req.query.upload_id, response)
+
+ return res.json(response)
}
async function addVideo (options: {
Hooks.runAction('action:api.video.uploaded', { video: videoCreated })
- return res.json({
+ return {
video: {
id: videoCreated.id,
shortUUID: uuidToShort(videoCreated.uuid),
uuid: videoCreated.uuid
}
- })
+ }
}
async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) {
import { join } from 'path'
+import { JobQueue } from '@server/lib/job-queue'
import { RESUMABLE_UPLOAD_DIRECTORY } from '../initializers/constants'
function getResumableUploadPath (filename?: string) {
return RESUMABLE_UPLOAD_DIRECTORY
}
+function scheduleDeleteResumableUploadMetaFile (filepath: string) {
+ const payload = { filepath }
+ JobQueue.Instance.createJob({ type: 'delete-resumable-upload-meta-file', payload }, { delay: 900 * 1000 }) // executed in 15 min
+}
+
// ---------------------------------------------------------------------------
export {
- getResumableUploadPath
+ getResumableUploadPath,
+ scheduleDeleteResumableUploadMetaFile
}
const HLS_STREAMING_PLAYLIST_DIRECTORY = join(CONFIG.STORAGE.STREAMING_PLAYLISTS_DIR, 'hls')
const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls')
+const RESUMABLE_UPLOAD_SESSION_LIFETIME = SCHEDULER_INTERVALS_MS.REMOVE_DANGLING_RESUMABLE_UPLOADS
+
const VIDEO_LIVE = {
EXTENSION: '.ts',
CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes
LAZY_STATIC_PATHS,
SEARCH_INDEX,
RESUMABLE_UPLOAD_DIRECTORY,
+ RESUMABLE_UPLOAD_SESSION_LIFETIME,
HLS_REDUNDANCY_DIRECTORY,
P2P_MEDIA_LOADER_PEER_VERSION,
ACTOR_IMAGES_SIZE,
ActivitypubHttpFetcherPayload,
ActivitypubHttpUnicastPayload,
ActorKeysPayload,
+ DeleteResumableUploadMetaFilePayload,
EmailPayload,
JobState,
JobType,
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } |
{ type: 'actor-keys', payload: ActorKeysPayload } |
{ type: 'video-redundancy', payload: VideoRedundancyPayload } |
+ { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
{ type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
export type CreateJobOptions = {
USER_PASSWORD_CREATE_LIFETIME,
VIEW_LIFETIME,
WEBSERVER,
- TRACKER_RATE_LIMITS
+ TRACKER_RATE_LIMITS,
+ RESUMABLE_UPLOAD_SESSION_LIFETIME
} from '../initializers/constants'
import { CONFIG } from '../initializers/config'
])
}
+ /* ************ Resumable uploads final responses ************ */
+
+ setUploadSession (uploadId: string, response?: { video: { id: number, shortUUID: string, uuid: string } }) {
+ return this.setValue(
+ 'resumable-upload-' + uploadId,
+ response
+ ? JSON.stringify(response)
+ : '',
+ RESUMABLE_UPLOAD_SESSION_LIFETIME
+ )
+ }
+
+ doesUploadSessionExist (uploadId: string) {
+ return this.exists('resumable-upload-' + uploadId)
+ }
+
+ async getUploadSession (uploadId: string) {
+ const value = await this.getValue('resumable-upload-' + uploadId)
+
+ return value
+ ? JSON.parse(value)
+ : ''
+ }
+
/* ************ Keys generation ************ */
generateCachedRouteKey (req: express.Request) {
import express from 'express'
import { body, header, param, query, ValidationChain } from 'express-validator'
+import { isTestInstance } from '@server/helpers/core-utils'
import { getResumableUploadPath } from '@server/helpers/upload'
+import { Redis } from '@server/lib/redis'
import { isAbleToUploadVideo } from '@server/lib/user'
import { getServerActor } from '@server/models/application/application'
import { ExpressPromiseHandler } from '@server/types/express'
const videosAddResumableValidator = [
async (req: express.Request, res: express.Response, next: express.NextFunction) => {
const user = res.locals.oauth.token.User
-
const body: express.CustomUploadXFile<express.UploadXFileMetadata> = req.body
const file = { ...body, duration: undefined, path: getResumableUploadPath(body.id), filename: body.metadata.filename }
-
const cleanup = () => deleteFileAndCatch(file.path)
+ const uploadId = req.query.upload_id
+ const sessionExists = await Redis.Instance.doesUploadSessionExist(uploadId)
+
+ if (sessionExists) {
+ const sessionResponse = await Redis.Instance.getUploadSession(uploadId)
+
+ if (!sessionResponse) {
+ res.setHeader('Retry-After', 300) // ask to retry after 5 min, knowing the upload_id is kept for up to 15 min after completion
+
+ return res.fail({
+ status: HttpStatusCode.SERVICE_UNAVAILABLE_503,
+ message: 'The upload is already being processed'
+ })
+ }
+
+ if (isTestInstance()) {
+ res.setHeader('x-resumable-upload-cached', 'true')
+ }
+
+ return res.json(sessionResponse)
+ }
+
+ await Redis.Instance.setUploadSession(uploadId)
+
if (!await doesVideoChannelOfAccountExist(file.metadata.channelId, user, res)) return cleanup()
try {
await sendChunks({ pathUploadId: uploadId, expectedStatus, contentRangeBuilder, contentLength: size })
await checkFileSize(uploadId, 0)
})
+
+ it('Should be able to accept 2 PUT requests', async function () {
+ const uploadId = await prepareUpload()
+
+ const result1 = await sendChunks({ pathUploadId: uploadId })
+ const result2 = await sendChunks({ pathUploadId: uploadId })
+
+ expect(result1.body.video.uuid).to.exist
+ expect(result1.body.video.uuid).to.equal(result2.body.video.uuid)
+
+ expect(result1.headers['x-resumable-upload-cached']).to.not.exist
+ expect(result2.headers['x-resumable-upload-cached']).to.equal('true')
+
+ await checkFileSize(uploadId, null)
+ })
})
after(async function () {
actorId: number
}
+export interface DeleteResumableUploadMetaFilePayload {
+ filepath: string
+}
+
export interface MoveObjectStoragePayload {
videoUUID: string
isNewVideo: boolean
description: video unreadable
'429':
description: too many concurrent requests
+ '503':
+ description: upload is already being processed
+ headers:
+ 'Retry-After':
+ schema:
+ type: number
+ example: 300
delete:
summary: Cancel the resumable upload of a video, deleting any data uploaded so far
description: Uses [a resumable protocol](https://github.com/kukhariev/node-uploadx/blob/master/proto.md) to cancel the upload of a video