X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fpeertube-socket.ts;h=ded7e97433a8a547767981e71d13963a33e16690;hb=0c302acb3c358b4d4d8dee45aed1de1108ea37ea;hp=26ced351f3333cf1dfe756dc333ad95653c9ff88;hpb=8c559fad1e1c4c2ab7f1388c73200aa4c6256d74;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 26ced351f..ded7e9743 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts @@ -1,26 +1,33 @@ -import * as SocketIO from 'socket.io' -import { authenticateSocket } from '../middlewares' +import { Server as HTTPServer } from 'http' +import { Namespace, Server as SocketServer, Socket } from 'socket.io' +import { isIdValid } from '@server/helpers/custom-validators/misc' +import { MVideo, MVideoImmutable } from '@server/types/models' +import { MRunner } from '@server/types/models/runners' +import { UserNotificationModelForApi } from '@server/types/models/user' +import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' import { logger } from '../helpers/logger' -import { Server } from 'http' -import { UserNotificationModelForApi } from '@server/typings/models/user' +import { authenticateRunnerSocket, authenticateSocket } from '../middlewares' +import { Debounce } from '@server/helpers/debounce' class PeerTubeSocket { private static instance: PeerTubeSocket - private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {} + private userNotificationSockets: { [ userId: number ]: Socket[] } = {} + private liveVideosNamespace: Namespace + private readonly runnerSockets = new Set() private constructor () {} - init (server: Server) { - const io = SocketIO(server) + init (server: HTTPServer) { + const io = new SocketServer(server) io.of('/user-notifications') .use(authenticateSocket) .on('connection', socket => { - const userId = socket.handshake.query.user.id + const userId = socket.handshake.auth.user.id - logger.debug('User %d connected on the notification system.', userId) + logger.debug('User %d connected to the notification system.', userId) if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] @@ -32,19 +39,84 @@ class PeerTubeSocket { this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket) }) }) + + this.liveVideosNamespace = io.of('/live-videos') + .on('connection', socket => { + socket.on('subscribe', ({ videoId }) => { + if (!isIdValid(videoId)) return + + /* eslint-disable @typescript-eslint/no-floating-promises */ + socket.join(videoId) + }) + + socket.on('unsubscribe', ({ videoId }) => { + if (!isIdValid(videoId)) return + + /* eslint-disable @typescript-eslint/no-floating-promises */ + socket.leave(videoId) + }) + }) + + io.of('/runners') + .use(authenticateRunnerSocket) + .on('connection', socket => { + const runner: MRunner = socket.handshake.auth.runner + + logger.debug(`New runner "${runner.name}" connected to the notification system.`) + + this.runnerSockets.add(socket) + + socket.on('disconnect', () => { + logger.debug(`Runner "${runner.name}" disconnected from the notification system.`) + + this.runnerSockets.delete(socket) + }) + }) } sendNotification (userId: number, notification: UserNotificationModelForApi) { const sockets = this.userNotificationSockets[userId] - if (!sockets) return + logger.debug('Sending user notification to user %d.', userId) + const notificationMessage = notification.toFormattedJSON() for (const socket of sockets) { socket.emit('new-notification', notificationMessage) } } + sendVideoLiveNewState (video: MVideo) { + const data: LiveVideoEventPayload = { state: video.state } + const type: LiveVideoEventType = 'state-change' + + logger.debug('Sending video live new state notification of %s.', video.url, { state: video.state }) + + this.liveVideosNamespace + .in(video.id) + .emit(type, data) + } + + sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) { + const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers } + const type: LiveVideoEventType = 'views-change' + + logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers }) + + this.liveVideosNamespace + .in(video.id) + .emit(type, data) + } + + @Debounce({ timeoutMS: 1000 }) + sendAvailableJobsPingToRunners () { + logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`) + + for (const runners of this.runnerSockets) { + runners.emit('available-jobs') + } + } + static get Instance () { return this.instance || (this.instance = new this()) }