diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 14:55:10 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch) | |
tree | 226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/peertube-socket.ts | |
parent | 6bcb854cdea8688a32240bc5719c7d139806e00b (diff) | |
download | PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip |
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/peertube-socket.ts')
-rw-r--r-- | server/lib/peertube-socket.ts | 32 |
1 files changed, 30 insertions, 2 deletions
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 0398ca61d..ded7e9743 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts | |||
@@ -2,10 +2,12 @@ import { Server as HTTPServer } from 'http' | |||
2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' | 2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' |
3 | import { isIdValid } from '@server/helpers/custom-validators/misc' | 3 | import { isIdValid } from '@server/helpers/custom-validators/misc' |
4 | import { MVideo, MVideoImmutable } from '@server/types/models' | 4 | import { MVideo, MVideoImmutable } from '@server/types/models' |
5 | import { MRunner } from '@server/types/models/runners' | ||
5 | import { UserNotificationModelForApi } from '@server/types/models/user' | 6 | import { UserNotificationModelForApi } from '@server/types/models/user' |
6 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' | 7 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' |
7 | import { logger } from '../helpers/logger' | 8 | import { logger } from '../helpers/logger' |
8 | import { authenticateSocket } from '../middlewares' | 9 | import { authenticateRunnerSocket, authenticateSocket } from '../middlewares' |
10 | import { Debounce } from '@server/helpers/debounce' | ||
9 | 11 | ||
10 | class PeerTubeSocket { | 12 | class PeerTubeSocket { |
11 | 13 | ||
@@ -13,6 +15,7 @@ class PeerTubeSocket { | |||
13 | 15 | ||
14 | private userNotificationSockets: { [ userId: number ]: Socket[] } = {} | 16 | private userNotificationSockets: { [ userId: number ]: Socket[] } = {} |
15 | private liveVideosNamespace: Namespace | 17 | private liveVideosNamespace: Namespace |
18 | private readonly runnerSockets = new Set<Socket>() | ||
16 | 19 | ||
17 | private constructor () {} | 20 | private constructor () {} |
18 | 21 | ||
@@ -24,7 +27,7 @@ class PeerTubeSocket { | |||
24 | .on('connection', socket => { | 27 | .on('connection', socket => { |
25 | const userId = socket.handshake.auth.user.id | 28 | const userId = socket.handshake.auth.user.id |
26 | 29 | ||
27 | logger.debug('User %d connected on the notification system.', userId) | 30 | logger.debug('User %d connected to the notification system.', userId) |
28 | 31 | ||
29 | if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] | 32 | if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] |
30 | 33 | ||
@@ -53,6 +56,22 @@ class PeerTubeSocket { | |||
53 | socket.leave(videoId) | 56 | socket.leave(videoId) |
54 | }) | 57 | }) |
55 | }) | 58 | }) |
59 | |||
60 | io.of('/runners') | ||
61 | .use(authenticateRunnerSocket) | ||
62 | .on('connection', socket => { | ||
63 | const runner: MRunner = socket.handshake.auth.runner | ||
64 | |||
65 | logger.debug(`New runner "${runner.name}" connected to the notification system.`) | ||
66 | |||
67 | this.runnerSockets.add(socket) | ||
68 | |||
69 | socket.on('disconnect', () => { | ||
70 | logger.debug(`Runner "${runner.name}" disconnected from the notification system.`) | ||
71 | |||
72 | this.runnerSockets.delete(socket) | ||
73 | }) | ||
74 | }) | ||
56 | } | 75 | } |
57 | 76 | ||
58 | sendNotification (userId: number, notification: UserNotificationModelForApi) { | 77 | sendNotification (userId: number, notification: UserNotificationModelForApi) { |
@@ -89,6 +108,15 @@ class PeerTubeSocket { | |||
89 | .emit(type, data) | 108 | .emit(type, data) |
90 | } | 109 | } |
91 | 110 | ||
111 | @Debounce({ timeoutMS: 1000 }) | ||
112 | sendAvailableJobsPingToRunners () { | ||
113 | logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`) | ||
114 | |||
115 | for (const runners of this.runnerSockets) { | ||
116 | runners.emit('available-jobs') | ||
117 | } | ||
118 | } | ||
119 | |||
92 | static get Instance () { | 120 | static get Instance () { |
93 | return this.instance || (this.instance = new this()) | 121 | return this.instance || (this.instance = new this()) |
94 | } | 122 | } |