]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/peertube-socket.ts
Merge remote-tracking branch 'weblate/develop' into develop
[github/Chocobozzz/PeerTube.git] / server / lib / peertube-socket.ts
index 26ced351f3333cf1dfe756dc333ad95653c9ff88..ded7e97433a8a547767981e71d13963a33e16690 100644 (file)
@@ -1,26 +1,33 @@
-import * as SocketIO from 'socket.io'
-import { authenticateSocket } from '../middlewares'
+import { Server as HTTPServer } from 'http'
+import { Namespace, Server as SocketServer, Socket } from 'socket.io'
+import { isIdValid } from '@server/helpers/custom-validators/misc'
+import { MVideo, MVideoImmutable } from '@server/types/models'
+import { MRunner } from '@server/types/models/runners'
+import { UserNotificationModelForApi } from '@server/types/models/user'
+import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
 import { logger } from '../helpers/logger'
-import { Server } from 'http'
-import { UserNotificationModelForApi } from '@server/typings/models/user'
+import { authenticateRunnerSocket, authenticateSocket } from '../middlewares'
+import { Debounce } from '@server/helpers/debounce'
 
 class PeerTubeSocket {
 
   private static instance: PeerTubeSocket
 
-  private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {}
+  private userNotificationSockets: { [ userId: number ]: Socket[] } = {}
+  private liveVideosNamespace: Namespace
+  private readonly runnerSockets = new Set<Socket>()
 
   private constructor () {}
 
-  init (server: Server) {
-    const io = SocketIO(server)
+  init (server: HTTPServer) {
+    const io = new SocketServer(server)
 
     io.of('/user-notifications')
       .use(authenticateSocket)
       .on('connection', socket => {
-        const userId = socket.handshake.query.user.id
+        const userId = socket.handshake.auth.user.id
 
-        logger.debug('User %d connected on the notification system.', userId)
+        logger.debug('User %d connected to the notification system.', userId)
 
         if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = []
 
@@ -32,19 +39,84 @@ class PeerTubeSocket {
           this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket)
         })
       })
+
+    this.liveVideosNamespace = io.of('/live-videos')
+      .on('connection', socket => {
+        socket.on('subscribe', ({ videoId }) => {
+          if (!isIdValid(videoId)) return
+
+          /* eslint-disable @typescript-eslint/no-floating-promises */
+          socket.join(videoId)
+        })
+
+        socket.on('unsubscribe', ({ videoId }) => {
+          if (!isIdValid(videoId)) return
+
+          /* eslint-disable @typescript-eslint/no-floating-promises */
+          socket.leave(videoId)
+        })
+      })
+
+    io.of('/runners')
+      .use(authenticateRunnerSocket)
+      .on('connection', socket => {
+        const runner: MRunner = socket.handshake.auth.runner
+
+        logger.debug(`New runner "${runner.name}" connected to the notification system.`)
+
+        this.runnerSockets.add(socket)
+
+        socket.on('disconnect', () => {
+          logger.debug(`Runner "${runner.name}" disconnected from the notification system.`)
+
+          this.runnerSockets.delete(socket)
+        })
+      })
   }
 
   sendNotification (userId: number, notification: UserNotificationModelForApi) {
     const sockets = this.userNotificationSockets[userId]
-
     if (!sockets) return
 
+    logger.debug('Sending user notification to user %d.', userId)
+
     const notificationMessage = notification.toFormattedJSON()
     for (const socket of sockets) {
       socket.emit('new-notification', notificationMessage)
     }
   }
 
+  sendVideoLiveNewState (video: MVideo) {
+    const data: LiveVideoEventPayload = { state: video.state }
+    const type: LiveVideoEventType = 'state-change'
+
+    logger.debug('Sending video live new state notification of %s.', video.url, { state: video.state })
+
+    this.liveVideosNamespace
+      .in(video.id)
+      .emit(type, data)
+  }
+
+  sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
+    const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
+    const type: LiveVideoEventType = 'views-change'
+
+    logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
+
+    this.liveVideosNamespace
+      .in(video.id)
+      .emit(type, data)
+  }
+
+  @Debounce({ timeoutMS: 1000 })
+  sendAvailableJobsPingToRunners () {
+    logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`)
+
+    for (const runners of this.runnerSockets) {
+      runners.emit('available-jobs')
+    }
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }