diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/initializers/constants.ts | 10 | ||||
-rw-r--r-- | server/lib/activitypub/videos.ts | 3 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-live-ending.ts | 47 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 20 | ||||
-rw-r--r-- | server/lib/live-manager.ts | 129 | ||||
-rw-r--r-- | server/lib/peertube-socket.ts | 30 | ||||
-rw-r--r-- | server/lib/video-blacklist.ts | 5 | ||||
-rw-r--r-- | server/models/video/video-live.ts | 30 | ||||
-rw-r--r-- | server/models/video/video-streaming-playlist.ts | 11 | ||||
-rw-r--r-- | server/models/video/video.ts | 8 |
10 files changed, 232 insertions, 61 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 606eeba2d..82d04a94e 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -139,7 +139,8 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { | |||
139 | 'email': 5, | 139 | 'email': 5, |
140 | 'videos-views': 1, | 140 | 'videos-views': 1, |
141 | 'activitypub-refresher': 1, | 141 | 'activitypub-refresher': 1, |
142 | 'video-redundancy': 1 | 142 | 'video-redundancy': 1, |
143 | 'video-live-ending': 1 | ||
143 | } | 144 | } |
144 | const JOB_CONCURRENCY: { [id in JobType]: number } = { | 145 | const JOB_CONCURRENCY: { [id in JobType]: number } = { |
145 | 'activitypub-http-broadcast': 1, | 146 | 'activitypub-http-broadcast': 1, |
@@ -152,7 +153,8 @@ const JOB_CONCURRENCY: { [id in JobType]: number } = { | |||
152 | 'email': 5, | 153 | 'email': 5, |
153 | 'videos-views': 1, | 154 | 'videos-views': 1, |
154 | 'activitypub-refresher': 1, | 155 | 'activitypub-refresher': 1, |
155 | 'video-redundancy': 1 | 156 | 'video-redundancy': 1, |
157 | 'video-live-ending': 1 | ||
156 | } | 158 | } |
157 | const JOB_TTL: { [id in JobType]: number } = { | 159 | const JOB_TTL: { [id in JobType]: number } = { |
158 | 'activitypub-http-broadcast': 60000 * 10, // 10 minutes | 160 | 'activitypub-http-broadcast': 60000 * 10, // 10 minutes |
@@ -165,7 +167,8 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
165 | 'email': 60000 * 10, // 10 minutes | 167 | 'email': 60000 * 10, // 10 minutes |
166 | 'videos-views': undefined, // Unlimited | 168 | 'videos-views': undefined, // Unlimited |
167 | 'activitypub-refresher': 60000 * 10, // 10 minutes | 169 | 'activitypub-refresher': 60000 * 10, // 10 minutes |
168 | 'video-redundancy': 1000 * 3600 * 3 // 3 hours | 170 | 'video-redundancy': 1000 * 3600 * 3, // 3 hours |
171 | 'video-live-ending': 1000 * 60 * 10 // 10 minutes | ||
169 | } | 172 | } |
170 | const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { | 173 | const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { |
171 | 'videos-views': { | 174 | 'videos-views': { |
@@ -605,6 +608,7 @@ const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls') | |||
605 | 608 | ||
606 | const VIDEO_LIVE = { | 609 | const VIDEO_LIVE = { |
607 | EXTENSION: '.ts', | 610 | EXTENSION: '.ts', |
611 | CLEANUP_DELAY: 1000 * 60 * 5, // 5 mintues | ||
608 | RTMP: { | 612 | RTMP: { |
609 | CHUNK_SIZE: 60000, | 613 | CHUNK_SIZE: 60000, |
610 | GOP_CACHE: true, | 614 | GOP_CACHE: true, |
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index 049e06cff..ab23ff507 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts | |||
@@ -66,6 +66,7 @@ import { FilteredModelAttributes } from '../../types/sequelize' | |||
66 | import { ActorFollowScoreCache } from '../files-cache' | 66 | import { ActorFollowScoreCache } from '../files-cache' |
67 | import { JobQueue } from '../job-queue' | 67 | import { JobQueue } from '../job-queue' |
68 | import { Notifier } from '../notifier' | 68 | import { Notifier } from '../notifier' |
69 | import { PeerTubeSocket } from '../peertube-socket' | ||
69 | import { createPlaceholderThumbnail, createVideoMiniatureFromUrl } from '../thumbnail' | 70 | import { createPlaceholderThumbnail, createVideoMiniatureFromUrl } from '../thumbnail' |
70 | import { setVideoTags } from '../video' | 71 | import { setVideoTags } from '../video' |
71 | import { autoBlacklistVideoIfNeeded } from '../video-blacklist' | 72 | import { autoBlacklistVideoIfNeeded } from '../video-blacklist' |
@@ -348,6 +349,7 @@ async function updateVideoFromAP (options: { | |||
348 | video.privacy = videoData.privacy | 349 | video.privacy = videoData.privacy |
349 | video.channelId = videoData.channelId | 350 | video.channelId = videoData.channelId |
350 | video.views = videoData.views | 351 | video.views = videoData.views |
352 | video.isLive = videoData.isLive | ||
351 | 353 | ||
352 | const videoUpdated = await video.save(sequelizeOptions) as MVideoFullLight | 354 | const videoUpdated = await video.save(sequelizeOptions) as MVideoFullLight |
353 | 355 | ||
@@ -434,6 +436,7 @@ async function updateVideoFromAP (options: { | |||
434 | }) | 436 | }) |
435 | 437 | ||
436 | if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated) // Notify our users? | 438 | if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated) // Notify our users? |
439 | if (videoUpdated.isLive) PeerTubeSocket.Instance.sendVideoLiveNewState(video) | ||
437 | 440 | ||
438 | logger.info('Remote video with uuid %s updated', videoObject.uuid) | 441 | logger.info('Remote video with uuid %s updated', videoObject.uuid) |
439 | 442 | ||
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts new file mode 100644 index 000000000..1a58a9f7e --- /dev/null +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -0,0 +1,47 @@ | |||
1 | import * as Bull from 'bull' | ||
2 | import { readdir, remove } from 'fs-extra' | ||
3 | import { join } from 'path' | ||
4 | import { getHLSDirectory } from '@server/lib/video-paths' | ||
5 | import { VideoModel } from '@server/models/video/video' | ||
6 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | ||
7 | import { VideoLiveEndingPayload } from '@shared/models' | ||
8 | import { logger } from '../../../helpers/logger' | ||
9 | |||
10 | async function processVideoLiveEnding (job: Bull.Job) { | ||
11 | const payload = job.data as VideoLiveEndingPayload | ||
12 | |||
13 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) | ||
14 | if (!video) { | ||
15 | logger.warn('Video live %d does not exist anymore. Cannot cleanup.', payload.videoId) | ||
16 | return | ||
17 | } | ||
18 | |||
19 | const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) | ||
20 | const hlsDirectory = getHLSDirectory(video, false) | ||
21 | |||
22 | const files = await readdir(hlsDirectory) | ||
23 | |||
24 | for (const filename of files) { | ||
25 | if ( | ||
26 | filename.endsWith('.ts') || | ||
27 | filename.endsWith('.m3u8') || | ||
28 | filename.endsWith('.mpd') || | ||
29 | filename.endsWith('.m4s') || | ||
30 | filename.endsWith('.tmp') | ||
31 | ) { | ||
32 | const p = join(hlsDirectory, filename) | ||
33 | |||
34 | remove(p) | ||
35 | .catch(err => logger.error('Cannot remove %s.', p, { err })) | ||
36 | } | ||
37 | } | ||
38 | |||
39 | streamingPlaylist.destroy() | ||
40 | .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) | ||
41 | } | ||
42 | |||
43 | // --------------------------------------------------------------------------- | ||
44 | |||
45 | export { | ||
46 | processVideoLiveEnding | ||
47 | } | ||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 14e181835..8d97434ac 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -10,6 +10,7 @@ import { | |||
10 | RefreshPayload, | 10 | RefreshPayload, |
11 | VideoFileImportPayload, | 11 | VideoFileImportPayload, |
12 | VideoImportPayload, | 12 | VideoImportPayload, |
13 | VideoLiveEndingPayload, | ||
13 | VideoRedundancyPayload, | 14 | VideoRedundancyPayload, |
14 | VideoTranscodingPayload | 15 | VideoTranscodingPayload |
15 | } from '../../../shared/models' | 16 | } from '../../../shared/models' |
@@ -27,6 +28,7 @@ import { processVideosViews } from './handlers/video-views' | |||
27 | import { refreshAPObject } from './handlers/activitypub-refresher' | 28 | import { refreshAPObject } from './handlers/activitypub-refresher' |
28 | import { processVideoFileImport } from './handlers/video-file-import' | 29 | import { processVideoFileImport } from './handlers/video-file-import' |
29 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 30 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
31 | import { processVideoLiveEnding } from './handlers/video-live-ending' | ||
30 | 32 | ||
31 | type CreateJobArgument = | 33 | type CreateJobArgument = |
32 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 34 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -39,8 +41,13 @@ type CreateJobArgument = | |||
39 | { type: 'video-import', payload: VideoImportPayload } | | 41 | { type: 'video-import', payload: VideoImportPayload } | |
40 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 42 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
41 | { type: 'videos-views', payload: {} } | | 43 | { type: 'videos-views', payload: {} } | |
44 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | ||
42 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | 45 | { type: 'video-redundancy', payload: VideoRedundancyPayload } |
43 | 46 | ||
47 | type CreateJobOptions = { | ||
48 | delay?: number | ||
49 | } | ||
50 | |||
44 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | 51 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { |
45 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 52 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
46 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 53 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
@@ -52,6 +59,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | |||
52 | 'video-import': processVideoImport, | 59 | 'video-import': processVideoImport, |
53 | 'videos-views': processVideosViews, | 60 | 'videos-views': processVideosViews, |
54 | 'activitypub-refresher': refreshAPObject, | 61 | 'activitypub-refresher': refreshAPObject, |
62 | 'video-live-ending': processVideoLiveEnding, | ||
55 | 'video-redundancy': processVideoRedundancy | 63 | 'video-redundancy': processVideoRedundancy |
56 | } | 64 | } |
57 | 65 | ||
@@ -66,7 +74,8 @@ const jobTypes: JobType[] = [ | |||
66 | 'video-import', | 74 | 'video-import', |
67 | 'videos-views', | 75 | 'videos-views', |
68 | 'activitypub-refresher', | 76 | 'activitypub-refresher', |
69 | 'video-redundancy' | 77 | 'video-redundancy', |
78 | 'video-live-ending' | ||
70 | ] | 79 | ] |
71 | 80 | ||
72 | class JobQueue { | 81 | class JobQueue { |
@@ -122,12 +131,12 @@ class JobQueue { | |||
122 | } | 131 | } |
123 | } | 132 | } |
124 | 133 | ||
125 | createJob (obj: CreateJobArgument): void { | 134 | createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { |
126 | this.createJobWithPromise(obj) | 135 | this.createJobWithPromise(obj, options) |
127 | .catch(err => logger.error('Cannot create job.', { err, obj })) | 136 | .catch(err => logger.error('Cannot create job.', { err, obj })) |
128 | } | 137 | } |
129 | 138 | ||
130 | createJobWithPromise (obj: CreateJobArgument) { | 139 | createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { |
131 | const queue = this.queues[obj.type] | 140 | const queue = this.queues[obj.type] |
132 | if (queue === undefined) { | 141 | if (queue === undefined) { |
133 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 142 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
@@ -137,7 +146,8 @@ class JobQueue { | |||
137 | const jobArgs: Bull.JobOptions = { | 146 | const jobArgs: Bull.JobOptions = { |
138 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 147 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
139 | attempts: JOB_ATTEMPTS[obj.type], | 148 | attempts: JOB_ATTEMPTS[obj.type], |
140 | timeout: JOB_TTL[obj.type] | 149 | timeout: JOB_TTL[obj.type], |
150 | delay: options.delay | ||
141 | } | 151 | } |
142 | 152 | ||
143 | return queue.add(obj.payload, jobArgs) | 153 | return queue.add(obj.payload, jobArgs) |
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index f602bfb6d..41176d197 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts | |||
@@ -2,18 +2,22 @@ | |||
2 | import { AsyncQueue, queue } from 'async' | 2 | import { AsyncQueue, queue } from 'async' |
3 | import * as chokidar from 'chokidar' | 3 | import * as chokidar from 'chokidar' |
4 | import { FfmpegCommand } from 'fluent-ffmpeg' | 4 | import { FfmpegCommand } from 'fluent-ffmpeg' |
5 | import { ensureDir, readdir, remove } from 'fs-extra' | 5 | import { ensureDir } from 'fs-extra' |
6 | import { basename, join } from 'path' | 6 | import { basename } from 'path' |
7 | import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' | 7 | import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' |
8 | import { logger } from '@server/helpers/logger' | 8 | import { logger } from '@server/helpers/logger' |
9 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | 9 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' |
10 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' | 10 | import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' |
11 | import { VideoModel } from '@server/models/video/video' | ||
11 | import { VideoFileModel } from '@server/models/video/video-file' | 12 | import { VideoFileModel } from '@server/models/video/video-file' |
12 | import { VideoLiveModel } from '@server/models/video/video-live' | 13 | import { VideoLiveModel } from '@server/models/video/video-live' |
13 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 14 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
14 | import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models' | 15 | import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models' |
15 | import { VideoState, VideoStreamingPlaylistType } from '@shared/models' | 16 | import { VideoState, VideoStreamingPlaylistType } from '@shared/models' |
17 | import { federateVideoIfNeeded } from './activitypub/videos' | ||
16 | import { buildSha256Segment } from './hls' | 18 | import { buildSha256Segment } from './hls' |
19 | import { JobQueue } from './job-queue' | ||
20 | import { PeerTubeSocket } from './peertube-socket' | ||
17 | import { getHLSDirectory } from './video-paths' | 21 | import { getHLSDirectory } from './video-paths' |
18 | 22 | ||
19 | const NodeRtmpServer = require('node-media-server/node_rtmp_server') | 23 | const NodeRtmpServer = require('node-media-server/node_rtmp_server') |
@@ -47,6 +51,7 @@ class LiveManager { | |||
47 | private static instance: LiveManager | 51 | private static instance: LiveManager |
48 | 52 | ||
49 | private readonly transSessions = new Map<string, FfmpegCommand>() | 53 | private readonly transSessions = new Map<string, FfmpegCommand>() |
54 | private readonly videoSessions = new Map<number, string>() | ||
50 | private readonly segmentsSha256 = new Map<string, Map<string, string>>() | 55 | private readonly segmentsSha256 = new Map<string, Map<string, string>>() |
51 | 56 | ||
52 | private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam> | 57 | private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam> |
@@ -56,7 +61,8 @@ class LiveManager { | |||
56 | } | 61 | } |
57 | 62 | ||
58 | init () { | 63 | init () { |
59 | this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => { | 64 | const events = this.getContext().nodeEvent |
65 | events.on('postPublish', (sessionId: string, streamPath: string) => { | ||
60 | logger.debug('RTMP received stream', { id: sessionId, streamPath }) | 66 | logger.debug('RTMP received stream', { id: sessionId, streamPath }) |
61 | 67 | ||
62 | const splittedPath = streamPath.split('/') | 68 | const splittedPath = streamPath.split('/') |
@@ -69,7 +75,7 @@ class LiveManager { | |||
69 | .catch(err => logger.error('Cannot handle sessions.', { err })) | 75 | .catch(err => logger.error('Cannot handle sessions.', { err })) |
70 | }) | 76 | }) |
71 | 77 | ||
72 | this.getContext().nodeEvent.on('donePublish', sessionId => { | 78 | events.on('donePublish', sessionId => { |
73 | this.abortSession(sessionId) | 79 | this.abortSession(sessionId) |
74 | }) | 80 | }) |
75 | 81 | ||
@@ -115,6 +121,16 @@ class LiveManager { | |||
115 | return this.segmentsSha256.get(videoUUID) | 121 | return this.segmentsSha256.get(videoUUID) |
116 | } | 122 | } |
117 | 123 | ||
124 | stopSessionOf (videoId: number) { | ||
125 | const sessionId = this.videoSessions.get(videoId) | ||
126 | if (!sessionId) return | ||
127 | |||
128 | this.abortSession(sessionId) | ||
129 | |||
130 | this.onEndTransmuxing(videoId) | ||
131 | .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err })) | ||
132 | } | ||
133 | |||
118 | private getContext () { | 134 | private getContext () { |
119 | return context | 135 | return context |
120 | } | 136 | } |
@@ -135,6 +151,13 @@ class LiveManager { | |||
135 | } | 151 | } |
136 | 152 | ||
137 | const video = videoLive.Video | 153 | const video = videoLive.Video |
154 | if (video.isBlacklisted()) { | ||
155 | logger.warn('Video is blacklisted. Refusing stream %s.', streamKey) | ||
156 | return this.abortSession(sessionId) | ||
157 | } | ||
158 | |||
159 | this.videoSessions.set(video.id, sessionId) | ||
160 | |||
138 | const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) | 161 | const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) |
139 | 162 | ||
140 | const session = this.getContext().sessions.get(sessionId) | 163 | const session = this.getContext().sessions.get(sessionId) |
@@ -154,11 +177,6 @@ class LiveManager { | |||
154 | type: VideoStreamingPlaylistType.HLS | 177 | type: VideoStreamingPlaylistType.HLS |
155 | }, { returning: true }) as [ MStreamingPlaylist, boolean ] | 178 | }, { returning: true }) as [ MStreamingPlaylist, boolean ] |
156 | 179 | ||
157 | video.state = VideoState.PUBLISHED | ||
158 | await video.save() | ||
159 | |||
160 | // FIXME: federation? | ||
161 | |||
162 | return this.runMuxing({ | 180 | return this.runMuxing({ |
163 | sessionId, | 181 | sessionId, |
164 | videoLive, | 182 | videoLive, |
@@ -207,11 +225,46 @@ class LiveManager { | |||
207 | 225 | ||
208 | this.transSessions.set(sessionId, ffmpegExec) | 226 | this.transSessions.set(sessionId, ffmpegExec) |
209 | 227 | ||
228 | const videoUUID = videoLive.Video.uuid | ||
229 | const tsWatcher = chokidar.watch(outPath + '/*.ts') | ||
230 | |||
231 | const updateHandler = segmentPath => { | ||
232 | this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) | ||
233 | } | ||
234 | |||
235 | const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) | ||
236 | |||
237 | tsWatcher.on('add', p => updateHandler(p)) | ||
238 | tsWatcher.on('change', p => updateHandler(p)) | ||
239 | tsWatcher.on('unlink', p => deleteHandler(p)) | ||
240 | |||
241 | const masterWatcher = chokidar.watch(outPath + '/master.m3u8') | ||
242 | masterWatcher.on('add', async () => { | ||
243 | try { | ||
244 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId) | ||
245 | |||
246 | video.state = VideoState.PUBLISHED | ||
247 | await video.save() | ||
248 | videoLive.Video = video | ||
249 | |||
250 | await federateVideoIfNeeded(video, false) | ||
251 | |||
252 | PeerTubeSocket.Instance.sendVideoLiveNewState(video) | ||
253 | } catch (err) { | ||
254 | logger.error('Cannot federate video %d.', videoLive.videoId, { err }) | ||
255 | } finally { | ||
256 | masterWatcher.close() | ||
257 | .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err })) | ||
258 | } | ||
259 | }) | ||
260 | |||
210 | const onFFmpegEnded = () => { | 261 | const onFFmpegEnded = () => { |
211 | watcher.close() | 262 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath) |
212 | .catch(err => logger.error('Cannot close watcher of %s.', outPath, { err })) | ||
213 | 263 | ||
214 | this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath) | 264 | Promise.all([ tsWatcher.close(), masterWatcher.close() ]) |
265 | .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) | ||
266 | |||
267 | this.onEndTransmuxing(videoLive.Video.id) | ||
215 | .catch(err => logger.error('Error in closed transmuxing.', { err })) | 268 | .catch(err => logger.error('Error in closed transmuxing.', { err })) |
216 | } | 269 | } |
217 | 270 | ||
@@ -225,44 +278,30 @@ class LiveManager { | |||
225 | }) | 278 | }) |
226 | 279 | ||
227 | ffmpegExec.on('end', () => onFFmpegEnded()) | 280 | ffmpegExec.on('end', () => onFFmpegEnded()) |
228 | |||
229 | const videoUUID = videoLive.Video.uuid | ||
230 | const watcher = chokidar.watch(outPath + '/*.ts') | ||
231 | |||
232 | const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) | ||
233 | const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) | ||
234 | |||
235 | watcher.on('add', p => updateHandler(p)) | ||
236 | watcher.on('change', p => updateHandler(p)) | ||
237 | watcher.on('unlink', p => deleteHandler(p)) | ||
238 | } | 281 | } |
239 | 282 | ||
240 | private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) { | 283 | private async onEndTransmuxing (videoId: number) { |
241 | logger.info('RTMP transmuxing for %s ended.', streamPath) | 284 | try { |
285 | const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | ||
286 | if (!fullVideo) return | ||
242 | 287 | ||
243 | const files = await readdir(outPath) | 288 | JobQueue.Instance.createJob({ |
289 | type: 'video-live-ending', | ||
290 | payload: { | ||
291 | videoId: fullVideo.id | ||
292 | } | ||
293 | }, { delay: VIDEO_LIVE.CLEANUP_DELAY }) | ||
244 | 294 | ||
245 | for (const filename of files) { | 295 | // FIXME: use end |
246 | if ( | 296 | fullVideo.state = VideoState.WAITING_FOR_LIVE |
247 | filename.endsWith('.ts') || | 297 | await fullVideo.save() |
248 | filename.endsWith('.m3u8') || | ||
249 | filename.endsWith('.mpd') || | ||
250 | filename.endsWith('.m4s') || | ||
251 | filename.endsWith('.tmp') | ||
252 | ) { | ||
253 | const p = join(outPath, filename) | ||
254 | 298 | ||
255 | remove(p) | 299 | PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) |
256 | .catch(err => logger.error('Cannot remove %s.', p, { err })) | ||
257 | } | ||
258 | } | ||
259 | 300 | ||
260 | playlist.destroy() | 301 | await federateVideoIfNeeded(fullVideo, false) |
261 | .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) | 302 | } catch (err) { |
262 | 303 | logger.error('Cannot save/federate new video state of live streaming.', { err }) | |
263 | video.state = VideoState.LIVE_ENDED | 304 | } |
264 | video.save() | ||
265 | .catch(err => logger.error('Cannot save new video state of live streaming.', { err })) | ||
266 | } | 305 | } |
267 | 306 | ||
268 | private async addSegmentSha (options: SegmentSha256QueueParam) { | 307 | private async addSegmentSha (options: SegmentSha256QueueParam) { |
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 2e4b15b38..c918a8685 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts | |||
@@ -1,14 +1,18 @@ | |||
1 | import * as SocketIO from 'socket.io' | 1 | import { Socket } from 'dgram' |
2 | import { authenticateSocket } from '../middlewares' | ||
3 | import { logger } from '../helpers/logger' | ||
4 | import { Server } from 'http' | 2 | import { Server } from 'http' |
3 | import * as SocketIO from 'socket.io' | ||
4 | import { MVideo } from '@server/types/models' | ||
5 | import { UserNotificationModelForApi } from '@server/types/models/user' | 5 | import { UserNotificationModelForApi } from '@server/types/models/user' |
6 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' | ||
7 | import { logger } from '../helpers/logger' | ||
8 | import { authenticateSocket } from '../middlewares' | ||
6 | 9 | ||
7 | class PeerTubeSocket { | 10 | class PeerTubeSocket { |
8 | 11 | ||
9 | private static instance: PeerTubeSocket | 12 | private static instance: PeerTubeSocket |
10 | 13 | ||
11 | private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {} | 14 | private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {} |
15 | private liveVideosNamespace: SocketIO.Namespace | ||
12 | 16 | ||
13 | private constructor () {} | 17 | private constructor () {} |
14 | 18 | ||
@@ -32,19 +36,37 @@ class PeerTubeSocket { | |||
32 | this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket) | 36 | this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket) |
33 | }) | 37 | }) |
34 | }) | 38 | }) |
39 | |||
40 | this.liveVideosNamespace = io.of('/live-videos') | ||
41 | .on('connection', socket => { | ||
42 | socket.on('subscribe', ({ videoId }) => socket.join(videoId)) | ||
43 | socket.on('unsubscribe', ({ videoId }) => socket.leave(videoId)) | ||
44 | }) | ||
35 | } | 45 | } |
36 | 46 | ||
37 | sendNotification (userId: number, notification: UserNotificationModelForApi) { | 47 | sendNotification (userId: number, notification: UserNotificationModelForApi) { |
38 | const sockets = this.userNotificationSockets[userId] | 48 | const sockets = this.userNotificationSockets[userId] |
39 | |||
40 | if (!sockets) return | 49 | if (!sockets) return |
41 | 50 | ||
51 | logger.debug('Sending user notification to user %d.', userId) | ||
52 | |||
42 | const notificationMessage = notification.toFormattedJSON() | 53 | const notificationMessage = notification.toFormattedJSON() |
43 | for (const socket of sockets) { | 54 | for (const socket of sockets) { |
44 | socket.emit('new-notification', notificationMessage) | 55 | socket.emit('new-notification', notificationMessage) |
45 | } | 56 | } |
46 | } | 57 | } |
47 | 58 | ||
59 | sendVideoLiveNewState (video: MVideo) { | ||
60 | const data: LiveVideoEventPayload = { state: video.state } | ||
61 | const type: LiveVideoEventType = 'state-change' | ||
62 | |||
63 | logger.debug('Sending video live new state notification of %s.', video.url) | ||
64 | |||
65 | this.liveVideosNamespace | ||
66 | .in(video.id) | ||
67 | .emit(type, data) | ||
68 | } | ||
69 | |||
48 | static get Instance () { | 70 | static get Instance () { |
49 | return this.instance || (this.instance = new this()) | 71 | return this.instance || (this.instance = new this()) |
50 | } | 72 | } |
diff --git a/server/lib/video-blacklist.ts b/server/lib/video-blacklist.ts index bdbcffda6..f6c66b6dd 100644 --- a/server/lib/video-blacklist.ts +++ b/server/lib/video-blacklist.ts | |||
@@ -17,6 +17,7 @@ import { sendDeleteVideo } from './activitypub/send' | |||
17 | import { federateVideoIfNeeded } from './activitypub/videos' | 17 | import { federateVideoIfNeeded } from './activitypub/videos' |
18 | import { Notifier } from './notifier' | 18 | import { Notifier } from './notifier' |
19 | import { Hooks } from './plugins/hooks' | 19 | import { Hooks } from './plugins/hooks' |
20 | import { LiveManager } from './live-manager' | ||
20 | 21 | ||
21 | async function autoBlacklistVideoIfNeeded (parameters: { | 22 | async function autoBlacklistVideoIfNeeded (parameters: { |
22 | video: MVideoWithBlacklistLight | 23 | video: MVideoWithBlacklistLight |
@@ -73,6 +74,10 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video | |||
73 | await sendDeleteVideo(videoInstance, undefined) | 74 | await sendDeleteVideo(videoInstance, undefined) |
74 | } | 75 | } |
75 | 76 | ||
77 | if (videoInstance.isLive) { | ||
78 | LiveManager.Instance.stopSessionOf(videoInstance.id) | ||
79 | } | ||
80 | |||
76 | Notifier.Instance.notifyOnVideoBlacklist(blacklist) | 81 | Notifier.Instance.notifyOnVideoBlacklist(blacklist) |
77 | } | 82 | } |
78 | 83 | ||
diff --git a/server/models/video/video-live.ts b/server/models/video/video-live.ts index 6929b9688..8608bc84c 100644 --- a/server/models/video/video-live.ts +++ b/server/models/video/video-live.ts | |||
@@ -1,14 +1,21 @@ | |||
1 | import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript' | 1 | import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript' |
2 | import { WEBSERVER } from '@server/initializers/constants' | 2 | import { WEBSERVER } from '@server/initializers/constants' |
3 | import { MVideoLive, MVideoLiveVideo } from '@server/types/models' | 3 | import { MVideoLive, MVideoLiveVideo } from '@server/types/models' |
4 | import { VideoLive } from '@shared/models/videos/video-live.model' | 4 | import { LiveVideo, VideoState } from '@shared/models' |
5 | import { VideoModel } from './video' | 5 | import { VideoModel } from './video' |
6 | import { VideoBlacklistModel } from './video-blacklist' | ||
6 | 7 | ||
7 | @DefaultScope(() => ({ | 8 | @DefaultScope(() => ({ |
8 | include: [ | 9 | include: [ |
9 | { | 10 | { |
10 | model: VideoModel, | 11 | model: VideoModel, |
11 | required: true | 12 | required: true, |
13 | include: [ | ||
14 | { | ||
15 | model: VideoBlacklistModel, | ||
16 | required: false | ||
17 | } | ||
18 | ] | ||
12 | } | 19 | } |
13 | ] | 20 | ] |
14 | })) | 21 | })) |
@@ -49,7 +56,22 @@ export class VideoLiveModel extends Model<VideoLiveModel> { | |||
49 | const query = { | 56 | const query = { |
50 | where: { | 57 | where: { |
51 | streamKey | 58 | streamKey |
52 | } | 59 | }, |
60 | include: [ | ||
61 | { | ||
62 | model: VideoModel.unscoped(), | ||
63 | required: true, | ||
64 | where: { | ||
65 | state: VideoState.WAITING_FOR_LIVE | ||
66 | }, | ||
67 | include: [ | ||
68 | { | ||
69 | model: VideoBlacklistModel.unscoped(), | ||
70 | required: false | ||
71 | } | ||
72 | ] | ||
73 | } | ||
74 | ] | ||
53 | } | 75 | } |
54 | 76 | ||
55 | return VideoLiveModel.findOne<MVideoLiveVideo>(query) | 77 | return VideoLiveModel.findOne<MVideoLiveVideo>(query) |
@@ -65,7 +87,7 @@ export class VideoLiveModel extends Model<VideoLiveModel> { | |||
65 | return VideoLiveModel.findOne<MVideoLive>(query) | 87 | return VideoLiveModel.findOne<MVideoLive>(query) |
66 | } | 88 | } |
67 | 89 | ||
68 | toFormattedJSON (): VideoLive { | 90 | toFormattedJSON (): LiveVideo { |
69 | return { | 91 | return { |
70 | rtmpUrl: WEBSERVER.RTMP_URL, | 92 | rtmpUrl: WEBSERVER.RTMP_URL, |
71 | streamKey: this.streamKey | 93 | streamKey: this.streamKey |
diff --git a/server/models/video/video-streaming-playlist.ts b/server/models/video/video-streaming-playlist.ts index b8dc7c450..73bd89844 100644 --- a/server/models/video/video-streaming-playlist.ts +++ b/server/models/video/video-streaming-playlist.ts | |||
@@ -153,6 +153,17 @@ export class VideoStreamingPlaylistModel extends Model<VideoStreamingPlaylistMod | |||
153 | return VideoStreamingPlaylistModel.findByPk(id, options) | 153 | return VideoStreamingPlaylistModel.findByPk(id, options) |
154 | } | 154 | } |
155 | 155 | ||
156 | static loadHLSPlaylistByVideo (videoId: number) { | ||
157 | const options = { | ||
158 | where: { | ||
159 | type: VideoStreamingPlaylistType.HLS, | ||
160 | videoId | ||
161 | } | ||
162 | } | ||
163 | |||
164 | return VideoStreamingPlaylistModel.findOne(options) | ||
165 | } | ||
166 | |||
156 | static getHlsPlaylistFilename (resolution: number) { | 167 | static getHlsPlaylistFilename (resolution: number) { |
157 | return resolution + '.m3u8' | 168 | return resolution + '.m3u8' |
158 | } | 169 | } |
diff --git a/server/models/video/video.ts b/server/models/video/video.ts index a3e3b6cfe..8493ab802 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts | |||
@@ -127,6 +127,7 @@ import { VideoShareModel } from './video-share' | |||
127 | import { VideoStreamingPlaylistModel } from './video-streaming-playlist' | 127 | import { VideoStreamingPlaylistModel } from './video-streaming-playlist' |
128 | import { VideoTagModel } from './video-tag' | 128 | import { VideoTagModel } from './video-tag' |
129 | import { VideoViewModel } from './video-view' | 129 | import { VideoViewModel } from './video-view' |
130 | import { LiveManager } from '@server/lib/live-manager' | ||
130 | 131 | ||
131 | export enum ScopeNames { | 132 | export enum ScopeNames { |
132 | AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS', | 133 | AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS', |
@@ -800,6 +801,13 @@ export class VideoModel extends Model<VideoModel> { | |||
800 | } | 801 | } |
801 | 802 | ||
802 | @BeforeDestroy | 803 | @BeforeDestroy |
804 | static stopLiveIfNeeded (instance: VideoModel) { | ||
805 | if (!instance.isLive) return | ||
806 | |||
807 | return LiveManager.Instance.stopSessionOf(instance.id) | ||
808 | } | ||
809 | |||
810 | @BeforeDestroy | ||
803 | static invalidateCache (instance: VideoModel) { | 811 | static invalidateCache (instance: VideoModel) { |
804 | ModelCache.Instance.invalidateCache('video', instance.id) | 812 | ModelCache.Instance.invalidateCache('video', instance.id) |
805 | } | 813 | } |