From 97969c4edf51b37eee691adba43368bb0fbb729b Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 3 Nov 2020 15:33:30 +0100 Subject: [PATCH] Add check constraints live tests --- .../app/+admin/system/jobs/jobs.component.ts | 3 +- server/initializers/constants.ts | 1 + .../job-queue/handlers/video-live-ending.ts | 19 ++- server/lib/live-manager.ts | 14 +- server/lib/user.ts | 6 +- server/lib/video.ts | 15 ++- server/models/account/user.ts | 3 +- server/models/video/video-file.ts | 8 ++ server/models/video/video.ts | 2 +- server/tests/api/check-params/live.ts | 16 +++ server/tests/api/live/live.ts | 126 +++++++++++++++++- shared/extra-utils/videos/live.ts | 46 ++++++- shared/models/videos/video-create.model.ts | 5 +- 13 files changed, 228 insertions(+), 36 deletions(-) diff --git a/client/src/app/+admin/system/jobs/jobs.component.ts b/client/src/app/+admin/system/jobs/jobs.component.ts index 602362fe9..f8e12d1b6 100644 --- a/client/src/app/+admin/system/jobs/jobs.component.ts +++ b/client/src/app/+admin/system/jobs/jobs.component.ts @@ -33,7 +33,8 @@ export class JobsComponent extends RestTable implements OnInit { 'videos-views', 'activitypub-refresher', 'video-live-ending', - 'video-redundancy' + 'video-redundancy', + 'video-live-ending' ] jobs: Job[] = [] diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index f0d614112..f8380eaa0 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -733,6 +733,7 @@ if (isTestInstance() === true) { FILES_CACHE.VIDEO_CAPTIONS.MAX_AGE = 3000 MEMOIZE_TTL.OVERVIEWS_SAMPLE = 3000 + MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD = 3000 OVERVIEWS.VIDEOS.SAMPLE_THRESHOLD = 2 PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000 diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 32eeff4d1..1e964726e 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -8,9 +8,10 @@ import { generateHlsPlaylist } from '@server/lib/video-transcoding' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models' +import { MStreamingPlaylist, MVideo, MVideoLive, MVideoWithFile } from '@server/types/models' import { VideoLiveEndingPayload, VideoState } from '@shared/models' import { logger } from '../../../helpers/logger' +import { VideoFileModel } from '@server/models/video/video-file' async function processVideoLiveEnding (job: Bull.Job) { const payload = job.data as VideoLiveEndingPayload @@ -60,6 +61,10 @@ async function saveLive (video: MVideo, live: MVideoLive) { const segmentFiles = files.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts')) await hlsPlaylistToFragmentedMP4(hlsDirectory, segmentFiles, mp4TmpName) + for (const file of segmentFiles) { + await remove(join(hlsDirectory, file)) + } + if (!duration) { duration = await getDurationFromVideoFile(mp4TmpName) } @@ -77,8 +82,13 @@ async function saveLive (video: MVideo, live: MVideoLive) { await video.save() + // Remove old HLS playlist video files const videoWithFiles = await VideoModel.loadWithFiles(video.id) + const hlsPlaylist = videoWithFiles.getHLSPlaylist() + await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) + hlsPlaylist.VideoFiles = [] + for (const resolution of resolutions) { const videoInputPath = buildMP4TmpName(resolution) const { isPortraitMode } = await getVideoFileResolution(videoInputPath) @@ -90,12 +100,11 @@ async function saveLive (video: MVideo, live: MVideoLive) { copyCodecs: true, isPortraitMode }) - } - video.state = VideoState.PUBLISHED - await video.save() + await remove(join(hlsDirectory, videoInputPath)) + } - await publishAndFederateIfNeeded(video) + await publishAndFederateIfNeeded(video, true) } async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index e115d2d50..2d8f906e9 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -133,10 +133,8 @@ class LiveManager { const sessionId = this.videoSessions.get(videoId) if (!sessionId) return + this.videoSessions.delete(videoId) this.abortSession(sessionId) - - this.onEndTransmuxing(videoId, true) - .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err })) } private getContext () { @@ -259,9 +257,12 @@ class LiveManager { updateSegment(segmentPath) if (this.isDurationConstraintValid(startStreamDateTime) !== true) { + logger.info('Stopping session of %s: max duration exceeded.', videoUUID) + this.stopSessionOf(videoLive.videoId) } + // Check user quota if the user enabled replay saving if (videoLive.saveReplay === true) { stat(segmentPath) .then(segmentStat => { @@ -270,6 +271,8 @@ class LiveManager { .then(() => this.isQuotaConstraintValid(user, videoLive)) .then(quotaValid => { if (quotaValid !== true) { + logger.info('Stopping session of %s: user quota exceeded.', videoUUID) + this.stopSessionOf(videoLive.videoId) } }) @@ -319,7 +322,7 @@ class LiveManager { onFFmpegEnded() // Don't care that we killed the ffmpeg process - if (err?.message?.includes('SIGINT')) return + if (err?.message?.includes('Exiting normally')) return logger.error('Live transcoding error.', { err, stdout, stderr }) @@ -348,8 +351,7 @@ class LiveManager { } }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) - // FIXME: use end - fullVideo.state = VideoState.WAITING_FOR_LIVE + fullVideo.state = VideoState.LIVE_ENDED await fullVideo.save() PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) diff --git a/server/lib/user.ts b/server/lib/user.ts index d3338f329..7d6497302 100644 --- a/server/lib/user.ts +++ b/server/lib/user.ts @@ -18,8 +18,6 @@ import { Redis } from './redis' import { createLocalVideoChannel } from './video-channel' import { createWatchLaterPlaylist } from './video-playlist' -import memoizee = require('memoizee') - type ChannelNames = { name: string, displayName: string } async function createUserAccountAndChannelAndPlaylist (parameters: { @@ -152,8 +150,8 @@ async function isAbleToUploadVideo (userId: number, size: number) { if (user.videoQuota === -1 && user.videoQuotaDaily === -1) return Promise.resolve(true) const [ totalBytes, totalBytesDaily ] = await Promise.all([ - getOriginalVideoFileTotalFromUser(user.id), - getOriginalVideoFileTotalDailyFromUser(user.id) + getOriginalVideoFileTotalFromUser(user), + getOriginalVideoFileTotalDailyFromUser(user) ]) const uploadedTotal = size + totalBytes diff --git a/server/lib/video.ts b/server/lib/video.ts index 81b7c4159..8d9918b2d 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts @@ -4,7 +4,7 @@ import { TagModel } from '@server/models/video/tag' import { VideoModel } from '@server/models/video/video' import { FilteredModelAttributes } from '@server/types' import { MTag, MThumbnail, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' -import { ThumbnailType, VideoCreate, VideoPrivacy } from '@shared/models' +import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState } from '@shared/models' import { federateVideoIfNeeded } from './activitypub/videos' import { Notifier } from './notifier' import { createVideoMiniatureFromExisting } from './thumbnail' @@ -81,8 +81,8 @@ async function setVideoTags (options: { } } -async function publishAndFederateIfNeeded (video: MVideoUUID) { - const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { +async function publishAndFederateIfNeeded (video: MVideoUUID, wasLive = false) { + const result = await sequelizeTypescript.transaction(async t => { // Maybe the video changed in database, refresh it const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) // Video does not exist anymore @@ -92,14 +92,15 @@ async function publishAndFederateIfNeeded (video: MVideoUUID) { const videoPublished = await videoDatabase.publishIfNeededAndSave(t) // If the video was not published, we consider it is a new one for other instances - await federateVideoIfNeeded(videoDatabase, videoPublished, t) + // Live videos are always federated, so it's not a new video + await federateVideoIfNeeded(videoDatabase, !wasLive && videoPublished, t) return { videoDatabase, videoPublished } }) - if (videoPublished) { - Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase) - Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) + if (result?.videoPublished) { + Notifier.Instance.notifyOnNewVideoIfNeeded(result.videoDatabase) + Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(result.videoDatabase) } } diff --git a/server/models/account/user.ts b/server/models/account/user.ts index f64568c54..2aa6469fb 100644 --- a/server/models/account/user.ts +++ b/server/models/account/user.ts @@ -26,7 +26,6 @@ import { MUser, MUserDefault, MUserFormattable, - MUserId, MUserNotifSettingChannelDefault, MUserWithNotificationSetting, MVideoFullLight @@ -68,10 +67,10 @@ import { getSort, throwIfNotValid } from '../utils' import { VideoModel } from '../video/video' import { VideoChannelModel } from '../video/video-channel' import { VideoImportModel } from '../video/video-import' +import { VideoLiveModel } from '../video/video-live' import { VideoPlaylistModel } from '../video/video-playlist' import { AccountModel } from './account' import { UserNotificationSettingModel } from './user-notification-setting' -import { VideoLiveModel } from '../video/video-live' enum ScopeNames { FOR_ME_API = 'FOR_ME_API', diff --git a/server/models/video/video-file.ts b/server/models/video/video-file.ts index 6a321917c..8c8fc0b51 100644 --- a/server/models/video/video-file.ts +++ b/server/models/video/video-file.ts @@ -311,6 +311,14 @@ export class VideoFileModel extends Model { return element.save({ transaction }) } + static removeHLSFilesOfVideoId (videoStreamingPlaylistId: number) { + const options = { + where: { videoStreamingPlaylistId } + } + + return VideoFileModel.destroy(options) + } + getVideoOrStreamingPlaylist (this: MVideoFileVideo | MVideoFileStreamingPlaylistVideo): MVideo | MStreamingPlaylistVideo { if (this.videoId) return (this as MVideoFileVideo).Video diff --git a/server/models/video/video.ts b/server/models/video/video.ts index aba8c8cf4..7e008f7ea 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -249,7 +249,7 @@ export type AvailableForListIDsOptions = { [ScopeNames.WITH_LIVE]: { include: [ { - model: VideoLiveModel, + model: VideoLiveModel.unscoped(), required: false } ] diff --git a/server/tests/api/check-params/live.ts b/server/tests/api/check-params/live.ts index 4134fca0c..3e97dffdc 100644 --- a/server/tests/api/check-params/live.ts +++ b/server/tests/api/check-params/live.ts @@ -18,6 +18,7 @@ import { ServerInfo, setAccessTokensToServers, stopFfmpeg, + testFfmpegStreamError, updateCustomSubConfig, updateLive, uploadVideoAndGetId, @@ -402,6 +403,21 @@ describe('Test video lives API validator', function () { await stopFfmpeg(command) }) + + it('Should fail to stream twice in the save live', async function () { + this.timeout(30000) + + const resLive = await getLive(server.url, server.accessToken, videoId) + const live: LiveVideo = resLive.body + + const command = sendRTMPStream(live.rtmpUrl, live.streamKey) + + await waitUntilLiveStarts(server.url, server.accessToken, videoId) + + await testFfmpegStreamError(server.url, server.accessToken, videoId, true) + + await stopFfmpeg(command) + }) }) after(async function () { diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts index e66c0cb26..f351e9650 100644 --- a/server/tests/api/live/live.ts +++ b/server/tests/api/live/live.ts @@ -2,14 +2,15 @@ import 'mocha' import * as chai from 'chai' -import { LiveVideo, LiveVideoCreate, VideoDetails, VideoPrivacy } from '@shared/models' +import { LiveVideo, LiveVideoCreate, User, VideoDetails, VideoPrivacy } from '@shared/models' import { - acceptChangeOwnership, cleanupTests, createLive, + createUser, doubleFollow, flushAndRunMultipleServers, getLive, + getMyUserInformation, getVideo, getVideosList, makeRawRequest, @@ -17,9 +18,13 @@ import { ServerInfo, setAccessTokensToServers, setDefaultVideoChannel, + testFfmpegStreamError, testImage, updateCustomSubConfig, updateLive, + updateUser, + userLogin, + wait, waitJobs } from '../../../../shared/extra-utils' @@ -28,6 +33,9 @@ const expect = chai.expect describe('Test live', function () { let servers: ServerInfo[] = [] let liveVideoUUID: string + let userId: number + let userAccessToken: string + let userChannelId: number before(async function () { this.timeout(120000) @@ -45,6 +53,22 @@ describe('Test live', function () { } }) + { + const user = { username: 'user1', password: 'superpassword' } + const res = await createUser({ + url: servers[0].url, + accessToken: servers[0].accessToken, + username: user.username, + password: user.password + }) + userId = res.body.user.id + + userAccessToken = await userLogin(servers[0], user) + + const resMe = await getMyUserInformation(servers[0].url, userAccessToken) + userChannelId = (resMe.body as User).videoChannels[0].id + } + // Server 1 and server 2 follow each other await doubleFollow(servers[0], servers[1]) }) @@ -198,17 +222,111 @@ describe('Test live', function () { describe('Test live constraints', function () { + async function createLiveWrapper (saveReplay: boolean) { + const liveAttributes = { + name: 'user live', + channelId: userChannelId, + privacy: VideoPrivacy.PUBLIC, + saveReplay + } + + const res = await createLive(servers[0].url, userAccessToken, liveAttributes) + return res.body.video.uuid as string + } + + before(async function () { + await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { + live: { + enabled: true, + allowReplay: true + } + }) + + await updateUser({ + url: servers[0].url, + userId, + accessToken: servers[0].accessToken, + videoQuota: 1, + videoQuotaDaily: -1 + }) + }) + it('Should not have size limit if save replay is disabled', async function () { + this.timeout(30000) + const userVideoLiveoId = await createLiveWrapper(false) + await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, false) }) - it('Should have size limit if save replay is enabled', async function () { - // daily quota + total quota + it('Should have size limit depending on user global quota if save replay is enabled', async function () { + this.timeout(30000) + + const userVideoLiveoId = await createLiveWrapper(true) + await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true) + + await waitJobs(servers) + + for (const server of servers) { + const res = await getVideo(server.url, userVideoLiveoId) + const video: VideoDetails = res.body + expect(video.isLive).to.be.false + expect(video.duration).to.be.greaterThan(0) + } + + // TODO: check stream correctly saved + cleaned + }) + + it('Should have size limit depending on user daily quota if save replay is enabled', async function () { + this.timeout(30000) + + await updateUser({ + url: servers[0].url, + userId, + accessToken: servers[0].accessToken, + videoQuota: -1, + videoQuotaDaily: 1 + }) + + const userVideoLiveoId = await createLiveWrapper(true) + await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true) + + // TODO: check stream correctly saved + cleaned + }) + + it('Should succeed without quota limit', async function () { + this.timeout(30000) + + // Wait for user quota memoize cache invalidation + await wait(5000) + + await updateUser({ + url: servers[0].url, + userId, + accessToken: servers[0].accessToken, + videoQuota: 10 * 1000 * 1000, + videoQuotaDaily: -1 + }) + + const userVideoLiveoId = await createLiveWrapper(true) + await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, false) }) it('Should have max duration limit', async function () { + this.timeout(30000) + + await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { + live: { + enabled: true, + allowReplay: true, + maxDuration: 1 + } + }) + + const userVideoLiveoId = await createLiveWrapper(true) + await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true) + // TODO: check stream correctly saved + cleaned }) }) diff --git a/shared/extra-utils/videos/live.ts b/shared/extra-utils/videos/live.ts index 65942db0a..a391565a4 100644 --- a/shared/extra-utils/videos/live.ts +++ b/shared/extra-utils/videos/live.ts @@ -1,9 +1,9 @@ import * as ffmpeg from 'fluent-ffmpeg' -import { LiveVideoCreate, LiveVideoUpdate, VideoDetails, VideoState } from '@shared/models' +import { omit } from 'lodash' +import { LiveVideo, LiveVideoCreate, LiveVideoUpdate, VideoDetails, VideoState } from '@shared/models' import { buildAbsoluteFixturePath, wait } from '../miscs/miscs' import { makeGetRequest, makePutBodyRequest, makeUploadRequest } from '../requests/requests' import { getVideoWithToken } from './videos' -import { omit } from 'lodash' function getLive (url: string, token: string, videoId: number | string, statusCodeExpected = 200) { const path = '/api/v1/videos/live' @@ -47,7 +47,14 @@ function createLive (url: string, token: string, fields: LiveVideoCreate, status }) } -function sendRTMPStream (rtmpBaseUrl: string, streamKey: string) { +async function sendRTMPStreamInVideo (url: string, token: string, videoId: number | string, onErrorCb?: Function) { + const res = await getLive(url, token, videoId) + const videoLive = res.body as LiveVideo + + return sendRTMPStream(videoLive.rtmpUrl, videoLive.streamKey, onErrorCb) +} + +function sendRTMPStream (rtmpBaseUrl: string, streamKey: string, onErrorCb?: Function) { const fixture = buildAbsoluteFixturePath('video_short.mp4') const command = ffmpeg(fixture) @@ -63,7 +70,7 @@ function sendRTMPStream (rtmpBaseUrl: string, streamKey: string) { command.on('error', err => { if (err?.message?.includes('Exiting normally')) return - console.error('Cannot send RTMP stream.', { err }) + if (onErrorCb) onErrorCb(err) }) if (process.env.DEBUG) { @@ -75,6 +82,34 @@ function sendRTMPStream (rtmpBaseUrl: string, streamKey: string) { return command } +function waitFfmpegUntilError (command: ffmpeg.FfmpegCommand, successAfterMS = 10000) { + return new Promise((res, rej) => { + command.on('error', err => { + return rej(err) + }) + + setTimeout(() => { + res() + }, successAfterMS) + }) +} + +async function testFfmpegStreamError (url: string, token: string, videoId: number | string, shouldHaveError: boolean) { + const command = await sendRTMPStreamInVideo(url, token, videoId) + let error: Error + + try { + await waitFfmpegUntilError(command, 10000) + } catch (err) { + error = err + } + + await stopFfmpeg(command) + + if (shouldHaveError && !error) throw new Error('Ffmpeg did not have an error') + if (!shouldHaveError && error) throw error +} + async function stopFfmpeg (command: ffmpeg.FfmpegCommand) { command.kill('SIGINT') @@ -99,6 +134,9 @@ export { updateLive, waitUntilLiveStarts, createLive, + testFfmpegStreamError, stopFfmpeg, + sendRTMPStreamInVideo, + waitFfmpegUntilError, sendRTMPStream } diff --git a/shared/models/videos/video-create.model.ts b/shared/models/videos/video-create.model.ts index 9e980529d..732d508d1 100644 --- a/shared/models/videos/video-create.model.ts +++ b/shared/models/videos/video-create.model.ts @@ -2,15 +2,16 @@ import { VideoPrivacy } from './video-privacy.enum' import { VideoScheduleUpdate } from './video-schedule-update.model' export interface VideoCreate { + name: string + channelId: number + category?: number licence?: number language?: string description?: string support?: string - channelId: number nsfw?: boolean waitTranscoding?: boolean - name: string tags?: string[] commentsEnabled?: boolean downloadEnabled?: boolean -- 2.41.0