import { IPCServer } from '../shared/ipc'
import { logger } from '../shared/logger'
import { JobWithToken, processJob } from './process'
+import { isJobSupported } from './shared'
type PeerTubeServer = PeerTubeServerCommand & {
runnerToken: string
private checkingAvailableJobs = false
+ private cleaningUp = false
+
private readonly sockets = new Map<PeerTubeServer, Socket>()
private constructor () {}
try {
await ipcServer.run(this)
} catch (err) {
- console.error('Cannot start local socket for IPC communication', err)
+ logger.error('Cannot start local socket for IPC communication', err)
process.exit(-1)
}
// Cleanup on exit
for (const code of [ 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'uncaughtException' ]) {
- process.on(code, async () => {
+ process.on(code, async (err, origin) => {
+ if (code === 'uncaughtException') {
+ logger.error({ err, origin }, 'uncaughtException')
+ }
+
await this.onExit()
})
}
private async checkAvailableJobs () {
if (this.checkingAvailableJobs) return
- logger.info('Checking available jobs')
-
this.checkingAvailableJobs = true
+ let hadAvailableJob = false
+
for (const server of this.servers) {
try {
+ logger.info('Checking available jobs on ' + server.url)
+
const job = await this.requestJob(server)
if (!job) continue
+ hadAvailableJob = true
+
await this.tryToExecuteJobAsync(server, job)
} catch (err) {
- if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
+ const code = (err.res?.body as PeerTubeProblemDocument)?.code
+
+ if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
+ logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
+ return
+ }
+
+ if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
await this.unregisterRunner({ url: server.url })
}
this.checkingAvailableJobs = false
+
+ if (hadAvailableJob && this.canProcessMoreJobs()) {
+ this.checkAvailableJobs()
+ .catch(err => logger.error({ err }, 'Cannot check more available jobs'))
+ }
}
private async requestJob (server: PeerTubeServer) {
const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
- if (availableJobs.length === 0) {
+ const filtered = availableJobs.filter(j => isJobSupported(j))
+
+ if (filtered.length === 0) {
logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
return undefined
}
- return availableJobs[0]
+ return filtered[0]
}
private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
- if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return
+ if (!this.canProcessMoreJobs()) return
const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
return ConfigManager.Instance.setRegisteredInstances(data)
}
+ private canProcessMoreJobs () {
+ return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
+ }
+
// ---------------------------------------------------------------------------
private async cleanupTMP () {
}
private async onExit () {
+ if (this.cleaningUp) return
+ this.cleaningUp = true
+
+ logger.info('Cleaning up after program exit')
+
try {
for (const { server, job } of this.processingJobs) {
await server.runnerJobs.abort({
await this.cleanupTMP()
} catch (err) {
- console.error(err)
+ logger.error(err)
process.exit(-1)
}