'activitypub-http-fetcher',
'activitypub-http-unicast',
'activitypub-refresher',
+ 'activitypub-cleaner',
'actor-keys',
'email',
'video-file-import',
videos:
federate_unlisted: false
+ # Add a weekly job that cleans up remote AP interactions on local videos (shares, rates and comments)
+ # It removes objects that do not exist anymore, and potentially fix their URLs
+ # This setting is opt-in because due to an old bug in PeerTube, remote rates sent by instance before PeerTube 3.0 will be deleted
+ # We still suggest you to enable this setting even if your users will loose most of their video's likes/dislikes
+ cleanup_remote_interactions: false
+
cache:
previews:
size: 500 # Max number of previews you want to cache
videos:
federate_unlisted: false
+ # Add a weekly job that cleans up remote AP interactions on local videos (shares, rates and comments)
+ # It removes objects that do not exist anymore, and potentially fix their URLs
+ # This setting is opt-in because due to an old bug in PeerTube, remote rates sent by instance before PeerTube 3.0 will be deleted
+ # We still suggest you to enable this setting even if your users will loose most of their video's likes/dislikes
+ cleanup_remote_interactions: false
+
###############################################################################
#
import validator from 'validator'
import { Activity, ActivityType } from '../../../../shared/models/activitypub'
+import { exists } from '../misc'
import { sanitizeAndCheckActorObject } from './actor'
+import { isCacheFileObjectValid } from './cache-file'
+import { isFlagActivityValid } from './flag'
import { isActivityPubUrlValid, isBaseActivityValid, isObjectValid } from './misc'
-import { isDislikeActivityValid } from './rate'
+import { isPlaylistObjectValid } from './playlist'
+import { isDislikeActivityValid, isLikeActivityValid } from './rate'
+import { isShareActivityValid } from './share'
import { sanitizeAndCheckVideoCommentObject } from './video-comments'
import { sanitizeAndCheckVideoTorrentObject } from './videos'
import { isViewActivityValid } from './view'
-import { exists } from '../misc'
-import { isCacheFileObjectValid } from './cache-file'
-import { isFlagActivityValid } from './flag'
-import { isPlaylistObjectValid } from './playlist'
function isRootActivityValid (activity: any) {
return isCollection(activity) || isActivity(activity)
}
function checkDislikeActivity (activity: any) {
- return isBaseActivityValid(activity, 'Dislike') &&
- isDislikeActivityValid(activity)
+ return isDislikeActivityValid(activity)
+}
+
+function checkLikeActivity (activity: any) {
+ return isLikeActivityValid(activity)
}
function checkCreateActivity (activity: any) {
}
function checkAnnounceActivity (activity: any) {
- return isBaseActivityValid(activity, 'Announce') &&
- isObjectValid(activity.object)
+ return isShareActivityValid(activity)
}
function checkUndoActivity (activity: any) {
checkCreateActivity(activity.object)
)
}
-
-function checkLikeActivity (activity: any) {
- return isBaseActivityValid(activity, 'Like') &&
- isObjectValid(activity.object)
-}
-import { isActivityPubUrlValid, isObjectValid } from './misc'
+import { isBaseActivityValid, isObjectValid } from './misc'
+
+function isLikeActivityValid (activity: any) {
+ return isBaseActivityValid(activity, 'Like') &&
+ isObjectValid(activity.object)
+}
function isDislikeActivityValid (activity: any) {
- return activity.type === 'Dislike' &&
- isActivityPubUrlValid(activity.actor) &&
+ return isBaseActivityValid(activity, 'Dislike') &&
isObjectValid(activity.object)
}
// ---------------------------------------------------------------------------
export {
- isDislikeActivityValid
+ isDislikeActivityValid,
+ isLikeActivityValid
}
--- /dev/null
+import { isBaseActivityValid, isObjectValid } from './misc'
+
+function isShareActivityValid (activity: any) {
+ return isBaseActivityValid(activity, 'Announce') &&
+ isObjectValid(activity.object)
+}
+// ---------------------------------------------------------------------------
+
+export {
+ isShareActivityValid
+}
'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max',
'theme.default',
'remote_redundancy.videos.accept_from',
- 'federation.videos.federate_unlisted',
+ 'federation.videos.federate_unlisted', 'federation.videos.cleanup_remote_interactions',
'search.remote_uri.users', 'search.remote_uri.anonymous', 'search.search_index.enabled', 'search.search_index.url',
'search.search_index.disable_local_search', 'search.search_index.is_default_search',
'live.enabled', 'live.allow_replay', 'live.max_duration', 'live.max_user_lives', 'live.max_instance_lives',
},
FEDERATION: {
VIDEOS: {
- FEDERATE_UNLISTED: config.get<boolean>('federation.videos.federate_unlisted')
+ FEDERATE_UNLISTED: config.get<boolean>('federation.videos.federate_unlisted'),
+ CLEANUP_REMOTE_INTERACTIONS: config.get<boolean>('federation.videos.cleanup_remote_interactions')
}
},
ADMIN: {
'activitypub-http-unicast': 5,
'activitypub-http-fetcher': 5,
'activitypub-follow': 5,
+ 'activitypub-cleaner': 1,
'video-file-import': 1,
'video-transcoding': 1,
'video-import': 1,
'video-redundancy': 1,
'video-live-ending': 1
}
-const JOB_CONCURRENCY: { [id in JobType]?: number } = {
+// Excluded keys are jobs that can be configured by admins
+const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
'activitypub-http-broadcast': 1,
'activitypub-http-unicast': 5,
'activitypub-http-fetcher': 1,
+ 'activitypub-cleaner': 1,
'activitypub-follow': 1,
'video-file-import': 1,
'email': 5,
'activitypub-http-unicast': 60000 * 10, // 10 minutes
'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours
'activitypub-follow': 60000 * 10, // 10 minutes
+ 'activitypub-cleaner': 1000 * 3600, // 1 hour
'video-file-import': 1000 * 3600, // 1 hour
'video-transcoding': 1000 * 3600 * 48, // 2 days, transcoding could be long
'video-import': 1000 * 3600 * 2, // 2 hours
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
'videos-views': {
cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
+ },
+ 'activitypub-cleaner': {
+ cron: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM
}
}
const JOB_PRIORITY = {
}
const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job
+const AP_CLEANER_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-cleaner job
const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...)
const JOB_REQUEST_TIMEOUT = 7000 // 7 seconds
const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 2 days
SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000
SCHEDULER_INTERVALS_MS.updateInboxStats = 5000
REPEAT_JOBS['videos-views'] = { every: 5000 }
+ REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 }
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
REDUNDANCY,
JOB_CONCURRENCY,
JOB_ATTEMPTS,
+ AP_CLEANER_CONCURRENCY,
LAST_MIGRATION_VERSION,
OAUTH_LIFETIME,
CUSTOM_HTML_TAG_COMMENTS,
return await tryResolveThreadFromVideo(params)
}
} catch (err) {
- logger.debug('Cannot get or create account and video and channel for reply %s, fetch comment', url, { err })
+ logger.debug('Cannot resolve thread from video %s, maybe because it was not a video', url, { err })
}
- return resolveParentComment(params)
+ return resolveRemoteParentComment(params)
}
export {
return { video, comment: resultComment, commentCreated }
}
-async function resolveParentComment (params: ResolveThreadParams) {
+async function resolveRemoteParentComment (params: ResolveThreadParams) {
const { url, comments } = params
if (comments.length > ACTIVITY_PUB.MAX_RECURSION_COMMENTS) {
})
if (sanitizeAndCheckVideoCommentObject(body) === false) {
- throw new Error('Remote video comment JSON is not valid:' + JSON.stringify(body))
+ throw new Error(`Remote video comment JSON ${url} is not valid:` + JSON.stringify(body))
}
const actorUrl = body.attributedTo
--- /dev/null
+import * as Bluebird from 'bluebird'
+import * as Bull from 'bull'
+import { checkUrlsSameHost } from '@server/helpers/activitypub'
+import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate'
+import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share'
+import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
+import { doRequest } from '@server/helpers/requests'
+import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants'
+import { VideoModel } from '@server/models/video/video'
+import { VideoCommentModel } from '@server/models/video/video-comment'
+import { VideoShareModel } from '@server/models/video/video-share'
+import { HttpStatusCode } from '@shared/core-utils'
+import { logger } from '../../../helpers/logger'
+import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
+
+// Job to clean remote interactions off local videos
+
+async function processActivityPubCleaner (_job: Bull.Job) {
+ logger.info('Processing ActivityPub cleaner.')
+
+ {
+ const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
+ const { bodyValidator, deleter, updater } = rateOptionsFactory()
+
+ await Bluebird.map(rateUrls, async rateUrl => {
+ try {
+ const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter)
+
+ if (result?.status === 'deleted') {
+ const { videoId, type } = result.data
+
+ await VideoModel.updateRatesOf(videoId, type, undefined)
+ }
+ } catch (err) {
+ logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
+ }
+ }, { concurrency: AP_CLEANER_CONCURRENCY })
+ }
+
+ {
+ const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos()
+ const { bodyValidator, deleter, updater } = shareOptionsFactory()
+
+ await Bluebird.map(shareUrls, async shareUrl => {
+ try {
+ await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter)
+ } catch (err) {
+ logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
+ }
+ }, { concurrency: AP_CLEANER_CONCURRENCY })
+ }
+
+ {
+ const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos()
+ const { bodyValidator, deleter, updater } = commentOptionsFactory()
+
+ await Bluebird.map(commentUrls, async commentUrl => {
+ try {
+ await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter)
+ } catch (err) {
+ logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
+ }
+ }, { concurrency: AP_CLEANER_CONCURRENCY })
+ }
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+ processActivityPubCleaner
+}
+
+// ---------------------------------------------------------------------------
+
+async function updateObjectIfNeeded <T> (
+ url: string,
+ bodyValidator: (body: any) => boolean,
+ updater: (url: string, newUrl: string) => Promise<T>,
+ deleter: (url: string) => Promise<T>
+): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
+ // Fetch url
+ const { response, body } = await doRequest<any>({
+ uri: url,
+ json: true,
+ activityPub: true
+ })
+
+ // Does not exist anymore, remove entry
+ if (response.statusCode === HttpStatusCode.NOT_FOUND_404) {
+ logger.info('Removing remote AP object %s.', url)
+ const data = await deleter(url)
+
+ return { status: 'deleted', data }
+ }
+
+ // If not same id, check same host and update
+ if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
+
+ if (body.type === 'Tombstone') {
+ logger.info('Removing remote AP object %s.', url)
+ const data = await deleter(url)
+
+ return { status: 'deleted', data }
+ }
+
+ const newUrl = body.id
+ if (newUrl !== url) {
+ if (checkUrlsSameHost(newUrl, url) !== true) {
+ throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
+ }
+
+ logger.info('Updating remote AP object %s.', url)
+ const data = await updater(url, newUrl)
+
+ return { status: 'updated', data }
+ }
+
+ return null
+}
+
+function rateOptionsFactory () {
+ return {
+ bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body),
+
+ updater: async (url: string, newUrl: string) => {
+ const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
+ rate.url = newUrl
+
+ const videoId = rate.videoId
+ const type = rate.type
+
+ await rate.save()
+
+ return { videoId, type }
+ },
+
+ deleter: async (url) => {
+ const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
+
+ const videoId = rate.videoId
+ const type = rate.type
+
+ await rate.destroy()
+
+ return { videoId, type }
+ }
+ }
+}
+
+function shareOptionsFactory () {
+ return {
+ bodyValidator: (body: any) => isShareActivityValid(body),
+
+ updater: async (url: string, newUrl: string) => {
+ const share = await VideoShareModel.loadByUrl(url, undefined)
+ share.url = newUrl
+
+ await share.save()
+
+ return undefined
+ },
+
+ deleter: async (url) => {
+ const share = await VideoShareModel.loadByUrl(url, undefined)
+
+ await share.destroy()
+
+ return undefined
+ }
+ }
+}
+
+function commentOptionsFactory () {
+ return {
+ bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body),
+
+ updater: async (url: string, newUrl: string) => {
+ const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
+ comment.url = newUrl
+
+ await comment.save()
+
+ return undefined
+ },
+
+ deleter: async (url) => {
+ const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
+
+ await comment.destroy()
+
+ return undefined
+ }
+ }
+}
async function processActorKeys (job: Bull.Job) {
const payload = job.data as ActorKeysPayload
- logger.info('Processing email in job %d.', job.id)
+ logger.info('Processing actor keys in job %d.', job.id)
const actor = await ActorModel.load(payload.actorId)
import { logger } from '../../helpers/logger'
import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
import { Redis } from '../redis'
+import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow'
import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
{ type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
+ { type: 'activitypub-http-cleaner', payload: {} } |
{ type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
{ type: 'video-file-import', payload: VideoFileImportPayload } |
{ type: 'video-transcoding', payload: VideoTranscodingPayload } |
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
+ 'activitypub-cleaner': processActivityPubCleaner,
'activitypub-follow': processActivityPubFollow,
'video-file-import': processVideoFileImport,
'video-transcoding': processVideoTranscoding,
'activitypub-http-broadcast',
'activitypub-http-fetcher',
'activitypub-http-unicast',
+ 'activitypub-cleaner',
'email',
'video-transcoding',
'video-file-import',
this.queues['videos-views'].add({}, {
repeat: REPEAT_JOBS['videos-views']
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
+
+ if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
+ this.queues['activitypub-cleaner'].add({}, {
+ repeat: REPEAT_JOBS['activitypub-cleaner']
+ }).catch(err => logger.error('Cannot add repeatable job.', { err }))
+ }
}
private filterJobTypes (jobType?: JobType) {
import * as express from 'express'
import { body, param, query } from 'express-validator'
-import { isIdOrUUIDValid } from '../../../helpers/custom-validators/misc'
+import { isIdOrUUIDValid, isIdValid } from '../../../helpers/custom-validators/misc'
import { isRatingValid } from '../../../helpers/custom-validators/video-rates'
import { isVideoRatingTypeValid } from '../../../helpers/custom-validators/videos'
import { logger } from '../../../helpers/logger'
const getAccountVideoRateValidatorFactory = function (rateType: VideoRateType) {
return [
param('name').custom(isAccountNameValid).withMessage('Should have a valid account name'),
- param('videoId').custom(isIdOrUUIDValid).not().isEmpty().withMessage('Should have a valid videoId'),
+ param('videoId').custom(isIdValid).not().isEmpty().withMessage('Should have a valid videoId'),
async (req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.debug('Checking videoCommentGetValidator parameters.', { parameters: req.params })
if (areValidationErrors(req, res)) return
- const rate = await AccountVideoRateModel.loadLocalAndPopulateVideo(rateType, req.params.name, req.params.videoId)
+ const rate = await AccountVideoRateModel.loadLocalAndPopulateVideo(rateType, req.params.name, +req.params.videoId)
if (!rate) {
return res.status(HttpStatusCode.NOT_FOUND_404)
.json({ error: 'Video rate not found' })
return AccountVideoRateModel.findAndCountAll(query)
}
+ static listRemoteRateUrlsOfLocalVideos () {
+ const query = `SELECT "accountVideoRate".url FROM "accountVideoRate" ` +
+ `INNER JOIN account ON account.id = "accountVideoRate"."accountId" ` +
+ `INNER JOIN actor ON actor.id = account."actorId" AND actor."serverId" IS NOT NULL ` +
+ `INNER JOIN video ON video.id = "accountVideoRate"."videoId" AND video.remote IS FALSE`
+
+ return AccountVideoRateModel.sequelize.query<{ url: string }>(query, {
+ type: QueryTypes.SELECT,
+ raw: true
+ }).then(rows => rows.map(r => r.url))
+ }
+
static loadLocalAndPopulateVideo (
rateType: VideoRateType,
accountName: string,
- videoId: number | string,
+ videoId: number,
t?: Transaction
): Promise<MAccountVideoRateAccountVideo> {
const options: FindOptions = {
await AccountVideoRateModel.destroy(query)
- const field = type === 'like'
- ? 'likes'
- : 'dislikes'
-
- const rawQuery = `UPDATE "video" SET "${field}" = ` +
- '(' +
- 'SELECT COUNT(id) FROM "accountVideoRate" WHERE "accountVideoRate"."videoId" = "video"."id" AND type = :rateType' +
- ') ' +
- 'WHERE "video"."id" = :videoId'
-
- return AccountVideoRateModel.sequelize.query(rawQuery, {
- transaction: t,
- replacements: { videoId, rateType: type },
- type: QueryTypes.UPDATE
- })
+ return VideoModel.updateRatesOf(videoId, type, t)
})
}
import { uniq } from 'lodash'
-import { FindAndCountOptions, FindOptions, Op, Order, ScopeOptions, Sequelize, Transaction, WhereOptions } from 'sequelize'
+import { FindAndCountOptions, FindOptions, Op, Order, QueryTypes, ScopeOptions, Sequelize, Transaction, WhereOptions } from 'sequelize'
import {
AllowNull,
BelongsTo,
}
}
+ static listRemoteCommentUrlsOfLocalVideos () {
+ const query = `SELECT "videoComment".url FROM "videoComment" ` +
+ `INNER JOIN account ON account.id = "videoComment"."accountId" ` +
+ `INNER JOIN actor ON actor.id = "account"."actorId" AND actor."serverId" IS NOT NULL ` +
+ `INNER JOIN video ON video.id = "videoComment"."videoId" AND video.remote IS FALSE`
+
+ return VideoCommentModel.sequelize.query<{ url: string }>(query, {
+ type: QueryTypes.SELECT,
+ raw: true
+ }).then(rows => rows.map(r => r.url))
+ }
+
static cleanOldCommentsOf (videoId: number, beforeUpdatedAt: Date) {
const query = {
where: {
-import { literal, Op, Transaction } from 'sequelize'
+import { literal, Op, QueryTypes, Transaction } from 'sequelize'
import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Model, Scopes, Table, UpdatedAt } from 'sequelize-typescript'
import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc'
import { CONSTRAINTS_FIELDS } from '../../initializers/constants'
return VideoShareModel.findAndCountAll(query)
}
+ static listRemoteShareUrlsOfLocalVideos () {
+ const query = `SELECT "videoShare".url FROM "videoShare" ` +
+ `INNER JOIN actor ON actor.id = "videoShare"."actorId" AND actor."serverId" IS NOT NULL ` +
+ `INNER JOIN video ON video.id = "videoShare"."videoId" AND video.remote IS FALSE`
+
+ return VideoShareModel.sequelize.query<{ url: string }>(query, {
+ type: QueryTypes.SELECT,
+ raw: true
+ }).then(rows => rows.map(r => r.url))
+ }
+
static cleanOldSharesOf (videoId: number, beforeUpdatedAt: Date) {
const query = {
where: {
import { VideoFile } from '@shared/models/videos/video-file.model'
import { ResultList, UserRight, VideoPrivacy, VideoState } from '../../../shared'
import { VideoObject } from '../../../shared/models/activitypub/objects'
-import { Video, VideoDetails } from '../../../shared/models/videos'
+import { Video, VideoDetails, VideoRateType } from '../../../shared/models/videos'
import { ThumbnailType } from '../../../shared/models/videos/thumbnail.type'
import { VideoFilter } from '../../../shared/models/videos/video-query.type'
import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type'
})
}
+ static updateRatesOf (videoId: number, type: VideoRateType, t: Transaction) {
+ const field = type === 'like'
+ ? 'likes'
+ : 'dislikes'
+
+ const rawQuery = `UPDATE "video" SET "${field}" = ` +
+ '(' +
+ 'SELECT COUNT(id) FROM "accountVideoRate" WHERE "accountVideoRate"."videoId" = "video"."id" AND type = :rateType' +
+ ') ' +
+ 'WHERE "video"."id" = :videoId'
+
+ return AccountVideoRateModel.sequelize.query(rawQuery, {
+ transaction: t,
+ replacements: { videoId, rateType: type },
+ type: QueryTypes.UPDATE
+ })
+ }
+
static checkVideoHasInstanceFollow (videoId: number, followerActorId: number) {
// Instances only share videos
const query = 'SELECT 1 FROM "videoShare" ' +
--- /dev/null
+/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
+
+import 'mocha'
+import * as chai from 'chai'
+import {
+ cleanupTests,
+ closeAllSequelize,
+ deleteAll,
+ doubleFollow,
+ getCount,
+ selectQuery,
+ setVideoField,
+ updateQuery,
+ wait
+} from '../../../../shared/extra-utils'
+import { flushAndRunMultipleServers, ServerInfo, setAccessTokensToServers } from '../../../../shared/extra-utils/index'
+import { waitJobs } from '../../../../shared/extra-utils/server/jobs'
+import { addVideoCommentThread, getVideoCommentThreads } from '../../../../shared/extra-utils/videos/video-comments'
+import { getVideo, rateVideo, uploadVideoAndGetId } from '../../../../shared/extra-utils/videos/videos'
+
+const expect = chai.expect
+
+describe('Test AP cleaner', function () {
+ let servers: ServerInfo[] = []
+ let videoUUID1: string
+ let videoUUID2: string
+ let videoUUID3: string
+
+ let videoUUIDs: string[]
+
+ before(async function () {
+ this.timeout(120000)
+
+ const config = {
+ federation: {
+ videos: { cleanup_remote_interactions: true }
+ }
+ }
+ servers = await flushAndRunMultipleServers(3, config)
+
+ // Get the access tokens
+ await setAccessTokensToServers(servers)
+
+ await Promise.all([
+ doubleFollow(servers[0], servers[1]),
+ doubleFollow(servers[1], servers[2]),
+ doubleFollow(servers[0], servers[2])
+ ])
+
+ // Update 1 local share, check 6 shares
+
+ // Create 1 comment per video
+ // Update 1 remote URL and 1 local URL on
+
+ videoUUID1 = (await uploadVideoAndGetId({ server: servers[0], videoName: 'server 1' })).uuid
+ videoUUID2 = (await uploadVideoAndGetId({ server: servers[1], videoName: 'server 2' })).uuid
+ videoUUID3 = (await uploadVideoAndGetId({ server: servers[2], videoName: 'server 3' })).uuid
+
+ videoUUIDs = [ videoUUID1, videoUUID2, videoUUID3 ]
+
+ await waitJobs(servers)
+
+ for (const server of servers) {
+ for (const uuid of videoUUIDs) {
+ await rateVideo(server.url, server.accessToken, uuid, 'like')
+ await addVideoCommentThread(server.url, server.accessToken, uuid, 'comment')
+ }
+ }
+
+ await waitJobs(servers)
+ })
+
+ it('Should have the correct likes', async function () {
+ for (const server of servers) {
+ for (const uuid of videoUUIDs) {
+ const res = await getVideo(server.url, uuid)
+ expect(res.body.likes).to.equal(3)
+ expect(res.body.dislikes).to.equal(0)
+ }
+ }
+ })
+
+ it('Should destroy server 3 internal likes and correctly clean them', async function () {
+ this.timeout(20000)
+
+ await deleteAll(servers[2].internalServerNumber, 'accountVideoRate')
+ for (const uuid of videoUUIDs) {
+ await setVideoField(servers[2].internalServerNumber, uuid, 'likes', '0')
+ }
+
+ await wait(5000)
+ await waitJobs(servers)
+
+ // Updated rates of my video
+ {
+ const res = await getVideo(servers[0].url, videoUUID1)
+ expect(res.body.likes).to.equal(2)
+ expect(res.body.dislikes).to.equal(0)
+ }
+
+ // Did not update rates of a remote video
+ {
+ const res = await getVideo(servers[0].url, videoUUID2)
+ expect(res.body.likes).to.equal(3)
+ expect(res.body.dislikes).to.equal(0)
+ }
+ })
+
+ it('Should update rates to dislikes', async function () {
+ this.timeout(20000)
+
+ for (const server of servers) {
+ for (const uuid of videoUUIDs) {
+ await rateVideo(server.url, server.accessToken, uuid, 'dislike')
+ }
+ }
+
+ await waitJobs(servers)
+
+ for (const server of servers) {
+ for (const uuid of videoUUIDs) {
+ const res = await getVideo(server.url, uuid)
+ expect(res.body.likes).to.equal(0)
+ expect(res.body.dislikes).to.equal(3)
+ }
+ }
+ })
+
+ it('Should destroy server 3 internal dislikes and correctly clean them', async function () {
+ this.timeout(20000)
+
+ await deleteAll(servers[2].internalServerNumber, 'accountVideoRate')
+
+ for (const uuid of videoUUIDs) {
+ await setVideoField(servers[2].internalServerNumber, uuid, 'dislikes', '0')
+ }
+
+ await wait(5000)
+ await waitJobs(servers)
+
+ // Updated rates of my video
+ {
+ const res = await getVideo(servers[0].url, videoUUID1)
+ expect(res.body.likes).to.equal(0)
+ expect(res.body.dislikes).to.equal(2)
+ }
+
+ // Did not update rates of a remote video
+ {
+ const res = await getVideo(servers[0].url, videoUUID2)
+ expect(res.body.likes).to.equal(0)
+ expect(res.body.dislikes).to.equal(3)
+ }
+ })
+
+ it('Should destroy server 3 internal shares and correctly clean them', async function () {
+ this.timeout(20000)
+
+ const preCount = await getCount(servers[0].internalServerNumber, 'videoShare')
+ expect(preCount).to.equal(6)
+
+ await deleteAll(servers[2].internalServerNumber, 'videoShare')
+ await wait(5000)
+ await waitJobs(servers)
+
+ // Still 6 because we don't have remote shares on local videos
+ const postCount = await getCount(servers[0].internalServerNumber, 'videoShare')
+ expect(postCount).to.equal(6)
+ })
+
+ it('Should destroy server 3 internal comments and correctly clean them', async function () {
+ this.timeout(20000)
+
+ {
+ const res = await getVideoCommentThreads(servers[0].url, videoUUID1, 0, 5)
+ expect(res.body.total).to.equal(3)
+ }
+
+ await deleteAll(servers[2].internalServerNumber, 'videoComment')
+
+ await wait(5000)
+ await waitJobs(servers)
+
+ {
+ const res = await getVideoCommentThreads(servers[0].url, videoUUID1, 0, 5)
+ expect(res.body.total).to.equal(2)
+ }
+ })
+
+ it('Should correctly update rate URLs', async function () {
+ this.timeout(30000)
+
+ async function check (like: string, ofServerUrl: string, urlSuffix: string, remote: 'true' | 'false') {
+ const query = `SELECT "videoId", "accountVideoRate".url FROM "accountVideoRate" ` +
+ `INNER JOIN video ON "accountVideoRate"."videoId" = video.id AND remote IS ${remote} WHERE "accountVideoRate"."url" LIKE '${like}'`
+ const res = await selectQuery(servers[0].internalServerNumber, query)
+
+ for (const rate of res) {
+ const matcher = new RegExp(`^${ofServerUrl}/accounts/root/dislikes/\\d+${urlSuffix}$`)
+ expect(rate.url).to.match(matcher)
+ }
+ }
+
+ async function checkLocal () {
+ const startsWith = 'http://' + servers[0].host + '%'
+ // On local videos
+ await check(startsWith, servers[0].url, '', 'false')
+ // On remote videos
+ await check(startsWith, servers[0].url, '', 'true')
+ }
+
+ async function checkRemote (suffix: string) {
+ const startsWith = 'http://' + servers[1].host + '%'
+ // On local videos
+ await check(startsWith, servers[1].url, suffix, 'false')
+ // On remote videos, we should not update URLs so no suffix
+ await check(startsWith, servers[1].url, '', 'true')
+ }
+
+ await checkLocal()
+ await checkRemote('')
+
+ {
+ const query = `UPDATE "accountVideoRate" SET url = url || 'stan'`
+ await updateQuery(servers[1].internalServerNumber, query)
+
+ await wait(5000)
+ await waitJobs(servers)
+ }
+
+ await checkLocal()
+ await checkRemote('stan')
+ })
+
+ it('Should correctly update comment URLs', async function () {
+ this.timeout(30000)
+
+ async function check (like: string, ofServerUrl: string, urlSuffix: string, remote: 'true' | 'false') {
+ const query = `SELECT "videoId", "videoComment".url, uuid as "videoUUID" FROM "videoComment" ` +
+ `INNER JOIN video ON "videoComment"."videoId" = video.id AND remote IS ${remote} WHERE "videoComment"."url" LIKE '${like}'`
+
+ const res = await selectQuery(servers[0].internalServerNumber, query)
+
+ for (const comment of res) {
+ const matcher = new RegExp(`${ofServerUrl}/videos/watch/${comment.videoUUID}/comments/\\d+${urlSuffix}`)
+ expect(comment.url).to.match(matcher)
+ }
+ }
+
+ async function checkLocal () {
+ const startsWith = 'http://' + servers[0].host + '%'
+ // On local videos
+ await check(startsWith, servers[0].url, '', 'false')
+ // On remote videos
+ await check(startsWith, servers[0].url, '', 'true')
+ }
+
+ async function checkRemote (suffix: string) {
+ const startsWith = 'http://' + servers[1].host + '%'
+ // On local videos
+ await check(startsWith, servers[1].url, suffix, 'false')
+ // On remote videos, we should not update URLs so no suffix
+ await check(startsWith, servers[1].url, '', 'true')
+ }
+
+ {
+ const query = `UPDATE "videoComment" SET url = url || 'kyle'`
+ await updateQuery(servers[1].internalServerNumber, query)
+
+ await wait(5000)
+ await waitJobs(servers)
+ }
+
+ await checkLocal()
+ await checkRemote('kyle')
+ })
+
+ after(async function () {
+ await cleanupTests(servers)
+
+ await closeAllSequelize(servers)
+ })
+})
+import './cleaner'
import './client'
import './fetch'
import './refresher'
return seq
}
+function deleteAll (internalServerNumber: number, table: string) {
+ const seq = getSequelize(internalServerNumber)
+
+ const options = { type: QueryTypes.DELETE }
+
+ return seq.query(`DELETE FROM "${table}"`, options)
+}
+
+async function getCount (internalServerNumber: number, table: string) {
+ const seq = getSequelize(internalServerNumber)
+
+ const options = { type: QueryTypes.SELECT as QueryTypes.SELECT }
+
+ const [ { total } ] = await seq.query<{ total: string }>(`SELECT COUNT(*) as total FROM "${table}"`, options)
+ if (total === null) return 0
+
+ return parseInt(total, 10)
+}
+
function setActorField (internalServerNumber: number, to: string, field: string, value: string) {
const seq = getSequelize(internalServerNumber)
return parseInt(total + '', 10)
}
+function selectQuery (internalServerNumber: number, query: string) {
+ const seq = getSequelize(internalServerNumber)
+ const options = { type: QueryTypes.SELECT as QueryTypes.SELECT }
+
+ return seq.query<any>(query, options)
+}
+
+function updateQuery (internalServerNumber: number, query: string) {
+ const seq = getSequelize(internalServerNumber)
+ const options = { type: QueryTypes.UPDATE as QueryTypes.UPDATE }
+
+ return seq.query(query, options)
+}
+
async function closeAllSequelize (servers: ServerInfo[]) {
for (const server of servers) {
if (sequelizes[server.internalServerNumber]) {
setActorField,
countVideoViewsOf,
setPluginVersion,
+ selectQuery,
+ deleteAll,
+ updateQuery,
setActorFollowScores,
- closeAllSequelize
+ closeAllSequelize,
+ getCount
}
else servers = serversArg as ServerInfo[]
const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
+ const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ]
let pendingRequests: boolean
function tasksBuilder () {
count: 10,
sort: '-createdAt'
}).then(res => res.body.data)
- .then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views'))
+ .then((jobs: Job[]) => jobs.filter(j => !repeatableJobs.includes(j.type)))
.then(jobs => {
if (jobs.length !== 0) {
pendingRequests = true
})
}
-function rateVideo (url: string, accessToken: string, id: number, rating: string, specialStatus = HttpStatusCode.NO_CONTENT_204) {
+function rateVideo (url: string, accessToken: string, id: number | string, rating: string, specialStatus = HttpStatusCode.NO_CONTENT_204) {
const path = '/api/v1/videos/' + id + '/rate'
return request(url)
| 'activitypub-http-unicast'
| 'activitypub-http-broadcast'
| 'activitypub-http-fetcher'
+ | 'activitypub-cleaner'
| 'activitypub-follow'
| 'video-file-import'
| 'video-transcoding'