From 04b8c3fba614efc3827f583096c78b08cb668470 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 20 Nov 2018 10:05:51 +0100 Subject: Delete invalid or deleted remote videos --- server/controllers/api/videos/index.ts | 7 +- server/initializers/constants.ts | 15 ++- server/lib/activitypub/process/process-update.ts | 1 - server/lib/activitypub/videos.ts | 113 +++++++++++---------- .../job-queue/handlers/activitypub-refresher.ts | 40 ++++++++ server/lib/job-queue/job-queue.ts | 8 +- server/models/video/video.ts | 6 ++ server/tests/api/activitypub/index.ts | 1 + server/tests/api/activitypub/refresher.ts | 84 +++++++++++++++ 9 files changed, 210 insertions(+), 65 deletions(-) create mode 100644 server/lib/job-queue/handlers/activitypub-refresher.ts create mode 100644 server/tests/api/activitypub/refresher.ts (limited to 'server') diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index 89fd0432f..b659f53ed 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -387,6 +387,11 @@ async function updateVideo (req: express.Request, res: express.Response) { function getVideo (req: express.Request, res: express.Response) { const videoInstance = res.locals.video + if (videoInstance.isOutdated()) { + JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoInstance.url } }) + .catch(err => logger.error('Cannot create AP refresher job for video %s.', videoInstance.url, { err })) + } + return res.json(videoInstance.toFormattedDetailsJSON()) } @@ -429,7 +434,7 @@ async function getVideoDescription (req: express.Request, res: express.Response) return res.json({ description }) } -async function listVideos (req: express.Request, res: express.Response, next: express.NextFunction) { +async function listVideos (req: express.Request, res: express.Response) { const resultList = await VideoModel.listForApi({ start: req.query.start, count: req.query.count, diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index ae3d671bb..aa243859c 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -102,7 +102,8 @@ const JOB_ATTEMPTS: { [ id in JobType ]: number } = { 'video-file': 1, 'video-import': 1, 'email': 5, - 'videos-views': 1 + 'videos-views': 1, + 'activitypub-refresher': 1 } const JOB_CONCURRENCY: { [ id in JobType ]: number } = { 'activitypub-http-broadcast': 1, @@ -113,7 +114,8 @@ const JOB_CONCURRENCY: { [ id in JobType ]: number } = { 'video-file': 1, 'video-import': 1, 'email': 5, - 'videos-views': 1 + 'videos-views': 1, + 'activitypub-refresher': 1 } const JOB_TTL: { [ id in JobType ]: number } = { 'activitypub-http-broadcast': 60000 * 10, // 10 minutes @@ -124,11 +126,12 @@ const JOB_TTL: { [ id in JobType ]: number } = { 'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long 'video-import': 1000 * 3600 * 2, // hours 'email': 60000 * 10, // 10 minutes - 'videos-views': undefined // Unlimited + 'videos-views': undefined, // Unlimited + 'activitypub-refresher': 60000 * 10 // 10 minutes } const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { 'videos-views': { - cron: '1 * * * *' // At 1 minutes past the hour + cron: '1 * * * *' // At 1 minute past the hour } } @@ -543,7 +546,7 @@ const HTTP_SIGNATURE = { // --------------------------------------------------------------------------- -const PRIVATE_RSA_KEY_SIZE = 2048 +let PRIVATE_RSA_KEY_SIZE = 2048 // Password encryption const BCRYPT_SALT_SIZE = 10 @@ -647,6 +650,8 @@ const TRACKER_RATE_LIMITS = { // Special constants for a test instance if (isTestInstance() === true) { + PRIVATE_RSA_KEY_SIZE = 1024 + ACTOR_FOLLOW_SCORE.BASE = 20 REMOTE_SCHEME.HTTP = 'http' diff --git a/server/lib/activitypub/process/process-update.ts b/server/lib/activitypub/process/process-update.ts index bd4013555..03831a00e 100644 --- a/server/lib/activitypub/process/process-update.ts +++ b/server/lib/activitypub/process/process-update.ts @@ -59,7 +59,6 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate) videoObject, account: actor.Account, channel: channelActor.VideoChannel, - updateViews: true, overrideTo: activity.to } return updateVideoFromAP(updateOptions) diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index 4cecf9345..998f90330 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts @@ -117,7 +117,7 @@ type SyncParam = { shares: boolean comments: boolean thumbnail: boolean - refreshVideo: boolean + refreshVideo?: boolean } async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: VideoTorrentObject, syncParam: SyncParam) { logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid) @@ -158,13 +158,11 @@ async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: Vid async function getOrCreateVideoAndAccountAndChannel (options: { videoObject: VideoTorrentObject | string, syncParam?: SyncParam, - fetchType?: VideoFetchByUrlType, - refreshViews?: boolean + fetchType?: VideoFetchByUrlType }) { // Default params const syncParam = options.syncParam || { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true, refreshVideo: false } const fetchType = options.fetchType || 'all' - const refreshViews = options.refreshViews || false // Get video url const videoUrl = getAPUrl(options.videoObject) @@ -174,11 +172,11 @@ async function getOrCreateVideoAndAccountAndChannel (options: { const refreshOptions = { video: videoFromDatabase, fetchedType: fetchType, - syncParam, - refreshViews + syncParam } - const p = refreshVideoIfNeeded(refreshOptions) - if (syncParam.refreshVideo === true) videoFromDatabase = await p + + if (syncParam.refreshVideo === true) videoFromDatabase = await refreshVideoIfNeeded(refreshOptions) + else await JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoFromDatabase.url } }) return { video: videoFromDatabase } } @@ -199,7 +197,6 @@ async function updateVideoFromAP (options: { videoObject: VideoTorrentObject, account: AccountModel, channel: VideoChannelModel, - updateViews: boolean, overrideTo?: string[] }) { logger.debug('Updating remote video "%s".', options.videoObject.uuid) @@ -238,8 +235,8 @@ async function updateVideoFromAP (options: { options.video.set('publishedAt', videoData.publishedAt) options.video.set('privacy', videoData.privacy) options.video.set('channelId', videoData.channelId) + options.video.set('views', videoData.views) - if (options.updateViews === true) options.video.set('views', videoData.views) await options.video.save(sequelizeOptions) { @@ -297,8 +294,58 @@ async function updateVideoFromAP (options: { } } +async function refreshVideoIfNeeded (options: { + video: VideoModel, + fetchedType: VideoFetchByUrlType, + syncParam: SyncParam +}): Promise { + if (!options.video.isOutdated()) return options.video + + // We need more attributes if the argument video was fetched with not enough joints + const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url) + + try { + const { response, videoObject } = await fetchRemoteVideo(video.url) + if (response.statusCode === 404) { + logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url) + + // Video does not exist anymore + await video.destroy() + return undefined + } + + if (videoObject === undefined) { + logger.warn('Cannot refresh remote video %s: invalid body.', video.url) + + await video.setAsRefreshed() + return video + } + + const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject) + const account = await AccountModel.load(channelActor.VideoChannel.accountId) + + const updateOptions = { + video, + videoObject, + account, + channel: channelActor.VideoChannel + } + await retryTransactionWrapper(updateVideoFromAP, updateOptions) + await syncVideoExternalAttributes(video, videoObject, options.syncParam) + + return video + } catch (err) { + logger.warn('Cannot refresh video %s.', options.video.url, { err }) + + // Don't refresh in loop + await video.setAsRefreshed() + return video + } +} + export { updateVideoFromAP, + refreshVideoIfNeeded, federateVideoIfNeeded, fetchRemoteVideo, getOrCreateVideoAndAccountAndChannel, @@ -362,52 +409,6 @@ async function createVideo (videoObject: VideoTorrentObject, channelActor: Actor return videoCreated } -async function refreshVideoIfNeeded (options: { - video: VideoModel, - fetchedType: VideoFetchByUrlType, - syncParam: SyncParam, - refreshViews: boolean -}): Promise { - if (!options.video.isOutdated()) return options.video - - // We need more attributes if the argument video was fetched with not enough joints - const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url) - - try { - const { response, videoObject } = await fetchRemoteVideo(video.url) - if (response.statusCode === 404) { - logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url) - - // Video does not exist anymore - await video.destroy() - return undefined - } - - if (videoObject === undefined) { - logger.warn('Cannot refresh remote video %s: invalid body.', video.url) - return video - } - - const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject) - const account = await AccountModel.load(channelActor.VideoChannel.accountId) - - const updateOptions = { - video, - videoObject, - account, - channel: channelActor.VideoChannel, - updateViews: options.refreshViews - } - await retryTransactionWrapper(updateVideoFromAP, updateOptions) - await syncVideoExternalAttributes(video, videoObject, options.syncParam) - - return video - } catch (err) { - logger.warn('Cannot refresh video %s.', options.video.url, { err }) - return video - } -} - async function videoActivityObjectToDBAttributes ( videoChannel: VideoChannelModel, videoObject: VideoTorrentObject, diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts new file mode 100644 index 000000000..7752b3b40 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts @@ -0,0 +1,40 @@ +import * as Bull from 'bull' +import { logger } from '../../../helpers/logger' +import { fetchVideoByUrl } from '../../../helpers/video' +import { refreshVideoIfNeeded } from '../../activitypub' + +export type RefreshPayload = { + videoUrl: string + type: 'video' +} + +async function refreshAPObject (job: Bull.Job) { + const payload = job.data as RefreshPayload + logger.info('Processing AP refresher in job %d.', job.id) + + if (payload.type === 'video') return refreshAPVideo(payload.videoUrl) +} + +// --------------------------------------------------------------------------- + +export { + refreshAPObject +} + +// --------------------------------------------------------------------------- + +async function refreshAPVideo (videoUrl: string) { + const fetchType = 'all' as 'all' + const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } + + const videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType) + if (videoFromDatabase) { + const refreshOptions = { + video: videoFromDatabase, + fetchedType: fetchType, + syncParam + } + + await refreshVideoIfNeeded(refreshOptions) + } +} diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 4cfd4d253..5862e178f 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -11,6 +11,7 @@ import { processVideoFile, processVideoFileImport, VideoFileImportPayload, Video import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' import { processVideoImport, VideoImportPayload } from './handlers/video-import' import { processVideosViews } from './handlers/video-views' +import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -21,6 +22,7 @@ type CreateJobArgument = { type: 'video-file', payload: VideoFilePayload } | { type: 'email', payload: EmailPayload } | { type: 'video-import', payload: VideoImportPayload } | + { type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'videos-views', payload: {} } const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { @@ -32,7 +34,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'video-file': processVideoFile, 'email': processEmail, 'video-import': processVideoImport, - 'videos-views': processVideosViews + 'videos-views': processVideosViews, + 'activitypub-refresher': refreshAPObject } const jobTypes: JobType[] = [ @@ -44,7 +47,8 @@ const jobTypes: JobType[] = [ 'video-file', 'video-file-import', 'video-import', - 'videos-views' + 'videos-views', + 'activitypub-refresher' ] class JobQueue { diff --git a/server/models/video/video.ts b/server/models/video/video.ts index 1e68b380c..0f18d9f0c 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -1561,6 +1561,12 @@ export class VideoModel extends Model { (now - updatedAtTime) > ACTIVITY_PUB.VIDEO_REFRESH_INTERVAL } + setAsRefreshed () { + this.changed('updatedAt', true) + + return this.save() + } + getBaseUrls () { let baseUrlHttp let baseUrlWs diff --git a/server/tests/api/activitypub/index.ts b/server/tests/api/activitypub/index.ts index e748f32e9..450053309 100644 --- a/server/tests/api/activitypub/index.ts +++ b/server/tests/api/activitypub/index.ts @@ -1,4 +1,5 @@ import './client' import './fetch' import './helpers' +import './refresher' import './security' diff --git a/server/tests/api/activitypub/refresher.ts b/server/tests/api/activitypub/refresher.ts new file mode 100644 index 000000000..67e04f79e --- /dev/null +++ b/server/tests/api/activitypub/refresher.ts @@ -0,0 +1,84 @@ +/* tslint:disable:no-unused-expression */ + +import 'mocha' +import { doubleFollow, getVideo, reRunServer } from '../../utils' +import { flushAndRunMultipleServers, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo, wait } from '../../utils/index' +import { waitJobs } from '../../utils/server/jobs' +import { setVideoField } from '../../utils/miscs/sql' + +describe('Test AP refresher', function () { + let servers: ServerInfo[] = [] + let videoUUID1: string + let videoUUID2: string + let videoUUID3: string + + before(async function () { + this.timeout(30000) + + servers = await flushAndRunMultipleServers(2) + + // Get the access tokens + await setAccessTokensToServers(servers) + + { + const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video1' }) + videoUUID1 = res.body.video.uuid + } + + { + const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video2' }) + videoUUID2 = res.body.video.uuid + } + + { + const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video3' }) + videoUUID3 = res.body.video.uuid + } + + await doubleFollow(servers[0], servers[1]) + }) + + it('Should remove a deleted remote video', async function () { + this.timeout(60000) + + await wait(10000) + + // Change UUID so the remote server returns a 404 + await setVideoField(2, videoUUID1, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174f') + + await getVideo(servers[0].url, videoUUID1) + await getVideo(servers[0].url, videoUUID2) + + await waitJobs(servers) + + await getVideo(servers[0].url, videoUUID1, 404) + await getVideo(servers[0].url, videoUUID2, 200) + }) + + it('Should not update a remote video if the remote instance is down', async function () { + this.timeout(60000) + + killallServers([ servers[1] ]) + + await setVideoField(2, videoUUID3, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174e') + + // Video will need a refresh + await wait(10000) + + await getVideo(servers[0].url, videoUUID3) + // The refresh should fail + await waitJobs([ servers[0] ]) + + await reRunServer(servers[1]) + + // Should not refresh the video, even if the last refresh failed (to avoir a loop on dead instances) + await getVideo(servers[0].url, videoUUID3) + await waitJobs(servers) + + await getVideo(servers[0].url, videoUUID3, 200) + }) + + after(async function () { + killallServers(servers) + }) +}) -- cgit v1.2.3