diff options
author | Chocobozzz <me@florianbigard.com> | 2023-05-02 13:51:06 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 3a0c2a77b1a6626699514ddaf8135f4397175443 (patch) | |
tree | 60bec7ef8e9cdc008b0a4a56aa403617d036bfab /server/lib | |
parent | 9a3db678f56eda37d222cf2d2232ae0ef5d533d2 (diff) | |
download | PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.tar.gz PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.tar.zst PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.zip |
Enable external plugins to test the PR
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 7 | ||||
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 40 | ||||
-rw-r--r-- | server/lib/runners/job-handlers/abstract-job-handler.ts | 22 |
3 files changed, 42 insertions, 27 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 21bf0f226..03f6fbea7 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -96,6 +96,7 @@ export type CreateJobArgument = | |||
96 | export type CreateJobOptions = { | 96 | export type CreateJobOptions = { |
97 | delay?: number | 97 | delay?: number |
98 | priority?: number | 98 | priority?: number |
99 | failParentOnFailure?: boolean | ||
99 | } | 100 | } |
100 | 101 | ||
101 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | 102 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { |
@@ -363,7 +364,11 @@ class JobQueue { | |||
363 | name: 'job', | 364 | name: 'job', |
364 | data: job.payload, | 365 | data: job.payload, |
365 | queueName: job.type, | 366 | queueName: job.type, |
366 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) | 367 | opts: { |
368 | failParentOnFailure: true, | ||
369 | |||
370 | ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ])) | ||
371 | } | ||
367 | } | 372 | } |
368 | } | 373 | } |
369 | 374 | ||
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index f3f8fc886..ef4ecb83e 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -79,9 +79,7 @@ class MuxingSession extends EventEmitter { | |||
79 | private streamingPlaylist: MStreamingPlaylistVideo | 79 | private streamingPlaylist: MStreamingPlaylistVideo |
80 | private liveSegmentShaStore: LiveSegmentShaStore | 80 | private liveSegmentShaStore: LiveSegmentShaStore |
81 | 81 | ||
82 | private tsWatcher: FSWatcher | 82 | private filesWatcher: FSWatcher |
83 | private masterWatcher: FSWatcher | ||
84 | private m3u8Watcher: FSWatcher | ||
85 | 83 | ||
86 | private masterPlaylistCreated = false | 84 | private masterPlaylistCreated = false |
87 | private liveReady = false | 85 | private liveReady = false |
@@ -149,6 +147,8 @@ class MuxingSession extends EventEmitter { | |||
149 | 147 | ||
150 | await this.transcodingWrapper.run() | 148 | await this.transcodingWrapper.run() |
151 | 149 | ||
150 | this.filesWatcher = watch(this.outDirectory, { depth: 0 }) | ||
151 | |||
152 | this.watchMasterFile() | 152 | this.watchMasterFile() |
153 | this.watchTSFiles() | 153 | this.watchTSFiles() |
154 | this.watchM3U8File() | 154 | this.watchM3U8File() |
@@ -168,9 +168,10 @@ class MuxingSession extends EventEmitter { | |||
168 | } | 168 | } |
169 | 169 | ||
170 | private watchMasterFile () { | 170 | private watchMasterFile () { |
171 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) | 171 | this.filesWatcher.on('add', async path => { |
172 | if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return | ||
173 | if (this.masterPlaylistCreated === true) return | ||
172 | 174 | ||
173 | this.masterWatcher.on('add', async () => { | ||
174 | try { | 175 | try { |
175 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | 176 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { |
176 | const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) | 177 | const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) |
@@ -188,20 +189,18 @@ class MuxingSession extends EventEmitter { | |||
188 | this.masterPlaylistCreated = true | 189 | this.masterPlaylistCreated = true |
189 | 190 | ||
190 | logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) | 191 | logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) |
191 | |||
192 | this.masterWatcher.close() | ||
193 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) | ||
194 | }) | 192 | }) |
195 | } | 193 | } |
196 | 194 | ||
197 | private watchM3U8File () { | 195 | private watchM3U8File () { |
198 | this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') | ||
199 | |||
200 | const sendQueues = new Map<string, PQueue>() | 196 | const sendQueues = new Map<string, PQueue>() |
201 | 197 | ||
202 | const onChangeOrAdd = async (m3u8Path: string) => { | 198 | const onChange = async (m3u8Path: string) => { |
199 | if (m3u8Path.endsWith('.m3u8') !== true) return | ||
203 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return | 200 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return |
204 | 201 | ||
202 | logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags()) | ||
203 | |||
205 | try { | 204 | try { |
206 | if (!sendQueues.has(m3u8Path)) { | 205 | if (!sendQueues.has(m3u8Path)) { |
207 | sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) | 206 | sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) |
@@ -214,18 +213,18 @@ class MuxingSession extends EventEmitter { | |||
214 | } | 213 | } |
215 | } | 214 | } |
216 | 215 | ||
217 | this.m3u8Watcher.on('change', onChangeOrAdd) | 216 | this.filesWatcher.on('change', onChange) |
218 | } | 217 | } |
219 | 218 | ||
220 | private watchTSFiles () { | 219 | private watchTSFiles () { |
221 | const startStreamDateTime = new Date().getTime() | 220 | const startStreamDateTime = new Date().getTime() |
222 | 221 | ||
223 | this.tsWatcher = watch(this.outDirectory + '/*.ts') | ||
224 | |||
225 | const playlistIdMatcher = /^([\d+])-/ | 222 | const playlistIdMatcher = /^([\d+])-/ |
226 | 223 | ||
227 | const addHandler = async (segmentPath: string) => { | 224 | const addHandler = async (segmentPath: string) => { |
228 | logger.debug('Live add handler of %s.', segmentPath, this.lTags()) | 225 | if (segmentPath.endsWith('.ts') !== true) return |
226 | |||
227 | logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) | ||
229 | 228 | ||
230 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] | 229 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] |
231 | 230 | ||
@@ -252,6 +251,10 @@ class MuxingSession extends EventEmitter { | |||
252 | } | 251 | } |
253 | 252 | ||
254 | const deleteHandler = async (segmentPath: string) => { | 253 | const deleteHandler = async (segmentPath: string) => { |
254 | if (segmentPath.endsWith('.ts') !== true) return | ||
255 | |||
256 | logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags()) | ||
257 | |||
255 | try { | 258 | try { |
256 | await this.liveSegmentShaStore.removeSegmentSha(segmentPath) | 259 | await this.liveSegmentShaStore.removeSegmentSha(segmentPath) |
257 | } catch (err) { | 260 | } catch (err) { |
@@ -267,8 +270,8 @@ class MuxingSession extends EventEmitter { | |||
267 | } | 270 | } |
268 | } | 271 | } |
269 | 272 | ||
270 | this.tsWatcher.on('add', p => addHandler(p)) | 273 | this.filesWatcher.on('add', p => addHandler(p)) |
271 | this.tsWatcher.on('unlink', p => deleteHandler(p)) | 274 | this.filesWatcher.on('unlink', p => deleteHandler(p)) |
272 | } | 275 | } |
273 | 276 | ||
274 | private async isQuotaExceeded (segmentPath: string) { | 277 | private async isQuotaExceeded (segmentPath: string) { |
@@ -371,7 +374,8 @@ class MuxingSession extends EventEmitter { | |||
371 | setTimeout(() => { | 374 | setTimeout(() => { |
372 | // Wait latest segments generation, and close watchers | 375 | // Wait latest segments generation, and close watchers |
373 | 376 | ||
374 | Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) | 377 | const promise = this.filesWatcher?.close() || Promise.resolve() |
378 | promise | ||
375 | .then(() => { | 379 | .then(() => { |
376 | // Process remaining segments hash | 380 | // Process remaining segments hash |
377 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | 381 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { |
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts index 73fc14574..74b455107 100644 --- a/server/lib/runners/job-handlers/abstract-job-handler.ts +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts | |||
@@ -21,6 +21,7 @@ import { | |||
21 | RunnerJobVODWebVideoTranscodingPayload, | 21 | RunnerJobVODWebVideoTranscodingPayload, |
22 | RunnerJobVODWebVideoTranscodingPrivatePayload | 22 | RunnerJobVODWebVideoTranscodingPrivatePayload |
23 | } from '@shared/models' | 23 | } from '@shared/models' |
24 | import { throttle } from 'lodash' | ||
24 | 25 | ||
25 | type CreateRunnerJobArg = | 26 | type CreateRunnerJobArg = |
26 | { | 27 | { |
@@ -48,6 +49,8 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S | |||
48 | 49 | ||
49 | protected readonly lTags = loggerTagsFactory('runner') | 50 | protected readonly lTags = loggerTagsFactory('runner') |
50 | 51 | ||
52 | static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000) | ||
53 | |||
51 | // --------------------------------------------------------------------------- | 54 | // --------------------------------------------------------------------------- |
52 | 55 | ||
53 | abstract create (options: C): Promise<MRunnerJob> | 56 | abstract create (options: C): Promise<MRunnerJob> |
@@ -102,16 +105,19 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S | |||
102 | 105 | ||
103 | if (progress) runnerJob.progress = progress | 106 | if (progress) runnerJob.progress = progress |
104 | 107 | ||
108 | if (!runnerJob.changed()) { | ||
109 | try { | ||
110 | await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id }) | ||
111 | } catch (err) { | ||
112 | logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | ||
113 | } | ||
114 | |||
115 | return | ||
116 | } | ||
117 | |||
105 | await retryTransactionWrapper(() => { | 118 | await retryTransactionWrapper(() => { |
106 | return sequelizeTypescript.transaction(async transaction => { | 119 | return sequelizeTypescript.transaction(async transaction => { |
107 | if (runnerJob.changed()) { | 120 | return runnerJob.save({ transaction }) |
108 | return runnerJob.save({ transaction }) | ||
109 | } | ||
110 | |||
111 | // Don't update the job too often | ||
112 | if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) { | ||
113 | await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction }) | ||
114 | } | ||
115 | }) | 121 | }) |
116 | }) | 122 | }) |
117 | } | 123 | } |