diff options
Diffstat (limited to 'packages/peertube-runner/server')
-rw-r--r-- | packages/peertube-runner/server/process/shared/process-live.ts | 10 | ||||
-rw-r--r-- | packages/peertube-runner/server/server.ts | 17 |
2 files changed, 19 insertions, 8 deletions
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts index 5a3b596a2..b17b51c7c 100644 --- a/packages/peertube-runner/server/process/shared/process-live.ts +++ b/packages/peertube-runner/server/process/shared/process-live.ts | |||
@@ -204,8 +204,8 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
204 | 204 | ||
205 | // --------------------------------------------------------------------------- | 205 | // --------------------------------------------------------------------------- |
206 | 206 | ||
207 | private sendDeletedChunkUpdate (deletedChunk: string) { | 207 | private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> { |
208 | if (this.ended) return | 208 | if (this.ended) return Promise.resolve() |
209 | 209 | ||
210 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) | 210 | logger.debug(`Sending removed live chunk ${deletedChunk} update`) |
211 | 211 | ||
@@ -230,8 +230,8 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
230 | return this.updateWithRetry(payload) | 230 | return this.updateWithRetry(payload) |
231 | } | 231 | } |
232 | 232 | ||
233 | private sendAddedChunkUpdate (addedChunk: string) { | 233 | private sendAddedChunkUpdate (addedChunk: string): Promise<any> { |
234 | if (this.ended) return | 234 | if (this.ended) return Promise.resolve() |
235 | 235 | ||
236 | logger.debug(`Sending added live chunk ${addedChunk} update`) | 236 | logger.debug(`Sending added live chunk ${addedChunk} update`) |
237 | 237 | ||
@@ -257,7 +257,7 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
257 | return this.updateWithRetry(payload) | 257 | return this.updateWithRetry(payload) |
258 | } | 258 | } |
259 | 259 | ||
260 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) { | 260 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> { |
261 | if (this.ended || this.errored) return | 261 | if (this.ended || this.errored) return |
262 | 262 | ||
263 | try { | 263 | try { |
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index 724f359bd..e851dfc7c 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts | |||
@@ -23,6 +23,8 @@ export class RunnerServer { | |||
23 | 23 | ||
24 | private checkingAvailableJobs = false | 24 | private checkingAvailableJobs = false |
25 | 25 | ||
26 | private cleaningUp = false | ||
27 | |||
26 | private readonly sockets = new Map<PeerTubeServer, Socket>() | 28 | private readonly sockets = new Map<PeerTubeServer, Socket>() |
27 | 29 | ||
28 | private constructor () {} | 30 | private constructor () {} |
@@ -45,13 +47,17 @@ export class RunnerServer { | |||
45 | try { | 47 | try { |
46 | await ipcServer.run(this) | 48 | await ipcServer.run(this) |
47 | } catch (err) { | 49 | } catch (err) { |
48 | console.error('Cannot start local socket for IPC communication', err) | 50 | logger.error('Cannot start local socket for IPC communication', err) |
49 | process.exit(-1) | 51 | process.exit(-1) |
50 | } | 52 | } |
51 | 53 | ||
52 | // Cleanup on exit | 54 | // Cleanup on exit |
53 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { | 55 | for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { |
54 | process.on(code, async () => { | 56 | process.on(code, async (err, origin) => { |
57 | if (code === 'uncaughtException') { | ||
58 | logger.error({ err, origin }, 'uncaughtException') | ||
59 | } | ||
60 | |||
55 | await this.onExit() | 61 | await this.onExit() |
56 | }) | 62 | }) |
57 | } | 63 | } |
@@ -244,6 +250,11 @@ export class RunnerServer { | |||
244 | } | 250 | } |
245 | 251 | ||
246 | private async onExit () { | 252 | private async onExit () { |
253 | if (this.cleaningUp) return | ||
254 | this.cleaningUp = true | ||
255 | |||
256 | logger.info('Cleaning up after program exit') | ||
257 | |||
247 | try { | 258 | try { |
248 | for (const { server, job } of this.processingJobs) { | 259 | for (const { server, job } of this.processingJobs) { |
249 | await server.runnerJobs.abort({ | 260 | await server.runnerJobs.abort({ |
@@ -256,7 +267,7 @@ export class RunnerServer { | |||
256 | 267 | ||
257 | await this.cleanupTMP() | 268 | await this.cleanupTMP() |
258 | } catch (err) { | 269 | } catch (err) { |
259 | console.error(err) | 270 | logger.error(err) |
260 | process.exit(-1) | 271 | process.exit(-1) |
261 | } | 272 | } |
262 | 273 | ||