]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/peertube-socket.ts
Increase test timeouts
[github/Chocobozzz/PeerTube.git] / server / lib / peertube-socket.ts
1 import { Server as HTTPServer } from 'http'
2 import { Namespace, Server as SocketServer, Socket } from 'socket.io'
3 import { isIdValid } from '@server/helpers/custom-validators/misc'
4 import { MVideo, MVideoImmutable } from '@server/types/models'
5 import { MRunner } from '@server/types/models/runners'
6 import { UserNotificationModelForApi } from '@server/types/models/user'
7 import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
8 import { logger } from '../helpers/logger'
9 import { authenticateRunnerSocket, authenticateSocket } from '../middlewares'
10 import { Debounce } from '@server/helpers/debounce'
11
12 class PeerTubeSocket {
13
14 private static instance: PeerTubeSocket
15
16 private userNotificationSockets: { [ userId: number ]: Socket[] } = {}
17 private liveVideosNamespace: Namespace
18 private readonly runnerSockets = new Set<Socket>()
19
20 private constructor () {}
21
22 init (server: HTTPServer) {
23 const io = new SocketServer(server)
24
25 io.of('/user-notifications')
26 .use(authenticateSocket)
27 .on('connection', socket => {
28 const userId = socket.handshake.auth.user.id
29
30 logger.debug('User %d connected to the notification system.', userId)
31
32 if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = []
33
34 this.userNotificationSockets[userId].push(socket)
35
36 socket.on('disconnect', () => {
37 logger.debug('User %d disconnected from SocketIO notifications.', userId)
38
39 this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket)
40 })
41 })
42
43 this.liveVideosNamespace = io.of('/live-videos')
44 .on('connection', socket => {
45 socket.on('subscribe', ({ videoId }) => {
46 if (!isIdValid(videoId)) return
47
48 /* eslint-disable @typescript-eslint/no-floating-promises */
49 socket.join(videoId)
50 })
51
52 socket.on('unsubscribe', ({ videoId }) => {
53 if (!isIdValid(videoId)) return
54
55 /* eslint-disable @typescript-eslint/no-floating-promises */
56 socket.leave(videoId)
57 })
58 })
59
60 io.of('/runners')
61 .use(authenticateRunnerSocket)
62 .on('connection', socket => {
63 const runner: MRunner = socket.handshake.auth.runner
64
65 logger.debug(`New runner "${runner.name}" connected to the notification system.`)
66
67 this.runnerSockets.add(socket)
68
69 socket.on('disconnect', () => {
70 logger.debug(`Runner "${runner.name}" disconnected from the notification system.`)
71
72 this.runnerSockets.delete(socket)
73 })
74 })
75 }
76
77 sendNotification (userId: number, notification: UserNotificationModelForApi) {
78 const sockets = this.userNotificationSockets[userId]
79 if (!sockets) return
80
81 logger.debug('Sending user notification to user %d.', userId)
82
83 const notificationMessage = notification.toFormattedJSON()
84 for (const socket of sockets) {
85 socket.emit('new-notification', notificationMessage)
86 }
87 }
88
89 sendVideoLiveNewState (video: MVideo) {
90 const data: LiveVideoEventPayload = { state: video.state }
91 const type: LiveVideoEventType = 'state-change'
92
93 logger.debug('Sending video live new state notification of %s.', video.url, { state: video.state })
94
95 this.liveVideosNamespace
96 .in(video.id)
97 .emit(type, data)
98 }
99
100 sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
101 const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
102 const type: LiveVideoEventType = 'views-change'
103
104 logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
105
106 this.liveVideosNamespace
107 .in(video.id)
108 .emit(type, data)
109 }
110
111 @Debounce({ timeoutMS: 1000 })
112 sendAvailableJobsPingToRunners () {
113 logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`)
114
115 for (const runners of this.runnerSockets) {
116 runners.emit('available-jobs')
117 }
118 }
119
120 static get Instance () {
121 return this.instance || (this.instance = new this())
122 }
123 }
124
125 // ---------------------------------------------------------------------------
126
127 export {
128 PeerTubeSocket
129 }