aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/job-queue/handlers/video-import.ts3
-rw-r--r--server/lib/live-manager.ts89
-rw-r--r--server/lib/user.ts72
3 files changed, 140 insertions, 24 deletions
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 9b5f2bb2b..9210aec54 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -4,6 +4,7 @@ import { extname } from 'path'
4import { addOptimizeOrMergeAudioJob } from '@server/helpers/video' 4import { addOptimizeOrMergeAudioJob } from '@server/helpers/video'
5import { isPostImportVideoAccepted } from '@server/lib/moderation' 5import { isPostImportVideoAccepted } from '@server/lib/moderation'
6import { Hooks } from '@server/lib/plugins/hooks' 6import { Hooks } from '@server/lib/plugins/hooks'
7import { isAbleToUploadVideo } from '@server/lib/user'
7import { getVideoFilePath } from '@server/lib/video-paths' 8import { getVideoFilePath } from '@server/lib/video-paths'
8import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 9import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
9import { 10import {
@@ -108,7 +109,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
108 109
109 // Get information about this video 110 // Get information about this video
110 const stats = await stat(tempVideoPath) 111 const stats = await stat(tempVideoPath)
111 const isAble = await videoImport.User.isAbleToUploadVideo({ size: stats.size }) 112 const isAble = await isAbleToUploadVideo(videoImport.User.id, stats.size)
112 if (isAble === false) { 113 if (isAble === false) {
113 throw new Error('The user video quota is exceeded with this video to import.') 114 throw new Error('The user video quota is exceeded with this video to import.')
114 } 115 }
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts
index 41176d197..3ff2434ff 100644
--- a/server/lib/live-manager.ts
+++ b/server/lib/live-manager.ts
@@ -2,24 +2,27 @@
2import { AsyncQueue, queue } from 'async' 2import { AsyncQueue, queue } from 'async'
3import * as chokidar from 'chokidar' 3import * as chokidar from 'chokidar'
4import { FfmpegCommand } from 'fluent-ffmpeg' 4import { FfmpegCommand } from 'fluent-ffmpeg'
5import { ensureDir } from 'fs-extra' 5import { ensureDir, stat } from 'fs-extra'
6import { basename } from 'path' 6import { basename } from 'path'
7import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' 7import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
8import { logger } from '@server/helpers/logger' 8import { logger } from '@server/helpers/logger'
9import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 9import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
10import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' 10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
11import { UserModel } from '@server/models/account/user'
11import { VideoModel } from '@server/models/video/video' 12import { VideoModel } from '@server/models/video/video'
12import { VideoFileModel } from '@server/models/video/video-file' 13import { VideoFileModel } from '@server/models/video/video-file'
13import { VideoLiveModel } from '@server/models/video/video-live' 14import { VideoLiveModel } from '@server/models/video/video-live'
14import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 15import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
15import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models' 16import { MStreamingPlaylist, MUser, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
16import { VideoState, VideoStreamingPlaylistType } from '@shared/models' 17import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
17import { federateVideoIfNeeded } from './activitypub/videos' 18import { federateVideoIfNeeded } from './activitypub/videos'
18import { buildSha256Segment } from './hls' 19import { buildSha256Segment } from './hls'
19import { JobQueue } from './job-queue' 20import { JobQueue } from './job-queue'
20import { PeerTubeSocket } from './peertube-socket' 21import { PeerTubeSocket } from './peertube-socket'
22import { isAbleToUploadVideo } from './user'
21import { getHLSDirectory } from './video-paths' 23import { getHLSDirectory } from './video-paths'
22 24
25import memoizee = require('memoizee')
23const NodeRtmpServer = require('node-media-server/node_rtmp_server') 26const NodeRtmpServer = require('node-media-server/node_rtmp_server')
24const context = require('node-media-server/node_core_ctx') 27const context = require('node-media-server/node_core_ctx')
25const nodeMediaServerLogger = require('node-media-server/node_core_logger') 28const nodeMediaServerLogger = require('node-media-server/node_core_logger')
@@ -53,6 +56,11 @@ class LiveManager {
53 private readonly transSessions = new Map<string, FfmpegCommand>() 56 private readonly transSessions = new Map<string, FfmpegCommand>()
54 private readonly videoSessions = new Map<number, string>() 57 private readonly videoSessions = new Map<number, string>()
55 private readonly segmentsSha256 = new Map<string, Map<string, string>>() 58 private readonly segmentsSha256 = new Map<string, Map<string, string>>()
59 private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
60
61 private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
62 return isAbleToUploadVideo(userId, 1000)
63 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
56 64
57 private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam> 65 private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
58 private rtmpServer: any 66 private rtmpServer: any
@@ -127,7 +135,7 @@ class LiveManager {
127 135
128 this.abortSession(sessionId) 136 this.abortSession(sessionId)
129 137
130 this.onEndTransmuxing(videoId) 138 this.onEndTransmuxing(videoId, true)
131 .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err })) 139 .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
132 } 140 }
133 141
@@ -196,8 +204,18 @@ class LiveManager {
196 originalResolution: number 204 originalResolution: number
197 }) { 205 }) {
198 const { sessionId, videoLive, playlist, streamPath, resolutionsEnabled, originalResolution } = options 206 const { sessionId, videoLive, playlist, streamPath, resolutionsEnabled, originalResolution } = options
207 const startStreamDateTime = new Date().getTime()
199 const allResolutions = resolutionsEnabled.concat([ originalResolution ]) 208 const allResolutions = resolutionsEnabled.concat([ originalResolution ])
200 209
210 const user = await UserModel.loadByLiveId(videoLive.id)
211 if (!this.livesPerUser.has(user.id)) {
212 this.livesPerUser.set(user.id, [])
213 }
214
215 const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
216 const livesOfUser = this.livesPerUser.get(user.id)
217 livesOfUser.push(currentUserLive)
218
201 for (let i = 0; i < allResolutions.length; i++) { 219 for (let i = 0; i < allResolutions.length; i++) {
202 const resolution = allResolutions[i] 220 const resolution = allResolutions[i]
203 221
@@ -216,26 +234,47 @@ class LiveManager {
216 const outPath = getHLSDirectory(videoLive.Video) 234 const outPath = getHLSDirectory(videoLive.Video)
217 await ensureDir(outPath) 235 await ensureDir(outPath)
218 236
237 const deleteSegments = videoLive.saveReplay === false
238
219 const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath 239 const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
220 const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED 240 const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
221 ? runLiveTranscoding(rtmpUrl, outPath, allResolutions) 241 ? runLiveTranscoding(rtmpUrl, outPath, allResolutions, deleteSegments)
222 : runLiveMuxing(rtmpUrl, outPath) 242 : runLiveMuxing(rtmpUrl, outPath, deleteSegments)
223 243
224 logger.info('Running live muxing/transcoding.') 244 logger.info('Running live muxing/transcoding.')
225
226 this.transSessions.set(sessionId, ffmpegExec) 245 this.transSessions.set(sessionId, ffmpegExec)
227 246
228 const videoUUID = videoLive.Video.uuid 247 const videoUUID = videoLive.Video.uuid
229 const tsWatcher = chokidar.watch(outPath + '/*.ts') 248 const tsWatcher = chokidar.watch(outPath + '/*.ts')
230 249
231 const updateHandler = segmentPath => { 250 const updateSegment = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
232 this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) 251
252 const addHandler = segmentPath => {
253 updateSegment(segmentPath)
254
255 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
256 this.stopSessionOf(videoLive.videoId)
257 }
258
259 if (videoLive.saveReplay === true) {
260 stat(segmentPath)
261 .then(segmentStat => {
262 currentUserLive.size += segmentStat.size
263 })
264 .then(() => this.isQuotaConstraintValid(user, videoLive))
265 .then(quotaValid => {
266 if (quotaValid !== true) {
267 this.stopSessionOf(videoLive.videoId)
268 }
269 })
270 .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
271 }
233 } 272 }
234 273
235 const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) 274 const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
236 275
237 tsWatcher.on('add', p => updateHandler(p)) 276 tsWatcher.on('add', p => addHandler(p))
238 tsWatcher.on('change', p => updateHandler(p)) 277 tsWatcher.on('change', p => updateSegment(p))
239 tsWatcher.on('unlink', p => deleteHandler(p)) 278 tsWatcher.on('unlink', p => deleteHandler(p))
240 279
241 const masterWatcher = chokidar.watch(outPath + '/master.m3u8') 280 const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
@@ -280,7 +319,14 @@ class LiveManager {
280 ffmpegExec.on('end', () => onFFmpegEnded()) 319 ffmpegExec.on('end', () => onFFmpegEnded())
281 } 320 }
282 321
283 private async onEndTransmuxing (videoId: number) { 322 getLiveQuotaUsedByUser (userId: number) {
323 const currentLives = this.livesPerUser.get(userId)
324 if (!currentLives) return 0
325
326 return currentLives.reduce((sum, obj) => sum + obj.size, 0)
327 }
328
329 private async onEndTransmuxing (videoId: number, cleanupNow = false) {
284 try { 330 try {
285 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 331 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
286 if (!fullVideo) return 332 if (!fullVideo) return
@@ -290,7 +336,7 @@ class LiveManager {
290 payload: { 336 payload: {
291 videoId: fullVideo.id 337 videoId: fullVideo.id
292 } 338 }
293 }, { delay: VIDEO_LIVE.CLEANUP_DELAY }) 339 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
294 340
295 // FIXME: use end 341 // FIXME: use end
296 fullVideo.state = VideoState.WAITING_FOR_LIVE 342 fullVideo.state = VideoState.WAITING_FOR_LIVE
@@ -337,6 +383,23 @@ class LiveManager {
337 filesMap.delete(segmentName) 383 filesMap.delete(segmentName)
338 } 384 }
339 385
386 private isDurationConstraintValid (streamingStartTime: number) {
387 const maxDuration = CONFIG.LIVE.MAX_DURATION
388 // No limit
389 if (maxDuration === null) return true
390
391 const now = new Date().getTime()
392 const max = streamingStartTime + maxDuration
393
394 return now <= max
395 }
396
397 private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
398 if (live.saveReplay !== true) return true
399
400 return this.isAbleToUploadVideoWithCache(user.id)
401 }
402
340 static get Instance () { 403 static get Instance () {
341 return this.instance || (this.instance = new this()) 404 return this.instance || (this.instance = new this())
342 } 405 }
diff --git a/server/lib/user.ts b/server/lib/user.ts
index aa14f0b54..d3338f329 100644
--- a/server/lib/user.ts
+++ b/server/lib/user.ts
@@ -1,20 +1,24 @@
1import { Transaction } from 'sequelize/types'
1import { v4 as uuidv4 } from 'uuid' 2import { v4 as uuidv4 } from 'uuid'
3import { UserModel } from '@server/models/account/user'
2import { ActivityPubActorType } from '../../shared/models/activitypub' 4import { ActivityPubActorType } from '../../shared/models/activitypub'
5import { UserNotificationSetting, UserNotificationSettingValue } from '../../shared/models/users'
3import { SERVER_ACTOR_NAME, WEBSERVER } from '../initializers/constants' 6import { SERVER_ACTOR_NAME, WEBSERVER } from '../initializers/constants'
7import { sequelizeTypescript } from '../initializers/database'
4import { AccountModel } from '../models/account/account' 8import { AccountModel } from '../models/account/account'
5import { buildActorInstance, setAsyncActorKeys } from './activitypub/actor'
6import { createLocalVideoChannel } from './video-channel'
7import { ActorModel } from '../models/activitypub/actor'
8import { UserNotificationSettingModel } from '../models/account/user-notification-setting' 9import { UserNotificationSettingModel } from '../models/account/user-notification-setting'
9import { UserNotificationSetting, UserNotificationSettingValue } from '../../shared/models/users' 10import { ActorModel } from '../models/activitypub/actor'
10import { createWatchLaterPlaylist } from './video-playlist'
11import { sequelizeTypescript } from '../initializers/database'
12import { Transaction } from 'sequelize/types'
13import { Redis } from './redis'
14import { Emailer } from './emailer'
15import { MAccountDefault, MActorDefault, MChannelActor } from '../types/models' 11import { MAccountDefault, MActorDefault, MChannelActor } from '../types/models'
16import { MUser, MUserDefault, MUserId } from '../types/models/user' 12import { MUser, MUserDefault, MUserId } from '../types/models/user'
13import { buildActorInstance, setAsyncActorKeys } from './activitypub/actor'
17import { getAccountActivityPubUrl } from './activitypub/url' 14import { getAccountActivityPubUrl } from './activitypub/url'
15import { Emailer } from './emailer'
16import { LiveManager } from './live-manager'
17import { Redis } from './redis'
18import { createLocalVideoChannel } from './video-channel'
19import { createWatchLaterPlaylist } from './video-playlist'
20
21import memoizee = require('memoizee')
18 22
19type ChannelNames = { name: string, displayName: string } 23type ChannelNames = { name: string, displayName: string }
20 24
@@ -116,13 +120,61 @@ async function sendVerifyUserEmail (user: MUser, isPendingEmail = false) {
116 await Emailer.Instance.addVerifyEmailJob(username, email, url) 120 await Emailer.Instance.addVerifyEmailJob(username, email, url)
117} 121}
118 122
123async function getOriginalVideoFileTotalFromUser (user: MUserId) {
124 // Don't use sequelize because we need to use a sub query
125 const query = UserModel.generateUserQuotaBaseSQL({
126 withSelect: true,
127 whereUserId: '$userId'
128 })
129
130 const base = await UserModel.getTotalRawQuery(query, user.id)
131
132 return base + LiveManager.Instance.getLiveQuotaUsedByUser(user.id)
133}
134
135// Returns cumulative size of all video files uploaded in the last 24 hours.
136async function getOriginalVideoFileTotalDailyFromUser (user: MUserId) {
137 // Don't use sequelize because we need to use a sub query
138 const query = UserModel.generateUserQuotaBaseSQL({
139 withSelect: true,
140 whereUserId: '$userId',
141 where: '"video"."createdAt" > now() - interval \'24 hours\''
142 })
143
144 const base = await UserModel.getTotalRawQuery(query, user.id)
145
146 return base + LiveManager.Instance.getLiveQuotaUsedByUser(user.id)
147}
148
149async function isAbleToUploadVideo (userId: number, size: number) {
150 const user = await UserModel.loadById(userId)
151
152 if (user.videoQuota === -1 && user.videoQuotaDaily === -1) return Promise.resolve(true)
153
154 const [ totalBytes, totalBytesDaily ] = await Promise.all([
155 getOriginalVideoFileTotalFromUser(user.id),
156 getOriginalVideoFileTotalDailyFromUser(user.id)
157 ])
158
159 const uploadedTotal = size + totalBytes
160 const uploadedDaily = size + totalBytesDaily
161
162 if (user.videoQuotaDaily === -1) return uploadedTotal < user.videoQuota
163 if (user.videoQuota === -1) return uploadedDaily < user.videoQuotaDaily
164
165 return uploadedTotal < user.videoQuota && uploadedDaily < user.videoQuotaDaily
166}
167
119// --------------------------------------------------------------------------- 168// ---------------------------------------------------------------------------
120 169
121export { 170export {
171 getOriginalVideoFileTotalFromUser,
172 getOriginalVideoFileTotalDailyFromUser,
122 createApplicationActor, 173 createApplicationActor,
123 createUserAccountAndChannelAndPlaylist, 174 createUserAccountAndChannelAndPlaylist,
124 createLocalAccountWithoutKeys, 175 createLocalAccountWithoutKeys,
125 sendVerifyUserEmail 176 sendVerifyUserEmail,
177 isAbleToUploadVideo
126} 178}
127 179
128// --------------------------------------------------------------------------- 180// ---------------------------------------------------------------------------