diff options
Diffstat (limited to 'server/lib/peertube-socket.ts')
-rw-r--r-- | server/lib/peertube-socket.ts | 129 |
1 files changed, 0 insertions, 129 deletions
diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts deleted file mode 100644 index 3e41a2def..000000000 --- a/server/lib/peertube-socket.ts +++ /dev/null | |||
@@ -1,129 +0,0 @@ | |||
1 | import { Server as HTTPServer } from 'http' | ||
2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' | ||
3 | import { isIdValid } from '@server/helpers/custom-validators/misc' | ||
4 | import { Debounce } from '@server/helpers/debounce' | ||
5 | import { MVideo, MVideoImmutable } from '@server/types/models' | ||
6 | import { MRunner } from '@server/types/models/runners' | ||
7 | import { UserNotificationModelForApi } from '@server/types/models/user' | ||
8 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' | ||
9 | import { logger } from '../helpers/logger' | ||
10 | import { authenticateRunnerSocket, authenticateSocket } from '../middlewares' | ||
11 | |||
12 | class PeerTubeSocket { | ||
13 | |||
14 | private static instance: PeerTubeSocket | ||
15 | |||
16 | private userNotificationSockets: { [ userId: number ]: Socket[] } = {} | ||
17 | private liveVideosNamespace: Namespace | ||
18 | private readonly runnerSockets = new Set<Socket>() | ||
19 | |||
20 | private constructor () {} | ||
21 | |||
22 | init (server: HTTPServer) { | ||
23 | const io = new SocketServer(server) | ||
24 | |||
25 | io.of('/user-notifications') | ||
26 | .use(authenticateSocket) | ||
27 | .on('connection', socket => { | ||
28 | const userId = socket.handshake.auth.user.id | ||
29 | |||
30 | logger.debug('User %d connected to the notification system.', userId) | ||
31 | |||
32 | if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] | ||
33 | |||
34 | this.userNotificationSockets[userId].push(socket) | ||
35 | |||
36 | socket.on('disconnect', () => { | ||
37 | logger.debug('User %d disconnected from SocketIO notifications.', userId) | ||
38 | |||
39 | this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket) | ||
40 | }) | ||
41 | }) | ||
42 | |||
43 | this.liveVideosNamespace = io.of('/live-videos') | ||
44 | .on('connection', socket => { | ||
45 | socket.on('subscribe', ({ videoId }) => { | ||
46 | if (!isIdValid(videoId)) return | ||
47 | |||
48 | /* eslint-disable @typescript-eslint/no-floating-promises */ | ||
49 | socket.join(videoId) | ||
50 | }) | ||
51 | |||
52 | socket.on('unsubscribe', ({ videoId }) => { | ||
53 | if (!isIdValid(videoId)) return | ||
54 | |||
55 | /* eslint-disable @typescript-eslint/no-floating-promises */ | ||
56 | socket.leave(videoId) | ||
57 | }) | ||
58 | }) | ||
59 | |||
60 | io.of('/runners') | ||
61 | .use(authenticateRunnerSocket) | ||
62 | .on('connection', socket => { | ||
63 | const runner: MRunner = socket.handshake.auth.runner | ||
64 | |||
65 | logger.debug(`New runner "${runner.name}" connected to the notification system.`) | ||
66 | |||
67 | this.runnerSockets.add(socket) | ||
68 | |||
69 | socket.on('disconnect', () => { | ||
70 | logger.debug(`Runner "${runner.name}" disconnected from the notification system.`) | ||
71 | |||
72 | this.runnerSockets.delete(socket) | ||
73 | }) | ||
74 | }) | ||
75 | } | ||
76 | |||
77 | sendNotification (userId: number, notification: UserNotificationModelForApi) { | ||
78 | const sockets = this.userNotificationSockets[userId] | ||
79 | if (!sockets) return | ||
80 | |||
81 | logger.debug('Sending user notification to user %d.', userId) | ||
82 | |||
83 | const notificationMessage = notification.toFormattedJSON() | ||
84 | for (const socket of sockets) { | ||
85 | socket.emit('new-notification', notificationMessage) | ||
86 | } | ||
87 | } | ||
88 | |||
89 | sendVideoLiveNewState (video: MVideo) { | ||
90 | const data: LiveVideoEventPayload = { state: video.state } | ||
91 | const type: LiveVideoEventType = 'state-change' | ||
92 | |||
93 | logger.debug('Sending video live new state notification of %s.', video.url, { state: video.state }) | ||
94 | |||
95 | this.liveVideosNamespace | ||
96 | .in(video.id) | ||
97 | .emit(type, data) | ||
98 | } | ||
99 | |||
100 | sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) { | ||
101 | const data: LiveVideoEventPayload = { viewers: numViewers } | ||
102 | const type: LiveVideoEventType = 'views-change' | ||
103 | |||
104 | logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers }) | ||
105 | |||
106 | this.liveVideosNamespace | ||
107 | .in(video.id) | ||
108 | .emit(type, data) | ||
109 | } | ||
110 | |||
111 | @Debounce({ timeoutMS: 1000 }) | ||
112 | sendAvailableJobsPingToRunners () { | ||
113 | logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`) | ||
114 | |||
115 | for (const runners of this.runnerSockets) { | ||
116 | runners.emit('available-jobs') | ||
117 | } | ||
118 | } | ||
119 | |||
120 | static get Instance () { | ||
121 | return this.instance || (this.instance = new this()) | ||
122 | } | ||
123 | } | ||
124 | |||
125 | // --------------------------------------------------------------------------- | ||
126 | |||
127 | export { | ||
128 | PeerTubeSocket | ||
129 | } | ||