]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Delete invalid or deleted remote videos
authorChocobozzz <me@florianbigard.com>
Tue, 20 Nov 2018 09:05:51 +0000 (10:05 +0100)
committerChocobozzz <me@florianbigard.com>
Tue, 20 Nov 2018 09:44:48 +0000 (10:44 +0100)
server/controllers/api/videos/index.ts
server/initializers/constants.ts
server/lib/activitypub/process/process-update.ts
server/lib/activitypub/videos.ts
server/lib/job-queue/handlers/activitypub-refresher.ts [new file with mode: 0644]
server/lib/job-queue/job-queue.ts
server/models/video/video.ts
server/tests/api/activitypub/index.ts
server/tests/api/activitypub/refresher.ts [new file with mode: 0644]
shared/models/server/job.model.ts

index 89fd0432fd5a4b0700673cf5b7e91bcfbe107004..b659f53ed350aa6107c8c2f992205f0aa40117bd 100644 (file)
@@ -387,6 +387,11 @@ async function updateVideo (req: express.Request, res: express.Response) {
 function getVideo (req: express.Request, res: express.Response) {
   const videoInstance = res.locals.video
 
+  if (videoInstance.isOutdated()) {
+    JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoInstance.url } })
+      .catch(err => logger.error('Cannot create AP refresher job for video %s.', videoInstance.url, { err }))
+  }
+
   return res.json(videoInstance.toFormattedDetailsJSON())
 }
 
@@ -429,7 +434,7 @@ async function getVideoDescription (req: express.Request, res: express.Response)
   return res.json({ description })
 }
 
