]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Add watch messages if live has not started
authorChocobozzz <me@florianbigard.com>
Fri, 25 Sep 2020 08:04:21 +0000 (10:04 +0200)
committerChocobozzz <chocobozzz@cpy.re>
Mon, 9 Nov 2020 14:33:04 +0000 (15:33 +0100)
41 files changed:
client/src/app/+admin/system/jobs/jobs.component.ts
client/src/app/+my-account/my-account-settings/my-account-notification-preferences/my-account-notification-preferences.component.ts
client/src/app/+videos/+video-edit/shared/video-edit.component.html
client/src/app/+videos/+video-edit/shared/video-edit.component.ts
client/src/app/+videos/+video-edit/video-add-components/video-go-live.component.html
client/src/app/+videos/+video-edit/video-add-components/video-go-live.component.ts
client/src/app/+videos/+video-edit/video-update.component.html
client/src/app/+videos/+video-edit/video-update.component.ts
client/src/app/+videos/+video-edit/video-update.resolver.ts
client/src/app/+videos/+video-watch/video-watch.component.html
client/src/app/+videos/+video-watch/video-watch.component.scss
client/src/app/+videos/+video-watch/video-watch.component.ts
client/src/app/core/core.module.ts
client/src/app/core/notification/index.ts
client/src/app/core/notification/peertube-socket.service.ts [new file with mode: 0644]
client/src/app/core/notification/user-notification-socket.service.ts [deleted file]
client/src/app/menu/avatar-notification.component.ts
client/src/app/shared/shared-main/shared-main.module.ts
client/src/app/shared/shared-main/users/user-notification.service.ts
client/src/app/shared/shared-main/video/index.ts
client/src/app/shared/shared-main/video/live-video.service.ts [moved from client/src/app/shared/shared-main/video/video-live.service.ts with 74% similarity]
client/src/app/shared/shared-share-modal/video-share.component.html
client/src/app/shared/shared-video-miniature/video-actions-dropdown.component.ts
config/default.yaml
config/test.yaml
server/initializers/constants.ts
server/lib/activitypub/videos.ts
server/lib/job-queue/handlers/video-live-ending.ts [new file with mode: 0644]
server/lib/job-queue/job-queue.ts
server/lib/live-manager.ts
server/lib/peertube-socket.ts
server/lib/video-blacklist.ts
server/models/video/video-live.ts
server/models/video/video-streaming-playlist.ts
server/models/video/video.ts
shared/models/server/job.model.ts
shared/models/videos/index.ts
shared/models/videos/live/index.ts [new file with mode: 0644]
shared/models/videos/live/live-video-event-payload.model.ts [new file with mode: 0644]
shared/models/videos/live/live-video-event.type.ts [new file with mode: 0644]
shared/models/videos/live/live-video.model.ts [moved from shared/models/videos/video-live.model.ts with 57% similarity]

index 25d75aed255e55149eb9b90468b00a05fe2f8dd4..602362fe91694420fd5b0221ef74ef45d6aeb27c 100644 (file)
@@ -32,6 +32,7 @@ export class JobsComponent extends RestTable implements OnInit {
     'video-import',
     'videos-views',
     'activitypub-refresher',
+    'video-live-ending',
     'video-redundancy'
   ]
 
