]>
Commit | Line | Data |
---|---|---|
1 | import { Server as HTTPServer } from 'http' | |
2 | import { Namespace, Server as SocketServer, Socket } from 'socket.io' | |
3 | import { isIdValid } from '@server/helpers/custom-validators/misc' | |
4 | import { MVideo, MVideoImmutable } from '@server/types/models' | |
5 | import { MRunner } from '@server/types/models/runners' | |
6 | import { UserNotificationModelForApi } from '@server/types/models/user' | |
7 | import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' | |
8 | import { logger } from '../helpers/logger' | |
9 | import { authenticateRunnerSocket, authenticateSocket } from '../middlewares' | |
10 | import { Debounce } from '@server/helpers/debounce' | |
11 | ||
12 | class PeerTubeSocket { | |
13 | ||
14 | private static instance: PeerTubeSocket | |
15 | ||
16 | private userNotificationSockets: { [ userId: number ]: Socket[] } = {} | |
17 | private liveVideosNamespace: Namespace | |
18 | private readonly runnerSockets = new Set<Socket>() | |
19 | ||
20 | private constructor () {} | |
21 | ||
22 | init (server: HTTPServer) { | |
23 | const io = new SocketServer(server) | |
24 | ||
25 | io.of('/user-notifications') | |
26 | .use(authenticateSocket) | |
27 | .on('connection', socket => { | |
28 | const userId = socket.handshake.auth.user.id | |
29 | ||
30 | logger.debug('User %d connected to the notification system.', userId) | |
31 | ||
32 | if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] | |
33 | ||
34 | this.userNotificationSockets[userId].push(socket) | |
35 | ||
36 | socket.on('disconnect', () => { | |
37 | logger.debug('User %d disconnected from SocketIO notifications.', userId) | |
38 | ||
39 | this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket) | |
40 | }) | |
41 | }) | |
42 | ||
43 | this.liveVideosNamespace = io.of('/live-videos') | |
44 | .on('connection', socket => { | |
45 | socket.on('subscribe', ({ videoId }) => { | |
46 | if (!isIdValid(videoId)) return | |
47 | ||
48 | /* eslint-disable @typescript-eslint/no-floating-promises */ | |
49 | socket.join(videoId) | |
50 | }) | |
51 | ||
52 | socket.on('unsubscribe', ({ videoId }) => { | |
53 | if (!isIdValid(videoId)) return | |
54 | ||
55 | /* eslint-disable @typescript-eslint/no-floating-promises */ | |
56 | socket.leave(videoId) | |
57 | }) | |
58 | }) | |
59 | ||
60 | io.of('/runners') | |
61 | .use(authenticateRunnerSocket) | |
62 | .on('connection', socket => { | |
63 | const runner: MRunner = socket.handshake.auth.runner | |
64 | ||
65 | logger.debug(`New runner "${runner.name}" connected to the notification system.`) | |
66 | ||
67 | this.runnerSockets.add(socket) | |
68 | ||
69 | socket.on('disconnect', () => { | |
70 | logger.debug(`Runner "${runner.name}" disconnected from the notification system.`) | |
71 | ||
72 | this.runnerSockets.delete(socket) | |
73 | }) | |
74 | }) | |
75 | } | |
76 | ||
77 | sendNotification (userId: number, notification: UserNotificationModelForApi) { | |
78 | const sockets = this.userNotificationSockets[userId] | |
79 | if (!sockets) return | |
80 | ||
81 | logger.debug('Sending user notification to user %d.', userId) | |
82 | ||
83 | const notificationMessage = notification.toFormattedJSON() | |
84 | for (const socket of sockets) { | |
85 | socket.emit('new-notification', notificationMessage) | |
86 | } | |
87 | } | |
88 | ||
89 | sendVideoLiveNewState (video: MVideo) { | |
90 | const data: LiveVideoEventPayload = { state: video.state } | |
91 | const type: LiveVideoEventType = 'state-change' | |
92 | ||
93 | logger.debug('Sending video live new state notification of %s.', video.url, { state: video.state }) | |
94 | ||
95 | this.liveVideosNamespace | |
96 | .in(video.id) | |
97 | .emit(type, data) | |
98 | } | |
99 | ||
100 | sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) { | |
101 | const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers } | |
102 | const type: LiveVideoEventType = 'views-change' | |
103 | ||
104 | logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers }) | |
105 | ||
106 | this.liveVideosNamespace | |
107 | .in(video.id) | |
108 | .emit(type, data) | |
109 | } | |
110 | ||
111 | @Debounce({ timeoutMS: 1000 }) | |
112 | sendAvailableJobsPingToRunners () { | |
113 | logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`) | |
114 | ||
115 | for (const runners of this.runnerSockets) { | |
116 | runners.emit('available-jobs') | |
117 | } | |
118 | } | |
119 | ||
120 | static get Instance () { | |
121 | return this.instance || (this.instance = new this()) | |
122 | } | |
123 | } | |
124 | ||
125 | // --------------------------------------------------------------------------- | |
126 | ||
127 | export { | |
128 | PeerTubeSocket | |
129 | } |