1 import { Server as HTTPServer } from 'http'
2 import { Namespace, Server as SocketServer, Socket } from 'socket.io'
3 import { isIdValid } from '@server/helpers/custom-validators/misc'
4 import { MVideo, MVideoImmutable } from '@server/types/models'
5 import { MRunner } from '@server/types/models/runners'
6 import { UserNotificationModelForApi } from '@server/types/models/user'
7 import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models'
8 import { logger } from '../helpers/logger'
9 import { authenticateRunnerSocket, authenticateSocket } from '../middlewares'
10 import { Debounce } from '@server/helpers/debounce'
12 class PeerTubeSocket {
14 private static instance: PeerTubeSocket
16 private userNotificationSockets: { [ userId: number ]: Socket[] } = {}
17 private liveVideosNamespace: Namespace
18 private readonly runnerSockets = new Set<Socket>()
20 private constructor () {}
22 init (server: HTTPServer) {
23 const io = new SocketServer(server)
25 io.of('/user-notifications')
26 .use(authenticateSocket)
27 .on('connection', socket => {
28 const userId = socket.handshake.auth.user.id
30 logger.debug('User %d connected to the notification system.', userId)
32 if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = []
34 this.userNotificationSockets[userId].push(socket)
36 socket.on('disconnect', () => {
37 logger.debug('User %d disconnected from SocketIO notifications.', userId)
39 this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket)
43 this.liveVideosNamespace = io.of('/live-videos')
44 .on('connection', socket => {
45 socket.on('subscribe', ({ videoId }) => {
46 if (!isIdValid(videoId)) return
48 /* eslint-disable @typescript-eslint/no-floating-promises */
52 socket.on('unsubscribe', ({ videoId }) => {
53 if (!isIdValid(videoId)) return
55 /* eslint-disable @typescript-eslint/no-floating-promises */
61 .use(authenticateRunnerSocket)
62 .on('connection', socket => {
63 const runner: MRunner = socket.handshake.auth.runner
65 logger.debug(`New runner "${runner.name}" connected to the notification system.`)
67 this.runnerSockets.add(socket)
69 socket.on('disconnect', () => {
70 logger.debug(`Runner "${runner.name}" disconnected from the notification system.`)
72 this.runnerSockets.delete(socket)
77 sendNotification (userId: number, notification: UserNotificationModelForApi) {
78 const sockets = this.userNotificationSockets[userId]
81 logger.debug('Sending user notification to user %d.', userId)
83 const notificationMessage = notification.toFormattedJSON()
84 for (const socket of sockets) {
85 socket.emit('new-notification', notificationMessage)
89 sendVideoLiveNewState (video: MVideo) {
90 const data: LiveVideoEventPayload = { state: video.state }
91 const type: LiveVideoEventType = 'state-change'
93 logger.debug('Sending video live new state notification of %s.', video.url, { state: video.state })
95 this.liveVideosNamespace
100 sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
101 const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers }
102 const type: LiveVideoEventType = 'views-change'
104 logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
106 this.liveVideosNamespace
111 @Debounce({ timeoutMS: 1000 })
112 sendAvailableJobsPingToRunners () {
113 logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`)
115 for (const runners of this.runnerSockets) {
116 runners.emit('available-jobs')
120 static get Instance () {
121 return this.instance || (this.instance = new this())
125 // ---------------------------------------------------------------------------