index bcbea7fad1e608bacdf49089d7156d3373cca8d5..ad7497f45f644fe3ffdce6ae94600149ceddf436 100644 (file)
@@ -86,7 +86,7 @@ export class MyAccountNotificationPreferencesComponent implements OnInit {
   }
 
   private savePreferencesImpl () {
-    this.userNotificationService.updateNotificationSettings(this.user, this.user.notificationSettings)
+    this.userNotificationService.updateNotificationSettings(this.user.notificationSettings)
       .subscribe(
         () => {
           this.notifier.success($localize`Preferences saved`, undefined, 2000)
index c444dd8d3f106e248d01301a78f6046016398674..0802e906dbfc86259a57ca0192da69e325337f72 100644 (file)
       </ng-template>
     </ng-container>
 
-    <ng-container ngbNavItem *ngIf="videoLive">
+    <ng-container ngbNavItem *ngIf="liveVideo">
       <a ngbNavLink i18n>Live settings</a>
 
       <ng-template ngbNavContent>
           <div class="col-md-12">
 
             <div class="form-group">
-              <label for="videoLiveRTMPUrl" i18n>Live RTMP Url</label>
-              <my-input-readonly-copy id="videoLiveRTMPUrl" [value]="videoLive.rtmpUrl"></my-input-readonly-copy>
+              <label for="liveVideoRTMPUrl" i18n>Live RTMP Url</label>
+              <my-input-readonly-copy id="liveVideoRTMPUrl" [value]="liveVideo.rtmpUrl"></my-input-readonly-copy>
             </div>
 
             <div class="form-group">
-              <label for="videoLiveStreamKey" i18n>Live stream key</label>
-              <my-input-readonly-copy id="videoLiveStreamKey" [value]="videoLive.streamKey"></my-input-readonly-copy>
+              <label for="liveVideoStreamKey" i18n>Live stream key</label>
+              <my-input-readonly-copy id="liveVideoStreamKey" [value]="liveVideo.streamKey"></my-input-readonly-copy>
             </div>
           </div>
         </div>
index bee65184b0820674023f0a9382def6ab72fa1224..304bf7ed0286db89bfd993ff71367cc107f07ab0 100644 (file)
@@ -20,7 +20,7 @@ import {
 import { FormReactiveValidationMessages, FormValidatorService, SelectChannelItem } from '@app/shared/shared-forms'
 import { InstanceService } from '@app/shared/shared-instance'
 import { VideoCaptionEdit, VideoEdit, VideoService } from '@app/shared/shared-main'
-import { ServerConfig, VideoConstant, VideoLive, VideoPrivacy } from '@shared/models'
+import { ServerConfig, VideoConstant, LiveVideo, VideoPrivacy } from '@shared/models'
 import { RegisterClientFormFieldOptions, RegisterClientVideoFieldOptions } from '@shared/models/plugins/register-client-form-field.model'
 import { I18nPrimengCalendarService } from './i18n-primeng-calendar.service'
 import { VideoCaptionAddModalComponent } from './video-caption-add-modal.component'
@@ -42,7 +42,7 @@ export class VideoEditComponent implements OnInit, OnDestroy {
   @Input() videoCaptions: (VideoCaptionEdit & { captionPath?: string })[] = []
   @Input() waitTranscodingEnabled = true
   @Input() type: VideoEditType
-  @Input() videoLive: VideoLive
+  @Input() liveVideo: LiveVideo
 
   @ViewChild('videoCaptionAddModal', { static: true }) videoCaptionAddModal: VideoCaptionAddModalComponent
 
index 6997f53885e646ed405994fb1712fbaa708f8260..8fae4044ac25cd7eab988935456d5006e54f0462 100644 (file)
@@ -31,7 +31,7 @@
 <form [hidden]="!isInUpdateForm" novalidate [formGroup]="form">
   <my-video-edit
     [form]="form" [formErrors]="formErrors" [videoCaptions]="videoCaptions" [schedulePublicationPossible]="false"
-    [validationMessages]="validationMessages" [userVideoChannels]="userVideoChannels" [videoLive]="videoLive"
+    [validationMessages]="validationMessages" [userVideoChannels]="userVideoChannels" [liveVideo]="liveVideo"
     type="go-live"
   ></my-video-edit>
 
index 64fd4c4d4e145184c6e1317731ea045c04132dbc..0a9efc693818acecd6ad1b808b3d51cbed220452 100644 (file)
@@ -4,9 +4,9 @@ import { Router } from '@angular/router'
 import { AuthService, CanComponentDeactivate, Notifier, ServerService } from '@app/core'
 import { scrollToTop } from '@app/helpers'
 import { FormValidatorService } from '@app/shared/shared-forms'
-import { VideoCaptionService, VideoEdit, VideoService, VideoLiveService } from '@app/shared/shared-main'
+import { LiveVideoService, VideoCaptionService, VideoEdit, VideoService } from '@app/shared/shared-main'
 import { LoadingBarService } from '@ngx-loading-bar/core'
-import { VideoCreate, VideoLive, VideoPrivacy } from '@shared/models'
+import { LiveVideo, VideoCreate, VideoPrivacy } from '@shared/models'
 import { VideoSend } from './video-send'
 
 @Component({
@@ -23,7 +23,7 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, CanCompon
 
   isInUpdateForm = false
 
-  videoLive: VideoLive
+  liveVideo: LiveVideo
   videoId: number
   videoUUID: string
   error: string
@@ -38,7 +38,7 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, CanCompon
     protected serverService: ServerService,
     protected videoService: VideoService,
     protected videoCaptionService: VideoCaptionService,
-    private videoLiveService: VideoLiveService,
+    private liveVideoService: LiveVideoService,
     private router: Router
     ) {
     super()
@@ -69,7 +69,7 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, CanCompon
     const toPatch = Object.assign({}, video, { privacy: this.firstStepPrivacyId })
     this.form.patchValue(toPatch)
 
-    this.videoLiveService.goLive(video).subscribe(
+    this.liveVideoService.goLive(video).subscribe(
       res => {
         this.videoId = res.video.id
         this.videoUUID = res.video.uuid
@@ -114,10 +114,10 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, CanCompon
   }
 
   private fetchVideoLive () {
-    this.videoLiveService.getVideoLive(this.videoId)
+    this.liveVideoService.getVideoLive(this.videoId)
       .subscribe(
-        videoLive => {
-          this.videoLive = videoLive
+        liveVideo => {
+          this.liveVideo = liveVideo
         },
 
         err => {
index 5f50ddc742f1c8ddc865c0042d101f3fc4c1130c..f290fd136934631ef74e8321590128c55773a5bb 100644 (file)
@@ -11,7 +11,7 @@
       [validationMessages]="validationMessages" [userVideoChannels]="userVideoChannels"
       [videoCaptions]="videoCaptions" [waitTranscodingEnabled]="waitTranscodingEnabled"
       type="update" (pluginFieldsAdded)="hydratePluginFieldsFromVideo()"
-      [videoLive]="videoLive"
+      [liveVideo]="liveVideo"
     ></my-video-edit>
 
     <div class="submit-container">
index c0f46acd2f65dcff781de6e556abfda106743e46..ec1305a33c8df558c7b8e95f57a266a3daadcd54 100644 (file)
@@ -5,7 +5,7 @@ import { Notifier } from '@app/core'
 import { FormReactive, FormValidatorService, SelectChannelItem } from '@app/shared/shared-forms'
 import { VideoCaptionEdit, VideoCaptionService, VideoDetails, VideoEdit, VideoService } from '@app/shared/shared-main'
 import { LoadingBarService } from '@ngx-loading-bar/core'
-import { VideoPrivacy, VideoLive } from '@shared/models'
+import { LiveVideo, VideoPrivacy } from '@shared/models'
 import { hydrateFormFromVideo } from './shared/video-edit-utils'
 
 @Component({
@@ -17,7 +17,7 @@ export class VideoUpdateComponent extends FormReactive implements OnInit {
   video: VideoEdit
   userVideoChannels: SelectChannelItem[] = []
   videoCaptions: VideoCaptionEdit[] = []
-  videoLive: VideoLive
+  liveVideo: LiveVideo
 
   isUpdatingVideo = false
   schedulePublicationPossible = false
@@ -42,11 +42,11 @@ export class VideoUpdateComponent extends FormReactive implements OnInit {
 
     this.route.data
         .pipe(map(data => data.videoData))
-        .subscribe(({ video, videoChannels, videoCaptions, videoLive }) => {
+        .subscribe(({ video, videoChannels, videoCaptions, liveVideo }) => {
           this.video = new VideoEdit(video)
           this.userVideoChannels = videoChannels
           this.videoCaptions = videoCaptions
-          this.videoLive = videoLive
+          this.liveVideo = liveVideo
 
           this.schedulePublicationPossible = this.video.privacy === VideoPrivacy.PRIVATE
 
index 3a82324c3ea5477bca546b5034aab85f6a430894..b7ec22dd57d95d023f0621bff54c4da78c653ab5 100644 (file)
@@ -2,13 +2,13 @@ import { forkJoin, of } from 'rxjs'
 import { map, switchMap } from 'rxjs/operators'
 import { Injectable } from '@angular/core'
 import { ActivatedRouteSnapshot, Resolve } from '@angular/router'
-import { VideoCaptionService, VideoChannelService, VideoDetails, VideoLiveService, VideoService } from '@app/shared/shared-main'
+import { VideoCaptionService, VideoChannelService, VideoDetails, LiveVideoService, VideoService } from '@app/shared/shared-main'
 
 @Injectable()
 export class VideoUpdateResolver implements Resolve<any> {
   constructor (
     private videoService: VideoService,
-    private videoLiveService: VideoLiveService,
+    private liveVideoService: LiveVideoService,
     private videoChannelService: VideoChannelService,
     private videoCaptionService: VideoCaptionService
   ) {
@@ -49,7 +49,7 @@ export class VideoUpdateResolver implements Resolve<any> {
         ),
 
       video.isLive
-          ? this.videoLiveService.getVideoLive(video.id)
+          ? this.liveVideoService.getVideoLive(video.id)
           : of(undefined)
     ]
   }
index 0d1768aa989387ae8276fe436baca9f3a83514f2..13242a2bcaa1c29118c70ee62608a22e09b27819 100644 (file)
       This video will be published on {{ video.scheduledUpdate.updateAt | date: 'full' }}.
     </div>
 
+    <div i18n class="col-md-12 alert alert-info" *ngIf="isWaitingForLive()">
+      This live has not started yet.
+    </div>
+
+    <div i18n class="col-md-12 alert alert-info" *ngIf="isLiveEnded()">
+      This live is finished.
+    </div>
+
     <div class="col-md-12 alert alert-danger" *ngIf="video?.blacklisted">
       <div class="blocked-label" i18n>This video is blocked.</div>
       {{ video.blockedReason }}
                     </div>
                   </div>
 
-                  <ng-container *ngIf="!isUserLoggedIn()">
+                  <ng-container *ngIf="!isUserLoggedIn() && !isLive()">
                     <button
                       *ngIf="isVideoDownloadable()" class="action-button action-button-save"
                       (click)="showDownloadModal()" (keydown.enter)="showDownloadModal()"
index b2bd04cf30fc64fe7556b8e256e4eb8de820bb3d..4bf5ff8088886cf80c9dca09545cf9c7bd765030 100644 (file)
@@ -50,6 +50,8 @@ $video-info-margin-left: 44px;
 }
 
 #video-wrapper {
+  $video-height: 66vh;
+
   background-color: #000;
   display: flex;
   justify-content: center;
@@ -58,6 +60,7 @@ $video-info-margin-left: 44px;
     display: flex;
     justify-content: center;
     flex-grow: 1;
+    height: $video-height;
   }
 
   .remote-server-down {
@@ -84,7 +87,7 @@ $video-info-margin-left: 44px;
   ::ng-deep .video-js {
     width: 100%;
     max-width: getPlayerWidth(66vh);
-    height: 66vh;
+    height: $video-height;
 
     // VideoJS create an inner video player
     video {
index fde32dc7401bc78c121858b8a2a911a18e80057f..e4edb42fb893391aac6a73a671faa012ed345dab 100644 (file)
@@ -4,7 +4,17 @@ import { catchError } from 'rxjs/operators'
 import { PlatformLocation } from '@angular/common'
 import { ChangeDetectorRef, Component, ElementRef, Inject, LOCALE_ID, NgZone, OnDestroy, OnInit, ViewChild } from '@angular/core'
 import { ActivatedRoute, Router } from '@angular/router'
-import { AuthService, AuthUser, ConfirmService, MarkdownService, Notifier, RestExtractor, ServerService, UserService } from '@app/core'
+import {
+  AuthService,
+  AuthUser,
+  ConfirmService,
+  MarkdownService,
+  Notifier,
+  PeerTubeSocket,
+  RestExtractor,
+  ServerService,
+  UserService
+} from '@app/core'
 import { HooksService } from '@app/core/plugins/hooks.service'
 import { RedirectService } from '@app/core/routing/redirect.service'
 import { isXPercentInViewport, scrollToTop } from '@app/helpers'
@@ -30,6 +40,8 @@ import { environment } from '../../../environments/environment'
 import { VideoSupportComponent } from './modal/video-support.component'
 import { VideoWatchPlaylistComponent } from './video-watch-playlist.component'
 
+type URLOptions = CustomizationOptions & { playerMode: PlayerMode }
+
 @Component({
   selector: 'my-video-watch',
   templateUrl: './video-watch.component.html',
@@ -76,6 +88,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
   private paramsSub: Subscription
   private queryParamsSub: Subscription
   private configSub: Subscription
+  private liveVideosSub: Subscription
 
   private serverConfig: ServerConfig
 
@@ -99,6 +112,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
     private videoCaptionService: VideoCaptionService,
     private hotkeysService: HotkeysService,
     private hooks: HooksService,
+    private peertubeSocket: PeerTubeSocket,
     private location: PlatformLocation,
     @Inject(LOCALE_ID) private localeId: string
   ) {
@@ -165,6 +179,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
     if (this.paramsSub) this.paramsSub.unsubscribe()
     if (this.queryParamsSub) this.queryParamsSub.unsubscribe()
     if (this.configSub) this.configSub.unsubscribe()
+    if (this.liveVideosSub) this.liveVideosSub.unsubscribe()
 
     // Unbind hotkeys
     this.hotkeysService.remove(this.hotkeys)
@@ -306,6 +321,18 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
     return this.video && this.video.scheduledUpdate !== undefined
   }
 
+  isLive () {
+    return !!(this.video?.isLive)
+  }
+
+  isWaitingForLive () {
+    return this.video?.state.id === VideoState.WAITING_FOR_LIVE
+  }
+
+  isLiveEnded () {
+    return this.video?.state.id === VideoState.LIVE_ENDED
+  }
+
   isVideoBlur (video: Video) {
     return video.isVideoNSFWForUser(this.user, this.serverConfig)
   }
@@ -470,8 +497,10 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
   private async onVideoFetched (
     video: VideoDetails,
     videoCaptions: VideoCaption[],
-    urlOptions: CustomizationOptions & { playerMode: PlayerMode }
+    urlOptions: URLOptions
   ) {
+    this.subscribeToLiveEventsIfNeeded(this.video, video)
+
     this.video = video
     this.videoCaptions = videoCaptions
 
@@ -489,6 +518,9 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
       if (res === false) return this.location.back()
     }
 
+    const videoState = this.video.state.id
+    if (videoState === VideoState.LIVE_ENDED || videoState === VideoState.WAITING_FOR_LIVE) return
+
     // Flush old player if needed
     this.flushPlayer()
 
@@ -794,6 +826,29 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
     return !this.player.paused()
   }
 
+  private async subscribeToLiveEventsIfNeeded (oldVideo: VideoDetails, newVideo: VideoDetails) {
+    if (!this.liveVideosSub) {
+      this.liveVideosSub = this.peertubeSocket.getLiveVideosObservable()
+        .subscribe(({ payload }) => {
+          if (payload.state !== VideoState.PUBLISHED || this.video.state.id !== VideoState.WAITING_FOR_LIVE) return
+
+          const videoUUID = this.video.uuid
+
+          // Reset to refetch the video
+          this.video = undefined
+          this.loadVideo(videoUUID)
+        })
+    }
+
+    if (oldVideo && oldVideo.id !== newVideo.id) {
+      await this.peertubeSocket.unsubscribeLiveVideos(oldVideo.id)
+    }
+
+    if (!newVideo.isLive) return
+
+    await this.peertubeSocket.subscribeToLiveVideosSocket(newVideo.id)
+  }
+
   private initHotkeys () {
     this.hotkeys = [
       // These hotkeys are managed by the player
index 22896e2e96ec04d7f136751da234ccc5fa5a909d..6c0a2245d2263f4a528093423de0554182cf47d6 100644 (file)
@@ -4,7 +4,7 @@ import { ToastModule } from 'primeng/toast'
 import { CommonModule } from '@angular/common'
 import { NgModule, Optional, SkipSelf } from '@angular/core'
 import { BrowserAnimationsModule } from '@angular/platform-browser/animations'
-import { UserNotificationSocket } from '@app/core/notification/user-notification-socket.service'
+import { PeerTubeSocket } from '@app/core/notification/peertube-socket.service'
 import { HooksService } from '@app/core/plugins/hooks.service'
 import { PluginService } from '@app/core/plugins/plugin.service'
 import { UnloggedGuard } from '@app/core/routing/unlogged-guard.service'
@@ -84,7 +84,7 @@ import { LocalStorageService, ScreenService, SessionStorageService } from './wra
     RedirectService,
     Notifier,
     MessageService,
-    UserNotificationSocket,
+    PeerTubeSocket,
     ServerConfigResolver,
     CanDeactivateGuard
   ]
index 3e8d9ea650dd0fcd6479c66dd0b6221d0a1e7869..cd9634c8e875eecdaff70ad818389e9f7a408cde 100644 (file)
@@ -1,2 +1,2 @@
 export * from './notifier.service'
-export * from './user-notification-socket.service'
+export * from './peertube-socket.service'
diff --git a/client/src/app/core/notification/peertube-socket.service.ts b/client/src/app/core/notification/peertube-socket.service.ts
new file mode 100644 (file)
index 0000000..8668c44
--- /dev/null
@@ -0,0 +1,86 @@
+import { Subject } from 'rxjs'
+import { Injectable, NgZone } from '@angular/core'
+import { LiveVideoEventPayload, LiveVideoEventType, UserNotification as UserNotificationServer } from '@shared/models'
+import { environment } from '../../../environments/environment'
+import { AuthService } from '../auth'
+
+export type NotificationEvent = 'new' | 'read' | 'read-all'
+
+@Injectable()
+export class PeerTubeSocket {
+  private io: typeof import ('socket.io-client')
+
+  private notificationSubject = new Subject<{ type: NotificationEvent, notification?: UserNotificationServer }>()
+  private liveVideosSubject = new Subject<{ type: LiveVideoEventType, payload: LiveVideoEventPayload }>()
+
+  private notificationSocket: SocketIOClient.Socket
+  private liveVideosSocket: SocketIOClient.Socket
+
+  constructor (
+    private auth: AuthService,
+    private ngZone: NgZone
+  ) {}
+
+  async getMyNotificationsSocket () {
+    await this.initNotificationSocket()
+
+    return this.notificationSubject.asObservable()
+  }
+
+  getLiveVideosObservable () {
+    return this.liveVideosSubject.asObservable()
+  }
+
+  async subscribeToLiveVideosSocket (videoId: number) {
+    await this.initLiveVideosSocket()
+
+    this.liveVideosSocket.emit('subscribe', { videoId })
+  }
+
+  async unsubscribeLiveVideos (videoId: number) {
+    if (!this.liveVideosSocket) return
+
+    this.liveVideosSocket.emit('unsubscribe', { videoId })
+  }
+
+  dispatchNotificationEvent (type: NotificationEvent, notification?: UserNotificationServer) {
+    this.notificationSubject.next({ type, notification })
+  }
+
+  private async initNotificationSocket () {
+    if (this.notificationSocket) return
+
+    await this.importIOIfNeeded()
+
+    this.ngZone.runOutsideAngular(() => {
+      this.notificationSocket = this.io(environment.apiUrl + '/user-notifications', {
+        query: { accessToken: this.auth.getAccessToken() }
+      })
+
+      this.notificationSocket.on('new-notification', (n: UserNotificationServer) => this.dispatchNotificationEvent('new', n))
+    })
+  }
+
+  private async initLiveVideosSocket () {
+    if (this.liveVideosSocket) return
+
+    await this.importIOIfNeeded()
+
+    this.ngZone.runOutsideAngular(() => {
+      this.liveVideosSocket = this.io(environment.apiUrl + '/live-videos')
+
+      const type: LiveVideoEventType = 'state-change'
+      this.liveVideosSocket.on(type, (payload: LiveVideoEventPayload) => this.dispatchLiveVideoEvent(type, payload))
+    })
+  }
+
+  private async importIOIfNeeded () {
+    if (this.io) return
+
+    this.io = (await import('socket.io-client') as any).default
+  }
+
+  private dispatchLiveVideoEvent (type: LiveVideoEventType, payload: LiveVideoEventPayload) {
+    this.liveVideosSubject.next({ type, payload })
+  }
+}
diff --git a/client/src/app/core/notification/user-notification-socket.service.ts b/client/src/app/core/notification/user-notification-socket.service.ts
deleted file mode 100644 (file)
index 37f0bc3..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-import { Subject } from 'rxjs'
-import { Injectable, NgZone } from '@angular/core'
-import { UserNotification as UserNotificationServer } from '@shared/models'
-import { environment } from '../../../environments/environment'
-import { AuthService } from '../auth'
-
-export type NotificationEvent = 'new' | 'read' | 'read-all'
-
-@Injectable()
-export class UserNotificationSocket {
-  private notificationSubject = new Subject<{ type: NotificationEvent, notification?: UserNotificationServer }>()
-
-  private socket: SocketIOClient.Socket
-
-  constructor (
-    private auth: AuthService,
-    private ngZone: NgZone
-  ) {}
-
-  dispatch (type: NotificationEvent, notification?: UserNotificationServer) {
-    this.notificationSubject.next({ type, notification })
-  }
-
-  async getMyNotificationsSocket () {
-    await this.initSocket()
-
-    return this.notificationSubject.asObservable()
-  }
-
-  private async initSocket () {
-    if (this.socket) return
-
-    // FIXME: import('..') returns a struct module, containing a "default" field corresponding to our sanitizeHtml function
-    const io: typeof import ('socket.io-client') = (await import('socket.io-client') as any).default
-
-    this.ngZone.runOutsideAngular(() => {
-      this.socket = io(environment.apiUrl + '/user-notifications', {
-        query: { accessToken: this.auth.getAccessToken() }
-      })
-
-      this.socket.on('new-notification', (n: UserNotificationServer) => this.dispatch('new', n))
-    })
-  }
-}
index 8b99550699b701d223d0d9bac08146052af92d56..ed3ffc2d8e7f97943d21a0f3903c4261af849637 100644 (file)
@@ -2,7 +2,7 @@ import { Subject, Subscription } from 'rxjs'
 import { filter } from 'rxjs/operators'
 import { Component, EventEmitter, Input, Output, OnDestroy, OnInit, ViewChild } from '@angular/core'
 import { NavigationEnd, Router } from '@angular/router'
-import { Notifier, User, UserNotificationSocket } from '@app/core'
+import { Notifier, User, PeerTubeSocket } from '@app/core'
 import { UserNotificationService } from '@app/shared/shared-main'
 import { NgbPopover } from '@ng-bootstrap/ng-bootstrap'
 
@@ -27,7 +27,7 @@ export class AvatarNotificationComponent implements OnInit, OnDestroy {
 
   constructor (
     private userNotificationService: UserNotificationService,
-    private userNotificationSocket: UserNotificationSocket,
+    private peertubeSocket: PeerTubeSocket,
     private notifier: Notifier,
     private router: Router
   ) {
@@ -75,7 +75,7 @@ export class AvatarNotificationComponent implements OnInit, OnDestroy {
   }
 
   private async subscribeToNotifications () {
-    const obs = await this.userNotificationSocket.getMyNotificationsSocket()
+    const obs = await this.peertubeSocket.getMyNotificationsSocket()
 
     this.notificationSub = obs.subscribe(data => {
       if (data.type === 'new') return this.unreadNotifications++
index bca67b193275b3b0bbbc497ca00c6528c779072b..0580872f46ecf4460eda1e179d09d2a5ca3261ec 100644 (file)
@@ -23,7 +23,7 @@ import { FeedComponent } from './feeds'
 import { LoaderComponent, SmallLoaderComponent } from './loaders'
 import { HelpComponent, ListOverflowComponent, TopMenuDropdownComponent } from './misc'
 import { UserHistoryService, UserNotificationsComponent, UserNotificationService, UserQuotaComponent } from './users'
-import { RedundancyService, VideoImportService, VideoOwnershipService, VideoService, VideoLiveService } from './video'
+import { RedundancyService, VideoImportService, VideoOwnershipService, VideoService, LiveVideoService } from './video'
 import { VideoCaptionService } from './video-caption'
 import { VideoChannelService } from './video-channel'
 
@@ -142,7 +142,7 @@ import { VideoChannelService } from './video-channel'
     RedundancyService,
     VideoImportService,
     VideoOwnershipService,
-    VideoLiveService,
+    LiveVideoService,
     VideoService,
 
     VideoCaptionService,
index 7b9dc34be417e9e09bfd1fceed60e062edf17daf..9014b48a8d894ae43dbf5e829254956fa25da6fd 100644 (file)
@@ -1,7 +1,7 @@
 import { catchError, map, tap } from 'rxjs/operators'
 import { HttpClient, HttpParams } from '@angular/common/http'
 import { Injectable } from '@angular/core'
-import { ComponentPaginationLight, RestExtractor, RestService, User, UserNotificationSocket, AuthService } from '@app/core'
+import { ComponentPaginationLight, RestExtractor, RestService, User, PeerTubeSocket, AuthService } from '@app/core'
 import { ResultList, UserNotification as UserNotificationServer, UserNotificationSetting } from '@shared/models'
 import { environment } from '../../../../environments/environment'
 import { UserNotification } from './user-notification.model'
@@ -17,7 +17,7 @@ export class UserNotificationService {
     private auth: AuthService,
     private restExtractor: RestExtractor,
     private restService: RestService,
-    private userNotificationSocket: UserNotificationSocket
+    private peertubeSocket: PeerTubeSocket
   ) {}
 
   listMyNotifications (parameters: {
@@ -57,7 +57,7 @@ export class UserNotificationService {
     return this.authHttp.post(url, body, { headers })
                .pipe(
                  map(this.restExtractor.extractDataBool),
-                 tap(() => this.userNotificationSocket.dispatch('read')),
+                 tap(() => this.peertubeSocket.dispatchNotificationEvent('read')),
                  catchError(res => this.restExtractor.handleError(res))
                )
   }
@@ -69,12 +69,12 @@ export class UserNotificationService {
     return this.authHttp.post(url, {}, { headers })
                .pipe(
                  map(this.restExtractor.extractDataBool),
-                 tap(() => this.userNotificationSocket.dispatch('read-all')),
+                 tap(() => this.peertubeSocket.dispatchNotificationEvent('read-all')),
                  catchError(res => this.restExtractor.handleError(res))
                )
   }
 
-  updateNotificationSettings (user: User, settings: UserNotificationSetting) {
+  updateNotificationSettings (settings: UserNotificationSetting) {
     const url = UserNotificationService.BASE_NOTIFICATION_SETTINGS
 
     return this.authHttp.put(url, settings)
index 121635a30d45bbf01064a6c371ed02425739436b..f69089517def940a00ce7f3b6b4eb726576a149b 100644 (file)
@@ -1,8 +1,8 @@
+export * from './live-video.service'
 export * from './redundancy.service'
 export * from './video-details.model'
 export * from './video-edit.model'
 export * from './video-import.service'
-export * from './video-live.service'
 export * from './video-ownership.service'
 export * from './video.model'
 export * from './video.service'
similarity index 74%
rename from client/src/app/shared/shared-main/video/video-live.service.ts
rename to client/src/app/shared/shared-main/video/live-video.service.ts
index 12daff7560f8f4459540b0ea309840c4600769fd..2cd1c66a5f7acd2763e9f11749198d5932427e6c 100644 (file)
@@ -2,11 +2,11 @@ import { catchError } from 'rxjs/operators'
 import { HttpClient } from '@angular/common/http'
 import { Injectable } from '@angular/core'
 import { RestExtractor } from '@app/core'
-import { VideoCreate, VideoLive } from '@shared/models'
+import { VideoCreate, LiveVideo } from '@shared/models'
 import { environment } from '../../../../environments/environment'
 
 @Injectable()
-export class VideoLiveService {
+export class LiveVideoService {
   static BASE_VIDEO_LIVE_URL = environment.apiUrl + '/api/v1/videos/live/'
 
   constructor (
@@ -16,13 +16,13 @@ export class VideoLiveService {
 
   goLive (video: VideoCreate) {
     return this.authHttp
-               .post<{ video: { id: number, uuid: string } }>(VideoLiveService.BASE_VIDEO_LIVE_URL, video)
+               .post<{ video: { id: number, uuid: string } }>(LiveVideoService.BASE_VIDEO_LIVE_URL, video)
                .pipe(catchError(err => this.restExtractor.handleError(err)))
   }
 
   getVideoLive (videoId: number | string) {
     return this.authHttp
-               .get<VideoLive>(VideoLiveService.BASE_VIDEO_LIVE_URL + videoId)
+               .get<LiveVideo>(LiveVideoService.BASE_VIDEO_LIVE_URL + videoId)
                .pipe(catchError(err => this.restExtractor.handleError(err)))
   }
 }
index 3222dc5a688451fc8570bac420d3a98554d1ebe0..80b4e446a40b0aae8a7473913df5b7d764b5049a 100644 (file)
 
       <div class="filters">
         <div>
-          <div class="form-group start-at">
+          <div class="form-group start-at" *ngIf="!video.isLive">
             <my-peertube-checkbox
               inputName="startAt" [(ngModel)]="customizations.startAtCheckbox"
               i18n-labelText labelText="Start at"
 
         <div class="advanced-filters collapse-transition" [ngbCollapse]="isAdvancedCustomizationCollapsed">
           <div>
-            <div class="form-group stop-at">
+            <div class="form-group stop-at" *ngIf="!video.isLive">
               <my-peertube-checkbox
                 inputName="stopAt" [(ngModel)]="customizations.stopAtCheckbox"
                 i18n-labelText labelText="Stop at"
               ></my-peertube-checkbox>
             </div>
 
-            <div class="form-group">
+            <div class="form-group" *ngIf="!video.isLive">
               <my-peertube-checkbox
                 inputName="loop" [(ngModel)]="customizations.loop"
                 i18n-labelText labelText="Loop"
index 39358e08b7817ef81bc192f9cf165457a41141eb..4ef17bfe3c6f040f85e8db77eab79d6806d12e56 100644 (file)
@@ -146,7 +146,10 @@ export class VideoActionsDropdownComponent implements OnChanges {
   }
 
   isVideoDownloadable () {
-    return this.video && this.video instanceof VideoDetails && this.video.downloadEnabled
+    return this.video &&
+      this.video.isLive !== true &&
+      this.video instanceof VideoDetails &&
+      this.video.downloadEnabled
   }
 
   canVideoBeDuplicated () {
index 7efaeb5a2faf959869821bbac6a7caa56bed4c73..d0937bfc882990a294acea2e2e6b3a546b3db216 100644 (file)
@@ -246,9 +246,20 @@ transcoding:
 live:
   enabled: false
 
+  # Limit lives duration
+  # Set null to disable duration limit
+  max_duration: 5 hours
+
+  # Allow your users to save a replay of their live
+  # PeerTube will transcode segments in a video file
+  # If the user daily/total quota is reached, PeerTube will stop the live
+  # /!\ transcoding.enabled (and not live.transcoding.enabled) has to be true to create a replay
+  allow_replay: true
+
   rtmp:
     port: 1935
 
+  # Allow to transcode the live streaming in multiple live resolutions
   transcoding:
     enabled: false
     threads: 2
index b9279b5e68a7e0f6c8e245c0f63b8e7dfdfca5e4..865ed540087e1dc3259f1452f33d25c856bc1b7d 100644 (file)
@@ -89,7 +89,7 @@ live:
     port: 1935
 
   transcoding:
-    enabled: false
+    enabled: true
     threads: 2
 
     resolutions:
index 606eeba2d5c1812e5c7f67400871d0d91a16591d..82d04a94e462dcd61b5e2840c1c42341a22d4869 100644 (file)
@@ -139,7 +139,8 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
   'email': 5,
   'videos-views': 1,
   'activitypub-refresher': 1,
-  'video-redundancy': 1
+  'video-redundancy': 1,
+  'video-live-ending': 1
 }
 const JOB_CONCURRENCY: { [id in JobType]: number } = {
   'activitypub-http-broadcast': 1,
@@ -152,7 +153,8 @@ const JOB_CONCURRENCY: { [id in JobType]: number } = {
   'email': 5,
   'videos-views': 1,
   'activitypub-refresher': 1,
-  'video-redundancy': 1
+  'video-redundancy': 1,
+  'video-live-ending': 1
 }
 const JOB_TTL: { [id in JobType]: number } = {
   'activitypub-http-broadcast': 60000 * 10, // 10 minutes
@@ -165,7 +167,8 @@ const JOB_TTL: { [id in JobType]: number } = {
   'email': 60000 * 10, // 10 minutes
   'videos-views': undefined, // Unlimited
   'activitypub-refresher': 60000 * 10, // 10 minutes
-  'video-redundancy': 1000 * 3600 * 3 // 3 hours
+  'video-redundancy': 1000 * 3600 * 3, // 3 hours
+  'video-live-ending': 1000 * 60 * 10 // 10 minutes
 }
 const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
   'videos-views': {
@@ -605,6 +608,7 @@ const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls')
 
 const VIDEO_LIVE = {
   EXTENSION: '.ts',
+  CLEANUP_DELAY: 1000 * 60 * 5, // 5 mintues
   RTMP: {
     CHUNK_SIZE: 60000,
     GOP_CACHE: true,
index 049e06cff86ce5375bc9242258cc499916cc740b..ab23ff507d6aec4f4758c23bcc1a7e3d19218e03 100644 (file)
@@ -66,6 +66,7 @@ import { FilteredModelAttributes } from '../../types/sequelize'
 import { ActorFollowScoreCache } from '../files-cache'
 import { JobQueue } from '../job-queue'
 import { Notifier } from '../notifier'
+import { PeerTubeSocket } from '../peertube-socket'
 import { createPlaceholderThumbnail, createVideoMiniatureFromUrl } from '../thumbnail'
 import { setVideoTags } from '../video'
 import { autoBlacklistVideoIfNeeded } from '../video-blacklist'
@@ -348,6 +349,7 @@ async function updateVideoFromAP (options: {
       video.privacy = videoData.privacy
       video.channelId = videoData.channelId
       video.views = videoData.views
+      video.isLive = videoData.isLive
 
       const videoUpdated = await video.save(sequelizeOptions) as MVideoFullLight
 
@@ -434,6 +436,7 @@ async function updateVideoFromAP (options: {
     })
 
     if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated) // Notify our users?
+    if (videoUpdated.isLive) PeerTubeSocket.Instance.sendVideoLiveNewState(video)
 
     logger.info('Remote video with uuid %s updated', videoObject.uuid)
 
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
new file mode 100644 (file)
index 0000000..1a58a9f
--- /dev/null
@@ -0,0 +1,47 @@
+import * as Bull from 'bull'
+import { readdir, remove } from 'fs-extra'
+import { join } from 'path'
+import { getHLSDirectory } from '@server/lib/video-paths'
+import { VideoModel } from '@server/models/video/video'
+import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
+import { VideoLiveEndingPayload } from '@shared/models'
+import { logger } from '../../../helpers/logger'
+
+async function processVideoLiveEnding (job: Bull.Job) {
+  const payload = job.data as VideoLiveEndingPayload
+
+  const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId)
+  if (!video) {
+    logger.warn('Video live %d does not exist anymore. Cannot cleanup.', payload.videoId)
+    return
+  }
+
+  const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
+  const hlsDirectory = getHLSDirectory(video, false)
+
+  const files = await readdir(hlsDirectory)
+
+  for (const filename of files) {
+    if (
+      filename.endsWith('.ts') ||
+      filename.endsWith('.m3u8') ||
+      filename.endsWith('.mpd') ||
+      filename.endsWith('.m4s') ||
+      filename.endsWith('.tmp')
+    ) {
+      const p = join(hlsDirectory, filename)
+
+      remove(p)
+        .catch(err => logger.error('Cannot remove %s.', p, { err }))
+    }
+  }
+
+  streamingPlaylist.destroy()
+    .catch(err => logger.error('Cannot remove live streaming playlist.', { err }))
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+  processVideoLiveEnding
+}
index 14e181835a381bf8cde10dffcf5091f297bcb471..8d97434ac59b954d33510dd32780a051516ec955 100644 (file)
@@ -10,6 +10,7 @@ import {
   RefreshPayload,
   VideoFileImportPayload,
   VideoImportPayload,
+  VideoLiveEndingPayload,
   VideoRedundancyPayload,
   VideoTranscodingPayload
 } from '../../../shared/models'
@@ -27,6 +28,7 @@ import { processVideosViews } from './handlers/video-views'
 import { refreshAPObject } from './handlers/activitypub-refresher'
 import { processVideoFileImport } from './handlers/video-file-import'
 import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
+import { processVideoLiveEnding } from './handlers/video-live-ending'
 
 type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -39,8 +41,13 @@ type CreateJobArgument =
   { type: 'video-import', payload: VideoImportPayload } |
   { type: 'activitypub-refresher', payload: RefreshPayload } |
   { type: 'videos-views', payload: {} } |
+  { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
   { type: 'video-redundancy', payload: VideoRedundancyPayload }
 
+type CreateJobOptions = {
+  delay?: number
+}
+
 const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
   'activitypub-http-broadcast': processActivityPubHttpBroadcast,
   'activitypub-http-unicast': processActivityPubHttpUnicast,
@@ -52,6 +59,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
   'video-import': processVideoImport,
   'videos-views': processVideosViews,
   'activitypub-refresher': refreshAPObject,
+  'video-live-ending': processVideoLiveEnding,
   'video-redundancy': processVideoRedundancy
 }
 
@@ -66,7 +74,8 @@ const jobTypes: JobType[] = [
   'video-import',
   'videos-views',
   'activitypub-refresher',
-  'video-redundancy'
+  'video-redundancy',
+  'video-live-ending'
 ]
 
 class JobQueue {
@@ -122,12 +131,12 @@ class JobQueue {
     }
   }
 
-  createJob (obj: CreateJobArgument): void {
-    this.createJobWithPromise(obj)
+  createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
+    this.createJobWithPromise(obj, options)
         .catch(err => logger.error('Cannot create job.', { err, obj }))
   }
 
-  createJobWithPromise (obj: CreateJobArgument) {
+  createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
     const queue = this.queues[obj.type]
     if (queue === undefined) {
       logger.error('Unknown queue %s: cannot create job.', obj.type)
@@ -137,7 +146,8 @@ class JobQueue {
     const jobArgs: Bull.JobOptions = {
       backoff: { delay: 60 * 1000, type: 'exponential' },
       attempts: JOB_ATTEMPTS[obj.type],
-      timeout: JOB_TTL[obj.type]
+      timeout: JOB_TTL[obj.type],
+      delay: options.delay
     }
 
     return queue.add(obj.payload, jobArgs)
index f602bfb6da36fd97684b0b45abc2c9ec1242cea8..41176d19742b64b5e26a5cefb0323e638dddd963 100644 (file)
@@ -2,18 +2,22 @@
 import { AsyncQueue, queue } from 'async'
 import * as chokidar from 'chokidar'
 import { FfmpegCommand } from 'fluent-ffmpeg'
-import { ensureDir, readdir, remove } from 'fs-extra'
-import { basename, join } from 'path'
+import { ensureDir } from 'fs-extra'
+import { basename } from 'path'
 import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils'
 import { logger } from '@server/helpers/logger'
 import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
 import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
+import { VideoModel } from '@server/models/video/video'
 import { VideoFileModel } from '@server/models/video/video-file'
 import { VideoLiveModel } from '@server/models/video/video-live'
 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
-import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models'
+import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models'
 import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
+import { federateVideoIfNeeded } from './activitypub/videos'
 import { buildSha256Segment } from './hls'
+import { JobQueue } from './job-queue'
+import { PeerTubeSocket } from './peertube-socket'
 import { getHLSDirectory } from './video-paths'
 
 const NodeRtmpServer = require('node-media-server/node_rtmp_server')
@@ -47,6 +51,7 @@ class LiveManager {
   private static instance: LiveManager
 
   private readonly transSessions = new Map<string, FfmpegCommand>()
+  private readonly videoSessions = new Map<number, string>()
   private readonly segmentsSha256 = new Map<string, Map<string, string>>()
 
   private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
@@ -56,7 +61,8 @@ class LiveManager {
   }
 
   init () {
-    this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => {
+    const events = this.getContext().nodeEvent
+    events.on('postPublish', (sessionId: string, streamPath: string) => {
       logger.debug('RTMP received stream', { id: sessionId, streamPath })
 
       const splittedPath = streamPath.split('/')
@@ -69,7 +75,7 @@ class LiveManager {
         .catch(err => logger.error('Cannot handle sessions.', { err }))
     })
 
-    this.getContext().nodeEvent.on('donePublish', sessionId => {
+    events.on('donePublish', sessionId => {
       this.abortSession(sessionId)
     })
 
@@ -115,6 +121,16 @@ class LiveManager {
     return this.segmentsSha256.get(videoUUID)
   }
 
+  stopSessionOf (videoId: number) {
+    const sessionId = this.videoSessions.get(videoId)
+    if (!sessionId) return
+
+    this.abortSession(sessionId)
+
+    this.onEndTransmuxing(videoId)
+      .catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
+  }
+
   private getContext () {
     return context
   }
@@ -135,6 +151,13 @@ class LiveManager {
     }
 
     const video = videoLive.Video
+    if (video.isBlacklisted()) {
+      logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
+      return this.abortSession(sessionId)
+    }
+
+    this.videoSessions.set(video.id, sessionId)
+
     const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
 
     const session = this.getContext().sessions.get(sessionId)
@@ -154,11 +177,6 @@ class LiveManager {
       type: VideoStreamingPlaylistType.HLS
     }, { returning: true }) as [ MStreamingPlaylist, boolean ]
 
-    video.state = VideoState.PUBLISHED
-    await video.save()
-
-    // FIXME: federation?
-
     return this.runMuxing({
       sessionId,
       videoLive,
@@ -207,11 +225,46 @@ class LiveManager {
 
     this.transSessions.set(sessionId, ffmpegExec)
 
+    const videoUUID = videoLive.Video.uuid
+    const tsWatcher = chokidar.watch(outPath + '/*.ts')
+
+    const updateHandler = segmentPath => {
+      this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
+    }
+
+    const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
+
+    tsWatcher.on('add', p => updateHandler(p))
+    tsWatcher.on('change', p => updateHandler(p))
+    tsWatcher.on('unlink', p => deleteHandler(p))
+
+    const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
+    masterWatcher.on('add', async () => {
+      try {
+        const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
+
+        video.state = VideoState.PUBLISHED
+        await video.save()
+        videoLive.Video = video
+
+        await federateVideoIfNeeded(video, false)
+
+        PeerTubeSocket.Instance.sendVideoLiveNewState(video)
+      } catch (err) {
+        logger.error('Cannot federate video %d.', videoLive.videoId, { err })
+      } finally {
+        masterWatcher.close()
+          .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
+      }
+    })
+
     const onFFmpegEnded = () => {
-      watcher.close()
-        .catch(err => logger.error('Cannot close watcher of %s.', outPath, { err }))
+      logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath)
 
-      this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath)
+      Promise.all([ tsWatcher.close(), masterWatcher.close() ])
+        .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
+
+      this.onEndTransmuxing(videoLive.Video.id)
         .catch(err => logger.error('Error in closed transmuxing.', { err }))
     }
 
@@ -225,44 +278,30 @@ class LiveManager {
     })
 
     ffmpegExec.on('end', () => onFFmpegEnded())
-
-    const videoUUID = videoLive.Video.uuid
-    const watcher = chokidar.watch(outPath + '/*.ts')
-
-    const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID })
-    const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID })
-
-    watcher.on('add', p => updateHandler(p))
-    watcher.on('change', p => updateHandler(p))
-    watcher.on('unlink', p => deleteHandler(p))
   }
 
-  private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) {
-    logger.info('RTMP transmuxing for %s ended.', streamPath)
+  private async onEndTransmuxing (videoId: number) {
+    try {
+      const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
+      if (!fullVideo) return
 
-    const files = await readdir(outPath)
+      JobQueue.Instance.createJob({
+        type: 'video-live-ending',
+        payload: {
+          videoId: fullVideo.id
+        }
+      }, { delay: VIDEO_LIVE.CLEANUP_DELAY })
 
-    for (const filename of files) {
-      if (
-        filename.endsWith('.ts') ||
-        filename.endsWith('.m3u8') ||
-        filename.endsWith('.mpd') ||
-        filename.endsWith('.m4s') ||
-        filename.endsWith('.tmp')
-      ) {
-        const p = join(outPath, filename)
+      // FIXME: use end
+      fullVideo.state = VideoState.WAITING_FOR_LIVE
+      await fullVideo.save()
 
-        remove(p)
-          .catch(err => logger.error('Cannot remove %s.', p, { err }))
-      }
-    }
+      PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
 
-    playlist.destroy()
-      .catch(err => logger.error('Cannot remove live streaming playlist.', { err }))
-
-    video.state = VideoState.LIVE_ENDED
-    video.save()
-      .catch(err => logger.error('Cannot save new video state of live streaming.', { err }))
+      await federateVideoIfNeeded(fullVideo, false)
+    } catch (err) {
+      logger.error('Cannot save/federate new video state of live streaming.', { err })
+    }
   }
 
   private async addSegmentSha (options: SegmentSha256QueueParam) {
index 2e4b15b380e16f5f772e6e3c8629569a6f32d6b1..c918a8685d05856627c1b58b13471a2381d30309 100644 (file)
@@ -1,14 +1,18 @@
-import * as SocketIO from 'socket.io'
-import { authenticateSocket } from '../middlewares'
-import { logger } from '../helpers/logger'
+import { Socket } from 'dgram'
 import { Server } from 'http'
+import * as SocketIO from 'socket.io'
+import { MVideo } from '@server/types/models'
 import { UserNotificationModelForApi } from '@server/types/models/user'
+import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
+import { logger } from '../helpers/logger'
+import { authenticateSocket } from '../middlewares'
 
 class PeerTubeSocket {
 
   private static instance: PeerTubeSocket
 
   private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {}
+  private liveVideosNamespace: SocketIO.Namespace
 
   private constructor () {}
 
@@ -32,19 +36,37 @@ class PeerTubeSocket {
           this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket)
         })
       })
+
+    this.liveVideosNamespace = io.of('/live-videos')
+      .on('connection', socket => {
+        socket.on('subscribe', ({ videoId }) => socket.join(videoId))
+        socket.on('unsubscribe', ({ videoId }) => socket.leave(videoId))
+      })
   }
 
   sendNotification (userId: number, notification: UserNotificationModelForApi) {
     const sockets = this.userNotificationSockets[userId]
-
     if (!sockets) return
 
+    logger.debug('Sending user notification to user %d.', userId)
+
     const notificationMessage = notification.toFormattedJSON()
     for (const socket of sockets) {
       socket.emit('new-notification', notificationMessage)
     }
   }
 
+  sendVideoLiveNewState (video: MVideo) {
+    const data: LiveVideoEventPayload = { state: video.state }
+    const type: LiveVideoEventType = 'state-change'
+
+    logger.debug('Sending video live new state notification of %s.', video.url)
+
+    this.liveVideosNamespace
+      .in(video.id)
+      .emit(type, data)
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }
index bdbcffda668c242262d1d33915dfb7de331de312..f6c66b6dd8a658d3391440927f0c3537c64aa6c6 100644 (file)
@@ -17,6 +17,7 @@ import { sendDeleteVideo } from './activitypub/send'
 import { federateVideoIfNeeded } from './activitypub/videos'
 import { Notifier } from './notifier'
 import { Hooks } from './plugins/hooks'
+import { LiveManager } from './live-manager'
 
 async function autoBlacklistVideoIfNeeded (parameters: {
   video: MVideoWithBlacklistLight
@@ -73,6 +74,10 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video
     await sendDeleteVideo(videoInstance, undefined)
   }
 
+  if (videoInstance.isLive) {
+    LiveManager.Instance.stopSessionOf(videoInstance.id)
+  }
+
   Notifier.Instance.notifyOnVideoBlacklist(blacklist)
 }
 
index 6929b96886cd48270c4c1aee80a4d393e2b140e5..8608bc84ca9f654ff2b262904ef4072ff177b15d 100644 (file)
@@ -1,14 +1,21 @@
 import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript'
 import { WEBSERVER } from '@server/initializers/constants'
 import { MVideoLive, MVideoLiveVideo } from '@server/types/models'
-import { VideoLive } from '@shared/models/videos/video-live.model'
+import { LiveVideo, VideoState } from '@shared/models'
 import { VideoModel } from './video'
+import { VideoBlacklistModel } from './video-blacklist'
 
 @DefaultScope(() => ({
   include: [
     {
       model: VideoModel,
-      required: true
+      required: true,
+      include: [
+        {
+          model: VideoBlacklistModel,
+          required: false
+        }
+      ]
     }
   ]
 }))
@@ -49,7 +56,22 @@ export class VideoLiveModel extends Model<VideoLiveModel> {
     const query = {
       where: {
         streamKey
-      }
+      },
+      include: [
+        {
+          model: VideoModel.unscoped(),
+          required: true,
+          where: {
+            state: VideoState.WAITING_FOR_LIVE
+          },
+          include: [
+            {
+              model: VideoBlacklistModel.unscoped(),
+              required: false
+            }
+          ]
+        }
+      ]
     }
 
     return VideoLiveModel.findOne<MVideoLiveVideo>(query)
@@ -65,7 +87,7 @@ export class VideoLiveModel extends Model<VideoLiveModel> {
     return VideoLiveModel.findOne<MVideoLive>(query)
   }
 
-  toFormattedJSON (): VideoLive {
+  toFormattedJSON (): LiveVideo {
     return {
       rtmpUrl: WEBSERVER.RTMP_URL,
       streamKey: this.streamKey
index b8dc7c450340920a9de516b0cd8f83eb91463835..73bd898445a5a2d8be54f38bf28d0595494e26d0 100644 (file)
@@ -153,6 +153,17 @@ export class VideoStreamingPlaylistModel extends Model<VideoStreamingPlaylistMod
     return VideoStreamingPlaylistModel.findByPk(id, options)
   }
 
+  static loadHLSPlaylistByVideo (videoId: number) {
+    const options = {
+      where: {
+        type: VideoStreamingPlaylistType.HLS,
+        videoId
+      }
+    }
+
+    return VideoStreamingPlaylistModel.findOne(options)
+  }
+
   static getHlsPlaylistFilename (resolution: number) {
     return resolution + '.m3u8'
   }
index a3e3b6cfe4c8e96a1337d4f6ac600d1dce637c11..8493ab802bc8841205543d0d7bf129aa6f8bb772 100644 (file)
@@ -127,6 +127,7 @@ import { VideoShareModel } from './video-share'
 import { VideoStreamingPlaylistModel } from './video-streaming-playlist'
 import { VideoTagModel } from './video-tag'
 import { VideoViewModel } from './video-view'
+import { LiveManager } from '@server/lib/live-manager'
 
 export enum ScopeNames {
   AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS',
@@ -799,6 +800,13 @@ export class VideoModel extends Model<VideoModel> {
     return undefined
   }
 
+  @BeforeDestroy
+  static stopLiveIfNeeded (instance: VideoModel) {
+    if (!instance.isLive) return
+
+    return LiveManager.Instance.stopSessionOf(instance.id)
+  }
+
   @BeforeDestroy
   static invalidateCache (instance: VideoModel) {
     ModelCache.Instance.invalidateCache('video', instance.id)
index c643e60179832ebd2bd7c8755b9b36cdd0883fa2..346b25607624158bc8d2e5b5aab404ad8dbd12c4 100644 (file)
@@ -16,6 +16,7 @@ export type JobType =
   | 'videos-views'
   | 'activitypub-refresher'
   | 'video-redundancy'
+  | 'video-live-ending'
 
 export interface Job {
   id: number
@@ -126,3 +127,7 @@ export type VideoTranscodingPayload =
   | NewResolutionTranscodingPayload
   | OptimizeTranscodingPayload
   | MergeAudioTranscodingPayload
+
+export interface VideoLiveEndingPayload {
+  videoId: number
+}
index 2a032a456d08197fe50027fc3ad7cbe1bb7c00f7..abf144f23f3ebaacf3ae32287e44f32998c417b0 100644 (file)
@@ -1,6 +1,7 @@
 export * from './blacklist'
 export * from './caption'
 export * from './channel'
+export * from './live'
 export * from './import'
 export * from './playlist'
 export * from './rate'
@@ -19,7 +20,7 @@ export * from './video-create.model'
 export * from './video-file-metadata'
 export * from './video-file.model'
 
-export * from './video-live.model'
+export * from './live/live-video.model'
 
 export * from './video-privacy.enum'
 export * from './video-query.type'
diff --git a/shared/models/videos/live/index.ts b/shared/models/videos/live/index.ts
new file mode 100644 (file)
index 0000000..4f33173
--- /dev/null
@@ -0,0 +1,3 @@
+export * from './live-video-event-payload.model'
+export * from './live-video-event.type'
+export * from './live-video.model'
diff --git a/shared/models/videos/live/live-video-event-payload.model.ts b/shared/models/videos/live/live-video-event-payload.model.ts
new file mode 100644 (file)
index 0000000..f9038f4
--- /dev/null
@@ -0,0 +1,5 @@
+import { VideoState } from '../video-state.enum'
+
+export interface LiveVideoEventPayload {
+  state: VideoState
+}
diff --git a/shared/models/videos/live/live-video-event.type.ts b/shared/models/videos/live/live-video-event.type.ts
new file mode 100644 (file)
index 0000000..4d15899
--- /dev/null
@@ -0,0 +1 @@
+export type LiveVideoEventType = 'state-change'
similarity index 57%
rename from shared/models/videos/video-live.model.ts
rename to shared/models/videos/live/live-video.model.ts
index 2a834dc9157410cbfc709aa19247aa0cbc9fe7d7..74abee96e799c982427edce8e77868c52a1ad84e 100644 (file)
@@ -1,4 +1,4 @@
-export interface VideoLive {
+export interface LiveVideo {
   rtmpUrl: string
   streamKey: string
 }