diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-refresher.ts | 41 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-file.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 9 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-views.ts | 15 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 8 |
5 files changed, 62 insertions, 15 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts new file mode 100644 index 000000000..671b0f487 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts | |||
@@ -0,0 +1,41 @@ | |||
1 | import * as Bull from 'bull' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { fetchVideoByUrl } from '../../../helpers/video' | ||
4 | import { refreshVideoIfNeeded } from '../../activitypub' | ||
5 | |||
6 | export type RefreshPayload = { | ||
7 | videoUrl: string | ||
8 | type: 'video' | ||
9 | } | ||
10 | |||
11 | async function refreshAPObject (job: Bull.Job) { | ||
12 | const payload = job.data as RefreshPayload | ||
13 | |||
14 | logger.info('Processing AP refresher in job %d for video %s.', job.id, payload.videoUrl) | ||
15 | |||
16 | if (payload.type === 'video') return refreshAPVideo(payload.videoUrl) | ||
17 | } | ||
18 | |||
19 | // --------------------------------------------------------------------------- | ||
20 | |||
21 | export { | ||
22 | refreshAPObject | ||
23 | } | ||
24 | |||
25 | // --------------------------------------------------------------------------- | ||
26 | |||
27 | async function refreshAPVideo (videoUrl: string) { | ||
28 | const fetchType = 'all' as 'all' | ||
29 | const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } | ||
30 | |||
31 | const videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType) | ||
32 | if (videoFromDatabase) { | ||
33 | const refreshOptions = { | ||
34 | video: videoFromDatabase, | ||
35 | fetchedType: fetchType, | ||
36 | syncParam | ||
37 | } | ||
38 | |||
39 | await refreshVideoIfNeeded(refreshOptions) | ||
40 | } | ||
41 | } | ||
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index adc0a2a15..ddbf6d1c2 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { VideoResolution, VideoState } from '../../../../shared' | 2 | import { VideoResolution, VideoState, Job } from '../../../../shared' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { VideoModel } from '../../../models/video/video' | 4 | import { VideoModel } from '../../../models/video/video' |
5 | import { JobQueue } from '../job-queue' | 5 | import { JobQueue } from '../job-queue' |
@@ -111,7 +111,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole | |||
111 | ) | 111 | ) |
112 | 112 | ||
113 | if (resolutionsEnabled.length !== 0) { | 113 | if (resolutionsEnabled.length !== 0) { |
114 | const tasks: Bluebird<any>[] = [] | 114 | const tasks: Bluebird<Bull.Job<any>>[] = [] |
115 | 115 | ||
116 | for (const resolution of resolutionsEnabled) { | 116 | for (const resolution of resolutionsEnabled) { |
117 | const dataInput = { | 117 | const dataInput = { |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4de901c0c..51a0b5faf 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -7,7 +7,7 @@ import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } fro | |||
7 | import { extname, join } from 'path' | 7 | import { extname, join } from 'path' |
8 | import { VideoFileModel } from '../../../models/video/video-file' | 8 | import { VideoFileModel } from '../../../models/video/video-file' |
9 | import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' | 9 | import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' |
10 | import { doRequestAndSaveToFile, downloadImage } from '../../../helpers/requests' | 10 | import { downloadImage } from '../../../helpers/requests' |
11 | import { VideoState } from '../../../../shared' | 11 | import { VideoState } from '../../../../shared' |
12 | import { JobQueue } from '../index' | 12 | import { JobQueue } from '../index' |
13 | import { federateVideoIfNeeded } from '../../activitypub' | 13 | import { federateVideoIfNeeded } from '../../activitypub' |
@@ -109,6 +109,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
109 | let tempVideoPath: string | 109 | let tempVideoPath: string |
110 | let videoDestFile: string | 110 | let videoDestFile: string |
111 | let videoFile: VideoFileModel | 111 | let videoFile: VideoFileModel |
112 | |||
112 | try { | 113 | try { |
113 | // Download video from youtubeDL | 114 | // Download video from youtubeDL |
114 | tempVideoPath = await downloader() | 115 | tempVideoPath = await downloader() |
@@ -144,8 +145,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
144 | // Process thumbnail | 145 | // Process thumbnail |
145 | if (options.downloadThumbnail) { | 146 | if (options.downloadThumbnail) { |
146 | if (options.thumbnailUrl) { | 147 | if (options.thumbnailUrl) { |
147 | const destThumbnailPath = join(CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName()) | 148 | await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName(), THUMBNAILS_SIZE) |
148 | await downloadImage(options.thumbnailUrl, destThumbnailPath, THUMBNAILS_SIZE) | ||
149 | } else { | 149 | } else { |
150 | await videoImport.Video.createThumbnail(videoFile) | 150 | await videoImport.Video.createThumbnail(videoFile) |
151 | } | 151 | } |
@@ -156,8 +156,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
156 | // Process preview | 156 | // Process preview |
157 | if (options.downloadPreview) { | 157 | if (options.downloadPreview) { |
158 | if (options.thumbnailUrl) { | 158 | if (options.thumbnailUrl) { |
159 | const destPreviewPath = join(CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName()) | 159 | await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName(), PREVIEWS_SIZE) |
160 | await downloadImage(options.thumbnailUrl, destPreviewPath, PREVIEWS_SIZE) | ||
161 | } else { | 160 | } else { |
162 | await videoImport.Video.createPreview(videoFile) | 161 | await videoImport.Video.createPreview(videoFile) |
163 | } | 162 | } |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts index f44c3c727..fa1fd13b3 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views.ts | |||
@@ -23,13 +23,9 @@ async function processVideosViews () { | |||
23 | for (const videoId of videoIds) { | 23 | for (const videoId of videoIds) { |
24 | try { | 24 | try { |
25 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 25 | const views = await Redis.Instance.getVideoViews(videoId, hour) |
26 | if (isNaN(views)) { | 26 | if (views) { |
27 | logger.error('Cannot process videos views of video %d in hour %d: views number is NaN.', videoId, hour) | ||
28 | } else { | ||
29 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 27 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) |
30 | 28 | ||
31 | await VideoModel.incrementViews(videoId, views) | ||
32 | |||
33 | try { | 29 | try { |
34 | await VideoViewModel.create({ | 30 | await VideoViewModel.create({ |
35 | startDate, | 31 | startDate, |
@@ -39,7 +35,14 @@ async function processVideosViews () { | |||
39 | }) | 35 | }) |
40 | 36 | ||
41 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | 37 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) |
42 | if (video.isOwned()) await federateVideoIfNeeded(video, false) | 38 | if (video.isOwned()) { |
39 | // If this is a remote video, the origin instance will send us an update | ||
40 | await VideoModel.incrementViews(videoId, views) | ||
41 | |||
42 | // Send video update | ||
43 | video.views += views | ||
44 | await federateVideoIfNeeded(video, false) | ||
45 | } | ||
43 | } catch (err) { | 46 | } catch (err) { |
44 | logger.debug('Cannot create video views for video %d in hour %d. Maybe the video does not exist anymore?', videoId, hour) | 47 | logger.debug('Cannot create video views for video %d in hour %d. Maybe the video does not exist anymore?', videoId, hour) |
45 | } | 48 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 4cfd4d253..5862e178f 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -11,6 +11,7 @@ import { processVideoFile, processVideoFileImport, VideoFileImportPayload, Video | |||
11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | 11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' |
12 | import { processVideoImport, VideoImportPayload } from './handlers/video-import' | 12 | import { processVideoImport, VideoImportPayload } from './handlers/video-import' |
13 | import { processVideosViews } from './handlers/video-views' | 13 | import { processVideosViews } from './handlers/video-views' |
14 | import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' | ||
14 | 15 | ||
15 | type CreateJobArgument = | 16 | type CreateJobArgument = |
16 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 17 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -21,6 +22,7 @@ type CreateJobArgument = | |||
21 | { type: 'video-file', payload: VideoFilePayload } | | 22 | { type: 'video-file', payload: VideoFilePayload } | |
22 | { type: 'email', payload: EmailPayload } | | 23 | { type: 'email', payload: EmailPayload } | |
23 | { type: 'video-import', payload: VideoImportPayload } | | 24 | { type: 'video-import', payload: VideoImportPayload } | |
25 | { type: 'activitypub-refresher', payload: RefreshPayload } | | ||
24 | { type: 'videos-views', payload: {} } | 26 | { type: 'videos-views', payload: {} } |
25 | 27 | ||
26 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | 28 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { |
@@ -32,7 +34,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | |||
32 | 'video-file': processVideoFile, | 34 | 'video-file': processVideoFile, |
33 | 'email': processEmail, | 35 | 'email': processEmail, |
34 | 'video-import': processVideoImport, | 36 | 'video-import': processVideoImport, |
35 | 'videos-views': processVideosViews | 37 | 'videos-views': processVideosViews, |
38 | 'activitypub-refresher': refreshAPObject | ||
36 | } | 39 | } |
37 | 40 | ||
38 | const jobTypes: JobType[] = [ | 41 | const jobTypes: JobType[] = [ |
@@ -44,7 +47,8 @@ const jobTypes: JobType[] = [ | |||
44 | 'video-file', | 47 | 'video-file', |
45 | 'video-file-import', | 48 | 'video-file-import', |
46 | 'video-import', | 49 | 'video-import', |
47 | 'videos-views' | 50 | 'videos-views', |
51 | 'activitypub-refresher' | ||
48 | ] | 52 | ] |
49 | 53 | ||
50 | class JobQueue { | 54 | class JobQueue { |