1 import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg'
2 import { buildAbsoluteFixturePath, wait } from '@shared/core-utils'
3 import { VideoDetails, VideoInclude, VideoPrivacy } from '@shared/models'
4 import { PeerTubeServer } from '../server/server'
6 function sendRTMPStream (options: {
9 fixtureName?: string // default video_short.mp4
10 copyCodecs?: boolean // default false
12 const { rtmpBaseUrl, streamKey, fixtureName = 'video_short.mp4', copyCodecs = false } = options
14 const fixture = buildAbsoluteFixturePath(fixtureName)
16 const command = ffmpeg(fixture)
17 command.inputOption('-stream_loop -1')
18 command.inputOption('-re')
21 command.outputOption('-c copy')
23 command.outputOption('-c:v libx264')
24 command.outputOption('-g 50')
25 command.outputOption('-keyint_min 2')
26 command.outputOption('-r 60')
29 command.outputOption('-f flv')
31 const rtmpUrl = rtmpBaseUrl + '/' + streamKey
32 command.output(rtmpUrl)
34 command.on('error', err => {
35 if (err?.message?.includes('Exiting normally')) return
37 if (process.env.DEBUG) console.error(err)
40 if (process.env.DEBUG) {
41 command.on('stderr', data => console.log(data))
42 command.on('stdout', data => console.log(data))
50 function waitFfmpegUntilError (command: FfmpegCommand, successAfterMS = 10000) {
51 return new Promise<void>((res, rej) => {
52 command.on('error', err => {
62 async function testFfmpegStreamError (command: FfmpegCommand, shouldHaveError: boolean) {
66 await waitFfmpegUntilError(command, 35000)
71 await stopFfmpeg(command)
73 if (shouldHaveError && !error) throw new Error('Ffmpeg did not have an error')
74 if (!shouldHaveError && error) throw error
77 async function stopFfmpeg (command: FfmpegCommand) {
78 command.kill('SIGINT')
83 async function waitUntilLivePublishedOnAllServers (servers: PeerTubeServer[], videoId: string) {
84 for (const server of servers) {
85 await server.live.waitUntilPublished({ videoId })
89 async function waitUntilLiveWaitingOnAllServers (servers: PeerTubeServer[], videoId: string) {
90 for (const server of servers) {
91 await server.live.waitUntilWaiting({ videoId })
95 async function waitUntilLiveReplacedByReplayOnAllServers (servers: PeerTubeServer[], videoId: string) {
96 for (const server of servers) {
97 await server.live.waitUntilReplacedByReplay({ videoId })
101 async function findExternalSavedVideo (server: PeerTubeServer, liveDetails: VideoDetails) {
102 const include = VideoInclude.BLACKLISTED
103 const privacyOneOf = [ VideoPrivacy.INTERNAL, VideoPrivacy.PRIVATE, VideoPrivacy.PUBLIC, VideoPrivacy.UNLISTED ]
105 const { data } = await server.videos.list({ token: server.accessToken, sort: '-publishedAt', include, privacyOneOf })
107 return data.find(v => v.name === liveDetails.name + ' - ' + new Date(liveDetails.publishedAt).toLocaleString())
112 waitFfmpegUntilError,
113 testFfmpegStreamError,
116 waitUntilLivePublishedOnAllServers,
117 waitUntilLiveReplacedByReplayOnAllServers,
118 waitUntilLiveWaitingOnAllServers,
120 findExternalSavedVideo