]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
prevent multiple post-process triggering of upload-resumable (#4175)
authorRigel Kent <sendmemail@rigelk.eu>
Mon, 25 Oct 2021 15:42:20 +0000 (17:42 +0200)
committerGitHub <noreply@github.com>
Mon, 25 Oct 2021 15:42:20 +0000 (17:42 +0200)
* prevent multiple post-process triggering of upload-resumable

* switch from 409 to 503 for upload being processed

* Improve resumable upload check

Co-authored-by: Chocobozzz <me@florianbigard.com>
client/src/app/+videos/+video-edit/video-add-components/video-upload.component.ts
server/controllers/api/videos/upload.ts
server/helpers/upload.ts
server/initializers/constants.ts
server/lib/job-queue/job-queue.ts
server/lib/redis.ts
server/middlewares/validators/videos/videos.ts
server/tests/api/videos/resumable-upload.ts
shared/models/server/job.model.ts
support/doc/api/openapi.yaml

index 6f72a07c4a856c50b730e3cf4b55614fdd3d3c49..91d89a5357d56b33bbf0f03be0d9ff7b843f793c 100644 (file)
@@ -82,9 +82,10 @@ export class VideoUploadComponent extends VideoSend implements OnInit, OnDestroy
       uploaderClass: UploaderXFormData,
       chunkSize,
       retryConfig: {
-        maxAttempts: 6,
-        shouldRetry: (code: number) => {
-          return code < 400 || code >= 501
+        maxAttempts: 30, // maximum attempts for 503 codes, otherwise set to 6, see below
+        maxDelay: 120_000, // 2 min
+        shouldRetry: (code: number, attempts: number) => {
+          return code === HttpStatusCode.SERVICE_UNAVAILABLE_503 || ((code < 400 || code > 500) && attempts < 6)
         }
       }
     }
index 55cb9cf201215159db7a559e0f3b9c090415453a..02aadd4260b3cf6b9d492ad28d37906259e92638 100644 (file)
@@ -7,6 +7,7 @@ import { uuidToShort } from '@server/helpers/uuid'
 import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
 import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
 import { generateWebTorrentVideoFilename } from '@server/lib/paths'
+import { Redis } from '@server/lib/redis'
 import {
   addMoveToObjectStorageJob,
   addOptimizeOrMergeAudioJob,
@@ -94,7 +95,7 @@ uploadRouter.delete('/upload-resumable',
 uploadRouter.put('/upload-resumable',
   openapiOperationDoc({ operationId: 'uploadResumable' }),
   authenticate,
-  uploadxMiddleware, // uploadx doesn't use call next() before the file upload completes
+  uploadxMiddleware, // uploadx doesn't next() before the file upload completes
   asyncMiddleware(videosAddResumableValidator),
   asyncMiddleware(addVideoResumable)
 )
@@ -122,15 +123,20 @@ export async function addVideoLegacy (req: express.Request, res: express.Respons
   const videoInfo: VideoCreate = req.body
   const files = req.files
 
-  return addVideo({ res, videoPhysicalFile, videoInfo, files })
+  const response = await addVideo({ res, videoPhysicalFile, videoInfo, files })
+
+  return res.json(response)
 }
 
-export async function addVideoResumable (_req: express.Request, res: express.Response) {
+export async function addVideoResumable (req: express.Request, res: express.Response) {
   const videoPhysicalFile = res.locals.videoFileResumable
   const videoInfo = videoPhysicalFile.metadata
   const files = { previewfile: videoInfo.previewfile }
 
-  return addVideo({ res, videoPhysicalFile, videoInfo, files })
+  const response = await addVideo({ res, videoPhysicalFile, videoInfo, files })
+  await Redis.Instance.setUploadSession(req.query.upload_id, response)
+
+  return res.json(response)
 }
 
 async function addVideo (options: {
@@ -225,13 +231,13 @@ async function addVideo (options: {
 
   Hooks.runAction('action:api.video.uploaded', { video: videoCreated })
 
-  return res.json({
+  return {
     video: {
       id: videoCreated.id,
       shortUUID: uuidToShort(videoCreated.uuid),
       uuid: videoCreated.uuid
     }
-  })
+  }
 }
 
 async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) {
index 3cb17edd0aeaafeea1eb5dc369e787326fb2d92f..c94c7ab82d316065b6294030186607dda1ae5e1a 100644 (file)
@@ -1,4 +1,5 @@
 import { join } from 'path'
+import { JobQueue } from '@server/lib/job-queue'
 import { RESUMABLE_UPLOAD_DIRECTORY } from '../initializers/constants'
 
 function getResumableUploadPath (filename?: string) {
@@ -7,8 +8,14 @@ function getResumableUploadPath (filename?: string) {
   return RESUMABLE_UPLOAD_DIRECTORY
 }
 
+function scheduleDeleteResumableUploadMetaFile (filepath: string) {
+  const payload = { filepath }
+  JobQueue.Instance.createJob({ type: 'delete-resumable-upload-meta-file', payload }, { delay: 900 * 1000 }) // executed in 15 min
+}
+
 // ---------------------------------------------------------------------------
 
 export {
-  getResumableUploadPath
+  getResumableUploadPath,
+  scheduleDeleteResumableUploadMetaFile
 }
index 87a74a32c566875f625a5d00892fd361300ccb4e..f6c19dab4197fa3f1837c06273d514c070099845 100644 (file)
@@ -665,6 +665,8 @@ const RESUMABLE_UPLOAD_DIRECTORY = join(CONFIG.STORAGE.TMP_DIR, 'resumable-uploa
 const HLS_STREAMING_PLAYLIST_DIRECTORY = join(CONFIG.STORAGE.STREAMING_PLAYLISTS_DIR, 'hls')
 const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls')
 
+const RESUMABLE_UPLOAD_SESSION_LIFETIME = SCHEDULER_INTERVALS_MS.REMOVE_DANGLING_RESUMABLE_UPLOADS
+
 const VIDEO_LIVE = {
   EXTENSION: '.ts',
   CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes
@@ -838,6 +840,7 @@ export {
   LAZY_STATIC_PATHS,
   SEARCH_INDEX,
   RESUMABLE_UPLOAD_DIRECTORY,
+  RESUMABLE_UPLOAD_SESSION_LIFETIME,
   HLS_REDUNDANCY_DIRECTORY,
   P2P_MEDIA_LOADER_PEER_VERSION,
   ACTOR_IMAGES_SIZE,
index 4cda12b5754c5ca411384656eaaf2f4edafc192b..53d6b6a9cfb8a34c46ee2f9b660f0aa388c6aea5 100644 (file)
@@ -8,6 +8,7 @@ import {
   ActivitypubHttpFetcherPayload,
   ActivitypubHttpUnicastPayload,
   ActorKeysPayload,
+  DeleteResumableUploadMetaFilePayload,
   EmailPayload,
   JobState,
   JobType,
@@ -52,6 +53,7 @@ type CreateJobArgument =
   { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
   { type: 'actor-keys', payload: ActorKeysPayload } |
   { type: 'video-redundancy', payload: VideoRedundancyPayload } |
+  { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
   { type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
 
 export type CreateJobOptions = {
index d1d88d853d22e4dcd4f5a6203effffc76aed6e5e..46617b07e4ac6405978d9249b1c1f8681b30f9c7 100644 (file)
@@ -9,7 +9,8 @@ import {
   USER_PASSWORD_CREATE_LIFETIME,
   VIEW_LIFETIME,
   WEBSERVER,
-  TRACKER_RATE_LIMITS
+  TRACKER_RATE_LIMITS,
+  RESUMABLE_UPLOAD_SESSION_LIFETIME
 } from '../initializers/constants'
 import { CONFIG } from '../initializers/config'
 
@@ -202,6 +203,30 @@ class Redis {
     ])
   }
 
+  /* ************ Resumable uploads final responses ************ */
+
+  setUploadSession (uploadId: string, response?: { video: { id: number, shortUUID: string, uuid: string } }) {
+    return this.setValue(
+      'resumable-upload-' + uploadId,
+      response
+        ? JSON.stringify(response)
+        : '',
+      RESUMABLE_UPLOAD_SESSION_LIFETIME
+    )
+  }
+
+  doesUploadSessionExist (uploadId: string) {
+    return this.exists('resumable-upload-' + uploadId)
+  }
+
+  async getUploadSession (uploadId: string) {
+    const value = await this.getValue('resumable-upload-' + uploadId)
+
+    return value
+      ? JSON.parse(value)
+      : ''
+  }
+
   /* ************ Keys generation ************ */
 
   generateCachedRouteKey (req: express.Request) {
index 23ee9778af71084fa8baa3a5bb6a8166d9a4c777..e486887a7e284f21657c6bb643935cc6272a47c6 100644 (file)
@@ -1,6 +1,8 @@
 import express from 'express'
 import { body, header, param, query, ValidationChain } from 'express-validator'
+import { isTestInstance } from '@server/helpers/core-utils'
 import { getResumableUploadPath } from '@server/helpers/upload'
+import { Redis } from '@server/lib/redis'
 import { isAbleToUploadVideo } from '@server/lib/user'
 import { getServerActor } from '@server/models/application/application'
 import { ExpressPromiseHandler } from '@server/types/express'
@@ -105,12 +107,34 @@ const videosAddLegacyValidator = getCommonVideoEditAttributes().concat([
 const videosAddResumableValidator = [
   async (req: express.Request, res: express.Response, next: express.NextFunction) => {
     const user = res.locals.oauth.token.User
-
     const body: express.CustomUploadXFile<express.UploadXFileMetadata> = req.body
     const file = { ...body, duration: undefined, path: getResumableUploadPath(body.id), filename: body.metadata.filename }
-
     const cleanup = () => deleteFileAndCatch(file.path)
 
+    const uploadId = req.query.upload_id
+    const sessionExists = await Redis.Instance.doesUploadSessionExist(uploadId)
+
+    if (sessionExists) {
+      const sessionResponse = await Redis.Instance.getUploadSession(uploadId)
+
+      if (!sessionResponse) {
+        res.setHeader('Retry-After', 300) // ask to retry after 5 min, knowing the upload_id is kept for up to 15 min after completion
+
+        return res.fail({
+          status: HttpStatusCode.SERVICE_UNAVAILABLE_503,
+          message: 'The upload is already being processed'
+        })
+      }
+
+      if (isTestInstance()) {
+        res.setHeader('x-resumable-upload-cached', 'true')
+      }
+
+      return res.json(sessionResponse)
+    }
+
+    await Redis.Instance.setUploadSession(uploadId)
+
     if (!await doesVideoChannelOfAccountExist(file.metadata.channelId, user, res)) return cleanup()
 
     try {
index 59970aa943ea57854b3387cd0d7a3fa0617908c9..6b5e0c09d79b5ce034cc46c51e85ae8b9026f578 100644 (file)
@@ -180,6 +180,21 @@ describe('Test resumable upload', function () {
       await sendChunks({ pathUploadId: uploadId, expectedStatus, contentRangeBuilder, contentLength: size })
       await checkFileSize(uploadId, 0)
     })
+
+    it('Should be able to accept 2 PUT requests', async function () {
+      const uploadId = await prepareUpload()
+
+      const result1 = await sendChunks({ pathUploadId: uploadId })
+      const result2 = await sendChunks({ pathUploadId: uploadId })
+
+      expect(result1.body.video.uuid).to.exist
+      expect(result1.body.video.uuid).to.equal(result2.body.video.uuid)
+
+      expect(result1.headers['x-resumable-upload-cached']).to.not.exist
+      expect(result2.headers['x-resumable-upload-cached']).to.equal('true')
+
+      await checkFileSize(uploadId, null)
+    })
   })
 
   after(async function () {
index ff96283a4f50996cb97502e12c6f321619739c2e..12e0fcf85ed91567130aca79e248e5f4dd68cf3a 100644 (file)
@@ -138,6 +138,10 @@ export interface ActorKeysPayload {
   actorId: number
 }
 
+export interface DeleteResumableUploadMetaFilePayload {
+  filepath: string
+}
+
 export interface MoveObjectStoragePayload {
   videoUUID: string
   isNewVideo: boolean
index d6f8c1ae0b772a8814a617bfab8636e2d4ea77a4..ef4e7d04d27f84ce6e049dd44d1e818bef0e9bce 100644 (file)
@@ -2081,6 +2081,13 @@ paths:
           description: video unreadable
         '429':
           description: too many concurrent requests
+        '503':
+          description: upload is already being processed
+          headers:
+            'Retry-After':
+              schema:
+                type: number
+                example: 300
     delete:
       summary: Cancel the resumable upload of a video, deleting any data uploaded so far
       description: Uses [a resumable protocol](https://github.com/kukhariev/node-uploadx/blob/master/proto.md) to cancel the upload of a video