aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-01-21 16:57:21 +0100
committerChocobozzz <chocobozzz@cpy.re>2021-01-25 14:38:52 +0100
commit77d7e851dccf17dcc89e8fcc2db3f655d1e63f95 (patch)
treed09e045dfabe7ab1e170d1b0caa9decda8a7d39c
parent92c871b40554d5285232eb4392cebb63d127704a (diff)
downloadPeerTube-77d7e851dccf17dcc89e8fcc2db3f655d1e63f95.tar.gz
PeerTube-77d7e851dccf17dcc89e8fcc2db3f655d1e63f95.tar.zst
PeerTube-77d7e851dccf17dcc89e8fcc2db3f655d1e63f95.zip
Add priority to transcoding jobs
(1 = highest priority) 100 for new resolutions 10 for original file optimization Add a malus for transcoding jobs depending on how many uploads the user did in the last 7 days
-rw-r--r--client/src/app/+admin/system/jobs/jobs.component.html4
-rw-r--r--client/src/app/+admin/system/jobs/jobs.component.scss3
-rw-r--r--client/src/app/+admin/system/jobs/jobs.component.ts12
-rw-r--r--server/controllers/api/jobs.ts1
-rw-r--r--server/controllers/api/videos/index.ts7
-rw-r--r--server/helpers/database-utils.ts8
-rw-r--r--server/helpers/video.ts28
-rw-r--r--server/initializers/constants.ts7
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts5
-rw-r--r--server/lib/job-queue/handlers/video-import.ts4
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts91
-rw-r--r--server/lib/job-queue/job-queue.ts2
-rw-r--r--server/lib/video.ts44
-rw-r--r--server/models/video/video.ts34
-rw-r--r--shared/models/server/job.model.ts1
15 files changed, 172 insertions, 79 deletions
diff --git a/client/src/app/+admin/system/jobs/jobs.component.html b/client/src/app/+admin/system/jobs/jobs.component.html
index b6457a005..6b3fd9393 100644
--- a/client/src/app/+admin/system/jobs/jobs.component.html
+++ b/client/src/app/+admin/system/jobs/jobs.component.html
@@ -40,6 +40,7 @@
40 <th style="width: 40px"></th> 40 <th style="width: 40px"></th>
41 <th style="width: calc(100% - 390px)" class="job-id" i18n>ID</th> 41 <th style="width: calc(100% - 390px)" class="job-id" i18n>ID</th>
42 <th style="width: 200px" class="job-type" i18n>Type</th> 42 <th style="width: 200px" class="job-type" i18n>Type</th>
43 <th style="width: 200px" class="job-priority" i18n>Priority <small>(1 = highest priority)</small></th>
43 <th style="width: 200px" class="job-state" i18n *ngIf="jobState === 'all'">State</th> 44 <th style="width: 200px" class="job-state" i18n *ngIf="jobState === 'all'">State</th>
44 <th style="width: 100px" class="job-progress" i18n *ngIf="hasProgress()">Progress</th> 45 <th style="width: 100px" class="job-progress" i18n *ngIf="hasProgress()">Progress</th>
45 <th style="width: 150px" class="job-date" i18n pSortableColumn="createdAt">Created <p-sortIcon field="createdAt"></p-sortIcon></th> 46 <th style="width: 150px" class="job-date" i18n pSortableColumn="createdAt">Created <p-sortIcon field="createdAt"></p-sortIcon></th>
@@ -56,12 +57,13 @@
56 57
57 <td class="job-id c-hand" [pRowToggler]="job" [title]="job.id">{{ job.id }}</td> 58 <td class="job-id c-hand" [pRowToggler]="job" [title]="job.id">{{ job.id }}</td>
58 <td class="job-type c-hand" [pRowToggler]="job">{{ job.type }}</td> 59 <td class="job-type c-hand" [pRowToggler]="job">{{ job.type }}</td>
60 <td class="job-priority c-hand" [pRowToggler]="job">{{ job.priority }}</td>
59 61
60 <td class="job-state c-hand" [pRowToggler]="job" *ngIf="jobState === 'all'"> 62 <td class="job-state c-hand" [pRowToggler]="job" *ngIf="jobState === 'all'">
61 <span class="badge" [ngClass]="getJobStateClass(job.state)">{{ job.state }}</span> 63 <span class="badge" [ngClass]="getJobStateClass(job.state)">{{ job.state }}</span>
62 </td> 64 </td>
63 65
64 <td class="job-state" [pRowToggler]="job" *ngIf="hasProgress()"> 66 <td class="job-progress c-hand" [pRowToggler]="job" *ngIf="hasProgress()">
65 {{ getProgress(job) }} 67 {{ getProgress(job) }}
66 </td> 68 </td>
67 69
diff --git a/client/src/app/+admin/system/jobs/jobs.component.scss b/client/src/app/+admin/system/jobs/jobs.component.scss
index 9c6ae73e1..2add506e8 100644
--- a/client/src/app/+admin/system/jobs/jobs.component.scss
+++ b/client/src/app/+admin/system/jobs/jobs.component.scss
@@ -10,7 +10,8 @@
10} 10}
11 11
12.job-type, 12.job-type,
13.job-state { 13.job-state,
14.job-priority {
14 width: 150px !important; 15 width: 150px !important;
15} 16}
16 17
diff --git a/client/src/app/+admin/system/jobs/jobs.component.ts b/client/src/app/+admin/system/jobs/jobs.component.ts
index 5355d2c20..d08079f7e 100644
--- a/client/src/app/+admin/system/jobs/jobs.component.ts
+++ b/client/src/app/+admin/system/jobs/jobs.component.ts
@@ -21,19 +21,19 @@ export class JobsComponent extends RestTable implements OnInit {
21 21
22 jobType: JobTypeClient = 'all' 22 jobType: JobTypeClient = 'all'
23 jobTypes: JobTypeClient[] = [ 23 jobTypes: JobTypeClient[] = [
24 'all',
25 'activitypub-follow', 24 'activitypub-follow',
26 'activitypub-http-broadcast', 25 'activitypub-http-broadcast',
27 'activitypub-http-fetcher', 26 'activitypub-http-fetcher',
28 'activitypub-http-unicast', 27 'activitypub-http-unicast',
28 'activitypub-refresher',
29 'all',
29 'email', 30 'email',
30 'video-transcoding',
31 'video-file-import', 31 'video-file-import',
32 'video-import', 32 'video-import',
33 'videos-views', 33 'video-live-ending',
34 'activitypub-refresher',
35 'video-redundancy', 34 'video-redundancy',
36 'video-live-ending' 35 'video-transcoding',
36 'videos-views'
37 ] 37 ]
38 38
39 jobs: Job[] = [] 39 jobs: Job[] = []
@@ -75,7 +75,7 @@ export class JobsComponent extends RestTable implements OnInit {
75 getColspan () { 75 getColspan () {
76 if (this.jobState === 'all' && this.hasProgress()) return 6 76 if (this.jobState === 'all' && this.hasProgress()) return 6
77 77
78 if (this.jobState === 'all') return 5 78 if (this.jobState === 'all' || this.hasProgress()) return 5
79 79
80 return 4 80 return 4
81 } 81 }
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts
index 929140140..861cc22b9 100644
--- a/server/controllers/api/jobs.ts
+++ b/server/controllers/api/jobs.ts
@@ -69,6 +69,7 @@ async function formatJob (job: any, state?: JobState): Promise<Job> {
69 type: job.queue.name as JobType, 69 type: job.queue.name as JobType,
70 data: job.data, 70 data: job.data,
71 progress: await job.progress(), 71 progress: await job.progress(),
72 priority: job.opts.priority,
72 error, 73 error,
73 createdAt: new Date(job.timestamp), 74 createdAt: new Date(job.timestamp),
74 finishedOn: new Date(job.finishedOn), 75 finishedOn: new Date(job.finishedOn),
diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts
index e1c775180..c2c5eb640 100644
--- a/server/controllers/api/videos/index.ts
+++ b/server/controllers/api/videos/index.ts
@@ -2,16 +2,16 @@ import * as express from 'express'
2import { move } from 'fs-extra' 2import { move } from 'fs-extra'
3import { extname } from 'path' 3import { extname } from 'path'
4import toInt from 'validator/lib/toInt' 4import toInt from 'validator/lib/toInt'
5import { addOptimizeOrMergeAudioJob } from '@server/helpers/video'
6import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' 5import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
7import { changeVideoChannelShare } from '@server/lib/activitypub/share' 6import { changeVideoChannelShare } from '@server/lib/activitypub/share'
8import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' 7import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
9import { LiveManager } from '@server/lib/live-manager' 8import { LiveManager } from '@server/lib/live-manager'
10import { buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' 9import { addOptimizeOrMergeAudioJob, buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video'
11import { getVideoFilePath } from '@server/lib/video-paths' 10import { getVideoFilePath } from '@server/lib/video-paths'
12import { getServerActor } from '@server/models/application/application' 11import { getServerActor } from '@server/models/application/application'
13import { MVideoFullLight } from '@server/types/models' 12import { MVideoFullLight } from '@server/types/models'
14import { VideoCreate, VideoState, VideoUpdate } from '../../../../shared' 13import { VideoCreate, VideoState, VideoUpdate } from '../../../../shared'
14import { HttpStatusCode } from '../../../../shared/core-utils/miscs/http-error-codes'
15import { VideoFilter } from '../../../../shared/models/videos/video-query.type' 15import { VideoFilter } from '../../../../shared/models/videos/video-query.type'
16import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' 16import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger'
17import { resetSequelizeInstance } from '../../../helpers/database-utils' 17import { resetSequelizeInstance } from '../../../helpers/database-utils'
@@ -66,7 +66,6 @@ import { liveRouter } from './live'
66import { ownershipVideoRouter } from './ownership' 66import { ownershipVideoRouter } from './ownership'
67import { rateVideoRouter } from './rate' 67import { rateVideoRouter } from './rate'
68import { watchingRouter } from './watching' 68import { watchingRouter } from './watching'
69import { HttpStatusCode } from '../../../../shared/core-utils/miscs/http-error-codes'
70 69
71const auditLogger = auditLoggerFactory('videos') 70const auditLogger = auditLoggerFactory('videos')
72const videosRouter = express.Router() 71const videosRouter = express.Router()
@@ -267,7 +266,7 @@ async function addVideo (req: express.Request, res: express.Response) {
267 Notifier.Instance.notifyOnNewVideoIfNeeded(videoCreated) 266 Notifier.Instance.notifyOnNewVideoIfNeeded(videoCreated)
268 267
269 if (video.state === VideoState.TO_TRANSCODE) { 268 if (video.state === VideoState.TO_TRANSCODE) {
270 await addOptimizeOrMergeAudioJob(videoCreated, videoFile) 269 await addOptimizeOrMergeAudioJob(videoCreated, videoFile, res.locals.oauth.token.User)
271 } 270 }
272 271
273 Hooks.runAction('action:api.video.uploaded', { video: videoCreated }) 272 Hooks.runAction('action:api.video.uploaded', { video: videoCreated })
diff --git a/server/helpers/database-utils.ts b/server/helpers/database-utils.ts
index 87f10f913..2b916efc2 100644
--- a/server/helpers/database-utils.ts
+++ b/server/helpers/database-utils.ts
@@ -4,6 +4,14 @@ import { Model } from 'sequelize-typescript'
4import { logger } from './logger' 4import { logger } from './logger'
5import { Transaction } from 'sequelize' 5import { Transaction } from 'sequelize'
6 6
7function retryTransactionWrapper <T, A, B, C, D> (
8 functionToRetry: (arg1: A, arg2: B, arg3: C, arg4: D) => Promise<T> | Bluebird<T>,
9 arg1: A,
10 arg2: B,
11 arg3: C,
12 arg4: D,
13): Promise<T>
14
7function retryTransactionWrapper <T, A, B, C> ( 15function retryTransactionWrapper <T, A, B, C> (
8 functionToRetry: (arg1: A, arg2: B, arg3: C) => Promise<T> | Bluebird<T>, 16 functionToRetry: (arg1: A, arg2: B, arg3: C) => Promise<T> | Bluebird<T>,
9 arg1: A, 17 arg1: A,
diff --git a/server/helpers/video.ts b/server/helpers/video.ts
index bfd5a9627..7c510f474 100644
--- a/server/helpers/video.ts
+++ b/server/helpers/video.ts
@@ -1,20 +1,17 @@
1import { Response } from 'express' 1import { Response } from 'express'
2import { CONFIG } from '@server/initializers/config' 2import { CONFIG } from '@server/initializers/config'
3import { DEFAULT_AUDIO_RESOLUTION } from '@server/initializers/constants'
4import { JobQueue } from '@server/lib/job-queue'
5import { 3import {
6 isStreamingPlaylist, 4 isStreamingPlaylist,
7 MStreamingPlaylistVideo, 5 MStreamingPlaylistVideo,
8 MVideo, 6 MVideo,
9 MVideoAccountLightBlacklistAllFiles, 7 MVideoAccountLightBlacklistAllFiles,
10 MVideoFile,
11 MVideoFullLight, 8 MVideoFullLight,
12 MVideoIdThumbnail, 9 MVideoIdThumbnail,
13 MVideoImmutable, 10 MVideoImmutable,
14 MVideoThumbnail, 11 MVideoThumbnail,
15 MVideoWithRights 12 MVideoWithRights
16} from '@server/types/models' 13} from '@server/types/models'
17import { VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' 14import { VideoPrivacy, VideoState } from '@shared/models'
18import { VideoModel } from '../models/video/video' 15import { VideoModel } from '../models/video/video'
19 16
20type VideoFetchType = 'all' | 'only-video' | 'only-video-with-rights' | 'id' | 'none' | 'only-immutable-attributes' 17type VideoFetchType = 'all' | 'only-video' | 'only-video-with-rights' | 'id' | 'none' | 'only-immutable-attributes'
@@ -69,27 +66,6 @@ function getVideoWithAttributes (res: Response) {
69 return res.locals.videoAll || res.locals.onlyVideo || res.locals.onlyVideoWithRights 66 return res.locals.videoAll || res.locals.onlyVideo || res.locals.onlyVideoWithRights
70} 67}
71 68
72function addOptimizeOrMergeAudioJob (video: MVideo, videoFile: MVideoFile) {
73 let dataInput: VideoTranscodingPayload
74
75 if (videoFile.isAudio()) {
76 dataInput = {
77 type: 'merge-audio-to-webtorrent',
78 resolution: DEFAULT_AUDIO_RESOLUTION,
79 videoUUID: video.uuid,
80 isNewVideo: true
81 }
82 } else {
83 dataInput = {
84 type: 'optimize-to-webtorrent',
85 videoUUID: video.uuid,
86 isNewVideo: true
87 }
88 }
89
90 return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: dataInput })
91}
92
93function extractVideo (videoOrPlaylist: MVideo | MStreamingPlaylistVideo) { 69function extractVideo (videoOrPlaylist: MVideo | MStreamingPlaylistVideo) {
94 return isStreamingPlaylist(videoOrPlaylist) 70 return isStreamingPlaylist(videoOrPlaylist)
95 ? videoOrPlaylist.Video 71 ? videoOrPlaylist.Video
@@ -107,7 +83,6 @@ function isStateForFederation (state: VideoState) {
107 const castedState = parseInt(state + '', 10) 83 const castedState = parseInt(state + '', 10)
108 84
109 return castedState === VideoState.PUBLISHED || castedState === VideoState.WAITING_FOR_LIVE || castedState === VideoState.LIVE_ENDED 85 return castedState === VideoState.PUBLISHED || castedState === VideoState.WAITING_FOR_LIVE || castedState === VideoState.LIVE_ENDED
110
111} 86}
112 87
113function getPrivaciesForFederation () { 88function getPrivaciesForFederation () {
@@ -130,7 +105,6 @@ export {
130 fetchVideo, 105 fetchVideo,
131 getVideoWithAttributes, 106 getVideoWithAttributes,
132 fetchVideoByUrl, 107 fetchVideoByUrl,
133 addOptimizeOrMergeAudioJob,
134 extractVideo, 108 extractVideo,
135 getExtFromMimetype, 109 getExtFromMimetype,
136 isStateForFederation, 110 isStateForFederation,
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 182bdf9cc..453ca02ed 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -179,6 +179,12 @@ const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } =
179 cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour 179 cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
180 } 180 }
181} 181}
182const JOB_PRIORITY = {
183 TRANSCODING: {
184 OPTIMIZER: 10,
185 NEW_RESOLUTION: 100
186 }
187}
182 188
183const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job 189const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job
184const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) 190const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...)
@@ -851,6 +857,7 @@ export {
851 VIDEO_STATES, 857 VIDEO_STATES,
852 QUEUE_CONCURRENCY, 858 QUEUE_CONCURRENCY,
853 VIDEO_RATE_TYPES, 859 VIDEO_RATE_TYPES,
860 JOB_PRIORITY,
854 VIDEO_TRANSCODING_FPS, 861 VIDEO_TRANSCODING_FPS,
855 FFMPEG_NICE, 862 FFMPEG_NICE,
856 ABUSE_STATES, 863 ABUSE_STATES,
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 22e4d0cf1..582efea3a 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -3,6 +3,7 @@ import { copy, stat } from 'fs-extra'
3import { extname } from 'path' 3import { extname } from 'path'
4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' 4import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
5import { getVideoFilePath } from '@server/lib/video-paths' 5import { getVideoFilePath } from '@server/lib/video-paths'
6import { UserModel } from '@server/models/account/user'
6import { MVideoFile, MVideoWithFile } from '@server/types/models' 7import { MVideoFile, MVideoWithFile } from '@server/types/models'
7import { VideoFileImportPayload } from '@shared/models' 8import { VideoFileImportPayload } from '@shared/models'
8import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' 9import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils'
@@ -24,7 +25,9 @@ async function processVideoFileImport (job: Bull.Job) {
24 25
25 await updateVideoFile(video, payload.filePath) 26 await updateVideoFile(video, payload.filePath)
26 27
27 await onNewWebTorrentFileResolution(video) 28 const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId)
29 await onNewWebTorrentFileResolution(video, user)
30
28 return video 31 return video
29} 32}
30 33
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index c1d1866b0..f61fd773a 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -1,10 +1,10 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { move, remove, stat } from 'fs-extra' 2import { move, remove, stat } from 'fs-extra'
3import { extname } from 'path' 3import { extname } from 'path'
4import { addOptimizeOrMergeAudioJob } from '@server/helpers/video'
5import { isPostImportVideoAccepted } from '@server/lib/moderation' 4import { isPostImportVideoAccepted } from '@server/lib/moderation'
6import { Hooks } from '@server/lib/plugins/hooks' 5import { Hooks } from '@server/lib/plugins/hooks'
7import { isAbleToUploadVideo } from '@server/lib/user' 6import { isAbleToUploadVideo } from '@server/lib/user'
7import { addOptimizeOrMergeAudioJob } from '@server/lib/video'
8import { getVideoFilePath } from '@server/lib/video-paths' 8import { getVideoFilePath } from '@server/lib/video-paths'
9import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 9import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
10import { 10import {
@@ -224,7 +224,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
224 224
225 // Create transcoding jobs? 225 // Create transcoding jobs?
226 if (video.state === VideoState.TO_TRANSCODE) { 226 if (video.state === VideoState.TO_TRANSCODE) {
227 await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile) 227 await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User)
228 } 228 }
229 229
230 } catch (err) { 230 } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 0f6b3f753..ee241ad03 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,8 +1,9 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' 2import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
3import { publishAndFederateIfNeeded } from '@server/lib/video' 3import { JOB_PRIORITY } from '@server/initializers/constants'
4import { getJobTranscodingPriorityMalus, publishAndFederateIfNeeded } from '@server/lib/video'
4import { getVideoFilePath } from '@server/lib/video-paths' 5import { getVideoFilePath } from '@server/lib/video-paths'
5import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' 6import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models'
6import { 7import {
7 HLSTranscodingPayload, 8 HLSTranscodingPayload,
8 MergeAudioTranscodingPayload, 9 MergeAudioTranscodingPayload,
@@ -25,8 +26,11 @@ import {
25 transcodeNewWebTorrentResolution 26 transcodeNewWebTorrentResolution
26} from '../../video-transcoding' 27} from '../../video-transcoding'
27import { JobQueue } from '../job-queue' 28import { JobQueue } from '../job-queue'
29import { UserModel } from '@server/models/account/user'
28 30
29const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise<any> } = { 31type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<any>
32
33const handlers: { [ id: string ]: HandlerFunction } = {
30 // Deprecated, introduced in 3.1 34 // Deprecated, introduced in 3.1
31 'hls': handleHLSJob, 35 'hls': handleHLSJob,
32 'new-resolution-to-hls': handleHLSJob, 36 'new-resolution-to-hls': handleHLSJob,
@@ -55,13 +59,15 @@ async function processVideoTranscoding (job: Bull.Job) {
55 return undefined 59 return undefined
56 } 60 }
57 61
62 const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId)
63
58 const handler = handlers[payload.type] 64 const handler = handlers[payload.type]
59 65
60 if (!handler) { 66 if (!handler) {
61 throw new Error('Cannot find transcoding handler for ' + payload.type) 67 throw new Error('Cannot find transcoding handler for ' + payload.type)
62 } 68 }
63 69
64 await handler(job, payload, video) 70 await handler(job, payload, video, user)
65 71
66 return video 72 return video
67} 73}
@@ -90,22 +96,27 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide
90 await retryTransactionWrapper(onHlsPlaylistGeneration, video) 96 await retryTransactionWrapper(onHlsPlaylistGeneration, video)
91} 97}
92 98
93async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) { 99async function handleNewWebTorrentResolutionJob (
100 job: Bull.Job,
101 payload: NewResolutionTranscodingPayload,
102 video: MVideoFullLight,
103 user: MUserId
104) {
94 await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) 105 await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job)
95 106
96 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) 107 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload)
97} 108}
98 109
99async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) { 110async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
100 await mergeAudioVideofile(video, payload.resolution, job) 111 await mergeAudioVideofile(video, payload.resolution, job)
101 112
102 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) 113 await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload)
103} 114}
104 115
105async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) { 116async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
106 const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) 117 const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
107 118
108 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType) 119 await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user)
109} 120}
110 121
111// --------------------------------------------------------------------------- 122// ---------------------------------------------------------------------------
@@ -129,7 +140,8 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight) {
129async function onVideoFileOptimizer ( 140async function onVideoFileOptimizer (
130 videoArg: MVideoWithFile, 141 videoArg: MVideoWithFile,
131 payload: OptimizeTranscodingPayload, 142 payload: OptimizeTranscodingPayload,
132 transcodeType: TranscodeOptionsType 143 transcodeType: TranscodeOptionsType,
144 user: MUserId
133) { 145) {
134 if (videoArg === undefined) return undefined 146 if (videoArg === undefined) return undefined
135 147
@@ -142,13 +154,6 @@ async function onVideoFileOptimizer (
142 // Video does not exist anymore 154 // Video does not exist anymore
143 if (!videoDatabase) return undefined 155 if (!videoDatabase) return undefined
144 156
145 // Create transcoding jobs if there are enabled resolutions
146 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
147 logger.info(
148 'Resolutions computed for video %s and origin file resolution of %d.', videoDatabase.uuid, videoFileResolution,
149 { resolutions: resolutionsEnabled }
150 )
151
152 let videoPublished = false 157 let videoPublished = false
153 158
154 // Generate HLS version of the original file 159 // Generate HLS version of the original file
@@ -158,9 +163,9 @@ async function onVideoFileOptimizer (
158 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues 163 // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
159 copyCodecs: transcodeType !== 'quick-transcode' 164 copyCodecs: transcodeType !== 'quick-transcode'
160 }) 165 })
161 createHlsJobIfEnabled(originalFileHLSPayload) 166 await createHlsJobIfEnabled(user, originalFileHLSPayload)
162 167
163 const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode) 168 const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode)
164 169
165 if (!hasNewResolutions) { 170 if (!hasNewResolutions) {
166 // No transcoding to do, it's now published 171 // No transcoding to do, it's now published
@@ -178,11 +183,12 @@ async function onVideoFileOptimizer (
178 183
179async function onNewWebTorrentFileResolution ( 184async function onNewWebTorrentFileResolution (
180 video: MVideoUUID, 185 video: MVideoUUID,
186 user: MUserId,
181 payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload 187 payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
182) { 188) {
183 await publishAndFederateIfNeeded(video) 189 await publishAndFederateIfNeeded(video)
184 190
185 createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true })) 191 await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true }))
186} 192}
187 193
188// --------------------------------------------------------------------------- 194// ---------------------------------------------------------------------------
@@ -194,22 +200,35 @@ export {
194 200
195// --------------------------------------------------------------------------- 201// ---------------------------------------------------------------------------
196 202
197function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { 203async function createHlsJobIfEnabled (user: MUserId, payload: {
198 // Generate HLS playlist? 204 videoUUID: string
199 if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { 205 resolution: number
200 const hlsTranscodingPayload: HLSTranscodingPayload = { 206 isPortraitMode?: boolean
201 type: 'new-resolution-to-hls', 207 copyCodecs: boolean
202 videoUUID: payload.videoUUID, 208}) {
203 resolution: payload.resolution, 209 if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return
204 isPortraitMode: payload.isPortraitMode, 210
205 copyCodecs: payload.copyCodecs 211 const jobOptions = {
206 } 212 priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user)
213 }
207 214
208 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) 215 const hlsTranscodingPayload: HLSTranscodingPayload = {
216 type: 'new-resolution-to-hls',
217 videoUUID: payload.videoUUID,
218 resolution: payload.resolution,
219 isPortraitMode: payload.isPortraitMode,
220 copyCodecs: payload.copyCodecs
209 } 221 }
222
223 return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions)
210} 224}
211 225
212function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) { 226async function createLowerResolutionsJobs (
227 video: MVideoFullLight,
228 user: MUserId,
229 videoFileResolution: number,
230 isPortraitMode: boolean
231) {
213 // Create transcoding jobs if there are enabled resolutions 232 // Create transcoding jobs if there are enabled resolutions
214 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') 233 const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
215 logger.info( 234 logger.info(
@@ -244,7 +263,11 @@ function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution
244 } 263 }
245 } 264 }
246 265
247 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) 266 const jobOptions = {
267 priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user)
268 }
269
270 JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions)
248 } 271 }
249 272
250 logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) 273 logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled })
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 5d0b797b0..38b1d6f1f 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -47,6 +47,7 @@ type CreateJobArgument =
47 47
48type CreateJobOptions = { 48type CreateJobOptions = {
49 delay?: number 49 delay?: number
50 priority?: number
50} 51}
51 52
52const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { 53const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
@@ -148,6 +149,7 @@ class JobQueue {
148 backoff: { delay: 60 * 1000, type: 'exponential' }, 149 backoff: { delay: 60 * 1000, type: 'exponential' },
149 attempts: JOB_ATTEMPTS[obj.type], 150 attempts: JOB_ATTEMPTS[obj.type],
150 timeout: JOB_TTL[obj.type], 151 timeout: JOB_TTL[obj.type],
152 priority: options.priority,
151 delay: options.delay 153 delay: options.delay
152 } 154 }
153 155
diff --git a/server/lib/video.ts b/server/lib/video.ts
index d03ab0452..6b75fadb0 100644
--- a/server/lib/video.ts
+++ b/server/lib/video.ts
@@ -1,11 +1,13 @@
1import { Transaction } from 'sequelize/types' 1import { Transaction } from 'sequelize/types'
2import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY } from '@server/initializers/constants'
2import { sequelizeTypescript } from '@server/initializers/database' 3import { sequelizeTypescript } from '@server/initializers/database'
3import { TagModel } from '@server/models/video/tag' 4import { TagModel } from '@server/models/video/tag'
4import { VideoModel } from '@server/models/video/video' 5import { VideoModel } from '@server/models/video/video'
5import { FilteredModelAttributes } from '@server/types' 6import { FilteredModelAttributes } from '@server/types'
6import { MTag, MThumbnail, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' 7import { MTag, MThumbnail, MUserId, MVideo, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models'
7import { ThumbnailType, VideoCreate, VideoPrivacy } from '@shared/models' 8import { ThumbnailType, VideoCreate, VideoPrivacy, VideoTranscodingPayload } from '@shared/models'
8import { federateVideoIfNeeded } from './activitypub/videos' 9import { federateVideoIfNeeded } from './activitypub/videos'
10import { JobQueue } from './job-queue/job-queue'
9import { Notifier } from './notifier' 11import { Notifier } from './notifier'
10import { createVideoMiniatureFromExisting } from './thumbnail' 12import { createVideoMiniatureFromExisting } from './thumbnail'
11 13
@@ -104,11 +106,47 @@ async function publishAndFederateIfNeeded (video: MVideoUUID, wasLive = false) {
104 } 106 }
105} 107}
106 108
109async function addOptimizeOrMergeAudioJob (video: MVideo, videoFile: MVideoFile, user: MUserId) {
110 let dataInput: VideoTranscodingPayload
111
112 if (videoFile.isAudio()) {
113 dataInput = {
114 type: 'merge-audio-to-webtorrent',
115 resolution: DEFAULT_AUDIO_RESOLUTION,
116 videoUUID: video.uuid,
117 isNewVideo: true
118 }
119 } else {
120 dataInput = {
121 type: 'optimize-to-webtorrent',
122 videoUUID: video.uuid,
123 isNewVideo: true
124 }
125 }
126
127 const jobOptions = {
128 priority: JOB_PRIORITY.TRANSCODING.OPTIMIZER + await getJobTranscodingPriorityMalus(user)
129 }
130
131 return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: dataInput }, jobOptions)
132}
133
134async function getJobTranscodingPriorityMalus (user: MUserId) {
135 const now = new Date()
136 const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7)
137
138 const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek)
139
140 return videoUploadedByUser
141}
142
107// --------------------------------------------------------------------------- 143// ---------------------------------------------------------------------------
108 144
109export { 145export {
110 buildLocalVideoFromReq, 146 buildLocalVideoFromReq,
111 publishAndFederateIfNeeded, 147 publishAndFederateIfNeeded,
112 buildVideoThumbnailsFromReq, 148 buildVideoThumbnailsFromReq,
113 setVideoTags 149 setVideoTags,
150 addOptimizeOrMergeAudioJob,
151 getJobTranscodingPriorityMalus
114} 152}
diff --git a/server/models/video/video.ts b/server/models/video/video.ts
index 2bfa704ec..720bfd829 100644
--- a/server/models/video/video.ts
+++ b/server/models/video/video.ts
@@ -129,6 +129,7 @@ import { VideoShareModel } from './video-share'
129import { VideoStreamingPlaylistModel } from './video-streaming-playlist' 129import { VideoStreamingPlaylistModel } from './video-streaming-playlist'
130import { VideoTagModel } from './video-tag' 130import { VideoTagModel } from './video-tag'
131import { VideoViewModel } from './video-view' 131import { VideoViewModel } from './video-view'
132import { UserModel } from '../account/user'
132 133
133export enum ScopeNames { 134export enum ScopeNames {
134 AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS', 135 AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS',
@@ -1198,6 +1199,39 @@ export class VideoModel extends Model {
1198 return VideoModel.count(options) 1199 return VideoModel.count(options)
1199 } 1200 }
1200 1201
1202 static countVideosUploadedByUserSince (userId: number, since: Date) {
1203 const options = {
1204 include: [
1205 {
1206 model: VideoChannelModel.unscoped(),
1207 required: true,
1208 include: [
1209 {
1210 model: AccountModel.unscoped(),
1211 required: true,
1212 include: [
1213 {
1214 model: UserModel.unscoped(),
1215 required: true,
1216 where: {
1217 id: userId
1218 }
1219 }
1220 ]
1221 }
1222 ]
1223 }
1224 ],
1225 where: {
1226 createdAt: {
1227 [Op.gte]: since
1228 }
1229 }
1230 }
1231
1232 return VideoModel.unscoped().count(options)
1233 }
1234
1201 static countLivesOfAccount (accountId: number) { 1235 static countLivesOfAccount (accountId: number) {
1202 const options = { 1236 const options = {
1203 where: { 1237 where: {
diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts
index d16ac1032..ddd678b91 100644
--- a/shared/models/server/job.model.ts
+++ b/shared/models/server/job.model.ts
@@ -23,6 +23,7 @@ export interface Job {
23 state: JobState 23 state: JobState
24 type: JobType 24 type: JobType
25 data: any 25 data: any
26 priority: number
26 progress: number 27 progress: number
27 error: any 28 error: any
28 createdAt: Date | string 29 createdAt: Date | string