aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2020-09-25 10:04:21 +0200
committerChocobozzz <chocobozzz@cpy.re>2020-11-09 15:33:04 +0100
commita5cf76afa378aae81af2a9b0ce548e5d2582f832 (patch)
tree58da320232bee7c9656774c5d6811e82bbf6c696 /server
parentde6310b2fcbb8a6b79c546b23dfa1920724faaa7 (diff)
downloadPeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.tar.gz
PeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.tar.zst
PeerTube-a5cf76afa378aae81af2a9b0ce548e5d2582f832.zip
Add watch messages if live has not started
Diffstat (limited to 'server')
-rw-r--r--server/initializers/constants.ts10
-rw-r--r--server/lib/activitypub/videos.ts3
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts47
-rw-r--r--server/lib/job-queue/job-queue.ts20
-rw-r--r--server/lib/live-manager.ts129
-rw-r--r--server/lib/peertube-socket.ts30
-rw-r--r--server/lib/video-blacklist.ts5
-rw-r--r--server/models/video/video-live.ts30
-rw-r--r--server/models/video/video-streaming-playlist.ts11
-rw-r--r--server/models/video/video.ts8
10 files changed, 232 insertions, 61 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 606eeba2d..82d04a94e 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -139,7 +139,8 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
139 'email': 5, 139 'email': 5,
140 'videos-views': 1, 140 'videos-views': 1,
141 'activitypub-refresher': 1, 141 'activitypub-refresher': 1,
142 'video-redundancy': 1 142 'video-redundancy': 1,
143 'video-live-ending': 1
143} 144}
144const JOB_CONCURRENCY: { [id in JobType]: number } = { 145const JOB_CONCURRENCY: { [id in JobType]: number } = {
145 'activitypub-http-broadcast': 1, 146 'activitypub-http-broadcast': 1,
@@ -152,7 +153,8 @@ const JOB_CONCURRENCY: { [id in JobType]: number } = {
152 'email': 5, 153 'email': 5,
153 'videos-views': 1, 154 'videos-views': 1,
154 'activitypub-refresher': 1, 155 'activitypub-refresher': 1,
155 'video-redundancy': 1 156 'video-redundancy': 1,
157 'video-live-ending': 1
156} 158}
157const JOB_TTL: { [id in JobType]: number } = { 159const JOB_TTL: { [id in JobType]: number } = {
158 'activitypub-http-broadcast': 60000 * 10, // 10 minutes 160 'activitypub-http-broadcast': 60000 * 10, // 10 minutes
@@ -165,7 +167,8 @@ const JOB_TTL: { [id in JobType]: number } = {
165 'email': 60000 * 10, // 10 minutes 167 'email': 60000 * 10, // 10 minutes
166 'videos-views': undefined, // Unlimited 168 'videos-views': undefined, // Unlimited
167 'activitypub-refresher': 60000 * 10, // 10 minutes 169 'activitypub-refresher': 60000 * 10, // 10 minutes
168 'video-redundancy': 1000 * 3600 * 3 // 3 hours 170 'video-redundancy': 1000 * 3600 * 3, // 3 hours
171 'video-live-ending': 1000 * 60 * 10 // 10 minutes
169} 172}
170const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { 173const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
171 'videos-views': { 174 'videos-views': {
@@ -605,6 +608,7 @@ const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls')
605 608
606const VIDEO_LIVE = { 609const VIDEO_LIVE = {
607 EXTENSION: '.ts', 610 EXTENSION: '.ts',
611 CLEANUP_DELAY: 1000 * 60 * 5, // 5 mintues
608 RTMP: { 612 RTMP: {
609 CHUNK_SIZE: 60000, 613 CHUNK_SIZE: 60000,
610 GOP_CACHE: true, 614 GOP_CACHE: true,
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts
index 049e06cff..ab23ff507 100644
--- a/server/lib/activitypub/videos.ts
+++ b/server/lib/activitypub/videos.ts
@@ -66,6 +66,7 @@ import { FilteredModelAttributes } from '../../types/sequelize'
66import { ActorFollowScoreCache } from '../files-cache' 66import { ActorFollowScoreCache } from '../files-cache'
67import { JobQueue } from '../job-queue' 67import { JobQueue } from '../job-queue'
68import { Notifier } from '../notifier' 68import { Notifier } from '../notifier'
69import { PeerTubeSocket } from '../peertube-socket'
69import { createPlaceholderThumbnail, createVideoMiniatureFromUrl } from '../thumbnail' 70import { createPlaceholderThumbnail, createVideoMiniatureFromUrl } from '../thumbnail'
70import { setVideoTags } from '../video' 71import { setVideoTags } from '../video'
71import { autoBlacklistVideoIfNeeded } from '../video-blacklist' 72import { autoBlacklistVideoIfNeeded } from '../video-blacklist'
@@ -348,6 +349,7 @@ async function updateVideoFromAP (options: {
348 video.privacy = videoData.privacy 349 video.privacy = videoData.privacy
349 video.channelId = videoData.channelId 350 video.channelId = videoData.channelId
350 video.views = videoData.views 351 video.views = videoData.views
352 video.isLive = videoData.isLive
351 353
352 const videoUpdated = await video.save(sequelizeOptions) as MVideoFullLight 354 const videoUpdated = await video.save(sequelizeOptions) as MVideoFullLight
353 355
@@ -434,6 +436,7 @@ async function updateVideoFromAP (options: {
434 }) 436 })
435 437
436 if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated) // Notify our users? 438 if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated) // Notify our users?
439 if (videoUpdated.isLive) PeerTubeSocket.Instance.sendVideoLiveNewState(video)
437 440
438 logger.info('Remote video with uuid %s updated', videoObject.uuid) 441 logger.info('Remote video with uuid %s updated', videoObject.uuid)
439 442
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
new file mode 100644
index 000000000..1a58a9f7e
--- /dev/null
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -0,0 +1,47 @@
1import * as Bull from 'bull'
2import { readdir, remove } from 'fs-extra'
3import { join } from 'path'
4import { getHLSDirectory } from '@server/lib/video-paths'
5import { VideoModel } from '@server/models/video/video'
6import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
7import { VideoLiveEndingPayload } from '@shared/models'
8import { logger } from '../../../helpers/logger'
9
10async function processVideoLiveEnding (job: Bull.Job) {
11 const payload = job.data as VideoLiveEndingPayload
12
13 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId)
14 if (!video) {
15 logger.warn('Video live %d does not exist anymore. Cannot cleanup.', payload.videoId)
16 return
17 }
18
19 const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
20 const hlsDirectory = getHLSDirectory(video, false)
21
22 const files = await readdir(hlsDirectory)
23
24 for (const filename of files) {
25 if (
26 filename.endsWith('.ts') ||
27 filename.endsWith('.m3u8') ||
28 filename.endsWith('.mpd') ||
29 filename.endsWith('.m4s') ||
30 filename.endsWith('.tmp')
31 ) {
32 const p = join(hlsDirectory, filename)
33
34 remove(p)
35 .catch(err => logger.error('Cannot remove %s.', p, { err }))
36 }
37 }
38
39 streamingPlaylist.destroy()
40 .catch(err => logger.error('Cannot remove live streaming playlist.', { err }))
41}
42
43// ---------------------------------------------------------------------------
44
45export {
46 processVideoLiveEnding
47}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 14e181835..8d97434ac 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -10,6 +10,7 @@ import {
10 RefreshPayload, 10 RefreshPayload,
11 VideoFileImportPayload, 11 VideoFileImportPayload,
12 VideoImportPayload, 12 VideoImportPayload,
13 VideoLiveEndingPayload,
13 VideoRedundancyPayload, 14 VideoRedundancyPayload,
14 VideoTranscodingPayload 15 VideoTranscodingPayload
15} from '../../../shared/models' 16} from '../../../shared/models'
@@ -27,6 +28,7 @@ import { processVideosViews } from './handlers/video-views'
27import { refreshAPObject } from './handlers/activitypub-refresher' 28import { refreshAPObject } from './handlers/activitypub-refresher'
28import { processVideoFileImport } from './handlers/video-file-import' 29import { processVideoFileImport } from './handlers/video-file-import'
29import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 30import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
31import { processVideoLiveEnding } from './handlers/video-live-ending'
30 32
31type CreateJobArgument = 33type CreateJobArgument =
32 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 34 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -39,8 +41,13 @@ type CreateJobArgument =
39 { type: 'video-import', payload: VideoImportPayload } | 41 { type: 'video-import', payload: VideoImportPayload } |
40 { type: 'activitypub-refresher', payload: RefreshPayload } | 42 { type: 'activitypub-refresher', payload: RefreshPayload } |
41 { type: 'videos-views', payload: {} } | 43 { type: 'videos-views', payload: {} } |
44 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
42 { type: 'video-redundancy', payload: VideoRedundancyPayload } 45 { type: 'video-redundancy', payload: VideoRedundancyPayload }
43 46
47type CreateJobOptions = {
48 delay?: number
49}
50
44const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { 51const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
45 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 52 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
46 'activitypub-http-unicast': processActivityPubHttpUnicast, 53 'activitypub-http-unicast': processActivityPubHttpUnicast,
@@ -52,6 +59,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
52 'video-import': processVideoImport, 59 'video-import': processVideoImport,
53 'videos-views': processVideosViews, 60 'videos-views': processVideosViews,
54 'activitypub-refresher': refreshAPObject, 61 'activitypub-refresher': refreshAPObject,
62 'video-live-ending': processVideoLiveEnding,
55 'video-redundancy': processVideoRedundancy 63 'video-redundancy': processVideoRedundancy
56} 64}
57 65
@@ -66,7 +74,8 @@ const jobTypes: JobType[] = [
66 'video-import', 74 'video-import',
67 'videos-views', 75 'videos-views',
68 'activitypub-refresher', 76 'activitypub-refresher',
69 'video-redundancy' 77 'video-redundancy',
78 'video-live-ending'
70] 79]
71 80
72class JobQueue { 81class JobQueue {
@@ -122,12 +131,12 @@ class JobQueue {
122 } 131 }
123 } 132 }
124 133
125 createJob (obj: CreateJobArgument): void { 134 createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
126 this.createJobWithPromise(obj) 135 this.createJobWithPromise(obj, options)
127 .catch(err => logger.error('Cannot create job.', { err, obj })) 136 .catch(err => logger.error('Cannot create job.', { err, obj }))
128 } 137 }
129 138
130 createJobWithPromise (obj: CreateJobArgument) { 139 createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
131 const queue = this.queues[obj.type] 140 const queue = this.queues[obj.type]
132 if (queue === undefined) { 141 if (queue === undefined) {
133 logger.error('Unknown queue %s: cannot create job.', obj.type) 142 logger.error('Unknown queue %s: cannot create job.', obj.type)
@@ -137,7 +146,8 @@ class JobQueue {
137 const jobArgs: Bull.JobOptions = { 146 const jobArgs: Bull.JobOptions = {
138 backoff: { delay: 60 * 1000, type: 'exponential' }, 147 backoff: { delay: 60 * 1000, type: 'exponential' },
139 attempts: JOB_ATTEMPTS[obj.type], 148 attempts: JOB_ATTEMPTS[obj.type],
140 timeout: JOB_TTL[obj.type] 149 timeout: JOB_TTL[obj.type],
150 delay: options.delay
141 } 151 }
142 152
143 return queue.add(obj.payload, jobArgs) 153 return queue.add(obj.payload, jobArgs)
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts
index f602bfb6d..41176d197 100644
--- a/server/lib/live-manager.ts
+++ b/server/lib/live-manager.ts
@@ -2,18 +2,22 @@
2import { AsyncQueue, queue } from 'async' 2import { AsyncQueue, queue } from 'async'
3import * as chokidar from 'chokidar' 3import * as chokidar from 'chokidar'
4import { FfmpegCommand } from 'fluent-ffmpeg' 4import { FfmpegCommand } from 'fluent-ffmpeg'
5import { ensureDir, readdir, remove } from 'fs-extra' 5import { ensureDir } from 'fs-extra'
6import { basename, join } from 'path' 6import { basename } from 'path'
7import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' 7import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
8import { logger } from '@server/helpers/logger' 8import { logger } from '@server/helpers/logger'
9import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 9import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
10import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' 10import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
11import { VideoModel } from '@server/models/video/video'
11import { VideoFileModel } from '@server/models/video/video-file' 12import { VideoFileModel } from '@server/models/video/video-file'
12import { VideoLiveModel } from '@server/models/video/video-live' 13import { VideoLiveModel } from '@server/models/video/video-live'
13import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 14import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
14import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models' 15import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models'
15import { VideoState, VideoStreamingPlaylistType } from '@shared/models' 16import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
17import { federateVideoIfNeeded } from './activitypub/videos'
16import { buildSha256Segment } from './hls' 18import { buildSha256Segment } from './hls'
19import { JobQueue } from './job-queue'
20import { PeerTubeSocket } from './peertube-socket'
17import { getHLSDirectory } from './video-paths' 21import { getHLSDirectory } from './video-paths'
18 22
19const NodeRtmpServer = require('node-media-server/node_rtmp_server') 23const NodeRtmpServer = require('node-media-server/node_rtmp_server')
@@ -47,6 +51,7 @@ class LiveManager {
47 private static instance: LiveManager 51 private static instance: LiveManager
48 52
49 private readonly transSessions = new Map<string, FfmpegCommand>() 53 private readonly transSessions = new Map<string, FfmpegCommand>()
54 private readonly videoSessions = new Map<number, string>()
50 private readonly segmentsSha256 = new Map<string, Map<string, string>>() 55 private readonly segmentsSha256 = new Map<string, Map<string, string>>()
51 56
52 private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam> 57 private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
@@ -56,7 +61,8 @@ class LiveManager {
56 } 61 }
57 62
58 init () { 63 init () {
59 this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => { 64 const events = this.getContext().nodeEvent
65 events.on('postPublish', (sessionId: string, streamPath: string) => {
60 logger.debug('RTMP received stream', { id: sessionId, streamPath }) 66 logger.debug('RTMP received stream', { id: sessionId, streamPath })
61 67
62 const splittedPath = streamPath.split('/') 68 const splittedPath = streamPath.split('/')
@@ -69,7 +75,7 @@ class LiveManager {
69 .catch(err => logger.error('Cannot handle sessions.', { err })) 75 .catch(err => logger.error('Cannot handle sessions.', { err }))
70 }) 76 })
71 77
72 this.getContext().nodeEvent.on('donePublish', sessionId => { 78 events.on('donePublish', sessionId => {
73 this.abortSession(sessionId) 79 this.abortSession(sessionId)
74 }) 80 })
75 81
@@ -115,6 +121,16 @@ class LiveManager {
115 return this.segmentsSha256.get(videoUUID) 121 return this.segmentsSha256.get(videoUUID)
116 } 122 }
117 123
124 stopSessionOf (videoId: number) {
125 const sessionId = this.videoSessions.get(videoId)
126 if (!sessionId) return
127
128 this.abortSession(sessionId)
129
130 this.onEndTransmuxing(videoId)
131 .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
132 }
133
118 private getContext () { 134 private getContext () {
119 return context 135 return context
120 } 136 }
@@ -135,6 +151,13 @@ class LiveManager {
135 } 151 }
136 152
137 const video = videoLive.Video 153 const video = videoLive.Video
154 if (video.isBlacklisted()) {
155 logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
156 return this.abortSession(sessionId)
157 }
158
159 this.videoSessions.set(video.id, sessionId)
160
138 const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) 161 const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
139 162
140 const session = this.getContext().sessions.get(sessionId) 163 const session = this.getContext().sessions.get(sessionId)
@@ -154,11 +177,6 @@ class LiveManager {
154 type: VideoStreamingPlaylistType.HLS 177 type: VideoStreamingPlaylistType.HLS
155 }, { returning: true }) as [ MStreamingPlaylist, boolean ] 178 }, { returning: true }) as [ MStreamingPlaylist, boolean ]
156 179
157 video.state = VideoState.PUBLISHED
158 await video.save()
159
160 // FIXME: federation?
161
162 return this.runMuxing({ 180 return this.runMuxing({
163 sessionId, 181 sessionId,
164 videoLive, 182 videoLive,
@@ -207,11 +225,46 @@ class LiveManager {
207 225
208 this.transSessions.set(sessionId, ffmpegExec) 226 this.transSessions.set(sessionId, ffmpegExec)
209 227
228 const videoUUID = videoLive.Video.uuid
229 const tsWatcher = chokidar.watch(outPath + '/*.ts')
230
231 const updateHandler = segmentPath => {
232 this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
233 }
234
235 const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
236
237 tsWatcher.on('add', p => updateHandler(p))
238 tsWatcher.on('change', p => updateHandler(p))
239 tsWatcher.on('unlink', p => deleteHandler(p))
240
241 const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
242 masterWatcher.on('add', async () => {
243 try {
244 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
245
246 video.state = VideoState.PUBLISHED
247 await video.save()
248 videoLive.Video = video
249
250 await federateVideoIfNeeded(video, false)
251
252 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
253 } catch (err) {
254 logger.error('Cannot federate video %d.', videoLive.videoId, { err })
255 } finally {
256 masterWatcher.close()
257 .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
258 }
259 })
260
210 const onFFmpegEnded = () => { 261 const onFFmpegEnded = () => {
211 watcher.close() 262 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath)
212 .catch(err => logger.error('Cannot close watcher of %s.', outPath, { err }))
213 263
214 this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath) 264 Promise.all([ tsWatcher.close(), masterWatcher.close() ])
265 .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
266
267 this.onEndTransmuxing(videoLive.Video.id)
215 .catch(err => logger.error('Error in closed transmuxing.', { err })) 268 .catch(err => logger.error('Error in closed transmuxing.', { err }))
216 } 269 }
217 270
@@ -225,44 +278,30 @@ class LiveManager {
225 }) 278 })
226 279
227 ffmpegExec.on('end', () => onFFmpegEnded()) 280 ffmpegExec.on('end', () => onFFmpegEnded())
228
229 const videoUUID = videoLive.Video.uuid
230 const watcher = chokidar.watch(outPath + '/*.ts')
231
232 const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
233 const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
234
235 watcher.on('add', p => updateHandler(p))
236 watcher.on('change', p => updateHandler(p))
237 watcher.on('unlink', p => deleteHandler(p))
238 } 281 }
239 282
240 private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) { 283 private async onEndTransmuxing (videoId: number) {
241 logger.info('RTMP transmuxing for %s ended.', streamPath) 284 try {
285 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
286 if (!fullVideo) return
242 287
243 const files = await readdir(outPath) 288 JobQueue.Instance.createJob({
289 type: 'video-live-ending',
290 payload: {
291 videoId: fullVideo.id
292 }
293 }, { delay: VIDEO_LIVE.CLEANUP_DELAY })
244 294
245 for (const filename of files) { 295 // FIXME: use end
246 if ( 296 fullVideo.state = VideoState.WAITING_FOR_LIVE
247 filename.endsWith('.ts') || 297 await fullVideo.save()
248 filename.endsWith('.m3u8') ||
249 filename.endsWith('.mpd') ||
250 filename.endsWith('.m4s') ||
251 filename.endsWith('.tmp')
252 ) {
253 const p = join(outPath, filename)
254 298
255 remove(p) 299 PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
256 .catch(err => logger.error('Cannot remove %s.', p, { err }))
257 }
258 }
259 300
260 playlist.destroy() 301 await federateVideoIfNeeded(fullVideo, false)
261 .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) 302 } catch (err) {
262 303 logger.error('Cannot save/federate new video state of live streaming.', { err })
263 video.state = VideoState.LIVE_ENDED 304 }
264 video.save()
265 .catch(err => logger.error('Cannot save new video state of live streaming.', { err }))
266 } 305 }
267 306
268 private async addSegmentSha (options: SegmentSha256QueueParam) { 307 private async addSegmentSha (options: SegmentSha256QueueParam) {
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts
index 2e4b15b38..c918a8685 100644
--- a/server/lib/peertube-socket.ts
+++ b/server/lib/peertube-socket.ts
@@ -1,14 +1,18 @@
1import * as SocketIO from 'socket.io' 1import { Socket } from 'dgram'
2import { authenticateSocket } from '../middlewares'
3import { logger } from '../helpers/logger'
4import { Server } from 'http' 2import { Server } from 'http'
3import * as SocketIO from 'socket.io'
4import { MVideo } from '@server/types/models'
5import { UserNotificationModelForApi } from '@server/types/models/user' 5import { UserNotificationModelForApi } from '@server/types/models/user'
6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
7import { logger } from '../helpers/logger'
8import { authenticateSocket } from '../middlewares'
6 9
7class PeerTubeSocket { 10class PeerTubeSocket {
8 11
9 private static instance: PeerTubeSocket 12 private static instance: PeerTubeSocket
10 13
11 private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {} 14 private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {}
15 private liveVideosNamespace: SocketIO.Namespace
12 16
13 private constructor () {} 17 private constructor () {}
14 18
@@ -32,19 +36,37 @@ class PeerTubeSocket {
32 this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket) 36 this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket)
33 }) 37 })
34 }) 38 })
39
40 this.liveVideosNamespace = io.of('/live-videos')
41 .on('connection', socket => {
42 socket.on('subscribe', ({ videoId }) => socket.join(videoId))
43 socket.on('unsubscribe', ({ videoId }) => socket.leave(videoId))
44 })
35 } 45 }
36 46
37 sendNotification (userId: number, notification: UserNotificationModelForApi) { 47 sendNotification (userId: number, notification: UserNotificationModelForApi) {
38 const sockets = this.userNotificationSockets[userId] 48 const sockets = this.userNotificationSockets[userId]
39
40 if (!sockets) return 49 if (!sockets) return
41 50
51 logger.debug('Sending user notification to user %d.', userId)
52
42 const notificationMessage = notification.toFormattedJSON() 53 const notificationMessage = notification.toFormattedJSON()
43 for (const socket of sockets) { 54 for (const socket of sockets) {
44 socket.emit('new-notification', notificationMessage) 55 socket.emit('new-notification', notificationMessage)
45 } 56 }
46 } 57 }
47 58
59 sendVideoLiveNewState (video: MVideo) {
60 const data: LiveVideoEventPayload = { state: video.state }
61 const type: LiveVideoEventType = 'state-change'
62
63 logger.debug('Sending video live new state notification of %s.', video.url)
64
65 this.liveVideosNamespace
66 .in(video.id)
67 .emit(type, data)
68 }
69
48 static get Instance () { 70 static get Instance () {
49 return this.instance || (this.instance = new this()) 71 return this.instance || (this.instance = new this())
50 } 72 }
diff --git a/server/lib/video-blacklist.ts b/server/lib/video-blacklist.ts
index bdbcffda6..f6c66b6dd 100644
--- a/server/lib/video-blacklist.ts
+++ b/server/lib/video-blacklist.ts
@@ -17,6 +17,7 @@ import { sendDeleteVideo } from './activitypub/send'
17import { federateVideoIfNeeded } from './activitypub/videos' 17import { federateVideoIfNeeded } from './activitypub/videos'
18import { Notifier } from './notifier' 18import { Notifier } from './notifier'
19import { Hooks } from './plugins/hooks' 19import { Hooks } from './plugins/hooks'
20import { LiveManager } from './live-manager'
20 21
21async function autoBlacklistVideoIfNeeded (parameters: { 22async function autoBlacklistVideoIfNeeded (parameters: {
22 video: MVideoWithBlacklistLight 23 video: MVideoWithBlacklistLight
@@ -73,6 +74,10 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video
73 await sendDeleteVideo(videoInstance, undefined) 74 await sendDeleteVideo(videoInstance, undefined)
74 } 75 }
75 76
77 if (videoInstance.isLive) {
78 LiveManager.Instance.stopSessionOf(videoInstance.id)
79 }
80
76 Notifier.Instance.notifyOnVideoBlacklist(blacklist) 81 Notifier.Instance.notifyOnVideoBlacklist(blacklist)
77} 82}
78 83
diff --git a/server/models/video/video-live.ts b/server/models/video/video-live.ts
index 6929b9688..8608bc84c 100644
--- a/server/models/video/video-live.ts
+++ b/server/models/video/video-live.ts
@@ -1,14 +1,21 @@
1import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript' 1import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript'
2import { WEBSERVER } from '@server/initializers/constants' 2import { WEBSERVER } from '@server/initializers/constants'
3import { MVideoLive, MVideoLiveVideo } from '@server/types/models' 3import { MVideoLive, MVideoLiveVideo } from '@server/types/models'
4import { VideoLive } from '@shared/models/videos/video-live.model' 4import { LiveVideo, VideoState } from '@shared/models'
5import { VideoModel } from './video' 5import { VideoModel } from './video'
6import { VideoBlacklistModel } from './video-blacklist'
6 7
7@DefaultScope(() => ({ 8@DefaultScope(() => ({
8 include: [ 9 include: [
9 { 10 {
10 model: VideoModel, 11 model: VideoModel,
11 required: true 12 required: true,
13 include: [
14 {
15 model: VideoBlacklistModel,
16 required: false
17 }
18 ]
12 } 19 }
13 ] 20 ]
14})) 21}))
@@ -49,7 +56,22 @@ export class VideoLiveModel extends Model<VideoLiveModel> {
49 const query = { 56 const query = {
50 where: { 57 where: {
51 streamKey 58 streamKey
52 } 59 },
60 include: [
61 {
62 model: VideoModel.unscoped(),
63 required: true,
64 where: {
65 state: VideoState.WAITING_FOR_LIVE
66 },
67 include: [
68 {
69 model: VideoBlacklistModel.unscoped(),
70 required: false
71 }
72 ]
73 }
74 ]
53 } 75 }
54 76
55 return VideoLiveModel.findOne<MVideoLiveVideo>(query) 77 return VideoLiveModel.findOne<MVideoLiveVideo>(query)
@@ -65,7 +87,7 @@ export class VideoLiveModel extends Model<VideoLiveModel> {
65 return VideoLiveModel.findOne<MVideoLive>(query) 87 return VideoLiveModel.findOne<MVideoLive>(query)
66 } 88 }
67 89
68 toFormattedJSON (): VideoLive { 90 toFormattedJSON (): LiveVideo {
69 return { 91 return {
70 rtmpUrl: WEBSERVER.RTMP_URL, 92 rtmpUrl: WEBSERVER.RTMP_URL,
71 streamKey: this.streamKey 93 streamKey: this.streamKey
diff --git a/server/models/video/video-streaming-playlist.ts b/server/models/video/video-streaming-playlist.ts
index b8dc7c450..73bd89844 100644
--- a/server/models/video/video-streaming-playlist.ts
+++ b/server/models/video/video-streaming-playlist.ts
@@ -153,6 +153,17 @@ export class VideoStreamingPlaylistModel extends Model<VideoStreamingPlaylistMod
153 return VideoStreamingPlaylistModel.findByPk(id, options) 153 return VideoStreamingPlaylistModel.findByPk(id, options)
154 } 154 }
155 155
156 static loadHLSPlaylistByVideo (videoId: number) {
157 const options = {
158 where: {
159 type: VideoStreamingPlaylistType.HLS,
160 videoId
161 }
162 }
163
164 return VideoStreamingPlaylistModel.findOne(options)
165 }
166
156 static getHlsPlaylistFilename (resolution: number) { 167 static getHlsPlaylistFilename (resolution: number) {
157 return resolution + '.m3u8' 168 return resolution + '.m3u8'
158 } 169 }
diff --git a/server/models/video/video.ts b/server/models/video/video.ts
index a3e3b6cfe..8493ab802 100644
--- a/server/models/video/video.ts
+++ b/server/models/video/video.ts
@@ -127,6 +127,7 @@ import { VideoShareModel } from './video-share'
127import { VideoStreamingPlaylistModel } from './video-streaming-playlist' 127import { VideoStreamingPlaylistModel } from './video-streaming-playlist'
128import { VideoTagModel } from './video-tag' 128import { VideoTagModel } from './video-tag'
129import { VideoViewModel } from './video-view' 129import { VideoViewModel } from './video-view'
130import { LiveManager } from '@server/lib/live-manager'
130 131
131export enum ScopeNames { 132export enum ScopeNames {
132 AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS', 133 AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS',
@@ -800,6 +801,13 @@ export class VideoModel extends Model<VideoModel> {
800 } 801 }
801 802
802 @BeforeDestroy 803 @BeforeDestroy
804 static stopLiveIfNeeded (instance: VideoModel) {
805 if (!instance.isLive) return
806
807 return LiveManager.Instance.stopSessionOf(instance.id)
808 }
809
810 @BeforeDestroy
803 static invalidateCache (instance: VideoModel) { 811 static invalidateCache (instance: VideoModel) {
804 ModelCache.Instance.invalidateCache('video', instance.id) 812 ModelCache.Instance.invalidateCache('video', instance.id)
805 } 813 }