From c46edbc2f6ca310b2f0331f979ac6caf27f6eb92 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 22 Nov 2017 11:27:40 +0100 Subject: [PATCH] Fetch outbox to grab old activities tests --- scripts/parse-log.ts | 4 +- server/controllers/activitypub/client.ts | 4 +- server/controllers/activitypub/outbox.ts | 2 +- server/controllers/api/server/follows.ts | 13 +++---- server/helpers/activitypub.ts | 9 ++++- server/initializers/constants.ts | 1 + .../lib/activitypub/process/process-accept.ts | 2 + server/lib/activitypub/process/process-add.ts | 2 +- .../activitypub-http-fetcher-handler.ts | 6 +-- server/models/video/video.ts | 38 ++++++++++++++----- server/tests/api/follows.ts | 30 ++++++++++++++- 11 files changed, 81 insertions(+), 30 deletions(-) diff --git a/scripts/parse-log.ts b/scripts/parse-log.ts index 24a09c885..e2c42bf4c 100755 --- a/scripts/parse-log.ts +++ b/scripts/parse-log.ts @@ -2,7 +2,6 @@ import { createReadStream } from 'fs' import { join } from 'path' import { createInterface } from 'readline' import * as winston from 'winston' -import { readFileBufferPromise } from '../server/helpers/core-utils' import { CONFIG } from '../server/initializers/constants' const label = CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT @@ -16,7 +15,8 @@ const logger = new winston.Logger({ humanReadableUnhandledException: true, json: false, colorize: true, - prettyPrint: true + prettyPrint: true, + stderrLevels: [] }) ], exitOnError: true diff --git a/server/controllers/activitypub/client.ts b/server/controllers/activitypub/client.ts index 24c8665a5..eee89e2fd 100644 --- a/server/controllers/activitypub/client.ts +++ b/server/controllers/activitypub/client.ts @@ -56,7 +56,7 @@ async function accountController (req: express.Request, res: express.Response, n async function accountFollowersController (req: express.Request, res: express.Response, next: express.NextFunction) { const account: AccountInstance = res.locals.account - const page = req.params.page || 1 + const page = req.query.page || 1 const { start, count } = pageToStartAndCount(page, ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE) const result = await db.AccountFollow.listAcceptedFollowerUrlsForApi([ account.id ], start, count) @@ -68,7 +68,7 @@ async function accountFollowersController (req: express.Request, res: express.Re async function accountFollowingController (req: express.Request, res: express.Response, next: express.NextFunction) { const account: AccountInstance = res.locals.account - const page = req.params.page || 1 + const page = req.query.page || 1 const { start, count } = pageToStartAndCount(page, ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE) const result = await db.AccountFollow.listAcceptedFollowingUrlsForApi([ account.id ], start, count) diff --git a/server/controllers/activitypub/outbox.ts b/server/controllers/activitypub/outbox.ts index 1a74bde33..74d399763 100644 --- a/server/controllers/activitypub/outbox.ts +++ b/server/controllers/activitypub/outbox.ts @@ -28,7 +28,7 @@ export { async function outboxController (req: express.Request, res: express.Response, next: express.NextFunction) { const account: AccountInstance = res.locals.account - const page = req.params.page || 1 + const page = req.query.page || 1 const { start, count } = pageToStartAndCount(page, ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE) const data = await db.Video.listAllAndSharedByAccountForOutbox(account.id, start, count) diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts index 4b54afc8d..391f8bdca 100644 --- a/server/controllers/api/server/follows.ts +++ b/server/controllers/api/server/follows.ts @@ -1,11 +1,15 @@ import * as express from 'express' import { UserRight } from '../../../../shared/models/users/user-right.enum' import { getFormattedObjects } from '../../../helpers' +import { retryTransactionWrapper } from '../../../helpers/database-utils' import { logger } from '../../../helpers/logger' import { getServerAccount } from '../../../helpers/utils' import { getAccountFromWebfinger } from '../../../helpers/webfinger' import { SERVER_ACCOUNT_NAME } from '../../../initializers/constants' import { database as db } from '../../../initializers/database' +import { saveAccountAndServerIfNotExist } from '../../../lib/activitypub/account' +import { sendUndoFollow } from '../../../lib/activitypub/send/send-undo' +import { sendFollow } from '../../../lib/index' import { asyncMiddleware, paginationValidator, removeFollowingValidator, setFollowersSort, setPagination } from '../../../middlewares' import { authenticate } from '../../../middlewares/oauth' import { setBodyHostsPort } from '../../../middlewares/servers' @@ -13,13 +17,8 @@ import { setFollowingSort } from '../../../middlewares/sort' import { ensureUserHasRight } from '../../../middlewares/user-right' import { followValidator } from '../../../middlewares/validators/follows' import { followersSortValidator, followingSortValidator } from '../../../middlewares/validators/sort' -import { AccountFollowInstance } from '../../../models/index' -import { sendFollow } from '../../../lib/index' -import { sendUndoFollow } from '../../../lib/activitypub/send/send-undo' import { AccountInstance } from '../../../models/account/account-interface' -import { retryTransactionWrapper } from '../../../helpers/database-utils' -import { saveAccountAndServerIfNotExist } from '../../../lib/activitypub/account' -import { addFetchOutboxJob } from '../../../lib/activitypub/fetch' +import { AccountFollowInstance } from '../../../models/index' const serverFollowsRouter = express.Router() @@ -137,8 +136,6 @@ async function follow (fromAccount: AccountInstance, targetAccount: AccountInsta if (accountFollow.state === 'pending') { await sendFollow(accountFollow, t) } - - await addFetchOutboxJob(targetAccount, t) }) } catch (err) { // Reset target account diff --git a/server/helpers/activitypub.ts b/server/helpers/activitypub.ts index fb4a43a01..54c460200 100644 --- a/server/helpers/activitypub.ts +++ b/server/helpers/activitypub.ts @@ -24,12 +24,15 @@ function activityPubContextify (data: T) { }) } -function activityPubCollectionPagination (url: string, page: number, result: ResultList) { +function activityPubCollectionPagination (url: string, page: any, result: ResultList) { let next: string let prev: string + // Assert page is a number + page = parseInt(page, 10) + // There are more results - if (result.total > ((page + 1) * ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE)) { + if (result.total > page * ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE) { next = url + '?page=' + (page + 1) } @@ -53,6 +56,8 @@ function activityPubCollectionPagination (url: string, page: number, result: Res totalItems: result.total, first: orderedCollectionPagination }) + } else { + orderedCollectionPagination['totalItems'] = result.total } return orderedCollectionPagination diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 398691eba..9e61f01aa 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -328,6 +328,7 @@ if (isTestInstance() === true) { REMOTE_SCHEME.HTTP = 'http' REMOTE_SCHEME.WS = 'ws' STATIC_MAX_AGE = '0' + ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE = 2 } // --------------------------------------------------------------------------- diff --git a/server/lib/activitypub/process/process-accept.ts b/server/lib/activitypub/process/process-accept.ts index e159c41b5..73c6cb279 100644 --- a/server/lib/activitypub/process/process-accept.ts +++ b/server/lib/activitypub/process/process-accept.ts @@ -1,6 +1,7 @@ import { ActivityAccept } from '../../../../shared/models/activitypub/activity' import { database as db } from '../../../initializers' import { AccountInstance } from '../../../models/account/account-interface' +import { addFetchOutboxJob } from '../fetch' async function processAcceptActivity (activity: ActivityAccept, inboxAccount?: AccountInstance) { if (inboxAccount === undefined) throw new Error('Need to accept on explicit inbox.') @@ -24,4 +25,5 @@ async function processAccept (account: AccountInstance, targetAccount: AccountIn follow.set('state', 'accepted') await follow.save() + await addFetchOutboxJob(targetAccount, undefined) } diff --git a/server/lib/activitypub/process/process-add.ts b/server/lib/activitypub/process/process-add.ts index edc90dee5..332c18cc0 100644 --- a/server/lib/activitypub/process/process-add.ts +++ b/server/lib/activitypub/process/process-add.ts @@ -48,7 +48,7 @@ function addRemoteVideo (account: AccountInstance, activity: ActivityAdd, videoChannel: VideoChannelInstance, videoToCreateData: VideoTorrentObject) { - logger.debug('Adding remote video %s.', videoToCreateData.url) + logger.debug('Adding remote video %s.', videoToCreateData.id) return db.sequelize.transaction(async t => { const sequelizeOptions = { diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts index b8ead32a4..09efaa622 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts @@ -25,7 +25,7 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) { if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { const activities = firstBody.first.orderedItems - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri) + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) await processActivities(activities) } @@ -37,12 +37,12 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) { options.uri = nextLink const { body } = await doRequest(options) - nextLink = body.nextLink + nextLink = body.next i++ if (Array.isArray(body.orderedItems)) { const activities = body.orderedItems - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri) + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) await processActivities(activities) } diff --git a/server/models/video/video.ts b/server/models/video/video.ts index 3b7e83779..9b411a92e 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -46,6 +46,7 @@ import { TagInstance } from './tag-interface' import { VideoFileInstance, VideoFileModel } from './video-file-interface' import { VideoAttributes, VideoInstance, VideoMethods } from './video-interface' import { sendDeleteVideo } from '../../lib/index' +import * as Bluebird from 'bluebird' const Buffer = safeBuffer.Buffer @@ -786,14 +787,21 @@ list = function () { } listAllAndSharedByAccountForOutbox = function (accountId: number, start: number, count: number) { - const queryVideo = 'SELECT "Video"."id" FROM "Videos" AS "Video" ' + - 'INNER JOIN "VideoChannels" AS "VideoChannel" ON "VideoChannel"."id" = "Video"."channelId" ' + - 'WHERE "VideoChannel"."accountId" = ' + accountId - const queryVideoShare = 'SELECT "Video"."id" FROM "VideoShares" AS "VideoShare" ' + - 'INNER JOIN "Videos" AS "Video" ON "Video"."id" = "VideoShare"."videoId" ' + - 'INNER JOIN "VideoChannels" AS "VideoChannel" ON "VideoChannel"."id" = "Video"."channelId" ' + - 'WHERE "VideoShare"."accountId" = ' + accountId - const rawQuery = `(${queryVideo}) UNION (${queryVideoShare}) LIMIT ${count} OFFSET ${start}` + function getRawQuery (select: string) { + const queryVideo = 'SELECT ' + select + ' FROM "Videos" AS "Video" ' + + 'INNER JOIN "VideoChannels" AS "VideoChannel" ON "VideoChannel"."id" = "Video"."channelId" ' + + 'WHERE "VideoChannel"."accountId" = ' + accountId + const queryVideoShare = 'SELECT ' + select + ' FROM "VideoShares" AS "VideoShare" ' + + 'INNER JOIN "Videos" AS "Video" ON "Video"."id" = "VideoShare"."videoId" ' + + 'WHERE "VideoShare"."accountId" = ' + accountId + + let rawQuery = `(${queryVideo}) UNION (${queryVideoShare})` + + return rawQuery + } + + const rawQuery = getRawQuery('"Video"."id"') + const rawCountQuery = getRawQuery('COUNT("Video"."id") as "total"') const query = { distinct: true, @@ -825,10 +833,20 @@ listAllAndSharedByAccountForOutbox = function (accountId: number, start: number, ] } - return Video.findAndCountAll(query).then(({ rows, count }) => { + return Bluebird.all([ + Video.findAll(query), + Video['sequelize'].query(rawCountQuery, { type: Sequelize.QueryTypes.SELECT }) + ]).then(([ rows, totals ]) => { + // totals: totalVideos + totalVideoShares + let totalVideos = 0 + let totalVideoShares = 0 + if (totals[0]) totalVideos = parseInt(totals[0].total, 10) + if (totals[1]) totalVideoShares = parseInt(totals[1].total, 10) + + const total = totalVideos + totalVideoShares return { data: rows, - total: count + total: total } }) } diff --git a/server/tests/api/follows.ts b/server/tests/api/follows.ts index b2f53d3a7..875d814a7 100644 --- a/server/tests/api/follows.ts +++ b/server/tests/api/follows.ts @@ -22,7 +22,7 @@ describe('Test follows', function () { let server3Id: number before(async function () { - this.timeout(120000) + this.timeout(20000) servers = await flushAndRunMultipleServers(3) @@ -163,6 +163,34 @@ describe('Test follows', function () { expect(res.body.data[0].name).to.equal('server3') }) + it('Should propagate previous uploaded videos on a new following', async function () { + this.timeout(20000) + + await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-2' }) + await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-3' }) + await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-4' }) + await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-5' }) + await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-6' }) + + await wait(5000) + + // Server 1 follows server 3 + await follow(servers[0].url, [ servers[2].url ], servers[0].accessToken) + + await wait(7000) + + let res = await getVideosList(servers[0].url) + expect(res.body.total).to.equal(7) + + const video2 = res.body.data.find(v => v.name === 'server3-2') + const video4 = res.body.data.find(v => v.name === 'server3-4') + const video6 = res.body.data.find(v => v.name === 'server3-6') + + expect(video2).to.not.be.undefined + expect(video4).to.not.be.undefined + expect(video6).to.not.be.undefined + }) + after(async function () { killallServers(servers) -- 2.41.0