aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/peertube-runner/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-02 13:51:06 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit3a0c2a77b1a6626699514ddaf8135f4397175443 (patch)
tree60bec7ef8e9cdc008b0a4a56aa403617d036bfab /packages/peertube-runner/server
parent9a3db678f56eda37d222cf2d2232ae0ef5d533d2 (diff)
downloadPeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.tar.gz
PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.tar.zst
PeerTube-3a0c2a77b1a6626699514ddaf8135f4397175443.zip
Enable external plugins to test the PR
Diffstat (limited to 'packages/peertube-runner/server')
-rw-r--r--packages/peertube-runner/server/process/shared/process-live.ts10
-rw-r--r--packages/peertube-runner/server/server.ts17
2 files changed, 19 insertions, 8 deletions
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts
index 5a3b596a2..b17b51c7c 100644
--- a/packages/peertube-runner/server/process/shared/process-live.ts
+++ b/packages/peertube-runner/server/process/shared/process-live.ts
@@ -204,8 +204,8 @@ export class ProcessLiveRTMPHLSTranscoding {
204 204
205 // --------------------------------------------------------------------------- 205 // ---------------------------------------------------------------------------
206 206
207 private sendDeletedChunkUpdate (deletedChunk: string) { 207 private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
208 if (this.ended) return 208 if (this.ended) return Promise.resolve()
209 209
210 logger.debug(`Sending removed live chunk ${deletedChunk} update`) 210 logger.debug(`Sending removed live chunk ${deletedChunk} update`)
211 211
@@ -230,8 +230,8 @@ export class ProcessLiveRTMPHLSTranscoding {
230 return this.updateWithRetry(payload) 230 return this.updateWithRetry(payload)
231 } 231 }
232 232
233 private sendAddedChunkUpdate (addedChunk: string) { 233 private sendAddedChunkUpdate (addedChunk: string): Promise<any> {
234 if (this.ended) return 234 if (this.ended) return Promise.resolve()
235 235
236 logger.debug(`Sending added live chunk ${addedChunk} update`) 236 logger.debug(`Sending added live chunk ${addedChunk} update`)
237 237
@@ -257,7 +257,7 @@ export class ProcessLiveRTMPHLSTranscoding {
257 return this.updateWithRetry(payload) 257 return this.updateWithRetry(payload)
258 } 258 }
259 259
260 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1) { 260 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
261 if (this.ended || this.errored) return 261 if (this.ended || this.errored) return
262 262
263 try { 263 try {
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts
index 724f359bd..e851dfc7c 100644
--- a/packages/peertube-runner/server/server.ts
+++ b/packages/peertube-runner/server/server.ts
@@ -23,6 +23,8 @@ export class RunnerServer {
23 23
24 private checkingAvailableJobs = false 24 private checkingAvailableJobs = false
25 25
26 private cleaningUp = false
27
26 private readonly sockets = new Map<PeerTubeServer, Socket>() 28 private readonly sockets = new Map<PeerTubeServer, Socket>()
27 29
28 private constructor () {} 30 private constructor () {}
@@ -45,13 +47,17 @@ export class RunnerServer {
45 try { 47 try {
46 await ipcServer.run(this) 48 await ipcServer.run(this)
47 } catch (err) { 49 } catch (err) {
48 console.error('Cannot start local socket for IPC communication', err) 50 logger.error('Cannot start local socket for IPC communication', err)
49 process.exit(-1) 51 process.exit(-1)
50 } 52 }
51 53
52 // Cleanup on exit 54 // Cleanup on exit
53 for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) { 55 for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
54 process.on(code, async () => { 56 process.on(code, async (err, origin) => {
57 if (code === 'uncaughtException') {
58 logger.error({ err, origin }, 'uncaughtException')
59 }
60
55 await this.onExit() 61 await this.onExit()
56 }) 62 })
57 } 63 }
@@ -244,6 +250,11 @@ export class RunnerServer {
244 } 250 }
245 251
246 private async onExit () { 252 private async onExit () {
253 if (this.cleaningUp) return
254 this.cleaningUp = true
255
256 logger.info('Cleaning up after program exit')
257
247 try { 258 try {
248 for (const { server, job } of this.processingJobs) { 259 for (const { server, job } of this.processingJobs) {
249 await server.runnerJobs.abort({ 260 await server.runnerJobs.abort({
@@ -256,7 +267,7 @@ export class RunnerServer {
256 267
257 await this.cleanupTMP() 268 await this.cleanupTMP()
258 } catch (err) { 269 } catch (err) {
259 console.error(err) 270 logger.error(err)
260 process.exit(-1) 271 process.exit(-1)
261 } 272 }
262 273