]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/peertube-socket.ts
Implement remote runner jobs in server
[github/Chocobozzz/PeerTube.git] / server / lib / peertube-socket.ts
index 0398ca61dbf8f71e68c52be44cbed4fe2023dd7b..ded7e97433a8a547767981e71d13963a33e16690 100644 (file)
@@ -2,10 +2,12 @@ import { Server as HTTPServer } from 'http'
 import { Namespace, Server as SocketServer, Socket } from 'socket.io'
 import { isIdValid } from '@server/helpers/custom-validators/misc'
 import { MVideo, MVideoImmutable } from '@server/types/models'
+import { MRunner } from '@server/types/models/runners'
 import { UserNotificationModelForApi } from '@server/types/models/user'
 import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
 import { logger } from '../helpers/logger'
-import { authenticateSocket } from '../middlewares'
+import { authenticateRunnerSocket, authenticateSocket } from '../middlewares'
+import { Debounce } from '@server/helpers/debounce'
 
 class PeerTubeSocket {
 
@@ -13,6 +15,7 @@ class PeerTubeSocket {
 
   private userNotificationSockets: { [ userId: number ]: Socket[] } = {}
   private liveVideosNamespace: Namespace
+  private readonly runnerSockets = new Set<Socket>()
 
   private constructor () {}
 
@@ -24,7 +27,7 @@ class PeerTubeSocket {
       .on('connection', socket => {
         const userId = socket.handshake.auth.user.id
 
-        logger.debug('User %d connected on the notification system.', userId)
+        logger.debug('User %d connected to the notification system.', userId)
 
         if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = []
 
@@ -53,6 +56,22 @@ class PeerTubeSocket {
           socket.leave(videoId)
         })
       })
+
+    io.of('/runners')
+      .use(authenticateRunnerSocket)
+      .on('connection', socket => {
+        const runner: MRunner = socket.handshake.auth.runner
+
+        logger.debug(`New runner "${runner.name}" connected to the notification system.`)
+
+        this.runnerSockets.add(socket)
+
+        socket.on('disconnect', () => {
+          logger.debug(`Runner "${runner.name}" disconnected from the notification system.`)
+
+          this.runnerSockets.delete(socket)
+        })
+      })
   }
 
   sendNotification (userId: number, notification: UserNotificationModelForApi) {
@@ -89,6 +108,15 @@ class PeerTubeSocket {
       .emit(type, data)
   }
 
+  @Debounce({ timeoutMS: 1000 })
+  sendAvailableJobsPingToRunners () {
+    logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`)
+
+    for (const runners of this.runnerSockets) {
+      runners.emit('available-jobs')
+    }
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }