private async checkAvailableJobs () {
if (this.checkingAvailableJobs) return
- logger.info('Checking available jobs')
-
this.checkingAvailableJobs = true
+ let hadAvailableJob = false
+
for (const server of this.servers) {
try {
+ logger.info('Checking available jobs on ' + server.url)
+
const job = await this.requestJob(server)
if (!job) continue
+ hadAvailableJob = true
+
await this.tryToExecuteJobAsync(server, job)
} catch (err) {
- if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
+ const code = (err.res?.body as PeerTubeProblemDocument)?.code
+
+ if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
+ logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
+ return
+ }
+
+ if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
await this.unregisterRunner({ url: server.url })
}
this.checkingAvailableJobs = false
+
+ if (hadAvailableJob && this.canProcessMoreJobs()) {
+ this.checkAvailableJobs()
+ .catch(err => logger.error({ err }, 'Cannot check more available jobs'))
+ }
}
private async requestJob (server: PeerTubeServer) {
}
private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
- if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return
+ if (!this.canProcessMoreJobs()) return
const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
return ConfigManager.Instance.setRegisteredInstances(data)
}
+ private canProcessMoreJobs () {
+ return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
+ }
+
// ---------------------------------------------------------------------------
private async cleanupTMP () {
RunnerJobType,
RunnerJobUpdateBody,
RunnerJobUpdatePayload,
+ ServerErrorCode,
UserRight,
VideoStudioTranscodingSuccess,
VODAudioMergeTranscodingSuccess,
if (runnerJob.state !== RunnerJobState.PENDING) {
res.fail({
+ type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE,
message: 'This job is not in pending state anymore',
status: HttpStatusCode.CONFLICT_409
})
it('Should enable open telemetry metrics', async function () {
this.timeout(120000)
- server = await createSingleServer(1, {
+ await server.run({
open_telemetry: {
metrics: {
enabled: true
it('Should disable http request duration metrics', async function () {
await server.kill()
- server = await createSingleServer(1, {
+ await server.run({
open_telemetry: {
metrics: {
enabled: true,
})
it('Should enable open telemetry metrics', async function () {
- server = await createSingleServer(1, {
+ await server.run({
open_telemetry: {
tracing: {
enabled: true,