]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/hls.ts
Use bullmq job dependency
[github/Chocobozzz/PeerTube.git] / server / lib / hls.ts
1 import { close, ensureDir, move, open, outputJSON, read, readFile, remove, stat, writeFile } from 'fs-extra'
2 import { flatten, uniq } from 'lodash'
3 import PQueue from 'p-queue'
4 import { basename, dirname, join } from 'path'
5 import { MStreamingPlaylist, MStreamingPlaylistFilesVideo, MVideo } from '@server/types/models'
6 import { sha256 } from '@shared/extra-utils'
7 import { VideoStorage } from '@shared/models'
8 import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamDimensionsInfo } from '../helpers/ffmpeg'
9 import { logger } from '../helpers/logger'
10 import { doRequest, doRequestAndSaveToFile } from '../helpers/requests'
11 import { generateRandomString } from '../helpers/utils'
12 import { CONFIG } from '../initializers/config'
13 import { P2P_MEDIA_LOADER_PEER_VERSION, REQUEST_TIMEOUTS } from '../initializers/constants'
14 import { sequelizeTypescript } from '../initializers/database'
15 import { VideoFileModel } from '../models/video/video-file'
16 import { VideoStreamingPlaylistModel } from '../models/video/video-streaming-playlist'
17 import { storeHLSFile } from './object-storage'
18 import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from './paths'
19 import { VideoPathManager } from './video-path-manager'
20
21 async function updateStreamingPlaylistsInfohashesIfNeeded () {
22 const playlistsToUpdate = await VideoStreamingPlaylistModel.listByIncorrectPeerVersion()
23
24 // Use separate SQL queries, because we could have many videos to update
25 for (const playlist of playlistsToUpdate) {
26 await sequelizeTypescript.transaction(async t => {
27 const videoFiles = await VideoFileModel.listByStreamingPlaylist(playlist.id, t)
28
29 playlist.assignP2PMediaLoaderInfoHashes(playlist.Video, videoFiles)
30 playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION
31
32 await playlist.save({ transaction: t })
33 })
34 }
35 }
36
37 async function updatePlaylistAfterFileChange (video: MVideo, playlist: MStreamingPlaylist) {
38 let playlistWithFiles = await updateMasterHLSPlaylist(video, playlist)
39 playlistWithFiles = await updateSha256VODSegments(video, playlist)
40
41 // Refresh playlist, operations can take some time
42 playlistWithFiles = await VideoStreamingPlaylistModel.loadWithVideoAndFiles(playlist.id)
43 playlistWithFiles.assignP2PMediaLoaderInfoHashes(video, playlistWithFiles.VideoFiles)
44 await playlistWithFiles.save()
45
46 video.setHLSPlaylist(playlistWithFiles)
47 }
48
49 // ---------------------------------------------------------------------------
50
51 // Avoid concurrency issues when updating streaming playlist files
52 const playlistFilesQueue = new PQueue({ concurrency: 1 })
53
54 function updateMasterHLSPlaylist (video: MVideo, playlistArg: MStreamingPlaylist): Promise<MStreamingPlaylistFilesVideo> {
55 return playlistFilesQueue.add(async () => {
56 const playlist = await VideoStreamingPlaylistModel.loadWithVideoAndFiles(playlistArg.id)
57
58 const masterPlaylists: string[] = [ '#EXTM3U', '#EXT-X-VERSION:3' ]
59
60 for (const file of playlist.VideoFiles) {
61 const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
62
63 await VideoPathManager.Instance.makeAvailableVideoFile(file.withVideoOrPlaylist(playlist), async videoFilePath => {
64 const size = await getVideoStreamDimensionsInfo(videoFilePath)
65
66 const bandwidth = 'BANDWIDTH=' + video.getBandwidthBits(file)
67 const resolution = `RESOLUTION=${size?.width || 0}x${size?.height || 0}`
68
69 let line = `#EXT-X-STREAM-INF:${bandwidth},${resolution}`
70 if (file.fps) line += ',FRAME-RATE=' + file.fps
71
72 const codecs = await Promise.all([
73 getVideoStreamCodec(videoFilePath),
74 getAudioStreamCodec(videoFilePath)
75 ])
76
77 line += `,CODECS="${codecs.filter(c => !!c).join(',')}"`
78
79 masterPlaylists.push(line)
80 masterPlaylists.push(playlistFilename)
81 })
82 }
83
84 if (playlist.playlistFilename) {
85 await video.removeStreamingPlaylistFile(playlist, playlist.playlistFilename)
86 }
87 playlist.playlistFilename = generateHLSMasterPlaylistFilename(video.isLive)
88
89 const masterPlaylistPath = VideoPathManager.Instance.getFSHLSOutputPath(video, playlist.playlistFilename)
90 await writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n')
91
92 if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
93 playlist.playlistUrl = await storeHLSFile(playlist, playlist.playlistFilename)
94 await remove(masterPlaylistPath)
95 }
96
97 return playlist.save()
98 })
99 }
100
101 // ---------------------------------------------------------------------------
102
103 function updateSha256VODSegments (video: MVideo, playlistArg: MStreamingPlaylist): Promise<MStreamingPlaylistFilesVideo> {
104 return playlistFilesQueue.add(async () => {
105 const json: { [filename: string]: { [range: string]: string } } = {}
106
107 const playlist = await VideoStreamingPlaylistModel.loadWithVideoAndFiles(playlistArg.id)
108
109 // For all the resolutions available for this video
110 for (const file of playlist.VideoFiles) {
111 const rangeHashes: { [range: string]: string } = {}
112 const fileWithPlaylist = file.withVideoOrPlaylist(playlist)
113
114 await VideoPathManager.Instance.makeAvailableVideoFile(fileWithPlaylist, videoPath => {
115
116 return VideoPathManager.Instance.makeAvailableResolutionPlaylistFile(fileWithPlaylist, async resolutionPlaylistPath => {
117 const playlistContent = await readFile(resolutionPlaylistPath)
118 const ranges = getRangesFromPlaylist(playlistContent.toString())
119
120 const fd = await open(videoPath, 'r')
121 for (const range of ranges) {
122 const buf = Buffer.alloc(range.length)
123 await read(fd, buf, 0, range.length, range.offset)
124
125 rangeHashes[`${range.offset}-${range.offset + range.length - 1}`] = sha256(buf)
126 }
127 await close(fd)
128
129 const videoFilename = file.filename
130 json[videoFilename] = rangeHashes
131 })
132 })
133 }
134
135 if (playlist.segmentsSha256Filename) {
136 await video.removeStreamingPlaylistFile(playlist, playlist.segmentsSha256Filename)
137 }
138 playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(video.isLive)
139
140 const outputPath = VideoPathManager.Instance.getFSHLSOutputPath(video, playlist.segmentsSha256Filename)
141 await outputJSON(outputPath, json)
142
143 if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
144 playlist.segmentsSha256Url = await storeHLSFile(playlist, playlist.segmentsSha256Filename)
145 await remove(outputPath)
146 }
147
148 return playlist.save()
149 })
150 }
151
152 // ---------------------------------------------------------------------------
153
154 async function buildSha256Segment (segmentPath: string) {
155 const buf = await readFile(segmentPath)
156 return sha256(buf)
157 }
158
159 function downloadPlaylistSegments (playlistUrl: string, destinationDir: string, timeout: number, bodyKBLimit: number) {
160 let timer
161 let remainingBodyKBLimit = bodyKBLimit
162
163 logger.info('Importing HLS playlist %s', playlistUrl)
164
165 return new Promise<void>(async (res, rej) => {
166 const tmpDirectory = join(CONFIG.STORAGE.TMP_DIR, await generateRandomString(10))
167
168 await ensureDir(tmpDirectory)
169
170 timer = setTimeout(() => {
171 deleteTmpDirectory(tmpDirectory)
172
173 return rej(new Error('HLS download timeout.'))
174 }, timeout)
175
176 try {
177 // Fetch master playlist
178 const subPlaylistUrls = await fetchUniqUrls(playlistUrl)
179
180 const subRequests = subPlaylistUrls.map(u => fetchUniqUrls(u))
181 const fileUrls = uniq(flatten(await Promise.all(subRequests)))
182
183 logger.debug('Will download %d HLS files.', fileUrls.length, { fileUrls })
184
185 for (const fileUrl of fileUrls) {
186 const destPath = join(tmpDirectory, basename(fileUrl))
187
188 await doRequestAndSaveToFile(fileUrl, destPath, { bodyKBLimit: remainingBodyKBLimit, timeout: REQUEST_TIMEOUTS.REDUNDANCY })
189
190 const { size } = await stat(destPath)
191 remainingBodyKBLimit -= (size / 1000)
192
193 logger.debug('Downloaded HLS playlist file %s with %d kB remained limit.', fileUrl, Math.floor(remainingBodyKBLimit))
194 }
195
196 clearTimeout(timer)
197
198 await move(tmpDirectory, destinationDir, { overwrite: true })
199
200 return res()
201 } catch (err) {
202 deleteTmpDirectory(tmpDirectory)
203
204 return rej(err)
205 }
206 })
207
208 function deleteTmpDirectory (directory: string) {
209 remove(directory)
210 .catch(err => logger.error('Cannot delete path on HLS download error.', { err }))
211 }
212
213 async function fetchUniqUrls (playlistUrl: string) {
214 const { body } = await doRequest(playlistUrl)
215
216 if (!body) return []
217
218 const urls = body.split('\n')
219 .filter(line => line.endsWith('.m3u8') || line.endsWith('.mp4'))
220 .map(url => {
221 if (url.startsWith('http://') || url.startsWith('https://')) return url
222
223 return `${dirname(playlistUrl)}/${url}`
224 })
225
226 return uniq(urls)
227 }
228 }
229
230 // ---------------------------------------------------------------------------
231
232 export {
233 updateMasterHLSPlaylist,
234 updateSha256VODSegments,
235 buildSha256Segment,
236 downloadPlaylistSegments,
237 updateStreamingPlaylistsInfohashesIfNeeded,
238 updatePlaylistAfterFileChange
239 }
240
241 // ---------------------------------------------------------------------------
242
243 function getRangesFromPlaylist (playlistContent: string) {
244 const ranges: { offset: number, length: number }[] = []
245 const lines = playlistContent.split('\n')
246 const regex = /^#EXT-X-BYTERANGE:(\d+)@(\d+)$/
247
248 for (const line of lines) {
249 const captured = regex.exec(line)
250
251 if (captured) {
252 ranges.push({ length: parseInt(captured[1], 10), offset: parseInt(captured[2], 10) })
253 }
254 }
255
256 return ranges
257 }