aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/peertube-socket.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 14:55:10 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch)
tree226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/peertube-socket.ts
parent6bcb854cdea8688a32240bc5719c7d139806e00b (diff)
downloadPeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/peertube-socket.ts')
-rw-r--r--server/lib/peertube-socket.ts32
1 files changed, 30 insertions, 2 deletions
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts
index 0398ca61d..ded7e9743 100644
--- a/server/lib/peertube-socket.ts
+++ b/server/lib/peertube-socket.ts
@@ -2,10 +2,12 @@ import { Server as HTTPServer } from 'http'
2import { Namespace, Server as SocketServer, Socket } from 'socket.io' 2import { Namespace, Server as SocketServer, Socket } from 'socket.io'
3import { isIdValid } from '@server/helpers/custom-validators/misc' 3import { isIdValid } from '@server/helpers/custom-validators/misc'
4import { MVideo, MVideoImmutable } from '@server/types/models' 4import { MVideo, MVideoImmutable } from '@server/types/models'
5import { MRunner } from '@server/types/models/runners'
5import { UserNotificationModelForApi } from '@server/types/models/user' 6import { UserNotificationModelForApi } from '@server/types/models/user'
6import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' 7import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
7import { logger } from '../helpers/logger' 8import { logger } from '../helpers/logger'
8import { authenticateSocket } from '../middlewares' 9import { authenticateRunnerSocket, authenticateSocket } from '../middlewares'
10import { Debounce } from '@server/helpers/debounce'
9 11
10class PeerTubeSocket { 12class PeerTubeSocket {
11 13
@@ -13,6 +15,7 @@ class PeerTubeSocket {
13 15
14 private userNotificationSockets: { [ userId: number ]: Socket[] } = {} 16 private userNotificationSockets: { [ userId: number ]: Socket[] } = {}
15 private liveVideosNamespace: Namespace 17 private liveVideosNamespace: Namespace
18 private readonly runnerSockets = new Set<Socket>()
16 19
17 private constructor () {} 20 private constructor () {}
18 21
@@ -24,7 +27,7 @@ class PeerTubeSocket {
24 .on('connection', socket => { 27 .on('connection', socket => {
25 const userId = socket.handshake.auth.user.id 28 const userId = socket.handshake.auth.user.id
26 29
27 logger.debug('User %d connected on the notification system.', userId) 30 logger.debug('User %d connected to the notification system.', userId)
28 31
29 if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] 32 if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = []
30 33
@@ -53,6 +56,22 @@ class PeerTubeSocket {
53 socket.leave(videoId) 56 socket.leave(videoId)
54 }) 57 })
55 }) 58 })
59
60 io.of('/runners')
61 .use(authenticateRunnerSocket)
62 .on('connection', socket => {
63 const runner: MRunner = socket.handshake.auth.runner
64
65 logger.debug(`New runner "${runner.name}" connected to the notification system.`)
66
67 this.runnerSockets.add(socket)
68
69 socket.on('disconnect', () => {
70 logger.debug(`Runner "${runner.name}" disconnected from the notification system.`)
71
72 this.runnerSockets.delete(socket)
73 })
74 })
56 } 75 }
57 76
58 sendNotification (userId: number, notification: UserNotificationModelForApi) { 77 sendNotification (userId: number, notification: UserNotificationModelForApi) {
@@ -89,6 +108,15 @@ class PeerTubeSocket {
89 .emit(type, data) 108 .emit(type, data)
90 } 109 }
91 110
111 @Debounce({ timeoutMS: 1000 })
112 sendAvailableJobsPingToRunners () {
113 logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`)
114
115 for (const runners of this.runnerSockets) {
116 runners.emit('available-jobs')
117 }
118 }
119
92 static get Instance () { 120 static get Instance () {
93 return this.instance || (this.instance = new this()) 121 return this.instance || (this.instance = new this())
94 } 122 }