aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-11-20 10:05:51 +0100
committerChocobozzz <me@florianbigard.com>2018-11-20 10:44:48 +0100
commit04b8c3fba614efc3827f583096c78b08cb668470 (patch)
tree63172b40e4b029e4a14553c2fb39bd249d6cd0dd /server
parentf107470e50236e2a073f3f7dbab87c79e8364b56 (diff)
downloadPeerTube-04b8c3fba614efc3827f583096c78b08cb668470.tar.gz
PeerTube-04b8c3fba614efc3827f583096c78b08cb668470.tar.zst
PeerTube-04b8c3fba614efc3827f583096c78b08cb668470.zip
Delete invalid or deleted remote videos
Diffstat (limited to 'server')
-rw-r--r--server/controllers/api/videos/index.ts7
-rw-r--r--server/initializers/constants.ts15
-rw-r--r--server/lib/activitypub/process/process-update.ts1
-rw-r--r--server/lib/activitypub/videos.ts113
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts40
-rw-r--r--server/lib/job-queue/job-queue.ts8
-rw-r--r--server/models/video/video.ts6
-rw-r--r--server/tests/api/activitypub/index.ts1
-rw-r--r--server/tests/api/activitypub/refresher.ts84
9 files changed, 210 insertions, 65 deletions
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts
index 89fd0432f..b659f53ed 100644
--- a/server/controllers/api/videos/index.ts
+++ b/server/controllers/api/videos/index.ts
@@ -387,6 +387,11 @@ async function updateVideo (req: express.Request, res: express.Response) {
387function getVideo (req: express.Request, res: express.Response) { 387function getVideo (req: express.Request, res: express.Response) {
388 const videoInstance = res.locals.video 388 const videoInstance = res.locals.video
389 389
390 if (videoInstance.isOutdated()) {
391 JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoInstance.url } })
392 .catch(err => logger.error('Cannot create AP refresher job for video %s.', videoInstance.url, { err }))
393 }
394
390 return res.json(videoInstance.toFormattedDetailsJSON()) 395 return res.json(videoInstance.toFormattedDetailsJSON())
391} 396}
392 397
@@ -429,7 +434,7 @@ async function getVideoDescription (req: express.Request, res: express.Response)
429 return res.json({ description }) 434 return res.json({ description })
430} 435}
431 436
432async function listVideos (req: express.Request, res: express.Response, next: express.NextFunction) { 437async function listVideos (req: express.Request, res: express.Response) {
433 const resultList = await VideoModel.listForApi({ 438 const resultList = await VideoModel.listForApi({
434 start: req.query.start, 439 start: req.query.start,
435 count: req.query.count, 440 count: req.query.count,
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index ae3d671bb..aa243859c 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -102,7 +102,8 @@ const JOB_ATTEMPTS: { [ id in JobType ]: number } = {
102 'video-file': 1, 102 'video-file': 1,
103 'video-import': 1, 103 'video-import': 1,
104 'email': 5, 104 'email': 5,
105 'videos-views': 1 105 'videos-views': 1,
106 'activitypub-refresher': 1
106} 107}
107const JOB_CONCURRENCY: { [ id in JobType ]: number } = { 108const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
108 'activitypub-http-broadcast': 1, 109 'activitypub-http-broadcast': 1,
@@ -113,7 +114,8 @@ const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
113 'video-file': 1, 114 'video-file': 1,
114 'video-import': 1, 115 'video-import': 1,
115 'email': 5, 116 'email': 5,
116 'videos-views': 1 117 'videos-views': 1,
118 'activitypub-refresher': 1
117} 119}
118const JOB_TTL: { [ id in JobType ]: number } = { 120const JOB_TTL: { [ id in JobType ]: number } = {
119 'activitypub-http-broadcast': 60000 * 10, // 10 minutes 121 'activitypub-http-broadcast': 60000 * 10, // 10 minutes
@@ -124,11 +126,12 @@ const JOB_TTL: { [ id in JobType ]: number } = {
124 'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long 126 'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long
125 'video-import': 1000 * 3600 * 2, // hours 127 'video-import': 1000 * 3600 * 2, // hours
126 'email': 60000 * 10, // 10 minutes 128 'email': 60000 * 10, // 10 minutes
127 'videos-views': undefined // Unlimited 129 'videos-views': undefined, // Unlimited
130 'activitypub-refresher': 60000 * 10 // 10 minutes
128} 131}
129const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { 132const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
130 'videos-views': { 133 'videos-views': {
131 cron: '1 * * * *' // At 1 minutes past the hour 134 cron: '1 * * * *' // At 1 minute past the hour
132 } 135 }
133} 136}
134 137
@@ -543,7 +546,7 @@ const HTTP_SIGNATURE = {
543 546
544// --------------------------------------------------------------------------- 547// ---------------------------------------------------------------------------
545 548
546const PRIVATE_RSA_KEY_SIZE = 2048 549let PRIVATE_RSA_KEY_SIZE = 2048
547 550
548// Password encryption 551// Password encryption
549const BCRYPT_SALT_SIZE = 10 552const BCRYPT_SALT_SIZE = 10
@@ -647,6 +650,8 @@ const TRACKER_RATE_LIMITS = {
647 650
648// Special constants for a test instance 651// Special constants for a test instance
649if (isTestInstance() === true) { 652if (isTestInstance() === true) {
653 PRIVATE_RSA_KEY_SIZE = 1024
654
650 ACTOR_FOLLOW_SCORE.BASE = 20 655 ACTOR_FOLLOW_SCORE.BASE = 20
651 656
652 REMOTE_SCHEME.HTTP = 'http' 657 REMOTE_SCHEME.HTTP = 'http'
diff --git a/server/lib/activitypub/process/process-update.ts b/server/lib/activitypub/process/process-update.ts
index bd4013555..03831a00e 100644
--- a/server/lib/activitypub/process/process-update.ts
+++ b/server/lib/activitypub/process/process-update.ts
@@ -59,7 +59,6 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate)
59 videoObject, 59 videoObject,
60 account: actor.Account, 60 account: actor.Account,
61 channel: channelActor.VideoChannel, 61 channel: channelActor.VideoChannel,
62 updateViews: true,
63 overrideTo: activity.to 62 overrideTo: activity.to
64 } 63 }
65 return updateVideoFromAP(updateOptions) 64 return updateVideoFromAP(updateOptions)
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts
index 4cecf9345..998f90330 100644
--- a/server/lib/activitypub/videos.ts
+++ b/server/lib/activitypub/videos.ts
@@ -117,7 +117,7 @@ type SyncParam = {
117 shares: boolean 117 shares: boolean
118 comments: boolean 118 comments: boolean
119 thumbnail: boolean 119 thumbnail: boolean
120 refreshVideo: boolean 120 refreshVideo?: boolean
121} 121}
122async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: VideoTorrentObject, syncParam: SyncParam) { 122async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: VideoTorrentObject, syncParam: SyncParam) {
123 logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid) 123 logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid)
@@ -158,13 +158,11 @@ async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: Vid
158async function getOrCreateVideoAndAccountAndChannel (options: { 158async function getOrCreateVideoAndAccountAndChannel (options: {
159 videoObject: VideoTorrentObject | string, 159 videoObject: VideoTorrentObject | string,
160 syncParam?: SyncParam, 160 syncParam?: SyncParam,
161 fetchType?: VideoFetchByUrlType, 161 fetchType?: VideoFetchByUrlType
162 refreshViews?: boolean
163}) { 162}) {
164 // Default params 163 // Default params
165 const syncParam = options.syncParam || { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true, refreshVideo: false } 164 const syncParam = options.syncParam || { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true, refreshVideo: false }
166 const fetchType = options.fetchType || 'all' 165 const fetchType = options.fetchType || 'all'
167 const refreshViews = options.refreshViews || false
168 166
169 // Get video url 167 // Get video url
170 const videoUrl = getAPUrl(options.videoObject) 168 const videoUrl = getAPUrl(options.videoObject)
@@ -174,11 +172,11 @@ async function getOrCreateVideoAndAccountAndChannel (options: {
174 const refreshOptions = { 172 const refreshOptions = {
175 video: videoFromDatabase, 173 video: videoFromDatabase,
176 fetchedType: fetchType, 174 fetchedType: fetchType,
177 syncParam, 175 syncParam
178 refreshViews
179 } 176 }
180 const p = refreshVideoIfNeeded(refreshOptions) 177
181 if (syncParam.refreshVideo === true) videoFromDatabase = await p 178 if (syncParam.refreshVideo === true) videoFromDatabase = await refreshVideoIfNeeded(refreshOptions)
179 else await JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoFromDatabase.url } })
182 180
183 return { video: videoFromDatabase } 181 return { video: videoFromDatabase }
184 } 182 }
@@ -199,7 +197,6 @@ async function updateVideoFromAP (options: {
199 videoObject: VideoTorrentObject, 197 videoObject: VideoTorrentObject,
200 account: AccountModel, 198 account: AccountModel,
201 channel: VideoChannelModel, 199 channel: VideoChannelModel,
202 updateViews: boolean,
203 overrideTo?: string[] 200 overrideTo?: string[]
204}) { 201}) {
205 logger.debug('Updating remote video "%s".', options.videoObject.uuid) 202 logger.debug('Updating remote video "%s".', options.videoObject.uuid)
@@ -238,8 +235,8 @@ async function updateVideoFromAP (options: {
238 options.video.set('publishedAt', videoData.publishedAt) 235 options.video.set('publishedAt', videoData.publishedAt)
239 options.video.set('privacy', videoData.privacy) 236 options.video.set('privacy', videoData.privacy)
240 options.video.set('channelId', videoData.channelId) 237 options.video.set('channelId', videoData.channelId)
238 options.video.set('views', videoData.views)
241 239
242 if (options.updateViews === true) options.video.set('views', videoData.views)
243 await options.video.save(sequelizeOptions) 240 await options.video.save(sequelizeOptions)
244 241
245 { 242 {
@@ -297,8 +294,58 @@ async function updateVideoFromAP (options: {
297 } 294 }
298} 295}
299 296
297async function refreshVideoIfNeeded (options: {
298 video: VideoModel,
299 fetchedType: VideoFetchByUrlType,
300 syncParam: SyncParam
301}): Promise<VideoModel> {
302 if (!options.video.isOutdated()) return options.video
303
304 // We need more attributes if the argument video was fetched with not enough joints
305 const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url)
306
307 try {
308 const { response, videoObject } = await fetchRemoteVideo(video.url)
309 if (response.statusCode === 404) {
310 logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url)
311
312 // Video does not exist anymore
313 await video.destroy()
314 return undefined
315 }
316
317 if (videoObject === undefined) {
318 logger.warn('Cannot refresh remote video %s: invalid body.', video.url)
319
320 await video.setAsRefreshed()
321 return video
322 }
323
324 const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
325 const account = await AccountModel.load(channelActor.VideoChannel.accountId)
326
327 const updateOptions = {
328 video,
329 videoObject,
330 account,
331 channel: channelActor.VideoChannel
332 }
333 await retryTransactionWrapper(updateVideoFromAP, updateOptions)
334 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
335
336 return video
337 } catch (err) {
338 logger.warn('Cannot refresh video %s.', options.video.url, { err })
339
340 // Don't refresh in loop
341 await video.setAsRefreshed()
342 return video
343 }
344}
345
300export { 346export {
301 updateVideoFromAP, 347 updateVideoFromAP,
348 refreshVideoIfNeeded,
302 federateVideoIfNeeded, 349 federateVideoIfNeeded,
303 fetchRemoteVideo, 350 fetchRemoteVideo,
304 getOrCreateVideoAndAccountAndChannel, 351 getOrCreateVideoAndAccountAndChannel,
@@ -362,52 +409,6 @@ async function createVideo (videoObject: VideoTorrentObject, channelActor: Actor
362 return videoCreated 409 return videoCreated
363} 410}
364 411
365async function refreshVideoIfNeeded (options: {
366 video: VideoModel,
367 fetchedType: VideoFetchByUrlType,
368 syncParam: SyncParam,
369 refreshViews: boolean
370}): Promise<VideoModel> {
371 if (!options.video.isOutdated()) return options.video
372
373 // We need more attributes if the argument video was fetched with not enough joints
374 const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url)
375
376 try {
377 const { response, videoObject } = await fetchRemoteVideo(video.url)
378 if (response.statusCode === 404) {
379 logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url)
380
381 // Video does not exist anymore
382 await video.destroy()
383 return undefined
384 }
385
386 if (videoObject === undefined) {
387 logger.warn('Cannot refresh remote video %s: invalid body.', video.url)
388 return video
389 }
390
391 const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
392 const account = await AccountModel.load(channelActor.VideoChannel.accountId)
393
394 const updateOptions = {
395 video,
396 videoObject,
397 account,
398 channel: channelActor.VideoChannel,
399 updateViews: options.refreshViews
400 }
401 await retryTransactionWrapper(updateVideoFromAP, updateOptions)
402 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
403
404 return video
405 } catch (err) {
406 logger.warn('Cannot refresh video %s.', options.video.url, { err })
407 return video
408 }
409}
410
411async function videoActivityObjectToDBAttributes ( 412async function videoActivityObjectToDBAttributes (
412 videoChannel: VideoChannelModel, 413 videoChannel: VideoChannelModel,
413 videoObject: VideoTorrentObject, 414 videoObject: VideoTorrentObject,
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
new file mode 100644
index 000000000..7752b3b40
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -0,0 +1,40 @@
1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger'
3import { fetchVideoByUrl } from '../../../helpers/video'
4import { refreshVideoIfNeeded } from '../../activitypub'
5
6export type RefreshPayload = {
7 videoUrl: string
8 type: 'video'
9}
10
11async function refreshAPObject (job: Bull.Job) {
12 const payload = job.data as RefreshPayload
13 logger.info('Processing AP refresher in job %d.', job.id)
14
15 if (payload.type === 'video') return refreshAPVideo(payload.videoUrl)
16}
17
18// ---------------------------------------------------------------------------
19
20export {
21 refreshAPObject
22}
23
24// ---------------------------------------------------------------------------
25
26async function refreshAPVideo (videoUrl: string) {
27 const fetchType = 'all' as 'all'
28 const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true }
29
30 const videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType)
31 if (videoFromDatabase) {
32 const refreshOptions = {
33 video: videoFromDatabase,
34 fetchedType: fetchType,
35 syncParam
36 }
37
38 await refreshVideoIfNeeded(refreshOptions)
39 }
40}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 4cfd4d253..5862e178f 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -11,6 +11,7 @@ import { processVideoFile, processVideoFileImport, VideoFileImportPayload, Video
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12import { processVideoImport, VideoImportPayload } from './handlers/video-import' 12import { processVideoImport, VideoImportPayload } from './handlers/video-import'
13import { processVideosViews } from './handlers/video-views' 13import { processVideosViews } from './handlers/video-views'
14import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher'
14 15
15type CreateJobArgument = 16type CreateJobArgument =
16 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 17 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -21,6 +22,7 @@ type CreateJobArgument =
21 { type: 'video-file', payload: VideoFilePayload } | 22 { type: 'video-file', payload: VideoFilePayload } |
22 { type: 'email', payload: EmailPayload } | 23 { type: 'email', payload: EmailPayload } |
23 { type: 'video-import', payload: VideoImportPayload } | 24 { type: 'video-import', payload: VideoImportPayload } |
25 { type: 'activitypub-refresher', payload: RefreshPayload } |
24 { type: 'videos-views', payload: {} } 26 { type: 'videos-views', payload: {} }
25 27
26const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { 28const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
@@ -32,7 +34,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
32 'video-file': processVideoFile, 34 'video-file': processVideoFile,
33 'email': processEmail, 35 'email': processEmail,
34 'video-import': processVideoImport, 36 'video-import': processVideoImport,
35 'videos-views': processVideosViews 37 'videos-views': processVideosViews,
38 'activitypub-refresher': refreshAPObject
36} 39}
37 40
38const jobTypes: JobType[] = [ 41const jobTypes: JobType[] = [
@@ -44,7 +47,8 @@ const jobTypes: JobType[] = [
44 'video-file', 47 'video-file',
45 'video-file-import', 48 'video-file-import',
46 'video-import', 49 'video-import',
47 'videos-views' 50 'videos-views',
51 'activitypub-refresher'
48] 52]
49 53
50class JobQueue { 54class JobQueue {
diff --git a/server/models/video/video.ts b/server/models/video/video.ts
index 1e68b380c..0f18d9f0c 100644
--- a/server/models/video/video.ts
+++ b/server/models/video/video.ts
@@ -1561,6 +1561,12 @@ export class VideoModel extends Model<VideoModel> {
1561 (now - updatedAtTime) > ACTIVITY_PUB.VIDEO_REFRESH_INTERVAL 1561 (now - updatedAtTime) > ACTIVITY_PUB.VIDEO_REFRESH_INTERVAL
1562 } 1562 }
1563 1563
1564 setAsRefreshed () {
1565 this.changed('updatedAt', true)
1566
1567 return this.save()
1568 }
1569
1564 getBaseUrls () { 1570 getBaseUrls () {
1565 let baseUrlHttp 1571 let baseUrlHttp
1566 let baseUrlWs 1572 let baseUrlWs
diff --git a/server/tests/api/activitypub/index.ts b/server/tests/api/activitypub/index.ts
index e748f32e9..450053309 100644
--- a/server/tests/api/activitypub/index.ts
+++ b/server/tests/api/activitypub/index.ts
@@ -1,4 +1,5 @@
1import './client' 1import './client'
2import './fetch' 2import './fetch'
3import './helpers' 3import './helpers'
4import './refresher'
4import './security' 5import './security'
diff --git a/server/tests/api/activitypub/refresher.ts b/server/tests/api/activitypub/refresher.ts
new file mode 100644
index 000000000..67e04f79e
--- /dev/null
+++ b/server/tests/api/activitypub/refresher.ts
@@ -0,0 +1,84 @@
1/* tslint:disable:no-unused-expression */
2
3import 'mocha'
4import { doubleFollow, getVideo, reRunServer } from '../../utils'
5import { flushAndRunMultipleServers, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo, wait } from '../../utils/index'
6import { waitJobs } from '../../utils/server/jobs'
7import { setVideoField } from '../../utils/miscs/sql'
8
9describe('Test AP refresher', function () {
10 let servers: ServerInfo[] = []
11 let videoUUID1: string
12 let videoUUID2: string
13 let videoUUID3: string
14
15 before(async function () {
16 this.timeout(30000)
17
18 servers = await flushAndRunMultipleServers(2)
19
20 // Get the access tokens
21 await setAccessTokensToServers(servers)
22
23 {
24 const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video1' })
25 videoUUID1 = res.body.video.uuid
26 }
27
28 {
29 const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video2' })
30 videoUUID2 = res.body.video.uuid
31 }
32
33 {
34 const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video3' })
35 videoUUID3 = res.body.video.uuid
36 }
37
38 await doubleFollow(servers[0], servers[1])
39 })
40
41 it('Should remove a deleted remote video', async function () {
42 this.timeout(60000)
43
44 await wait(10000)
45
46 // Change UUID so the remote server returns a 404
47 await setVideoField(2, videoUUID1, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174f')
48
49 await getVideo(servers[0].url, videoUUID1)
50 await getVideo(servers[0].url, videoUUID2)
51
52 await waitJobs(servers)
53
54 await getVideo(servers[0].url, videoUUID1, 404)
55 await getVideo(servers[0].url, videoUUID2, 200)
56 })
57
58 it('Should not update a remote video if the remote instance is down', async function () {
59 this.timeout(60000)
60
61 killallServers([ servers[1] ])
62
63 await setVideoField(2, videoUUID3, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174e')
64
65 // Video will need a refresh
66 await wait(10000)
67
68 await getVideo(servers[0].url, videoUUID3)
69 // The refresh should fail
70 await waitJobs([ servers[0] ])
71
72 await reRunServer(servers[1])
73
74 // Should not refresh the video, even if the last refresh failed (to avoir a loop on dead instances)
75 await getVideo(servers[0].url, videoUUID3)
76 await waitJobs(servers)
77
78 await getVideo(servers[0].url, videoUUID3, 200)
79 })
80
81 after(async function () {
82 killallServers(servers)
83 })
84})