From a5cf76afa378aae81af2a9b0ce548e5d2582f832 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 25 Sep 2020 10:04:21 +0200 Subject: Add watch messages if live has not started --- server/initializers/constants.ts | 10 +- server/lib/activitypub/videos.ts | 3 + server/lib/job-queue/handlers/video-live-ending.ts | 47 ++++++++ server/lib/job-queue/job-queue.ts | 20 +++- server/lib/live-manager.ts | 129 ++++++++++++++------- server/lib/peertube-socket.ts | 30 ++++- server/lib/video-blacklist.ts | 5 + server/models/video/video-live.ts | 30 ++++- server/models/video/video-streaming-playlist.ts | 11 ++ server/models/video/video.ts | 8 ++ 10 files changed, 232 insertions(+), 61 deletions(-) create mode 100644 server/lib/job-queue/handlers/video-live-ending.ts (limited to 'server') diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 606eeba2d..82d04a94e 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -139,7 +139,8 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { 'email': 5, 'videos-views': 1, 'activitypub-refresher': 1, - 'video-redundancy': 1 + 'video-redundancy': 1, + 'video-live-ending': 1 } const JOB_CONCURRENCY: { [id in JobType]: number } = { 'activitypub-http-broadcast': 1, @@ -152,7 +153,8 @@ const JOB_CONCURRENCY: { [id in JobType]: number } = { 'email': 5, 'videos-views': 1, 'activitypub-refresher': 1, - 'video-redundancy': 1 + 'video-redundancy': 1, + 'video-live-ending': 1 } const JOB_TTL: { [id in JobType]: number } = { 'activitypub-http-broadcast': 60000 * 10, // 10 minutes @@ -165,7 +167,8 @@ const JOB_TTL: { [id in JobType]: number } = { 'email': 60000 * 10, // 10 minutes 'videos-views': undefined, // Unlimited 'activitypub-refresher': 60000 * 10, // 10 minutes - 'video-redundancy': 1000 * 3600 * 3 // 3 hours + 'video-redundancy': 1000 * 3600 * 3, // 3 hours + 'video-live-ending': 1000 * 60 * 10 // 10 minutes } const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { 'videos-views': { @@ -605,6 +608,7 @@ const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls') const VIDEO_LIVE = { EXTENSION: '.ts', + CLEANUP_DELAY: 1000 * 60 * 5, // 5 mintues RTMP: { CHUNK_SIZE: 60000, GOP_CACHE: true, diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index 049e06cff..ab23ff507 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts @@ -66,6 +66,7 @@ import { FilteredModelAttributes } from '../../types/sequelize' import { ActorFollowScoreCache } from '../files-cache' import { JobQueue } from '../job-queue' import { Notifier } from '../notifier' +import { PeerTubeSocket } from '../peertube-socket' import { createPlaceholderThumbnail, createVideoMiniatureFromUrl } from '../thumbnail' import { setVideoTags } from '../video' import { autoBlacklistVideoIfNeeded } from '../video-blacklist' @@ -348,6 +349,7 @@ async function updateVideoFromAP (options: { video.privacy = videoData.privacy video.channelId = videoData.channelId video.views = videoData.views + video.isLive = videoData.isLive const videoUpdated = await video.save(sequelizeOptions) as MVideoFullLight @@ -434,6 +436,7 @@ async function updateVideoFromAP (options: { }) if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated) // Notify our users? + if (videoUpdated.isLive) PeerTubeSocket.Instance.sendVideoLiveNewState(video) logger.info('Remote video with uuid %s updated', videoObject.uuid) diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts new file mode 100644 index 000000000..1a58a9f7e --- /dev/null +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -0,0 +1,47 @@ +import * as Bull from 'bull' +import { readdir, remove } from 'fs-extra' +import { join } from 'path' +import { getHLSDirectory } from '@server/lib/video-paths' +import { VideoModel } from '@server/models/video/video' +import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' +import { VideoLiveEndingPayload } from '@shared/models' +import { logger } from '../../../helpers/logger' + +async function processVideoLiveEnding (job: Bull.Job) { + const payload = job.data as VideoLiveEndingPayload + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) + if (!video) { + logger.warn('Video live %d does not exist anymore. Cannot cleanup.', payload.videoId) + return + } + + const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) + const hlsDirectory = getHLSDirectory(video, false) + + const files = await readdir(hlsDirectory) + + for (const filename of files) { + if ( + filename.endsWith('.ts') || + filename.endsWith('.m3u8') || + filename.endsWith('.mpd') || + filename.endsWith('.m4s') || + filename.endsWith('.tmp') + ) { + const p = join(hlsDirectory, filename) + + remove(p) + .catch(err => logger.error('Cannot remove %s.', p, { err })) + } + } + + streamingPlaylist.destroy() + .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) +} + +// --------------------------------------------------------------------------- + +export { + processVideoLiveEnding +} diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 14e181835..8d97434ac 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -10,6 +10,7 @@ import { RefreshPayload, VideoFileImportPayload, VideoImportPayload, + VideoLiveEndingPayload, VideoRedundancyPayload, VideoTranscodingPayload } from '../../../shared/models' @@ -27,6 +28,7 @@ import { processVideosViews } from './handlers/video-views' import { refreshAPObject } from './handlers/activitypub-refresher' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' +import { processVideoLiveEnding } from './handlers/video-live-ending' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -39,8 +41,13 @@ type CreateJobArgument = { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'videos-views', payload: {} } | + { type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'video-redundancy', payload: VideoRedundancyPayload } +type CreateJobOptions = { + delay?: number +} + const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, @@ -52,6 +59,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { 'video-import': processVideoImport, 'videos-views': processVideosViews, 'activitypub-refresher': refreshAPObject, + 'video-live-ending': processVideoLiveEnding, 'video-redundancy': processVideoRedundancy } @@ -66,7 +74,8 @@ const jobTypes: JobType[] = [ 'video-import', 'videos-views', 'activitypub-refresher', - 'video-redundancy' + 'video-redundancy', + 'video-live-ending' ] class JobQueue { @@ -122,12 +131,12 @@ class JobQueue { } } - createJob (obj: CreateJobArgument): void { - this.createJobWithPromise(obj) + createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { + this.createJobWithPromise(obj, options) .catch(err => logger.error('Cannot create job.', { err, obj })) } - createJobWithPromise (obj: CreateJobArgument) { + createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { const queue = this.queues[obj.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', obj.type) @@ -137,7 +146,8 @@ class JobQueue { const jobArgs: Bull.JobOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], - timeout: JOB_TTL[obj.type] + timeout: JOB_TTL[obj.type], + delay: options.delay } return queue.add(obj.payload, jobArgs) diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index f602bfb6d..41176d197 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -2,18 +2,22 @@ import { AsyncQueue, queue } from 'async' import * as chokidar from 'chokidar' import { FfmpegCommand } from 'fluent-ffmpeg' -import { ensureDir, readdir, remove } from 'fs-extra' -import { basename, join } from 'path' +import { ensureDir } from 'fs-extra' +import { basename } from 'path' import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' import { logger } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' +import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' import { VideoLiveModel } from '@server/models/video/video-live' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models' +import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models' import { VideoState, VideoStreamingPlaylistType } from '@shared/models' +import { federateVideoIfNeeded } from './activitypub/videos' import { buildSha256Segment } from './hls' +import { JobQueue } from './job-queue' +import { PeerTubeSocket } from './peertube-socket' import { getHLSDirectory } from './video-paths' const NodeRtmpServer = require('node-media-server/node_rtmp_server') @@ -47,6 +51,7 @@ class LiveManager { private static instance: LiveManager private readonly transSessions = new Map() + private readonly videoSessions = new Map() private readonly segmentsSha256 = new Map>() private segmentsSha256Queue: AsyncQueue @@ -56,7 +61,8 @@ class LiveManager { } init () { - this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => { + const events = this.getContext().nodeEvent + events.on('postPublish', (sessionId: string, streamPath: string) => { logger.debug('RTMP received stream', { id: sessionId, streamPath }) const splittedPath = streamPath.split('/') @@ -69,7 +75,7 @@ class LiveManager { .catch(err => logger.error('Cannot handle sessions.', { err })) }) - this.getContext().nodeEvent.on('donePublish', sessionId => { + events.on('donePublish', sessionId => { this.abortSession(sessionId) }) @@ -115,6 +121,16 @@ class LiveManager { return this.segmentsSha256.get(videoUUID) } + stopSessionOf (videoId: number) { + const sessionId = this.videoSessions.get(videoId) + if (!sessionId) return + + this.abortSession(sessionId) + + this.onEndTransmuxing(videoId) + .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err })) + } + private getContext () { return context } @@ -135,6 +151,13 @@ class LiveManager { } const video = videoLive.Video + if (video.isBlacklisted()) { + logger.warn('Video is blacklisted. Refusing stream %s.', streamKey) + return this.abortSession(sessionId) + } + + this.videoSessions.set(video.id, sessionId) + const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) const session = this.getContext().sessions.get(sessionId) @@ -154,11 +177,6 @@ class LiveManager { type: VideoStreamingPlaylistType.HLS }, { returning: true }) as [ MStreamingPlaylist, boolean ] - video.state = VideoState.PUBLISHED - await video.save() - - // FIXME: federation? - return this.runMuxing({ sessionId, videoLive, @@ -207,11 +225,46 @@ class LiveManager { this.transSessions.set(sessionId, ffmpegExec) + const videoUUID = videoLive.Video.uuid + const tsWatcher = chokidar.watch(outPath + '/*.ts') + + const updateHandler = segmentPath => { + this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) + } + + const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) + + tsWatcher.on('add', p => updateHandler(p)) + tsWatcher.on('change', p => updateHandler(p)) + tsWatcher.on('unlink', p => deleteHandler(p)) + + const masterWatcher = chokidar.watch(outPath + '/master.m3u8') + masterWatcher.on('add', async () => { + try { + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId) + + video.state = VideoState.PUBLISHED + await video.save() + videoLive.Video = video + + await federateVideoIfNeeded(video, false) + + PeerTubeSocket.Instance.sendVideoLiveNewState(video) + } catch (err) { + logger.error('Cannot federate video %d.', videoLive.videoId, { err }) + } finally { + masterWatcher.close() + .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err })) + } + }) + const onFFmpegEnded = () => { - watcher.close() - .catch(err => logger.error('Cannot close watcher of %s.', outPath, { err })) + logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath) - this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath) + Promise.all([ tsWatcher.close(), masterWatcher.close() ]) + .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) + + this.onEndTransmuxing(videoLive.Video.id) .catch(err => logger.error('Error in closed transmuxing.', { err })) } @@ -225,44 +278,30 @@ class LiveManager { }) ffmpegExec.on('end', () => onFFmpegEnded()) - - const videoUUID = videoLive.Video.uuid - const watcher = chokidar.watch(outPath + '/*.ts') - - const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) - const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) - - watcher.on('add', p => updateHandler(p)) - watcher.on('change', p => updateHandler(p)) - watcher.on('unlink', p => deleteHandler(p)) } - private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) { - logger.info('RTMP transmuxing for %s ended.', streamPath) + private async onEndTransmuxing (videoId: number) { + try { + const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + if (!fullVideo) return - const files = await readdir(outPath) + JobQueue.Instance.createJob({ + type: 'video-live-ending', + payload: { + videoId: fullVideo.id + } + }, { delay: VIDEO_LIVE.CLEANUP_DELAY }) - for (const filename of files) { - if ( - filename.endsWith('.ts') || - filename.endsWith('.m3u8') || - filename.endsWith('.mpd') || - filename.endsWith('.m4s') || - filename.endsWith('.tmp') - ) { - const p = join(outPath, filename) + // FIXME: use end + fullVideo.state = VideoState.WAITING_FOR_LIVE + await fullVideo.save() - remove(p) - .catch(err => logger.error('Cannot remove %s.', p, { err })) - } - } + PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) - playlist.destroy() - .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) - - video.state = VideoState.LIVE_ENDED - video.save() - .catch(err => logger.error('Cannot save new video state of live streaming.', { err })) + await federateVideoIfNeeded(fullVideo, false) + } catch (err) { + logger.error('Cannot save/federate new video state of live streaming.', { err }) + } } private async addSegmentSha (options: SegmentSha256QueueParam) { diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 2e4b15b38..c918a8685 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts @@ -1,14 +1,18 @@ -import * as SocketIO from 'socket.io' -import { authenticateSocket } from '../middlewares' -import { logger } from '../helpers/logger' +import { Socket } from 'dgram' import { Server } from 'http' +import * as SocketIO from 'socket.io' +import { MVideo } from '@server/types/models' import { UserNotificationModelForApi } from '@server/types/models/user' +import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' +import { logger } from '../helpers/logger' +import { authenticateSocket } from '../middlewares' class PeerTubeSocket { private static instance: PeerTubeSocket private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {} + private liveVideosNamespace: SocketIO.Namespace private constructor () {} @@ -32,19 +36,37 @@ class PeerTubeSocket { this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket) }) }) + + this.liveVideosNamespace = io.of('/live-videos') + .on('connection', socket => { + socket.on('subscribe', ({ videoId }) => socket.join(videoId)) + socket.on('unsubscribe', ({ videoId }) => socket.leave(videoId)) + }) } sendNotification (userId: number, notification: UserNotificationModelForApi) { const sockets = this.userNotificationSockets[userId] - if (!sockets) return + logger.debug('Sending user notification to user %d.', userId) + const notificationMessage = notification.toFormattedJSON() for (const socket of sockets) { socket.emit('new-notification', notificationMessage) } } + sendVideoLiveNewState (video: MVideo) { + const data: LiveVideoEventPayload = { state: video.state } + const type: LiveVideoEventType = 'state-change' + + logger.debug('Sending video live new state notification of %s.', video.url) + + this.liveVideosNamespace + .in(video.id) + .emit(type, data) + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/lib/video-blacklist.ts b/server/lib/video-blacklist.ts index bdbcffda6..f6c66b6dd 100644 --- a/server/lib/video-blacklist.ts +++ b/server/lib/video-blacklist.ts @@ -17,6 +17,7 @@ import { sendDeleteVideo } from './activitypub/send' import { federateVideoIfNeeded } from './activitypub/videos' import { Notifier } from './notifier' import { Hooks } from './plugins/hooks' +import { LiveManager } from './live-manager' async function autoBlacklistVideoIfNeeded (parameters: { video: MVideoWithBlacklistLight @@ -73,6 +74,10 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video await sendDeleteVideo(videoInstance, undefined) } + if (videoInstance.isLive) { + LiveManager.Instance.stopSessionOf(videoInstance.id) + } + Notifier.Instance.notifyOnVideoBlacklist(blacklist) } diff --git a/server/models/video/video-live.ts b/server/models/video/video-live.ts index 6929b9688..8608bc84c 100644 --- a/server/models/video/video-live.ts +++ b/server/models/video/video-live.ts @@ -1,14 +1,21 @@ import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript' import { WEBSERVER } from '@server/initializers/constants' import { MVideoLive, MVideoLiveVideo } from '@server/types/models' -import { VideoLive } from '@shared/models/videos/video-live.model' +import { LiveVideo, VideoState } from '@shared/models' import { VideoModel } from './video' +import { VideoBlacklistModel } from './video-blacklist' @DefaultScope(() => ({ include: [ { model: VideoModel, - required: true + required: true, + include: [ + { + model: VideoBlacklistModel, + required: false + } + ] } ] })) @@ -49,7 +56,22 @@ export class VideoLiveModel extends Model { const query = { where: { streamKey - } + }, + include: [ + { + model: VideoModel.unscoped(), + required: true, + where: { + state: VideoState.WAITING_FOR_LIVE + }, + include: [ + { + model: VideoBlacklistModel.unscoped(), + required: false + } + ] + } + ] } return VideoLiveModel.findOne(query) @@ -65,7 +87,7 @@ export class VideoLiveModel extends Model { return VideoLiveModel.findOne(query) } - toFormattedJSON (): VideoLive { + toFormattedJSON (): LiveVideo { return { rtmpUrl: WEBSERVER.RTMP_URL, streamKey: this.streamKey diff --git a/server/models/video/video-streaming-playlist.ts b/server/models/video/video-streaming-playlist.ts index b8dc7c450..73bd89844 100644 --- a/server/models/video/video-streaming-playlist.ts +++ b/server/models/video/video-streaming-playlist.ts @@ -153,6 +153,17 @@ export class VideoStreamingPlaylistModel extends Model { return undefined } + @BeforeDestroy + static stopLiveIfNeeded (instance: VideoModel) { + if (!instance.isLive) return + + return LiveManager.Instance.stopSessionOf(instance.id) + } + @BeforeDestroy static invalidateCache (instance: VideoModel) { ModelCache.Instance.invalidateCache('video', instance.id) -- cgit v1.2.3