'video-import',
'videos-views',
'activitypub-refresher',
+ 'video-live-ending',
'video-redundancy'
]
}
private savePreferencesImpl () {
- this.userNotificationService.updateNotificationSettings(this.user, this.user.notificationSettings)
+ this.userNotificationService.updateNotificationSettings(this.user.notificationSettings)
.subscribe(
() => {
this.notifier.success($localize`Preferences saved`, undefined, 2000)
</ng-template>
</ng-container>
- <ng-container ngbNavItem *ngIf="videoLive">
+ <ng-container ngbNavItem *ngIf="liveVideo">
<a ngbNavLink i18n>Live settings</a>
<ng-template ngbNavContent>
<div class="col-md-12">
<div class="form-group">
- <label for="videoLiveRTMPUrl" i18n>Live RTMP Url</label>
- <my-input-readonly-copy id="videoLiveRTMPUrl" [value]="videoLive.rtmpUrl"></my-input-readonly-copy>
+ <label for="liveVideoRTMPUrl" i18n>Live RTMP Url</label>
+ <my-input-readonly-copy id="liveVideoRTMPUrl" [value]="liveVideo.rtmpUrl"></my-input-readonly-copy>
</div>
<div class="form-group">
- <label for="videoLiveStreamKey" i18n>Live stream key</label>
- <my-input-readonly-copy id="videoLiveStreamKey" [value]="videoLive.streamKey"></my-input-readonly-copy>
+ <label for="liveVideoStreamKey" i18n>Live stream key</label>
+ <my-input-readonly-copy id="liveVideoStreamKey" [value]="liveVideo.streamKey"></my-input-readonly-copy>
</div>
</div>
</div>
import { FormReactiveValidationMessages, FormValidatorService, SelectChannelItem } from '@app/shared/shared-forms'
import { InstanceService } from '@app/shared/shared-instance'
import { VideoCaptionEdit, VideoEdit, VideoService } from '@app/shared/shared-main'
-import { ServerConfig, VideoConstant, VideoLive, VideoPrivacy } from '@shared/models'
+import { ServerConfig, VideoConstant, LiveVideo, VideoPrivacy } from '@shared/models'
import { RegisterClientFormFieldOptions, RegisterClientVideoFieldOptions } from '@shared/models/plugins/register-client-form-field.model'
import { I18nPrimengCalendarService } from './i18n-primeng-calendar.service'
import { VideoCaptionAddModalComponent } from './video-caption-add-modal.component'
@Input() videoCaptions: (VideoCaptionEdit & { captionPath?: string })[] = []
@Input() waitTranscodingEnabled = true
@Input() type: VideoEditType
- @Input() videoLive: VideoLive
+ @Input() liveVideo: LiveVideo
@ViewChild('videoCaptionAddModal', { static: true }) videoCaptionAddModal: VideoCaptionAddModalComponent
<form [hidden]="!isInUpdateForm" novalidate [formGroup]="form">
<my-video-edit
[form]="form" [formErrors]="formErrors" [videoCaptions]="videoCaptions" [schedulePublicationPossible]="false"
- [validationMessages]="validationMessages" [userVideoChannels]="userVideoChannels" [videoLive]="videoLive"
+ [validationMessages]="validationMessages" [userVideoChannels]="userVideoChannels" [liveVideo]="liveVideo"
type="go-live"
></my-video-edit>
import { AuthService, CanComponentDeactivate, Notifier, ServerService } from '@app/core'
import { scrollToTop } from '@app/helpers'
import { FormValidatorService } from '@app/shared/shared-forms'
-import { VideoCaptionService, VideoEdit, VideoService, VideoLiveService } from '@app/shared/shared-main'
+import { LiveVideoService, VideoCaptionService, VideoEdit, VideoService } from '@app/shared/shared-main'
import { LoadingBarService } from '@ngx-loading-bar/core'
-import { VideoCreate, VideoLive, VideoPrivacy } from '@shared/models'
+import { LiveVideo, VideoCreate, VideoPrivacy } from '@shared/models'
import { VideoSend } from './video-send'
@Component({
isInUpdateForm = false
- videoLive: VideoLive
+ liveVideo: LiveVideo
videoId: number
videoUUID: string
error: string
protected serverService: ServerService,
protected videoService: VideoService,
protected videoCaptionService: VideoCaptionService,
- private videoLiveService: VideoLiveService,
+ private liveVideoService: LiveVideoService,
private router: Router
) {
super()
const toPatch = Object.assign({}, video, { privacy: this.firstStepPrivacyId })
this.form.patchValue(toPatch)
- this.videoLiveService.goLive(video).subscribe(
+ this.liveVideoService.goLive(video).subscribe(
res => {
this.videoId = res.video.id
this.videoUUID = res.video.uuid
}
private fetchVideoLive () {
- this.videoLiveService.getVideoLive(this.videoId)
+ this.liveVideoService.getVideoLive(this.videoId)
.subscribe(
- videoLive => {
- this.videoLive = videoLive
+ liveVideo => {
+ this.liveVideo = liveVideo
},
err => {
[validationMessages]="validationMessages" [userVideoChannels]="userVideoChannels"
[videoCaptions]="videoCaptions" [waitTranscodingEnabled]="waitTranscodingEnabled"
type="update" (pluginFieldsAdded)="hydratePluginFieldsFromVideo()"
- [videoLive]="videoLive"
+ [liveVideo]="liveVideo"
></my-video-edit>
<div class="submit-container">
import { FormReactive, FormValidatorService, SelectChannelItem } from '@app/shared/shared-forms'
import { VideoCaptionEdit, VideoCaptionService, VideoDetails, VideoEdit, VideoService } from '@app/shared/shared-main'
import { LoadingBarService } from '@ngx-loading-bar/core'
-import { VideoPrivacy, VideoLive } from '@shared/models'
+import { LiveVideo, VideoPrivacy } from '@shared/models'
import { hydrateFormFromVideo } from './shared/video-edit-utils'
@Component({
video: VideoEdit
userVideoChannels: SelectChannelItem[] = []
videoCaptions: VideoCaptionEdit[] = []
- videoLive: VideoLive
+ liveVideo: LiveVideo
isUpdatingVideo = false
schedulePublicationPossible = false
this.route.data
.pipe(map(data => data.videoData))
- .subscribe(({ video, videoChannels, videoCaptions, videoLive }) => {
+ .subscribe(({ video, videoChannels, videoCaptions, liveVideo }) => {
this.video = new VideoEdit(video)
this.userVideoChannels = videoChannels
this.videoCaptions = videoCaptions
- this.videoLive = videoLive
+ this.liveVideo = liveVideo
this.schedulePublicationPossible = this.video.privacy === VideoPrivacy.PRIVATE
import { map, switchMap } from 'rxjs/operators'
import { Injectable } from '@angular/core'
import { ActivatedRouteSnapshot, Resolve } from '@angular/router'
-import { VideoCaptionService, VideoChannelService, VideoDetails, VideoLiveService, VideoService } from '@app/shared/shared-main'
+import { VideoCaptionService, VideoChannelService, VideoDetails, LiveVideoService, VideoService } from '@app/shared/shared-main'
@Injectable()
export class VideoUpdateResolver implements Resolve<any> {
constructor (
private videoService: VideoService,
- private videoLiveService: VideoLiveService,
+ private liveVideoService: LiveVideoService,
private videoChannelService: VideoChannelService,
private videoCaptionService: VideoCaptionService
) {
),
video.isLive
- ? this.videoLiveService.getVideoLive(video.id)
+ ? this.liveVideoService.getVideoLive(video.id)
: of(undefined)
]
}
This video will be published on {{ video.scheduledUpdate.updateAt | date: 'full' }}.
</div>
+ <div i18n class="col-md-12 alert alert-info" *ngIf="isWaitingForLive()">
+ This live has not started yet.
+ </div>
+
+ <div i18n class="col-md-12 alert alert-info" *ngIf="isLiveEnded()">
+ This live is finished.
+ </div>
+
<div class="col-md-12 alert alert-danger" *ngIf="video?.blacklisted">
<div class="blocked-label" i18n>This video is blocked.</div>
{{ video.blockedReason }}
</div>
</div>
- <ng-container *ngIf="!isUserLoggedIn()">
+ <ng-container *ngIf="!isUserLoggedIn() && !isLive()">
<button
*ngIf="isVideoDownloadable()" class="action-button action-button-save"
(click)="showDownloadModal()" (keydown.enter)="showDownloadModal()"
}
#video-wrapper {
+ $video-height: 66vh;
+
background-color: #000;
display: flex;
justify-content: center;
display: flex;
justify-content: center;
flex-grow: 1;
+ height: $video-height;
}
.remote-server-down {
::ng-deep .video-js {
width: 100%;
max-width: getPlayerWidth(66vh);
- height: 66vh;
+ height: $video-height;
// VideoJS create an inner video player
video {
import { PlatformLocation } from '@angular/common'
import { ChangeDetectorRef, Component, ElementRef, Inject, LOCALE_ID, NgZone, OnDestroy, OnInit, ViewChild } from '@angular/core'
import { ActivatedRoute, Router } from '@angular/router'
-import { AuthService, AuthUser, ConfirmService, MarkdownService, Notifier, RestExtractor, ServerService, UserService } from '@app/core'
+import {
+ AuthService,
+ AuthUser,
+ ConfirmService,
+ MarkdownService,
+ Notifier,
+ PeerTubeSocket,
+ RestExtractor,
+ ServerService,
+ UserService
+} from '@app/core'
import { HooksService } from '@app/core/plugins/hooks.service'
import { RedirectService } from '@app/core/routing/redirect.service'
import { isXPercentInViewport, scrollToTop } from '@app/helpers'
import { VideoSupportComponent } from './modal/video-support.component'
import { VideoWatchPlaylistComponent } from './video-watch-playlist.component'
+type URLOptions = CustomizationOptions & { playerMode: PlayerMode }
+
@Component({
selector: 'my-video-watch',
templateUrl: './video-watch.component.html',
private paramsSub: Subscription
private queryParamsSub: Subscription
private configSub: Subscription
+ private liveVideosSub: Subscription
private serverConfig: ServerConfig
private videoCaptionService: VideoCaptionService,
private hotkeysService: HotkeysService,
private hooks: HooksService,
+ private peertubeSocket: PeerTubeSocket,
private location: PlatformLocation,
@Inject(LOCALE_ID) private localeId: string
) {
if (this.paramsSub) this.paramsSub.unsubscribe()
if (this.queryParamsSub) this.queryParamsSub.unsubscribe()
if (this.configSub) this.configSub.unsubscribe()
+ if (this.liveVideosSub) this.liveVideosSub.unsubscribe()
// Unbind hotkeys
this.hotkeysService.remove(this.hotkeys)
return this.video && this.video.scheduledUpdate !== undefined
}
+ isLive () {
+ return !!(this.video?.isLive)
+ }
+
+ isWaitingForLive () {
+ return this.video?.state.id === VideoState.WAITING_FOR_LIVE
+ }
+
+ isLiveEnded () {
+ return this.video?.state.id === VideoState.LIVE_ENDED
+ }
+
isVideoBlur (video: Video) {
return video.isVideoNSFWForUser(this.user, this.serverConfig)
}
private async onVideoFetched (
video: VideoDetails,
videoCaptions: VideoCaption[],
- urlOptions: CustomizationOptions & { playerMode: PlayerMode }
+ urlOptions: URLOptions
) {
+ this.subscribeToLiveEventsIfNeeded(this.video, video)
+
this.video = video
this.videoCaptions = videoCaptions
if (res === false) return this.location.back()
}
+ const videoState = this.video.state.id
+ if (videoState === VideoState.LIVE_ENDED || videoState === VideoState.WAITING_FOR_LIVE) return
+
// Flush old player if needed
this.flushPlayer()
return !this.player.paused()
}
+ private async subscribeToLiveEventsIfNeeded (oldVideo: VideoDetails, newVideo: VideoDetails) {
+ if (!this.liveVideosSub) {
+ this.liveVideosSub = this.peertubeSocket.getLiveVideosObservable()
+ .subscribe(({ payload }) => {
+ if (payload.state !== VideoState.PUBLISHED || this.video.state.id !== VideoState.WAITING_FOR_LIVE) return
+
+ const videoUUID = this.video.uuid
+
+ // Reset to refetch the video
+ this.video = undefined
+ this.loadVideo(videoUUID)
+ })
+ }
+
+ if (oldVideo && oldVideo.id !== newVideo.id) {
+ await this.peertubeSocket.unsubscribeLiveVideos(oldVideo.id)
+ }
+
+ if (!newVideo.isLive) return
+
+ await this.peertubeSocket.subscribeToLiveVideosSocket(newVideo.id)
+ }
+
private initHotkeys () {
this.hotkeys = [
// These hotkeys are managed by the player
import { CommonModule } from '@angular/common'
import { NgModule, Optional, SkipSelf } from '@angular/core'
import { BrowserAnimationsModule } from '@angular/platform-browser/animations'
-import { UserNotificationSocket } from '@app/core/notification/user-notification-socket.service'
+import { PeerTubeSocket } from '@app/core/notification/peertube-socket.service'
import { HooksService } from '@app/core/plugins/hooks.service'
import { PluginService } from '@app/core/plugins/plugin.service'
import { UnloggedGuard } from '@app/core/routing/unlogged-guard.service'
RedirectService,
Notifier,
MessageService,
- UserNotificationSocket,
+ PeerTubeSocket,
ServerConfigResolver,
CanDeactivateGuard
]
export * from './notifier.service'
-export * from './user-notification-socket.service'
+export * from './peertube-socket.service'
--- /dev/null
+import { Subject } from 'rxjs'
+import { Injectable, NgZone } from '@angular/core'
+import { LiveVideoEventPayload, LiveVideoEventType, UserNotification as UserNotificationServer } from '@shared/models'
+import { environment } from '../../../environments/environment'
+import { AuthService } from '../auth'
+
+export type NotificationEvent = 'new' | 'read' | 'read-all'
+
+@Injectable()
+export class PeerTubeSocket {
+ private io: typeof import ('socket.io-client')
+
+ private notificationSubject = new Subject<{ type: NotificationEvent, notification?: UserNotificationServer }>()
+ private liveVideosSubject = new Subject<{ type: LiveVideoEventType, payload: LiveVideoEventPayload }>()
+
+ private notificationSocket: SocketIOClient.Socket
+ private liveVideosSocket: SocketIOClient.Socket
+
+ constructor (
+ private auth: AuthService,
+ private ngZone: NgZone
+ ) {}
+
+ async getMyNotificationsSocket () {
+ await this.initNotificationSocket()
+
+ return this.notificationSubject.asObservable()
+ }
+
+ getLiveVideosObservable () {
+ return this.liveVideosSubject.asObservable()
+ }
+
+ async subscribeToLiveVideosSocket (videoId: number) {
+ await this.initLiveVideosSocket()
+
+ this.liveVideosSocket.emit('subscribe', { videoId })
+ }
+
+ async unsubscribeLiveVideos (videoId: number) {
+ if (!this.liveVideosSocket) return
+
+ this.liveVideosSocket.emit('unsubscribe', { videoId })
+ }
+
+ dispatchNotificationEvent (type: NotificationEvent, notification?: UserNotificationServer) {
+ this.notificationSubject.next({ type, notification })
+ }
+
+ private async initNotificationSocket () {
+ if (this.notificationSocket) return
+
+ await this.importIOIfNeeded()
+
+ this.ngZone.runOutsideAngular(() => {
+ this.notificationSocket = this.io(environment.apiUrl + '/user-notifications', {
+ query: { accessToken: this.auth.getAccessToken() }
+ })
+
+ this.notificationSocket.on('new-notification', (n: UserNotificationServer) => this.dispatchNotificationEvent('new', n))
+ })
+ }
+
+ private async initLiveVideosSocket () {
+ if (this.liveVideosSocket) return
+
+ await this.importIOIfNeeded()
+
+ this.ngZone.runOutsideAngular(() => {
+ this.liveVideosSocket = this.io(environment.apiUrl + '/live-videos')
+
+ const type: LiveVideoEventType = 'state-change'
+ this.liveVideosSocket.on(type, (payload: LiveVideoEventPayload) => this.dispatchLiveVideoEvent(type, payload))
+ })
+ }
+
+ private async importIOIfNeeded () {
+ if (this.io) return
+
+ this.io = (await import('socket.io-client') as any).default
+ }
+
+ private dispatchLiveVideoEvent (type: LiveVideoEventType, payload: LiveVideoEventPayload) {
+ this.liveVideosSubject.next({ type, payload })
+ }
+}
+++ /dev/null
-import { Subject } from 'rxjs'
-import { Injectable, NgZone } from '@angular/core'
-import { UserNotification as UserNotificationServer } from '@shared/models'
-import { environment } from '../../../environments/environment'
-import { AuthService } from '../auth'
-
-export type NotificationEvent = 'new' | 'read' | 'read-all'
-
-@Injectable()
-export class UserNotificationSocket {
- private notificationSubject = new Subject<{ type: NotificationEvent, notification?: UserNotificationServer }>()
-
- private socket: SocketIOClient.Socket
-
- constructor (
- private auth: AuthService,
- private ngZone: NgZone
- ) {}
-
- dispatch (type: NotificationEvent, notification?: UserNotificationServer) {
- this.notificationSubject.next({ type, notification })
- }
-
- async getMyNotificationsSocket () {
- await this.initSocket()
-
- return this.notificationSubject.asObservable()
- }
-
- private async initSocket () {
- if (this.socket) return
-
- // FIXME: import('..') returns a struct module, containing a "default" field corresponding to our sanitizeHtml function
- const io: typeof import ('socket.io-client') = (await import('socket.io-client') as any).default
-
- this.ngZone.runOutsideAngular(() => {
- this.socket = io(environment.apiUrl + '/user-notifications', {
- query: { accessToken: this.auth.getAccessToken() }
- })
-
- this.socket.on('new-notification', (n: UserNotificationServer) => this.dispatch('new', n))
- })
- }
-}
import { filter } from 'rxjs/operators'
import { Component, EventEmitter, Input, Output, OnDestroy, OnInit, ViewChild } from '@angular/core'
import { NavigationEnd, Router } from '@angular/router'
-import { Notifier, User, UserNotificationSocket } from '@app/core'
+import { Notifier, User, PeerTubeSocket } from '@app/core'
import { UserNotificationService } from '@app/shared/shared-main'
import { NgbPopover } from '@ng-bootstrap/ng-bootstrap'
constructor (
private userNotificationService: UserNotificationService,
- private userNotificationSocket: UserNotificationSocket,
+ private peertubeSocket: PeerTubeSocket,
private notifier: Notifier,
private router: Router
) {
}
private async subscribeToNotifications () {
- const obs = await this.userNotificationSocket.getMyNotificationsSocket()
+ const obs = await this.peertubeSocket.getMyNotificationsSocket()
this.notificationSub = obs.subscribe(data => {
if (data.type === 'new') return this.unreadNotifications++
import { LoaderComponent, SmallLoaderComponent } from './loaders'
import { HelpComponent, ListOverflowComponent, TopMenuDropdownComponent } from './misc'
import { UserHistoryService, UserNotificationsComponent, UserNotificationService, UserQuotaComponent } from './users'
-import { RedundancyService, VideoImportService, VideoOwnershipService, VideoService, VideoLiveService } from './video'
+import { RedundancyService, VideoImportService, VideoOwnershipService, VideoService, LiveVideoService } from './video'
import { VideoCaptionService } from './video-caption'
import { VideoChannelService } from './video-channel'
RedundancyService,
VideoImportService,
VideoOwnershipService,
- VideoLiveService,
+ LiveVideoService,
VideoService,
VideoCaptionService,
import { catchError, map, tap } from 'rxjs/operators'
import { HttpClient, HttpParams } from '@angular/common/http'
import { Injectable } from '@angular/core'
-import { ComponentPaginationLight, RestExtractor, RestService, User, UserNotificationSocket, AuthService } from '@app/core'
+import { ComponentPaginationLight, RestExtractor, RestService, User, PeerTubeSocket, AuthService } from '@app/core'
import { ResultList, UserNotification as UserNotificationServer, UserNotificationSetting } from '@shared/models'
import { environment } from '../../../../environments/environment'
import { UserNotification } from './user-notification.model'
private auth: AuthService,
private restExtractor: RestExtractor,
private restService: RestService,
- private userNotificationSocket: UserNotificationSocket
+ private peertubeSocket: PeerTubeSocket
) {}
listMyNotifications (parameters: {
return this.authHttp.post(url, body, { headers })
.pipe(
map(this.restExtractor.extractDataBool),
- tap(() => this.userNotificationSocket.dispatch('read')),
+ tap(() => this.peertubeSocket.dispatchNotificationEvent('read')),
catchError(res => this.restExtractor.handleError(res))
)
}
return this.authHttp.post(url, {}, { headers })
.pipe(
map(this.restExtractor.extractDataBool),
- tap(() => this.userNotificationSocket.dispatch('read-all')),
+ tap(() => this.peertubeSocket.dispatchNotificationEvent('read-all')),
catchError(res => this.restExtractor.handleError(res))
)
}
- updateNotificationSettings (user: User, settings: UserNotificationSetting) {
+ updateNotificationSettings (settings: UserNotificationSetting) {
const url = UserNotificationService.BASE_NOTIFICATION_SETTINGS
return this.authHttp.put(url, settings)
+export * from './live-video.service'
export * from './redundancy.service'
export * from './video-details.model'
export * from './video-edit.model'
export * from './video-import.service'
-export * from './video-live.service'
export * from './video-ownership.service'
export * from './video.model'
export * from './video.service'
import { HttpClient } from '@angular/common/http'
import { Injectable } from '@angular/core'
import { RestExtractor } from '@app/core'
-import { VideoCreate, VideoLive } from '@shared/models'
+import { VideoCreate, LiveVideo } from '@shared/models'
import { environment } from '../../../../environments/environment'
@Injectable()
-export class VideoLiveService {
+export class LiveVideoService {
static BASE_VIDEO_LIVE_URL = environment.apiUrl + '/api/v1/videos/live/'
constructor (
goLive (video: VideoCreate) {
return this.authHttp
- .post<{ video: { id: number, uuid: string } }>(VideoLiveService.BASE_VIDEO_LIVE_URL, video)
+ .post<{ video: { id: number, uuid: string } }>(LiveVideoService.BASE_VIDEO_LIVE_URL, video)
.pipe(catchError(err => this.restExtractor.handleError(err)))
}
getVideoLive (videoId: number | string) {
return this.authHttp
- .get<VideoLive>(VideoLiveService.BASE_VIDEO_LIVE_URL + videoId)
+ .get<LiveVideo>(LiveVideoService.BASE_VIDEO_LIVE_URL + videoId)
.pipe(catchError(err => this.restExtractor.handleError(err)))
}
}
<div class="filters">
<div>
- <div class="form-group start-at">
+ <div class="form-group start-at" *ngIf="!video.isLive">
<my-peertube-checkbox
inputName="startAt" [(ngModel)]="customizations.startAtCheckbox"
i18n-labelText labelText="Start at"
<div class="advanced-filters collapse-transition" [ngbCollapse]="isAdvancedCustomizationCollapsed">
<div>
- <div class="form-group stop-at">
+ <div class="form-group stop-at" *ngIf="!video.isLive">
<my-peertube-checkbox
inputName="stopAt" [(ngModel)]="customizations.stopAtCheckbox"
i18n-labelText labelText="Stop at"
></my-peertube-checkbox>
</div>
- <div class="form-group">
+ <div class="form-group" *ngIf="!video.isLive">
<my-peertube-checkbox
inputName="loop" [(ngModel)]="customizations.loop"
i18n-labelText labelText="Loop"
}
isVideoDownloadable () {
- return this.video && this.video instanceof VideoDetails && this.video.downloadEnabled
+ return this.video &&
+ this.video.isLive !== true &&
+ this.video instanceof VideoDetails &&
+ this.video.downloadEnabled
}
canVideoBeDuplicated () {
live:
enabled: false
+ # Limit lives duration
+ # Set null to disable duration limit
+ max_duration: 5 hours
+
+ # Allow your users to save a replay of their live
+ # PeerTube will transcode segments in a video file
+ # If the user daily/total quota is reached, PeerTube will stop the live
+ # /!\ transcoding.enabled (and not live.transcoding.enabled) has to be true to create a replay
+ allow_replay: true
+
rtmp:
port: 1935
+ # Allow to transcode the live streaming in multiple live resolutions
transcoding:
enabled: false
threads: 2
port: 1935
transcoding:
- enabled: false
+ enabled: true
threads: 2
resolutions:
'email': 5,
'videos-views': 1,
'activitypub-refresher': 1,
- 'video-redundancy': 1
+ 'video-redundancy': 1,
+ 'video-live-ending': 1
}
const JOB_CONCURRENCY: { [id in JobType]: number } = {
'activitypub-http-broadcast': 1,
'email': 5,
'videos-views': 1,
'activitypub-refresher': 1,
- 'video-redundancy': 1
+ 'video-redundancy': 1,
+ 'video-live-ending': 1
}
const JOB_TTL: { [id in JobType]: number } = {
'activitypub-http-broadcast': 60000 * 10, // 10 minutes
'email': 60000 * 10, // 10 minutes
'videos-views': undefined, // Unlimited
'activitypub-refresher': 60000 * 10, // 10 minutes
- 'video-redundancy': 1000 * 3600 * 3 // 3 hours
+ 'video-redundancy': 1000 * 3600 * 3, // 3 hours
+ 'video-live-ending': 1000 * 60 * 10 // 10 minutes
}
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
'videos-views': {
const VIDEO_LIVE = {
EXTENSION: '.ts',
+ CLEANUP_DELAY: 1000 * 60 * 5, // 5 mintues
RTMP: {
CHUNK_SIZE: 60000,
GOP_CACHE: true,
import { ActorFollowScoreCache } from '../files-cache'
import { JobQueue } from '../job-queue'
import { Notifier } from '../notifier'
+import { PeerTubeSocket } from '../peertube-socket'
import { createPlaceholderThumbnail, createVideoMiniatureFromUrl } from '../thumbnail'
import { setVideoTags } from '../video'
import { autoBlacklistVideoIfNeeded } from '../video-blacklist'
video.privacy = videoData.privacy
video.channelId = videoData.channelId
video.views = videoData.views
+ video.isLive = videoData.isLive
const videoUpdated = await video.save(sequelizeOptions) as MVideoFullLight
})
if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated) // Notify our users?
+ if (videoUpdated.isLive) PeerTubeSocket.Instance.sendVideoLiveNewState(video)
logger.info('Remote video with uuid %s updated', videoObject.uuid)
--- /dev/null
+import * as Bull from 'bull'
+import { readdir, remove } from 'fs-extra'
+import { join } from 'path'
+import { getHLSDirectory } from '@server/lib/video-paths'
+import { VideoModel } from '@server/models/video/video'
+import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
+import { VideoLiveEndingPayload } from '@shared/models'
+import { logger } from '../../../helpers/logger'
+
+async function processVideoLiveEnding (job: Bull.Job) {
+ const payload = job.data as VideoLiveEndingPayload
+
+ const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId)
+ if (!video) {
+ logger.warn('Video live %d does not exist anymore. Cannot cleanup.', payload.videoId)
+ return
+ }
+
+ const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
+ const hlsDirectory = getHLSDirectory(video, false)
+
+ const files = await readdir(hlsDirectory)
+
+ for (const filename of files) {
+ if (
+ filename.endsWith('.ts') ||
+ filename.endsWith('.m3u8') ||
+ filename.endsWith('.mpd') ||
+ filename.endsWith('.m4s') ||
+ filename.endsWith('.tmp')
+ ) {
+ const p = join(hlsDirectory, filename)
+
+ remove(p)
+ .catch(err => logger.error('Cannot remove %s.', p, { err }))
+ }
+ }
+
+ streamingPlaylist.destroy()
+ .catch(err => logger.error('Cannot remove live streaming playlist.', { err }))
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+ processVideoLiveEnding
+}
RefreshPayload,
VideoFileImportPayload,
VideoImportPayload,
+ VideoLiveEndingPayload,
VideoRedundancyPayload,
VideoTranscodingPayload
} from '../../../shared/models'
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
+import { processVideoLiveEnding } from './handlers/video-live-ending'
type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'video-import', payload: VideoImportPayload } |
{ type: 'activitypub-refresher', payload: RefreshPayload } |
{ type: 'videos-views', payload: {} } |
+ { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
{ type: 'video-redundancy', payload: VideoRedundancyPayload }
+type CreateJobOptions = {
+ delay?: number
+}
+
const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'video-import': processVideoImport,
'videos-views': processVideosViews,
'activitypub-refresher': refreshAPObject,
+ 'video-live-ending': processVideoLiveEnding,
'video-redundancy': processVideoRedundancy
}
'video-import',
'videos-views',
'activitypub-refresher',
- 'video-redundancy'
+ 'video-redundancy',
+ 'video-live-ending'
]
class JobQueue {
}
}
- createJob (obj: CreateJobArgument): void {
- this.createJobWithPromise(obj)
+ createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
+ this.createJobWithPromise(obj, options)
.catch(err => logger.error('Cannot create job.', { err, obj }))
}
- createJobWithPromise (obj: CreateJobArgument) {
+ createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
const queue = this.queues[obj.type]
if (queue === undefined) {
logger.error('Unknown queue %s: cannot create job.', obj.type)
const jobArgs: Bull.JobOptions = {
backoff: { delay: 60 * 1000, type: 'exponential' },
attempts: JOB_ATTEMPTS[obj.type],
- timeout: JOB_TTL[obj.type]
+ timeout: JOB_TTL[obj.type],
+ delay: options.delay
}
return queue.add(obj.payload, jobArgs)
import { AsyncQueue, queue } from 'async'
import * as chokidar from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
-import { ensureDir, readdir, remove } from 'fs-extra'
-import { basename, join } from 'path'
+import { ensureDir } from 'fs-extra'
+import { basename } from 'path'
import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
import { logger } from '@server/helpers/logger'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
-import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models'
+import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models'
import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
+import { federateVideoIfNeeded } from './activitypub/videos'
import { buildSha256Segment } from './hls'
+import { JobQueue } from './job-queue'
+import { PeerTubeSocket } from './peertube-socket'
import { getHLSDirectory } from './video-paths'
const NodeRtmpServer = require('node-media-server/node_rtmp_server')
private static instance: LiveManager
private readonly transSessions = new Map<string, FfmpegCommand>()
+ private readonly videoSessions = new Map<number, string>()
private readonly segmentsSha256 = new Map<string, Map<string, string>>()
private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
}
init () {
- this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => {
+ const events = this.getContext().nodeEvent
+ events.on('postPublish', (sessionId: string, streamPath: string) => {
logger.debug('RTMP received stream', { id: sessionId, streamPath })
const splittedPath = streamPath.split('/')
.catch(err => logger.error('Cannot handle sessions.', { err }))
})
- this.getContext().nodeEvent.on('donePublish', sessionId => {
+ events.on('donePublish', sessionId => {
this.abortSession(sessionId)
})
return this.segmentsSha256.get(videoUUID)
}
+ stopSessionOf (videoId: number) {
+ const sessionId = this.videoSessions.get(videoId)
+ if (!sessionId) return
+
+ this.abortSession(sessionId)
+
+ this.onEndTransmuxing(videoId)
+ .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
+ }
+
private getContext () {
return context
}
}
const video = videoLive.Video
+ if (video.isBlacklisted()) {
+ logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
+ return this.abortSession(sessionId)
+ }
+
+ this.videoSessions.set(video.id, sessionId)
+
const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
const session = this.getContext().sessions.get(sessionId)
type: VideoStreamingPlaylistType.HLS
}, { returning: true }) as [ MStreamingPlaylist, boolean ]
- video.state = VideoState.PUBLISHED
- await video.save()
-
- // FIXME: federation?
-
return this.runMuxing({
sessionId,
videoLive,
this.transSessions.set(sessionId, ffmpegExec)
+ const videoUUID = videoLive.Video.uuid
+ const tsWatcher = chokidar.watch(outPath + '/*.ts')
+
+ const updateHandler = segmentPath => {
+ this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
+ }
+
+ const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
+
+ tsWatcher.on('add', p => updateHandler(p))
+ tsWatcher.on('change', p => updateHandler(p))
+ tsWatcher.on('unlink', p => deleteHandler(p))
+
+ const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
+ masterWatcher.on('add', async () => {
+ try {
+ const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
+
+ video.state = VideoState.PUBLISHED
+ await video.save()
+ videoLive.Video = video
+
+ await federateVideoIfNeeded(video, false)
+
+ PeerTubeSocket.Instance.sendVideoLiveNewState(video)
+ } catch (err) {
+ logger.error('Cannot federate video %d.', videoLive.videoId, { err })
+ } finally {
+ masterWatcher.close()
+ .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
+ }
+ })
+
const onFFmpegEnded = () => {
- watcher.close()
- .catch(err => logger.error('Cannot close watcher of %s.', outPath, { err }))
+ logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath)
- this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath)
+ Promise.all([ tsWatcher.close(), masterWatcher.close() ])
+ .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
+
+ this.onEndTransmuxing(videoLive.Video.id)
.catch(err => logger.error('Error in closed transmuxing.', { err }))
}
})
ffmpegExec.on('end', () => onFFmpegEnded())
-
- const videoUUID = videoLive.Video.uuid
- const watcher = chokidar.watch(outPath + '/*.ts')
-
- const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
- const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
-
- watcher.on('add', p => updateHandler(p))
- watcher.on('change', p => updateHandler(p))
- watcher.on('unlink', p => deleteHandler(p))
}
- private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) {
- logger.info('RTMP transmuxing for %s ended.', streamPath)
+ private async onEndTransmuxing (videoId: number) {
+ try {
+ const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
+ if (!fullVideo) return
- const files = await readdir(outPath)
+ JobQueue.Instance.createJob({
+ type: 'video-live-ending',
+ payload: {
+ videoId: fullVideo.id
+ }
+ }, { delay: VIDEO_LIVE.CLEANUP_DELAY })
- for (const filename of files) {
- if (
- filename.endsWith('.ts') ||
- filename.endsWith('.m3u8') ||
- filename.endsWith('.mpd') ||
- filename.endsWith('.m4s') ||
- filename.endsWith('.tmp')
- ) {
- const p = join(outPath, filename)
+ // FIXME: use end
+ fullVideo.state = VideoState.WAITING_FOR_LIVE
+ await fullVideo.save()
- remove(p)
- .catch(err => logger.error('Cannot remove %s.', p, { err }))
- }
- }
+ PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
- playlist.destroy()
- .catch(err => logger.error('Cannot remove live streaming playlist.', { err }))
-
- video.state = VideoState.LIVE_ENDED
- video.save()
- .catch(err => logger.error('Cannot save new video state of live streaming.', { err }))
+ await federateVideoIfNeeded(fullVideo, false)
+ } catch (err) {
+ logger.error('Cannot save/federate new video state of live streaming.', { err })
+ }
}
private async addSegmentSha (options: SegmentSha256QueueParam) {
-import * as SocketIO from 'socket.io'
-import { authenticateSocket } from '../middlewares'
-import { logger } from '../helpers/logger'
+import { Socket } from 'dgram'
import { Server } from 'http'
+import * as SocketIO from 'socket.io'
+import { MVideo } from '@server/types/models'
import { UserNotificationModelForApi } from '@server/types/models/user'
+import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
+import { logger } from '../helpers/logger'
+import { authenticateSocket } from '../middlewares'
class PeerTubeSocket {
private static instance: PeerTubeSocket
private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {}
+ private liveVideosNamespace: SocketIO.Namespace
private constructor () {}
this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket)
})
})
+
+ this.liveVideosNamespace = io.of('/live-videos')
+ .on('connection', socket => {
+ socket.on('subscribe', ({ videoId }) => socket.join(videoId))
+ socket.on('unsubscribe', ({ videoId }) => socket.leave(videoId))
+ })
}
sendNotification (userId: number, notification: UserNotificationModelForApi) {
const sockets = this.userNotificationSockets[userId]
-
if (!sockets) return
+ logger.debug('Sending user notification to user %d.', userId)
+
const notificationMessage = notification.toFormattedJSON()
for (const socket of sockets) {
socket.emit('new-notification', notificationMessage)
}
}
+ sendVideoLiveNewState (video: MVideo) {
+ const data: LiveVideoEventPayload = { state: video.state }
+ const type: LiveVideoEventType = 'state-change'
+
+ logger.debug('Sending video live new state notification of %s.', video.url)
+
+ this.liveVideosNamespace
+ .in(video.id)
+ .emit(type, data)
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}
import { federateVideoIfNeeded } from './activitypub/videos'
import { Notifier } from './notifier'
import { Hooks } from './plugins/hooks'
+import { LiveManager } from './live-manager'
async function autoBlacklistVideoIfNeeded (parameters: {
video: MVideoWithBlacklistLight
await sendDeleteVideo(videoInstance, undefined)
}
+ if (videoInstance.isLive) {
+ LiveManager.Instance.stopSessionOf(videoInstance.id)
+ }
+
Notifier.Instance.notifyOnVideoBlacklist(blacklist)
}
import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript'
import { WEBSERVER } from '@server/initializers/constants'
import { MVideoLive, MVideoLiveVideo } from '@server/types/models'
-import { VideoLive } from '@shared/models/videos/video-live.model'
+import { LiveVideo, VideoState } from '@shared/models'
import { VideoModel } from './video'
+import { VideoBlacklistModel } from './video-blacklist'
@DefaultScope(() => ({
include: [
{
model: VideoModel,
- required: true
+ required: true,
+ include: [
+ {
+ model: VideoBlacklistModel,
+ required: false
+ }
+ ]
}
]
}))
const query = {
where: {
streamKey
- }
+ },
+ include: [
+ {
+ model: VideoModel.unscoped(),
+ required: true,
+ where: {
+ state: VideoState.WAITING_FOR_LIVE
+ },
+ include: [
+ {
+ model: VideoBlacklistModel.unscoped(),
+ required: false
+ }
+ ]
+ }
+ ]
}
return VideoLiveModel.findOne<MVideoLiveVideo>(query)
return VideoLiveModel.findOne<MVideoLive>(query)
}
- toFormattedJSON (): VideoLive {
+ toFormattedJSON (): LiveVideo {
return {
rtmpUrl: WEBSERVER.RTMP_URL,
streamKey: this.streamKey
return VideoStreamingPlaylistModel.findByPk(id, options)
}
+ static loadHLSPlaylistByVideo (videoId: number) {
+ const options = {
+ where: {
+ type: VideoStreamingPlaylistType.HLS,
+ videoId
+ }
+ }
+
+ return VideoStreamingPlaylistModel.findOne(options)
+ }
+
static getHlsPlaylistFilename (resolution: number) {
return resolution + '.m3u8'
}
import { VideoStreamingPlaylistModel } from './video-streaming-playlist'
import { VideoTagModel } from './video-tag'
import { VideoViewModel } from './video-view'
+import { LiveManager } from '@server/lib/live-manager'
export enum ScopeNames {
AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS',
return undefined
}
+ @BeforeDestroy
+ static stopLiveIfNeeded (instance: VideoModel) {
+ if (!instance.isLive) return
+
+ return LiveManager.Instance.stopSessionOf(instance.id)
+ }
+
@BeforeDestroy
static invalidateCache (instance: VideoModel) {
ModelCache.Instance.invalidateCache('video', instance.id)
| 'videos-views'
| 'activitypub-refresher'
| 'video-redundancy'
+ | 'video-live-ending'
export interface Job {
id: number
| NewResolutionTranscodingPayload
| OptimizeTranscodingPayload
| MergeAudioTranscodingPayload
+
+export interface VideoLiveEndingPayload {
+ videoId: number
+}
export * from './blacklist'
export * from './caption'
export * from './channel'
+export * from './live'
export * from './import'
export * from './playlist'
export * from './rate'
export * from './video-file-metadata'
export * from './video-file.model'
-export * from './video-live.model'
+export * from './live/live-video.model'
export * from './video-privacy.enum'
export * from './video-query.type'
--- /dev/null
+export * from './live-video-event-payload.model'
+export * from './live-video-event.type'
+export * from './live-video.model'
--- /dev/null
+import { VideoState } from '../video-state.enum'
+
+export interface LiveVideoEventPayload {
+ state: VideoState
+}
--- /dev/null
+export type LiveVideoEventType = 'state-change'
-export interface VideoLive {
+export interface LiveVideo {
rtmpUrl: string
streamKey: string
}