diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-follow.ts | 9 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 3 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-unicast.ts | 6 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-refresher.ts | 26 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/email.ts | 3 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-file.ts | 47 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 23 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-views.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 5 |
9 files changed, 82 insertions, 44 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 36d0f237b..b4d381062 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -8,6 +8,7 @@ import { getOrCreateActorAndServerAndModel } from '../../activitypub/actor' | |||
8 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 8 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
9 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 9 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
10 | import { ActorModel } from '../../../models/activitypub/actor' | 10 | import { ActorModel } from '../../../models/activitypub/actor' |
11 | import { Notifier } from '../../notifier' | ||
11 | 12 | ||
12 | export type ActivitypubFollowPayload = { | 13 | export type ActivitypubFollowPayload = { |
13 | followerActorId: number | 14 | followerActorId: number |
@@ -42,7 +43,7 @@ export { | |||
42 | 43 | ||
43 | // --------------------------------------------------------------------------- | 44 | // --------------------------------------------------------------------------- |
44 | 45 | ||
45 | function follow (fromActor: ActorModel, targetActor: ActorModel) { | 46 | async function follow (fromActor: ActorModel, targetActor: ActorModel) { |
46 | if (fromActor.id === targetActor.id) { | 47 | if (fromActor.id === targetActor.id) { |
47 | throw new Error('Follower is the same than target actor.') | 48 | throw new Error('Follower is the same than target actor.') |
48 | } | 49 | } |
@@ -50,7 +51,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) { | |||
50 | // Same server, direct accept | 51 | // Same server, direct accept |
51 | const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending' | 52 | const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending' |
52 | 53 | ||
53 | return sequelizeTypescript.transaction(async t => { | 54 | const actorFollow = await sequelizeTypescript.transaction(async t => { |
54 | const [ actorFollow ] = await ActorFollowModel.findOrCreate({ | 55 | const [ actorFollow ] = await ActorFollowModel.findOrCreate({ |
55 | where: { | 56 | where: { |
56 | actorId: fromActor.id, | 57 | actorId: fromActor.id, |
@@ -68,5 +69,9 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) { | |||
68 | 69 | ||
69 | // Send a notification to remote server if our follow is not already accepted | 70 | // Send a notification to remote server if our follow is not already accepted |
70 | if (actorFollow.state !== 'accepted') await sendFollow(actorFollow) | 71 | if (actorFollow.state !== 'accepted') await sendFollow(actorFollow) |
72 | |||
73 | return actorFollow | ||
71 | }) | 74 | }) |
75 | |||
76 | if (actorFollow.state === 'accepted') Notifier.Instance.notifyOfNewFollow(actorFollow) | ||
72 | } | 77 | } |
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index abbd89b3b..9493945ff 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -5,6 +5,7 @@ import { doRequest } from '../../../helpers/requests' | |||
5 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 5 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 6 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
7 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' | 7 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' |
8 | import { ActorFollowScoreCache } from '../../cache' | ||
8 | 9 | ||
9 | export type ActivitypubHttpBroadcastPayload = { | 10 | export type ActivitypubHttpBroadcastPayload = { |
10 | uris: string[] | 11 | uris: string[] |
@@ -38,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { | |||
38 | .catch(() => badUrls.push(uri)) | 39 | .catch(() => badUrls.push(uri)) |
39 | }, { concurrency: BROADCAST_CONCURRENCY }) | 40 | }, { concurrency: BROADCAST_CONCURRENCY }) |
40 | 41 | ||
41 | return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) | 42 | return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) |
42 | } | 43 | } |
43 | 44 | ||
44 | // --------------------------------------------------------------------------- | 45 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index d36479032..3973dcdc8 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -1,9 +1,9 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { doRequest } from '../../../helpers/requests' | 3 | import { doRequest } from '../../../helpers/requests' |
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
5 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 4 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
6 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers' | 5 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers' |
6 | import { ActorFollowScoreCache } from '../../cache' | ||
7 | 7 | ||
8 | export type ActivitypubHttpUnicastPayload = { | 8 | export type ActivitypubHttpUnicastPayload = { |
9 | uri: string | 9 | uri: string |
@@ -31,9 +31,9 @@ async function processActivityPubHttpUnicast (job: Bull.Job) { | |||
31 | 31 | ||
32 | try { | 32 | try { |
33 | await doRequest(options) | 33 | await doRequest(options) |
34 | ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) | 34 | ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) |
35 | } catch (err) { | 35 | } catch (err) { |
36 | ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) | 36 | ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) |
37 | 37 | ||
38 | throw err | 38 | throw err |
39 | } | 39 | } |
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 7752b3b40..454b975fe 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts | |||
@@ -1,29 +1,33 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { fetchVideoByUrl } from '../../../helpers/video' | 3 | import { fetchVideoByUrl } from '../../../helpers/video' |
4 | import { refreshVideoIfNeeded } from '../../activitypub' | 4 | import { refreshVideoIfNeeded, refreshActorIfNeeded } from '../../activitypub' |
5 | import { ActorModel } from '../../../models/activitypub/actor' | ||
5 | 6 | ||
6 | export type RefreshPayload = { | 7 | export type RefreshPayload = { |
7 | videoUrl: string | 8 | type: 'video' | 'actor' |
8 | type: 'video' | 9 | url: string |
9 | } | 10 | } |
10 | 11 | ||
11 | async function refreshAPObject (job: Bull.Job) { | 12 | async function refreshAPObject (job: Bull.Job) { |
12 | const payload = job.data as RefreshPayload | 13 | const payload = job.data as RefreshPayload |
13 | logger.info('Processing AP refresher in job %d.', job.id) | ||
14 | 14 | ||
15 | if (payload.type === 'video') return refreshAPVideo(payload.videoUrl) | 15 | logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) |
16 | |||
17 | if (payload.type === 'video') return refreshVideo(payload.url) | ||
18 | if (payload.type === 'actor') return refreshActor(payload.url) | ||
16 | } | 19 | } |
17 | 20 | ||
18 | // --------------------------------------------------------------------------- | 21 | // --------------------------------------------------------------------------- |
19 | 22 | ||
20 | export { | 23 | export { |
24 | refreshActor, | ||
21 | refreshAPObject | 25 | refreshAPObject |
22 | } | 26 | } |
23 | 27 | ||
24 | // --------------------------------------------------------------------------- | 28 | // --------------------------------------------------------------------------- |
25 | 29 | ||
26 | async function refreshAPVideo (videoUrl: string) { | 30 | async function refreshVideo (videoUrl: string) { |
27 | const fetchType = 'all' as 'all' | 31 | const fetchType = 'all' as 'all' |
28 | const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } | 32 | const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } |
29 | 33 | ||
@@ -38,3 +42,13 @@ async function refreshAPVideo (videoUrl: string) { | |||
38 | await refreshVideoIfNeeded(refreshOptions) | 42 | await refreshVideoIfNeeded(refreshOptions) |
39 | } | 43 | } |
40 | } | 44 | } |
45 | |||
46 | async function refreshActor (actorUrl: string) { | ||
47 | const fetchType = 'all' as 'all' | ||
48 | const actor = await ActorModel.loadByUrlAndPopulateAccountAndChannel(actorUrl) | ||
49 | |||
50 | if (actor) { | ||
51 | await refreshActorIfNeeded(actor, fetchType) | ||
52 | } | ||
53 | |||
54 | } | ||
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 73d98ae54..220d0af32 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -6,13 +6,14 @@ export type EmailPayload = { | |||
6 | to: string[] | 6 | to: string[] |
7 | subject: string | 7 | subject: string |
8 | text: string | 8 | text: string |
9 | from?: string | ||
9 | } | 10 | } |
10 | 11 | ||
11 | async function processEmail (job: Bull.Job) { | 12 | async function processEmail (job: Bull.Job) { |
12 | const payload = job.data as EmailPayload | 13 | const payload = job.data as EmailPayload |
13 | logger.info('Processing email in job %d.', job.id) | 14 | logger.info('Processing email in job %d.', job.id) |
14 | 15 | ||
15 | return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text) | 16 | return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text, payload.from) |
16 | } | 17 | } |
17 | 18 | ||
18 | // --------------------------------------------------------------------------- | 19 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index adc0a2a15..593e43cc5 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -8,7 +8,8 @@ import { retryTransactionWrapper } from '../../../helpers/database-utils' | |||
8 | import { sequelizeTypescript } from '../../../initializers' | 8 | import { sequelizeTypescript } from '../../../initializers' |
9 | import * as Bluebird from 'bluebird' | 9 | import * as Bluebird from 'bluebird' |
10 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' | 10 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' |
11 | import { importVideoFile, transcodeOriginalVideofile, optimizeVideofile } from '../../video-transcoding' | 11 | import { importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding' |
12 | import { Notifier } from '../../notifier' | ||
12 | 13 | ||
13 | export type VideoFilePayload = { | 14 | export type VideoFilePayload = { |
14 | videoUUID: string | 15 | videoUUID: string |
@@ -67,17 +68,17 @@ async function processVideoFile (job: Bull.Job) { | |||
67 | async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { | 68 | async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { |
68 | if (video === undefined) return undefined | 69 | if (video === undefined) return undefined |
69 | 70 | ||
70 | return sequelizeTypescript.transaction(async t => { | 71 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { |
71 | // Maybe the video changed in database, refresh it | 72 | // Maybe the video changed in database, refresh it |
72 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | 73 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) |
73 | // Video does not exist anymore | 74 | // Video does not exist anymore |
74 | if (!videoDatabase) return undefined | 75 | if (!videoDatabase) return undefined |
75 | 76 | ||
76 | let isNewVideo = false | 77 | let videoPublished = false |
77 | 78 | ||
78 | // We transcoded the video file in another format, now we can publish it | 79 | // We transcoded the video file in another format, now we can publish it |
79 | if (videoDatabase.state !== VideoState.PUBLISHED) { | 80 | if (videoDatabase.state !== VideoState.PUBLISHED) { |
80 | isNewVideo = true | 81 | videoPublished = true |
81 | 82 | ||
82 | videoDatabase.state = VideoState.PUBLISHED | 83 | videoDatabase.state = VideoState.PUBLISHED |
83 | videoDatabase.publishedAt = new Date() | 84 | videoDatabase.publishedAt = new Date() |
@@ -85,21 +86,26 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { | |||
85 | } | 86 | } |
86 | 87 | ||
87 | // If the video was not published, we consider it is a new one for other instances | 88 | // If the video was not published, we consider it is a new one for other instances |
88 | await federateVideoIfNeeded(videoDatabase, isNewVideo, t) | 89 | await federateVideoIfNeeded(videoDatabase, videoPublished, t) |
89 | 90 | ||
90 | return undefined | 91 | return { videoDatabase, videoPublished } |
91 | }) | 92 | }) |
93 | |||
94 | if (videoPublished) { | ||
95 | Notifier.Instance.notifyOnNewVideo(videoDatabase) | ||
96 | Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) | ||
97 | } | ||
92 | } | 98 | } |
93 | 99 | ||
94 | async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { | 100 | async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: boolean) { |
95 | if (video === undefined) return undefined | 101 | if (videoArg === undefined) return undefined |
96 | 102 | ||
97 | // Outside the transaction (IO on disk) | 103 | // Outside the transaction (IO on disk) |
98 | const { videoFileResolution } = await video.getOriginalFileResolution() | 104 | const { videoFileResolution } = await videoArg.getOriginalFileResolution() |
99 | 105 | ||
100 | return sequelizeTypescript.transaction(async t => { | 106 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { |
101 | // Maybe the video changed in database, refresh it | 107 | // Maybe the video changed in database, refresh it |
102 | const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | 108 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) |
103 | // Video does not exist anymore | 109 | // Video does not exist anymore |
104 | if (!videoDatabase) return undefined | 110 | if (!videoDatabase) return undefined |
105 | 111 | ||
@@ -110,8 +116,10 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole | |||
110 | { resolutions: resolutionsEnabled } | 116 | { resolutions: resolutionsEnabled } |
111 | ) | 117 | ) |
112 | 118 | ||
119 | let videoPublished = false | ||
120 | |||
113 | if (resolutionsEnabled.length !== 0) { | 121 | if (resolutionsEnabled.length !== 0) { |
114 | const tasks: Bluebird<any>[] = [] | 122 | const tasks: Bluebird<Bull.Job<any>>[] = [] |
115 | 123 | ||
116 | for (const resolution of resolutionsEnabled) { | 124 | for (const resolution of resolutionsEnabled) { |
117 | const dataInput = { | 125 | const dataInput = { |
@@ -127,15 +135,22 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole | |||
127 | 135 | ||
128 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | 136 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) |
129 | } else { | 137 | } else { |
138 | videoPublished = true | ||
139 | |||
130 | // No transcoding to do, it's now published | 140 | // No transcoding to do, it's now published |
131 | video.state = VideoState.PUBLISHED | 141 | videoDatabase.state = VideoState.PUBLISHED |
132 | video = await video.save({ transaction: t }) | 142 | videoDatabase = await videoDatabase.save({ transaction: t }) |
133 | 143 | ||
134 | logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) | 144 | logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) |
135 | } | 145 | } |
136 | 146 | ||
137 | return federateVideoIfNeeded(video, isNewVideo, t) | 147 | await federateVideoIfNeeded(videoDatabase, isNewVideo, t) |
148 | |||
149 | return { videoDatabase, videoPublished } | ||
138 | }) | 150 | }) |
151 | |||
152 | if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) | ||
153 | if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) | ||
139 | } | 154 | } |
140 | 155 | ||
141 | // --------------------------------------------------------------------------- | 156 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4de901c0c..12004dcd7 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -7,14 +7,15 @@ import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } fro | |||
7 | import { extname, join } from 'path' | 7 | import { extname, join } from 'path' |
8 | import { VideoFileModel } from '../../../models/video/video-file' | 8 | import { VideoFileModel } from '../../../models/video/video-file' |
9 | import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' | 9 | import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' |
10 | import { doRequestAndSaveToFile, downloadImage } from '../../../helpers/requests' | 10 | import { downloadImage } from '../../../helpers/requests' |
11 | import { VideoState } from '../../../../shared' | 11 | import { VideoState } from '../../../../shared' |
12 | import { JobQueue } from '../index' | 12 | import { JobQueue } from '../index' |
13 | import { federateVideoIfNeeded } from '../../activitypub' | 13 | import { federateVideoIfNeeded } from '../../activitypub' |
14 | import { VideoModel } from '../../../models/video/video' | 14 | import { VideoModel } from '../../../models/video/video' |
15 | import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' | 15 | import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' |
16 | import { getSecureTorrentName } from '../../../helpers/utils' | 16 | import { getSecureTorrentName } from '../../../helpers/utils' |
17 | import { remove, rename, stat } from 'fs-extra' | 17 | import { remove, move, stat } from 'fs-extra' |
18 | import { Notifier } from '../../notifier' | ||
18 | 19 | ||
19 | type VideoImportYoutubeDLPayload = { | 20 | type VideoImportYoutubeDLPayload = { |
20 | type: 'youtube-dl' | 21 | type: 'youtube-dl' |
@@ -109,6 +110,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
109 | let tempVideoPath: string | 110 | let tempVideoPath: string |
110 | let videoDestFile: string | 111 | let videoDestFile: string |
111 | let videoFile: VideoFileModel | 112 | let videoFile: VideoFileModel |
113 | |||
112 | try { | 114 | try { |
113 | // Download video from youtubeDL | 115 | // Download video from youtubeDL |
114 | tempVideoPath = await downloader() | 116 | tempVideoPath = await downloader() |
@@ -138,14 +140,13 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
138 | 140 | ||
139 | // Move file | 141 | // Move file |
140 | videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImport.Video.getVideoFilename(videoFile)) | 142 | videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImport.Video.getVideoFilename(videoFile)) |
141 | await rename(tempVideoPath, videoDestFile) | 143 | await move(tempVideoPath, videoDestFile) |
142 | tempVideoPath = null // This path is not used anymore | 144 | tempVideoPath = null // This path is not used anymore |
143 | 145 | ||
144 | // Process thumbnail | 146 | // Process thumbnail |
145 | if (options.downloadThumbnail) { | 147 | if (options.downloadThumbnail) { |
146 | if (options.thumbnailUrl) { | 148 | if (options.thumbnailUrl) { |
147 | const destThumbnailPath = join(CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName()) | 149 | await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName(), THUMBNAILS_SIZE) |
148 | await downloadImage(options.thumbnailUrl, destThumbnailPath, THUMBNAILS_SIZE) | ||
149 | } else { | 150 | } else { |
150 | await videoImport.Video.createThumbnail(videoFile) | 151 | await videoImport.Video.createThumbnail(videoFile) |
151 | } | 152 | } |
@@ -156,8 +157,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
156 | // Process preview | 157 | // Process preview |
157 | if (options.downloadPreview) { | 158 | if (options.downloadPreview) { |
158 | if (options.thumbnailUrl) { | 159 | if (options.thumbnailUrl) { |
159 | const destPreviewPath = join(CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName()) | 160 | await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName(), PREVIEWS_SIZE) |
160 | await downloadImage(options.thumbnailUrl, destPreviewPath, PREVIEWS_SIZE) | ||
161 | } else { | 161 | } else { |
162 | await videoImport.Video.createPreview(videoFile) | 162 | await videoImport.Video.createPreview(videoFile) |
163 | } | 163 | } |
@@ -180,7 +180,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
180 | // Update video DB object | 180 | // Update video DB object |
181 | video.duration = duration | 181 | video.duration = duration |
182 | video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED | 182 | video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED |
183 | const videoUpdated = await video.save({ transaction: t }) | 183 | await video.save({ transaction: t }) |
184 | 184 | ||
185 | // Now we can federate the video (reload from database, we need more attributes) | 185 | // Now we can federate the video (reload from database, we need more attributes) |
186 | const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | 186 | const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) |
@@ -192,10 +192,13 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
192 | 192 | ||
193 | logger.info('Video %s imported.', video.uuid) | 193 | logger.info('Video %s imported.', video.uuid) |
194 | 194 | ||
195 | videoImportUpdated.Video = videoUpdated | 195 | videoImportUpdated.Video = videoForFederation |
196 | return videoImportUpdated | 196 | return videoImportUpdated |
197 | }) | 197 | }) |
198 | 198 | ||
199 | Notifier.Instance.notifyOnNewVideo(videoImportUpdated.Video) | ||
200 | Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true) | ||
201 | |||
199 | // Create transcoding jobs? | 202 | // Create transcoding jobs? |
200 | if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) { | 203 | if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) { |
201 | // Put uuid because we don't have id auto incremented for now | 204 | // Put uuid because we don't have id auto incremented for now |
@@ -218,6 +221,8 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide | |||
218 | videoImport.state = VideoImportState.FAILED | 221 | videoImport.state = VideoImportState.FAILED |
219 | await videoImport.save() | 222 | await videoImport.save() |
220 | 223 | ||
224 | Notifier.Instance.notifyOnFinishedVideoImport(videoImport, false) | ||
225 | |||
221 | throw err | 226 | throw err |
222 | } | 227 | } |
223 | } | 228 | } |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts index 038ef43e2..fa1fd13b3 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views.ts | |||
@@ -23,9 +23,7 @@ async function processVideosViews () { | |||
23 | for (const videoId of videoIds) { | 23 | for (const videoId of videoIds) { |
24 | try { | 24 | try { |
25 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 25 | const views = await Redis.Instance.getVideoViews(videoId, hour) |
26 | if (isNaN(views)) { | 26 | if (views) { |
27 | logger.error('Cannot process videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, views) | ||
28 | } else { | ||
29 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 27 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) |
30 | 28 | ||
31 | try { | 29 | try { |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 5862e178f..ba9cbe0d9 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -88,7 +88,6 @@ class JobQueue { | |||
88 | 88 | ||
89 | queue.on('error', err => { | 89 | queue.on('error', err => { |
90 | logger.error('Error in job queue %s.', handlerName, { err }) | 90 | logger.error('Error in job queue %s.', handlerName, { err }) |
91 | process.exit(-1) | ||
92 | }) | 91 | }) |
93 | 92 | ||
94 | this.queues[handlerName] = queue | 93 | this.queues[handlerName] = queue |
@@ -166,10 +165,10 @@ class JobQueue { | |||
166 | return total | 165 | return total |
167 | } | 166 | } |
168 | 167 | ||
169 | removeOldJobs () { | 168 | async removeOldJobs () { |
170 | for (const key of Object.keys(this.queues)) { | 169 | for (const key of Object.keys(this.queues)) { |
171 | const queue = this.queues[key] | 170 | const queue = this.queues[key] |
172 | queue.clean(JOB_COMPLETED_LIFETIME, 'completed') | 171 | await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') |
173 | } | 172 | } |
174 | } | 173 | } |
175 | 174 | ||