-async function listVideos (req: express.Request, res: express.Response, next: express.NextFunction) {
+async function listVideos (req: express.Request, res: express.Response) {
   const resultList = await VideoModel.listForApi({
     start: req.query.start,
     count: req.query.count,
index ae3d671bb497cace16bc6534c51b403dcf8dbc0f..aa243859cc6a2bab9662d9de29f9bbc28a62cc57 100644 (file)
@@ -102,7 +102,8 @@ const JOB_ATTEMPTS: { [ id in JobType ]: number } = {
   'video-file': 1,
   'video-import': 1,
   'email': 5,
-  'videos-views': 1
+  'videos-views': 1,
+  'activitypub-refresher': 1
 }
 const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
   'activitypub-http-broadcast': 1,
@@ -113,7 +114,8 @@ const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
   'video-file': 1,
   'video-import': 1,
   'email': 5,
-  'videos-views': 1
+  'videos-views': 1,
+  'activitypub-refresher': 1
 }
 const JOB_TTL: { [ id in JobType ]: number } = {
   'activitypub-http-broadcast': 60000 * 10, // 10 minutes
@@ -124,11 +126,12 @@ const JOB_TTL: { [ id in JobType ]: number } = {
   'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long
   'video-import': 1000 * 3600 * 2, //  hours
   'email': 60000 * 10, // 10 minutes
-  'videos-views': undefined // Unlimited
+  'videos-views': undefined, // Unlimited
+  'activitypub-refresher': 60000 * 10 // 10 minutes
 }
 const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
   'videos-views': {
-    cron: '1 * * * *' // At 1 minutes past the hour
+    cron: '1 * * * *' // At 1 minute past the hour
   }
 }
 
@@ -543,7 +546,7 @@ const HTTP_SIGNATURE = {
 
 // ---------------------------------------------------------------------------
 
-const PRIVATE_RSA_KEY_SIZE = 2048
+let PRIVATE_RSA_KEY_SIZE = 2048
 
 // Password encryption
 const BCRYPT_SALT_SIZE = 10
@@ -647,6 +650,8 @@ const TRACKER_RATE_LIMITS = {
 
 // Special constants for a test instance
 if (isTestInstance() === true) {
+  PRIVATE_RSA_KEY_SIZE = 1024
+
   ACTOR_FOLLOW_SCORE.BASE = 20
 
   REMOTE_SCHEME.HTTP = 'http'
index bd4013555dc12f55457277d64764b2905fd57add..03831a00e108947e060dabd80ad24c6a72e5ffdc 100644 (file)
@@ -59,7 +59,6 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate)
     videoObject,
     account: actor.Account,
     channel: channelActor.VideoChannel,
-    updateViews: true,
     overrideTo: activity.to
   }
   return updateVideoFromAP(updateOptions)
index 4cecf9345912a6d08eb59b73d13a6de183b20036..998f903303137aa17040d6bb6c5cfc860a5a819f 100644 (file)
@@ -117,7 +117,7 @@ type SyncParam = {
   shares: boolean
   comments: boolean
   thumbnail: boolean
-  refreshVideo: boolean
+  refreshVideo?: boolean
 }
 async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: VideoTorrentObject, syncParam: SyncParam) {
   logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid)
@@ -158,13 +158,11 @@ async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: Vid
 async function getOrCreateVideoAndAccountAndChannel (options: {
   videoObject: VideoTorrentObject | string,
   syncParam?: SyncParam,
-  fetchType?: VideoFetchByUrlType,
-  refreshViews?: boolean
+  fetchType?: VideoFetchByUrlType
 }) {
   // Default params
   const syncParam = options.syncParam || { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true, refreshVideo: false }
   const fetchType = options.fetchType || 'all'
-  const refreshViews = options.refreshViews || false
 
   // Get video url
   const videoUrl = getAPUrl(options.videoObject)
@@ -174,11 +172,11 @@ async function getOrCreateVideoAndAccountAndChannel (options: {
     const refreshOptions = {
       video: videoFromDatabase,
       fetchedType: fetchType,
-      syncParam,
-      refreshViews
+      syncParam
     }
-    const p = refreshVideoIfNeeded(refreshOptions)
-    if (syncParam.refreshVideo === true) videoFromDatabase = await p
+
+    if (syncParam.refreshVideo === true) videoFromDatabase = await refreshVideoIfNeeded(refreshOptions)
+    else await JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoFromDatabase.url } })
 
     return { video: videoFromDatabase }
   }
@@ -199,7 +197,6 @@ async function updateVideoFromAP (options: {
   videoObject: VideoTorrentObject,
   account: AccountModel,
   channel: VideoChannelModel,
-  updateViews: boolean,
   overrideTo?: string[]
 }) {
   logger.debug('Updating remote video "%s".', options.videoObject.uuid)
@@ -238,8 +235,8 @@ async function updateVideoFromAP (options: {
       options.video.set('publishedAt', videoData.publishedAt)
       options.video.set('privacy', videoData.privacy)
       options.video.set('channelId', videoData.channelId)
+      options.video.set('views', videoData.views)
 
-      if (options.updateViews === true) options.video.set('views', videoData.views)
       await options.video.save(sequelizeOptions)
 
       {
@@ -297,8 +294,58 @@ async function updateVideoFromAP (options: {
   }
 }
 
+async function refreshVideoIfNeeded (options: {
+  video: VideoModel,
+  fetchedType: VideoFetchByUrlType,
+  syncParam: SyncParam
+}): Promise<VideoModel> {
+  if (!options.video.isOutdated()) return options.video
+
+  // We need more attributes if the argument video was fetched with not enough joints
+  const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url)
+
+  try {
+    const { response, videoObject } = await fetchRemoteVideo(video.url)
+    if (response.statusCode === 404) {
+      logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url)
+
+      // Video does not exist anymore
+      await video.destroy()
+      return undefined
+    }
+
+    if (videoObject === undefined) {
+      logger.warn('Cannot refresh remote video %s: invalid body.', video.url)
+
+      await video.setAsRefreshed()
+      return video
+    }
+
+    const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
+    const account = await AccountModel.load(channelActor.VideoChannel.accountId)
+
+    const updateOptions = {
+      video,
+      videoObject,
+      account,
+      channel: channelActor.VideoChannel
+    }
+    await retryTransactionWrapper(updateVideoFromAP, updateOptions)
+    await syncVideoExternalAttributes(video, videoObject, options.syncParam)
+
+    return video
+  } catch (err) {
+    logger.warn('Cannot refresh video %s.', options.video.url, { err })
+
+    // Don't refresh in loop
+    await video.setAsRefreshed()
+    return video
+  }
+}
+
 export {
   updateVideoFromAP,
+  refreshVideoIfNeeded,
   federateVideoIfNeeded,
   fetchRemoteVideo,
   getOrCreateVideoAndAccountAndChannel,
@@ -362,52 +409,6 @@ async function createVideo (videoObject: VideoTorrentObject, channelActor: Actor
   return videoCreated
 }
 
-async function refreshVideoIfNeeded (options: {
-  video: VideoModel,
-  fetchedType: VideoFetchByUrlType,
-  syncParam: SyncParam,
-  refreshViews: boolean
-}): Promise<VideoModel> {
-  if (!options.video.isOutdated()) return options.video
-
-  // We need more attributes if the argument video was fetched with not enough joints
-  const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url)
-
-  try {
-    const { response, videoObject } = await fetchRemoteVideo(video.url)
-    if (response.statusCode === 404) {
-      logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url)
-
-      // Video does not exist anymore
-      await video.destroy()
-      return undefined
-    }
-
-    if (videoObject === undefined) {
-      logger.warn('Cannot refresh remote video %s: invalid body.', video.url)
-      return video
-    }
-
-    const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
-    const account = await AccountModel.load(channelActor.VideoChannel.accountId)
-
-    const updateOptions = {
-      video,
-      videoObject,
-      account,
-      channel: channelActor.VideoChannel,
-      updateViews: options.refreshViews
-    }
-    await retryTransactionWrapper(updateVideoFromAP, updateOptions)
-    await syncVideoExternalAttributes(video, videoObject, options.syncParam)
-
-    return video
-  } catch (err) {
-    logger.warn('Cannot refresh video %s.', options.video.url, { err })
-    return video
-  }
-}
-
 async function videoActivityObjectToDBAttributes (
   videoChannel: VideoChannelModel,
   videoObject: VideoTorrentObject,
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
new file mode 100644 (file)
index 0000000..7752b3b
--- /dev/null
@@ -0,0 +1,40 @@
+import * as Bull from 'bull'
+import { logger } from '../../../helpers/logger'
+import { fetchVideoByUrl } from '../../../helpers/video'
+import { refreshVideoIfNeeded } from '../../activitypub'
+
+export type RefreshPayload = {
+  videoUrl: string
+  type: 'video'
+}
+
+async function refreshAPObject (job: Bull.Job) {
+  const payload = job.data as RefreshPayload
+  logger.info('Processing AP refresher in job %d.', job.id)
+
+  if (payload.type === 'video') return refreshAPVideo(payload.videoUrl)
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+  refreshAPObject
+}
+
+// ---------------------------------------------------------------------------
+
+async function refreshAPVideo (videoUrl: string) {
+  const fetchType = 'all' as 'all'
+  const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true }
+
+  const videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType)
+  if (videoFromDatabase) {
+    const refreshOptions = {
+      video: videoFromDatabase,
+      fetchedType: fetchType,
+      syncParam
+    }
+
+    await refreshVideoIfNeeded(refreshOptions)
+  }
+}
index 4cfd4d253fd18495d992485d40f5ab7efd20e15f..5862e178f81d8b18b4c10e373b2a4b54c290b256 100644 (file)
@@ -11,6 +11,7 @@ import { processVideoFile, processVideoFileImport, VideoFileImportPayload, Video
 import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
 import { processVideoImport, VideoImportPayload } from './handlers/video-import'
 import { processVideosViews } from './handlers/video-views'
+import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher'
 
 type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -21,6 +22,7 @@ type CreateJobArgument =
   { type: 'video-file', payload: VideoFilePayload } |
   { type: 'email', payload: EmailPayload } |
   { type: 'video-import', payload: VideoImportPayload } |
+  { type: 'activitypub-refresher', payload: RefreshPayload } |
   { type: 'videos-views', payload: {} }
 
 const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
@@ -32,7 +34,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
   'video-file': processVideoFile,
   'email': processEmail,
   'video-import': processVideoImport,
-  'videos-views': processVideosViews
+  'videos-views': processVideosViews,
+  'activitypub-refresher': refreshAPObject
 }
 
 const jobTypes: JobType[] = [
@@ -44,7 +47,8 @@ const jobTypes: JobType[] = [
   'video-file',
   'video-file-import',
   'video-import',
-  'videos-views'
+  'videos-views',
+  'activitypub-refresher'
 ]
 
 class JobQueue {
index 1e68b380cea07c528ce189a63ca410e38b2e2de9..0f18d9f0c82e819371497c62691f6362d388f885 100644 (file)
@@ -1561,6 +1561,12 @@ export class VideoModel extends Model<VideoModel> {
       (now - updatedAtTime) > ACTIVITY_PUB.VIDEO_REFRESH_INTERVAL
   }
 
+  setAsRefreshed () {
+    this.changed('updatedAt', true)
+
+    return this.save()
+  }
+
   getBaseUrls () {
     let baseUrlHttp
     let baseUrlWs
index e748f32e9c1faad2c07b015269f036647e9e10d5..450053309d0ae6a20fa89b38e299216827b8fe5a 100644 (file)
@@ -1,4 +1,5 @@
 import './client'
 import './fetch'
 import './helpers'
+import './refresher'
 import './security'
diff --git a/server/tests/api/activitypub/refresher.ts b/server/tests/api/activitypub/refresher.ts
new file mode 100644 (file)
index 0000000..67e04f7
--- /dev/null
@@ -0,0 +1,84 @@
+/* tslint:disable:no-unused-expression */
+
+import 'mocha'
+import { doubleFollow, getVideo, reRunServer } from '../../utils'
+import { flushAndRunMultipleServers, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo, wait } from '../../utils/index'
+import { waitJobs } from '../../utils/server/jobs'
+import { setVideoField } from '../../utils/miscs/sql'
+
+describe('Test AP refresher', function () {
+  let servers: ServerInfo[] = []
+  let videoUUID1: string
+  let videoUUID2: string
+  let videoUUID3: string
+
+  before(async function () {
+    this.timeout(30000)
+
+    servers = await flushAndRunMultipleServers(2)
+
+    // Get the access tokens
+    await setAccessTokensToServers(servers)
+
+    {
+      const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video1' })
+      videoUUID1 = res.body.video.uuid
+    }
+
+    {
+      const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video2' })
+      videoUUID2 = res.body.video.uuid
+    }
+
+    {
+      const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video3' })
+      videoUUID3 = res.body.video.uuid
+    }
+
+    await doubleFollow(servers[0], servers[1])
+  })
+
+  it('Should remove a deleted remote video', async function () {
+    this.timeout(60000)
+
+    await wait(10000)
+
+    // Change UUID so the remote server returns a 404
+    await setVideoField(2, videoUUID1, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174f')
+
+    await getVideo(servers[0].url, videoUUID1)
+    await getVideo(servers[0].url, videoUUID2)
+
+    await waitJobs(servers)
+
+    await getVideo(servers[0].url, videoUUID1, 404)
+    await getVideo(servers[0].url, videoUUID2, 200)
+  })
+
+  it('Should not update a remote video if the remote instance is down', async function () {
+    this.timeout(60000)
+
+    killallServers([ servers[1] ])
+
+    await setVideoField(2, videoUUID3, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174e')
+
+    // Video will need a refresh
+    await wait(10000)
+
+    await getVideo(servers[0].url, videoUUID3)
+    // The refresh should fail
+    await waitJobs([ servers[0] ])
+
+    await reRunServer(servers[1])
+
+    // Should not refresh the video, even if the last refresh failed (to avoir a loop on dead instances)
+    await getVideo(servers[0].url, videoUUID3)
+    await waitJobs(servers)
+
+    await getVideo(servers[0].url, videoUUID3, 200)
+  })
+
+  after(async function () {
+    killallServers(servers)
+  })
+})
index 4046297c47b1d6060b9196a68069b8b69cce5598..85bc9541b673eda5eddec677396a917e25c7bf45 100644 (file)
@@ -8,7 +8,8 @@ export type JobType = 'activitypub-http-unicast' |
   'video-file' |
   'email' |
   'video-import' |
-  'videos-views'
+  'videos-views' |
+  'activitypub-refresher'
 
 export interface Job {
   id: number