aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2019-02-06 12:26:58 +0100
committerChocobozzz <me@florianbigard.com>2019-02-06 12:26:58 +0100
commit73471b1a52f242e86364ffb077ea6cadb3b07ae2 (patch)
tree43dbb7748e281f8d80f15326f489cdea10ec857d /server/lib/job-queue
parentc22419dd265c0c7185bf4197a1cb286eb3d8ebc0 (diff)
parentf5305c04aae14467d6f957b713c5a902275cbb89 (diff)
downloadPeerTube-73471b1a52f242e86364ffb077ea6cadb3b07ae2.tar.gz
PeerTube-73471b1a52f242e86364ffb077ea6cadb3b07ae2.tar.zst
PeerTube-73471b1a52f242e86364ffb077ea6cadb3b07ae2.zip
Merge branch 'release/v1.2.0'
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts9
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts3
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts6
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts26
-rw-r--r--server/lib/job-queue/handlers/email.ts3
-rw-r--r--server/lib/job-queue/handlers/video-file.ts47
-rw-r--r--server/lib/job-queue/handlers/video-import.ts23
-rw-r--r--server/lib/job-queue/handlers/video-views.ts4
-rw-r--r--server/lib/job-queue/job-queue.ts5
9 files changed, 82 insertions, 44 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index 36d0f237b..b4d381062 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -8,6 +8,7 @@ import { getOrCreateActorAndServerAndModel } from '../../activitypub/actor'
8import { retryTransactionWrapper } from '../../../helpers/database-utils' 8import { retryTransactionWrapper } from '../../../helpers/database-utils'
9import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 9import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
10import { ActorModel } from '../../../models/activitypub/actor' 10import { ActorModel } from '../../../models/activitypub/actor'
11import { Notifier } from '../../notifier'
11 12
12export type ActivitypubFollowPayload = { 13export type ActivitypubFollowPayload = {
13 followerActorId: number 14 followerActorId: number
@@ -42,7 +43,7 @@ export {
42 43
43// --------------------------------------------------------------------------- 44// ---------------------------------------------------------------------------
44 45
45function follow (fromActor: ActorModel, targetActor: ActorModel) { 46async function follow (fromActor: ActorModel, targetActor: ActorModel) {
46 if (fromActor.id === targetActor.id) { 47 if (fromActor.id === targetActor.id) {
47 throw new Error('Follower is the same than target actor.') 48 throw new Error('Follower is the same than target actor.')
48 } 49 }
@@ -50,7 +51,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) {
50 // Same server, direct accept 51 // Same server, direct accept
51 const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending' 52 const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending'
52 53
53 return sequelizeTypescript.transaction(async t => { 54 const actorFollow = await sequelizeTypescript.transaction(async t => {
54 const [ actorFollow ] = await ActorFollowModel.findOrCreate({ 55 const [ actorFollow ] = await ActorFollowModel.findOrCreate({
55 where: { 56 where: {
56 actorId: fromActor.id, 57 actorId: fromActor.id,
@@ -68,5 +69,9 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) {
68 69
69 // Send a notification to remote server if our follow is not already accepted 70 // Send a notification to remote server if our follow is not already accepted
70 if (actorFollow.state !== 'accepted') await sendFollow(actorFollow) 71 if (actorFollow.state !== 'accepted') await sendFollow(actorFollow)
72
73 return actorFollow
71 }) 74 })
75
76 if (actorFollow.state === 'accepted') Notifier.Instance.notifyOfNewFollow(actorFollow)
72} 77}
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index abbd89b3b..9493945ff 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -5,6 +5,7 @@ import { doRequest } from '../../../helpers/requests'
5import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 5import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
7import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' 7import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers'
8import { ActorFollowScoreCache } from '../../cache'
8 9
9export type ActivitypubHttpBroadcastPayload = { 10export type ActivitypubHttpBroadcastPayload = {
10 uris: string[] 11 uris: string[]
@@ -38,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) {
38 .catch(() => badUrls.push(uri)) 39 .catch(() => badUrls.push(uri))
39 }, { concurrency: BROADCAST_CONCURRENCY }) 40 }, { concurrency: BROADCAST_CONCURRENCY })
40 41
41 return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) 42 return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls)
42} 43}
43 44
44// --------------------------------------------------------------------------- 45// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index d36479032..3973dcdc8 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -1,9 +1,9 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 4import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
6import { JOB_REQUEST_TIMEOUT } from '../../../initializers' 5import { JOB_REQUEST_TIMEOUT } from '../../../initializers'
6import { ActorFollowScoreCache } from '../../cache'
7 7
8export type ActivitypubHttpUnicastPayload = { 8export type ActivitypubHttpUnicastPayload = {
9 uri: string 9 uri: string
@@ -31,9 +31,9 @@ async function processActivityPubHttpUnicast (job: Bull.Job) {
31 31
32 try { 32 try {
33 await doRequest(options) 33 await doRequest(options)
34 ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) 34 ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], [])
35 } catch (err) { 35 } catch (err) {
36 ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) 36 ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ])
37 37
38 throw err 38 throw err
39 } 39 }
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
index 7752b3b40..454b975fe 100644
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -1,29 +1,33 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { fetchVideoByUrl } from '../../../helpers/video' 3import { fetchVideoByUrl } from '../../../helpers/video'
4import { refreshVideoIfNeeded } from '../../activitypub' 4import { refreshVideoIfNeeded, refreshActorIfNeeded } from '../../activitypub'
5import { ActorModel } from '../../../models/activitypub/actor'
5 6
6export type RefreshPayload = { 7export type RefreshPayload = {
7 videoUrl: string 8 type: 'video' | 'actor'
8 type: 'video' 9 url: string
9} 10}
10 11
11async function refreshAPObject (job: Bull.Job) { 12async function refreshAPObject (job: Bull.Job) {
12 const payload = job.data as RefreshPayload 13 const payload = job.data as RefreshPayload
13 logger.info('Processing AP refresher in job %d.', job.id)
14 14
15 if (payload.type === 'video') return refreshAPVideo(payload.videoUrl) 15 logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url)
16
17 if (payload.type === 'video') return refreshVideo(payload.url)
18 if (payload.type === 'actor') return refreshActor(payload.url)
16} 19}
17 20
18// --------------------------------------------------------------------------- 21// ---------------------------------------------------------------------------
19 22
20export { 23export {
24 refreshActor,
21 refreshAPObject 25 refreshAPObject
22} 26}
23 27
24// --------------------------------------------------------------------------- 28// ---------------------------------------------------------------------------
25 29
26async function refreshAPVideo (videoUrl: string) { 30async function refreshVideo (videoUrl: string) {
27 const fetchType = 'all' as 'all' 31 const fetchType = 'all' as 'all'
28 const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } 32 const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true }
29 33
@@ -38,3 +42,13 @@ async function refreshAPVideo (videoUrl: string) {
38 await refreshVideoIfNeeded(refreshOptions) 42 await refreshVideoIfNeeded(refreshOptions)
39 } 43 }
40} 44}
45
46async function refreshActor (actorUrl: string) {
47 const fetchType = 'all' as 'all'
48 const actor = await ActorModel.loadByUrlAndPopulateAccountAndChannel(actorUrl)
49
50 if (actor) {
51 await refreshActorIfNeeded(actor, fetchType)
52 }
53
54}
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index 73d98ae54..220d0af32 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -6,13 +6,14 @@ export type EmailPayload = {
6 to: string[] 6 to: string[]
7 subject: string 7 subject: string
8 text: string 8 text: string
9 from?: string
9} 10}
10 11
11async function processEmail (job: Bull.Job) { 12async function processEmail (job: Bull.Job) {
12 const payload = job.data as EmailPayload 13 const payload = job.data as EmailPayload
13 logger.info('Processing email in job %d.', job.id) 14 logger.info('Processing email in job %d.', job.id)
14 15
15 return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text) 16 return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text, payload.from)
16} 17}
17 18
18// --------------------------------------------------------------------------- 19// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
index adc0a2a15..593e43cc5 100644
--- a/server/lib/job-queue/handlers/video-file.ts
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -8,7 +8,8 @@ import { retryTransactionWrapper } from '../../../helpers/database-utils'
8import { sequelizeTypescript } from '../../../initializers' 8import { sequelizeTypescript } from '../../../initializers'
9import * as Bluebird from 'bluebird' 9import * as Bluebird from 'bluebird'
10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' 10import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
11import { importVideoFile, transcodeOriginalVideofile, optimizeVideofile } from '../../video-transcoding' 11import { importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding'
12import { Notifier } from '../../notifier'
12 13
13export type VideoFilePayload = { 14export type VideoFilePayload = {
14 videoUUID: string 15 videoUUID: string
@@ -67,17 +68,17 @@ async function processVideoFile (job: Bull.Job) {
67async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { 68async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) {
68 if (video === undefined) return undefined 69 if (video === undefined) return undefined
69 70
70 return sequelizeTypescript.transaction(async t => { 71 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
71 // Maybe the video changed in database, refresh it 72 // Maybe the video changed in database, refresh it
72 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 73 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
73 // Video does not exist anymore 74 // Video does not exist anymore
74 if (!videoDatabase) return undefined 75 if (!videoDatabase) return undefined
75 76
76 let isNewVideo = false 77 let videoPublished = false
77 78
78 // We transcoded the video file in another format, now we can publish it 79 // We transcoded the video file in another format, now we can publish it
79 if (videoDatabase.state !== VideoState.PUBLISHED) { 80 if (videoDatabase.state !== VideoState.PUBLISHED) {
80 isNewVideo = true 81 videoPublished = true
81 82
82 videoDatabase.state = VideoState.PUBLISHED 83 videoDatabase.state = VideoState.PUBLISHED
83 videoDatabase.publishedAt = new Date() 84 videoDatabase.publishedAt = new Date()
@@ -85,21 +86,26 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) {
85 } 86 }
86 87
87 // If the video was not published, we consider it is a new one for other instances 88 // If the video was not published, we consider it is a new one for other instances
88 await federateVideoIfNeeded(videoDatabase, isNewVideo, t) 89 await federateVideoIfNeeded(videoDatabase, videoPublished, t)
89 90
90 return undefined 91 return { videoDatabase, videoPublished }
91 }) 92 })
93
94 if (videoPublished) {
95 Notifier.Instance.notifyOnNewVideo(videoDatabase)
96 Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
97 }
92} 98}
93 99
94async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { 100async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: boolean) {
95 if (video === undefined) return undefined 101 if (videoArg === undefined) return undefined
96 102
97 // Outside the transaction (IO on disk) 103 // Outside the transaction (IO on disk)
98 const { videoFileResolution } = await video.getOriginalFileResolution() 104 const { videoFileResolution } = await videoArg.getOriginalFileResolution()
99 105
100 return sequelizeTypescript.transaction(async t => { 106 const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
101 // Maybe the video changed in database, refresh it 107 // Maybe the video changed in database, refresh it
102 const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 108 let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t)
103 // Video does not exist anymore 109 // Video does not exist anymore
104 if (!videoDatabase) return undefined 110 if (!videoDatabase) return undefined
105 111
@@ -110,8 +116,10 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole
110 { resolutions: resolutionsEnabled } 116 { resolutions: resolutionsEnabled }
111 ) 117 )
112 118
119 let videoPublished = false
120
113 if (resolutionsEnabled.length !== 0) { 121 if (resolutionsEnabled.length !== 0) {
114 const tasks: Bluebird<any>[] = [] 122 const tasks: Bluebird<Bull.Job<any>>[] = []
115 123
116 for (const resolution of resolutionsEnabled) { 124 for (const resolution of resolutionsEnabled) {
117 const dataInput = { 125 const dataInput = {
@@ -127,15 +135,22 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole
127 135
128 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) 136 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
129 } else { 137 } else {
138 videoPublished = true
139
130 // No transcoding to do, it's now published 140 // No transcoding to do, it's now published
131 video.state = VideoState.PUBLISHED 141 videoDatabase.state = VideoState.PUBLISHED
132 video = await video.save({ transaction: t }) 142 videoDatabase = await videoDatabase.save({ transaction: t })
133 143
134 logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) 144 logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
135 } 145 }
136 146
137 return federateVideoIfNeeded(video, isNewVideo, t) 147 await federateVideoIfNeeded(videoDatabase, isNewVideo, t)
148
149 return { videoDatabase, videoPublished }
138 }) 150 })
151
152 if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase)
153 if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
139} 154}
140 155
141// --------------------------------------------------------------------------- 156// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 4de901c0c..12004dcd7 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -7,14 +7,15 @@ import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } fro
7import { extname, join } from 'path' 7import { extname, join } from 'path'
8import { VideoFileModel } from '../../../models/video/video-file' 8import { VideoFileModel } from '../../../models/video/video-file'
9import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' 9import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers'
10import { doRequestAndSaveToFile, downloadImage } from '../../../helpers/requests' 10import { downloadImage } from '../../../helpers/requests'
11import { VideoState } from '../../../../shared' 11import { VideoState } from '../../../../shared'
12import { JobQueue } from '../index' 12import { JobQueue } from '../index'
13import { federateVideoIfNeeded } from '../../activitypub' 13import { federateVideoIfNeeded } from '../../activitypub'
14import { VideoModel } from '../../../models/video/video' 14import { VideoModel } from '../../../models/video/video'
15import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' 15import { downloadWebTorrentVideo } from '../../../helpers/webtorrent'
16import { getSecureTorrentName } from '../../../helpers/utils' 16import { getSecureTorrentName } from '../../../helpers/utils'
17import { remove, rename, stat } from 'fs-extra' 17import { remove, move, stat } from 'fs-extra'
18import { Notifier } from '../../notifier'
18 19
19type VideoImportYoutubeDLPayload = { 20type VideoImportYoutubeDLPayload = {
20 type: 'youtube-dl' 21 type: 'youtube-dl'
@@ -109,6 +110,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
109 let tempVideoPath: string 110 let tempVideoPath: string
110 let videoDestFile: string 111 let videoDestFile: string
111 let videoFile: VideoFileModel 112 let videoFile: VideoFileModel
113
112 try { 114 try {
113 // Download video from youtubeDL 115 // Download video from youtubeDL
114 tempVideoPath = await downloader() 116 tempVideoPath = await downloader()
@@ -138,14 +140,13 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
138 140
139 // Move file 141 // Move file
140 videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImport.Video.getVideoFilename(videoFile)) 142 videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImport.Video.getVideoFilename(videoFile))
141 await rename(tempVideoPath, videoDestFile) 143 await move(tempVideoPath, videoDestFile)
142 tempVideoPath = null // This path is not used anymore 144 tempVideoPath = null // This path is not used anymore
143 145
144 // Process thumbnail 146 // Process thumbnail
145 if (options.downloadThumbnail) { 147 if (options.downloadThumbnail) {
146 if (options.thumbnailUrl) { 148 if (options.thumbnailUrl) {
147 const destThumbnailPath = join(CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName()) 149 await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName(), THUMBNAILS_SIZE)
148 await downloadImage(options.thumbnailUrl, destThumbnailPath, THUMBNAILS_SIZE)
149 } else { 150 } else {
150 await videoImport.Video.createThumbnail(videoFile) 151 await videoImport.Video.createThumbnail(videoFile)
151 } 152 }
@@ -156,8 +157,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
156 // Process preview 157 // Process preview
157 if (options.downloadPreview) { 158 if (options.downloadPreview) {
158 if (options.thumbnailUrl) { 159 if (options.thumbnailUrl) {
159 const destPreviewPath = join(CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName()) 160 await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName(), PREVIEWS_SIZE)
160 await downloadImage(options.thumbnailUrl, destPreviewPath, PREVIEWS_SIZE)
161 } else { 161 } else {
162 await videoImport.Video.createPreview(videoFile) 162 await videoImport.Video.createPreview(videoFile)
163 } 163 }
@@ -180,7 +180,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
180 // Update video DB object 180 // Update video DB object
181 video.duration = duration 181 video.duration = duration
182 video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED 182 video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED
183 const videoUpdated = await video.save({ transaction: t }) 183 await video.save({ transaction: t })
184 184
185 // Now we can federate the video (reload from database, we need more attributes) 185 // Now we can federate the video (reload from database, we need more attributes)
186 const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 186 const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
@@ -192,10 +192,13 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
192 192
193 logger.info('Video %s imported.', video.uuid) 193 logger.info('Video %s imported.', video.uuid)
194 194
195 videoImportUpdated.Video = videoUpdated 195 videoImportUpdated.Video = videoForFederation
196 return videoImportUpdated 196 return videoImportUpdated
197 }) 197 })
198 198
199 Notifier.Instance.notifyOnNewVideo(videoImportUpdated.Video)
200 Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true)
201
199 // Create transcoding jobs? 202 // Create transcoding jobs?
200 if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) { 203 if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) {
201 // Put uuid because we don't have id auto incremented for now 204 // Put uuid because we don't have id auto incremented for now
@@ -218,6 +221,8 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
218 videoImport.state = VideoImportState.FAILED 221 videoImport.state = VideoImportState.FAILED
219 await videoImport.save() 222 await videoImport.save()
220 223
224 Notifier.Instance.notifyOnFinishedVideoImport(videoImport, false)
225
221 throw err 226 throw err
222 } 227 }
223} 228}
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts
index 038ef43e2..fa1fd13b3 100644
--- a/server/lib/job-queue/handlers/video-views.ts
+++ b/server/lib/job-queue/handlers/video-views.ts
@@ -23,9 +23,7 @@ async function processVideosViews () {
23 for (const videoId of videoIds) { 23 for (const videoId of videoIds) {
24 try { 24 try {
25 const views = await Redis.Instance.getVideoViews(videoId, hour) 25 const views = await Redis.Instance.getVideoViews(videoId, hour)
26 if (isNaN(views)) { 26 if (views) {
27 logger.error('Cannot process videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, views)
28 } else {
29 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) 27 logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour)
30 28
31 try { 29 try {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 5862e178f..ba9cbe0d9 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -88,7 +88,6 @@ class JobQueue {
88 88
89 queue.on('error', err => { 89 queue.on('error', err => {
90 logger.error('Error in job queue %s.', handlerName, { err }) 90 logger.error('Error in job queue %s.', handlerName, { err })
91 process.exit(-1)
92 }) 91 })
93 92
94 this.queues[handlerName] = queue 93 this.queues[handlerName] = queue
@@ -166,10 +165,10 @@ class JobQueue {
166 return total 165 return total
167 } 166 }
168 167
169 removeOldJobs () { 168 async removeOldJobs () {
170 for (const key of Object.keys(this.queues)) { 169 for (const key of Object.keys(this.queues)) {
171 const queue = this.queues[key] 170 const queue = this.queues[key]
172 queue.clean(JOB_COMPLETED_LIFETIME, 'completed') 171 await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
173 } 172 }
174 } 173 }
175 174