aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts41
-rw-r--r--server/lib/job-queue/handlers/video-file.ts4
-rw-r--r--server/lib/job-queue/handlers/video-import.ts9
-rw-r--r--server/lib/job-queue/handlers/video-views.ts15
-rw-r--r--server/lib/job-queue/job-queue.ts8
5 files changed, 62 insertions, 15 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
new file mode 100644
index 000000000..671b0f487
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -0,0 +1,41 @@
1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger'
3import { fetchVideoByUrl } from '../../../helpers/video'
4import { refreshVideoIfNeeded } from '../../activitypub'
5
6export type RefreshPayload = {
7 videoUrl: string
8 type: 'video'
9}
10
11async function refreshAPObject (job: Bull.Job) {
12 const payload = job.data as RefreshPayload
13
14 logger.info('Processing AP refresher in job %d for video %s.', job.id, payload.videoUrl)
15
16 if (payload.type === 'video') return refreshAPVideo(payload.videoUrl)
17}
18
19// ---------------------------------------------------------------------------
20
21export {
22 refreshAPObject
23}
24
25// ---------------------------------------------------------------------------
26
27async function refreshAPVideo (videoUrl: string) {
28 const fetchType = 'all' as 'all'
29 const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true }
30
31 const videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType)
32 if (videoFromDatabase) {
33 const refreshOptions = {
34 video: videoFromDatabase,
35 fetchedType: fetchType,
36 syncParam
37 }
38
39 await refreshVideoIfNeeded(refreshOptions)
40 }
41}
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
index adc0a2a15..ddbf6d1c2 100644
--- a/server/lib/job-queue/handlers/video-file.ts
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -1,5 +1,5 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { VideoResolution, VideoState } from '../../../../shared' 2import { VideoResolution, VideoState, Job } from '../../../../shared'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { VideoModel } from '../../../models/video/video' 4import { VideoModel } from '../../../models/video/video'
5import { JobQueue } from '../job-queue' 5import { JobQueue } from '../job-queue'
@@ -111,7 +111,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole
111 ) 111 )
112 112
113 if (resolutionsEnabled.length !== 0) { 113 if (resolutionsEnabled.length !== 0) {
114 const tasks: Bluebird<any>[] = [] 114 const tasks: Bluebird<Bull.Job<any>>[] = []
115 115
116 for (const resolution of resolutionsEnabled) { 116 for (const resolution of resolutionsEnabled) {
117 const dataInput = { 117 const dataInput = {
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 4de901c0c..51a0b5faf 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -7,7 +7,7 @@ import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } fro
7import { extname, join } from 'path' 7import { extname, join } from 'path'
8import { VideoFileModel } from '../../../models/video/video-file' 8import { VideoFileModel } from '../../../models/video/video-file'
9import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' 9import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers'
10import { doRequestAndSaveToFile, downloadImage } from '../../../helpers/requests' 10import { downloadImage } from '../../../helpers/requests'
11import { VideoState } from '../../../../shared' 11import { VideoState } from '../../../../shared'
12import { JobQueue } from '../index' 12import { JobQueue } from '../index'
13import { federateVideoIfNeeded } from '../../activitypub' 13import { federateVideoIfNeeded } from '../../activitypub'
@@ -109,6 +109,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
109 let tempVideoPath: string 109 let tempVideoPath: string
110 let videoDestFile: string 110 let videoDestFile: string
111 let videoFile: VideoFileModel 111 let videoFile: VideoFileModel
112
112 try { 113 try {
113 // Download video from youtubeDL 114 // Download video from youtubeDL
114 tempVideoPath = await downloader() 115 tempVideoPath = await downloader()
@@ -144,8 +145,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
144 // Process thumbnail 145 // Process thumbnail
145 if (options.downloadThumbnail) { 146 if (options.downloadThumbnail) {
146 if (options.thumbnailUrl) { 147 if (options.thumbnailUrl) {
147 const destThumbnailPath = join(CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName()) 148 await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName(), THUMBNAILS_SIZE)
148 await downloadImage(options.thumbnailUrl, destThumbnailPath, THUMBNAILS_SIZE)
149 } else { 149 } else {
150 await videoImport.Video.createThumbnail(videoFile) 150 await videoImport.Video.createThumbnail(videoFile)
151 } 151 }
@@ -156,8 +156,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
156 // Process preview 156 // Process preview
157 if (options.downloadPreview) { 157 if (options.downloadPreview) {
158 if (options.thumbnailUrl) { 158 if (options.thumbnailUrl) {
159 const destPreviewPath = join(CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName()) 159 await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName(), PREVIEWS_SIZE)
160 await downloadImage(options.thumbnailUrl, destPreviewPath, PREVIEWS_SIZE)
161 } else { 160 } else {
162 await videoImport.Video.createPreview(videoFile) 161 await videoImport.Video.createPreview(videoFile)
163 } 162 }
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts
index f44c3c727..fa1fd13b3 100644
--- a/server/lib/job-queue/handlers/video-views.ts
+++ b/server/lib/job-queue/handlers/video-views.ts
@@ -23,13 +23,9 @@ async function processVideosViews () {
23 for (const videoId of videoIds) { 23 for (const videoId of videoIds) {
24 try { 24 try {
25 const views = await Redis.Instance.getVideoViews(videoId, hour) 25 const views = await Redis.Instance.getVideoViews(videoId, hour)
26 if (isNaN(views)) { 26 if (views) {
27 logger.error('Cannot process videos views of video %d in hour %d: views number is NaN.', videoId, hour)
28 } else {
29 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) 27 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour)
30 28
31 await VideoModel.incrementViews(videoId, views)
32
33 try { 29 try {
34 await VideoViewModel.create({ 30 await VideoViewModel.create({
35 startDate, 31 startDate,
@@ -39,7 +35,14 @@ async function processVideosViews () {
39 }) 35 })
40 36
41 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 37 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
42 if (video.isOwned()) await federateVideoIfNeeded(video, false) 38 if (video.isOwned()) {
39 // If this is a remote video, the origin instance will send us an update
40 await VideoModel.incrementViews(videoId, views)
41
42 // Send video update
43 video.views += views
44 await federateVideoIfNeeded(video, false)
45 }
43 } catch (err) { 46 } catch (err) {
44 logger.debug('Cannot create video views for video %d in hour %d. Maybe the video does not exist anymore?', videoId, hour) 47 logger.debug('Cannot create video views for video %d in hour %d. Maybe the video does not exist anymore?', videoId, hour)
45 } 48 }
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 4cfd4d253..5862e178f 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -11,6 +11,7 @@ import { processVideoFile, processVideoFileImport, VideoFileImportPayload, Video
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12import { processVideoImport, VideoImportPayload } from './handlers/video-import' 12import { processVideoImport, VideoImportPayload } from './handlers/video-import'
13import { processVideosViews } from './handlers/video-views' 13import { processVideosViews } from './handlers/video-views'
14import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher'
14 15
15type CreateJobArgument = 16type CreateJobArgument =
16 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 17 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -21,6 +22,7 @@ type CreateJobArgument =
21 { type: 'video-file', payload: VideoFilePayload } | 22 { type: 'video-file', payload: VideoFilePayload } |
22 { type: 'email', payload: EmailPayload } | 23 { type: 'email', payload: EmailPayload } |
23 { type: 'video-import', payload: VideoImportPayload } | 24 { type: 'video-import', payload: VideoImportPayload } |
25 { type: 'activitypub-refresher', payload: RefreshPayload } |
24 { type: 'videos-views', payload: {} } 26 { type: 'videos-views', payload: {} }
25 27
26const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { 28const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
@@ -32,7 +34,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
32 'video-file': processVideoFile, 34 'video-file': processVideoFile,
33 'email': processEmail, 35 'email': processEmail,
34 'video-import': processVideoImport, 36 'video-import': processVideoImport,
35 'videos-views': processVideosViews 37 'videos-views': processVideosViews,
38 'activitypub-refresher': refreshAPObject
36} 39}
37 40
38const jobTypes: JobType[] = [ 41const jobTypes: JobType[] = [
@@ -44,7 +47,8 @@ const jobTypes: JobType[] = [
44 'video-file', 47 'video-file',
45 'video-file-import', 48 'video-file-import',
46 'video-import', 49 'video-import',
47 'videos-views' 50 'videos-views',
51 'activitypub-refresher'
48] 52]
49 53
50class JobQueue { 54class JobQueue {