]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Add timeout and TTL to request jobs
authorChocobozzz <me@florianbigard.com>
Wed, 9 May 2018 07:08:22 +0000 (09:08 +0200)
committerChocobozzz <me@florianbigard.com>
Wed, 9 May 2018 07:08:22 +0000 (09:08 +0200)
server/initializers/constants.ts
server/lib/job-queue/handlers/activitypub-http-broadcast.ts
server/lib/job-queue/handlers/activitypub-http-fetcher.ts
server/lib/job-queue/handlers/activitypub-http-unicast.ts
server/lib/job-queue/job-queue.ts

index 365b8617df6ec2607634e2b61da50406ece5c1a2..6556aa168b52e330c81c2ce34a339245d2f4be79 100644 (file)
@@ -78,9 +78,10 @@ const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
   'video-file': 1,
   'email': 5
 }
-const BROADCAST_CONCURRENCY = 5 // How many requests in parallel we do in activitypub-http-broadcast job
-// 2 days
-const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2
+const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job
+const JOB_REQUEST_TIMEOUT = 3000 // 3 seconds
+const JOB_REQUEST_TTL = 60000 * 10 // 10 minutes
+const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 2 days
 
 // 1 hour
 let SCHEDULER_INTERVAL = 60000 * 60
@@ -466,6 +467,8 @@ export {
   VIDEO_RATE_TYPES,
   VIDEO_MIMETYPE_EXT,
   VIDEO_TRANSCODING_FPS,
+  JOB_REQUEST_TIMEOUT,
+  JOB_REQUEST_TTL,
   USER_PASSWORD_RESET_LIFETIME,
   IMAGE_MIMETYPE_EXT,
   SCHEDULER_INTERVAL,
index 38b8393f4527ba290db69594588df50ee688c9c9..d8b8ec22206d439e7aa85b39221db97ed5cfa33e 100644 (file)
@@ -4,7 +4,7 @@ import { logger } from '../../../helpers/logger'
 import { doRequest } from '../../../helpers/requests'
 import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
 import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
-import { BROADCAST_CONCURRENCY } from '../../../initializers'
+import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers'
 
 export type ActivitypubHttpBroadcastPayload = {
   uris: string[]
@@ -24,7 +24,8 @@ async function processActivityPubHttpBroadcast (job: kue.Job) {
     method: 'POST',
     uri: '',
     json: body,
-    httpSignature: httpSignatureOptions
+    httpSignature: httpSignatureOptions,
+    timeout: JOB_REQUEST_TIMEOUT
   }
 
   const badUrls: string[] = []
index 062211c85b276ad36ee322fc41dc06d6690b977e..4683beb2f58f40e88425f5b06c4e30a37abc55fb 100644 (file)
@@ -1,7 +1,7 @@
 import * as kue from 'kue'
 import { logger } from '../../../helpers/logger'
 import { doRequest } from '../../../helpers/requests'
-import { ACTIVITY_PUB } from '../../../initializers'
+import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../../initializers'
 import { processActivities } from '../../activitypub/process'
 import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
 
@@ -18,7 +18,8 @@ async function processActivityPubHttpFetcher (job: kue.Job) {
     method: 'GET',
     uri: '',
     json: true,
-    activityPub: true
+    activityPub: true,
+    timeout: JOB_REQUEST_TIMEOUT
   }
 
   for (const uri of payload.uris) {
index e1e1824e50dc25578a3cad95297d6c84a9a9f88f..173f3bb522cdfaa9a5d78fd0f88082c76e6368c1 100644 (file)
@@ -3,6 +3,7 @@ import { logger } from '../../../helpers/logger'
 import { doRequest } from '../../../helpers/requests'
 import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
 import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
+import { JOB_REQUEST_TIMEOUT } from '../../../initializers'
 
 export type ActivitypubHttpUnicastPayload = {
   uri: string
@@ -23,7 +24,8 @@ async function processActivityPubHttpUnicast (job: kue.Job) {
     method: 'POST',
     uri,
     json: body,
-    httpSignature: httpSignatureOptions
+    httpSignature: httpSignatureOptions,
+    timeout: JOB_REQUEST_TIMEOUT
   }
 
   try {
index bf40a9206877efa9538b58f814d6e63ae42c855e..acc69ef2459c24165628001251a45013e8be7fb0 100644 (file)
@@ -1,7 +1,7 @@
 import * as kue from 'kue'
 import { JobState, JobType } from '../../../shared/models'
 import { logger } from '../../helpers/logger'
-import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
+import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
 import { Redis } from '../redis'
 import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
 import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
@@ -27,6 +27,13 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
   'email': processEmail
 }
 
+const jobsWithTLL: JobType[] = [
+  'activitypub-http-broadcast',
+  'activitypub-http-unicast',
+  'activitypub-http-fetcher',
+  'activitypub-follow'
+]
+
 class JobQueue {
 
   private static instance: JobQueue
@@ -77,16 +84,21 @@ class JobQueue {
 
   createJob (obj: CreateJobArgument, priority = 'normal') {
     return new Promise((res, rej) => {
-      this.jobQueue
+      let job = this.jobQueue
         .create(obj.type, obj.payload)
         .priority(priority)
         .attempts(JOB_ATTEMPTS[obj.type])
         .backoff({ delay: 60 * 1000, type: 'exponential' })
-        .save(err => {
-          if (err) return rej(err)
 
-          return res()
-        })
+      if (jobsWithTLL.indexOf(obj.type) !== -1) {
+        job = job.ttl(JOB_REQUEST_TTL)
+      }
+
+      return job.save(err => {
+        if (err) return rej(err)
+
+        return res()
+      })
     })
   }