From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- server/lib/peertube-socket.ts | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) (limited to 'server/lib/peertube-socket.ts') diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 0398ca61d..ded7e9743 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts @@ -2,10 +2,12 @@ import { Server as HTTPServer } from 'http' import { Namespace, Server as SocketServer, Socket } from 'socket.io' import { isIdValid } from '@server/helpers/custom-validators/misc' import { MVideo, MVideoImmutable } from '@server/types/models' +import { MRunner } from '@server/types/models/runners' import { UserNotificationModelForApi } from '@server/types/models/user' import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' import { logger } from '../helpers/logger' -import { authenticateSocket } from '../middlewares' +import { authenticateRunnerSocket, authenticateSocket } from '../middlewares' +import { Debounce } from '@server/helpers/debounce' class PeerTubeSocket { @@ -13,6 +15,7 @@ class PeerTubeSocket { private userNotificationSockets: { [ userId: number ]: Socket[] } = {} private liveVideosNamespace: Namespace + private readonly runnerSockets = new Set() private constructor () {} @@ -24,7 +27,7 @@ class PeerTubeSocket { .on('connection', socket => { const userId = socket.handshake.auth.user.id - logger.debug('User %d connected on the notification system.', userId) + logger.debug('User %d connected to the notification system.', userId) if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] @@ -53,6 +56,22 @@ class PeerTubeSocket { socket.leave(videoId) }) }) + + io.of('/runners') + .use(authenticateRunnerSocket) + .on('connection', socket => { + const runner: MRunner = socket.handshake.auth.runner + + logger.debug(`New runner "${runner.name}" connected to the notification system.`) + + this.runnerSockets.add(socket) + + socket.on('disconnect', () => { + logger.debug(`Runner "${runner.name}" disconnected from the notification system.`) + + this.runnerSockets.delete(socket) + }) + }) } sendNotification (userId: number, notification: UserNotificationModelForApi) { @@ -89,6 +108,15 @@ class PeerTubeSocket { .emit(type, data) } + @Debounce({ timeoutMS: 1000 }) + sendAvailableJobsPingToRunners () { + logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`) + + for (const runners of this.runnerSockets) { + runners.emit('available-jobs') + } + } + static get Instance () { return this.instance || (this.instance = new this()) } -- cgit v1.2.3