From f9a971c671d5a8b88f420a86656a788575105598 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 4 Dec 2018 09:34:29 +0100 Subject: Update dependencies --- server/lib/job-queue/handlers/video-file.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index adc0a2a15..ddbf6d1c2 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts @@ -1,5 +1,5 @@ import * as Bull from 'bull' -import { VideoResolution, VideoState } from '../../../../shared' +import { VideoResolution, VideoState, Job } from '../../../../shared' import { logger } from '../../../helpers/logger' import { VideoModel } from '../../../models/video/video' import { JobQueue } from '../job-queue' @@ -111,7 +111,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole ) if (resolutionsEnabled.length !== 0) { - const tasks: Bluebird[] = [] + const tasks: Bluebird>[] = [] for (const resolution of resolutionsEnabled) { const dataInput = { -- cgit v1.2.3 From 745778256ced65415b04a9817fc49db70d4b6681 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 4 Dec 2018 15:12:54 +0100 Subject: Fix thumbnail processing --- server/lib/job-queue/handlers/activitypub-refresher.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 7752b3b40..671b0f487 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts @@ -10,7 +10,8 @@ export type RefreshPayload = { async function refreshAPObject (job: Bull.Job) { const payload = job.data as RefreshPayload - logger.info('Processing AP refresher in job %d.', job.id) + + logger.info('Processing AP refresher in job %d for video %s.', job.id, payload.videoUrl) if (payload.type === 'video') return refreshAPVideo(payload.videoUrl) } -- cgit v1.2.3 From 6040f87d143a5fa01db79867ece8197c3ce7be47 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 4 Dec 2018 16:02:49 +0100 Subject: Add tmp and redundancy directories --- server/lib/job-queue/handlers/video-import.ts | 9 ++++----- server/lib/job-queue/handlers/video-views.ts | 4 +--- 2 files changed, 5 insertions(+), 8 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4de901c0c..51a0b5faf 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -7,7 +7,7 @@ import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } fro import { extname, join } from 'path' import { VideoFileModel } from '../../../models/video/video-file' import { CONFIG, PREVIEWS_SIZE, sequelizeTypescript, THUMBNAILS_SIZE, VIDEO_IMPORT_TIMEOUT } from '../../../initializers' -import { doRequestAndSaveToFile, downloadImage } from '../../../helpers/requests' +import { downloadImage } from '../../../helpers/requests' import { VideoState } from '../../../../shared' import { JobQueue } from '../index' import { federateVideoIfNeeded } from '../../activitypub' @@ -109,6 +109,7 @@ async function processFile (downloader: () => Promise, videoImport: Vide let tempVideoPath: string let videoDestFile: string let videoFile: VideoFileModel + try { // Download video from youtubeDL tempVideoPath = await downloader() @@ -144,8 +145,7 @@ async function processFile (downloader: () => Promise, videoImport: Vide // Process thumbnail if (options.downloadThumbnail) { if (options.thumbnailUrl) { - const destThumbnailPath = join(CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName()) - await downloadImage(options.thumbnailUrl, destThumbnailPath, THUMBNAILS_SIZE) + await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.THUMBNAILS_DIR, videoImport.Video.getThumbnailName(), THUMBNAILS_SIZE) } else { await videoImport.Video.createThumbnail(videoFile) } @@ -156,8 +156,7 @@ async function processFile (downloader: () => Promise, videoImport: Vide // Process preview if (options.downloadPreview) { if (options.thumbnailUrl) { - const destPreviewPath = join(CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName()) - await downloadImage(options.thumbnailUrl, destPreviewPath, PREVIEWS_SIZE) + await downloadImage(options.thumbnailUrl, CONFIG.STORAGE.PREVIEWS_DIR, videoImport.Video.getPreviewName(), PREVIEWS_SIZE) } else { await videoImport.Video.createPreview(videoFile) } diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts index 038ef43e2..fa1fd13b3 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views.ts @@ -23,9 +23,7 @@ async function processVideosViews () { for (const videoId of videoIds) { try { const views = await Redis.Instance.getVideoViews(videoId, hour) - if (isNaN(views)) { - logger.error('Cannot process videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, views) - } else { + if (views) { logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) try { -- cgit v1.2.3 From 9f8ca79284f93693c734dd4b9a27b471017fc441 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 10 Dec 2018 10:54:49 +0100 Subject: Don't quit on queue error --- server/lib/job-queue/job-queue.ts | 1 - 1 file changed, 1 deletion(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 5862e178f..e34be7dcd 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -88,7 +88,6 @@ class JobQueue { queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) - process.exit(-1) }) this.queues[handlerName] = queue -- cgit v1.2.3 From f481c4f9f31e897a08e818f388fecdee07f57142 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 11 Dec 2018 15:12:38 +0100 Subject: Use move instead rename To avoid EXDEV errors --- server/lib/job-queue/handlers/video-import.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 51a0b5faf..63aacff98 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -14,7 +14,7 @@ import { federateVideoIfNeeded } from '../../activitypub' import { VideoModel } from '../../../models/video/video' import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' import { getSecureTorrentName } from '../../../helpers/utils' -import { remove, rename, stat } from 'fs-extra' +import { remove, move, stat } from 'fs-extra' type VideoImportYoutubeDLPayload = { type: 'youtube-dl' @@ -139,7 +139,7 @@ async function processFile (downloader: () => Promise, videoImport: Vide // Move file videoDestFile = join(CONFIG.STORAGE.VIDEOS_DIR, videoImport.Video.getVideoFilename(videoFile)) - await rename(tempVideoPath, videoDestFile) + await move(tempVideoPath, videoDestFile) tempVideoPath = null // This path is not used anymore // Process thumbnail -- cgit v1.2.3 From 56b13bd193b076d32925f0ad14b755b250b803a8 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 19 Dec 2018 11:24:34 +0100 Subject: Fix federation of some videos If we don't transcode additional resolutions, and user decided to wait transcoding before publishing the video --- server/lib/job-queue/handlers/video-file.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index ddbf6d1c2..3dca2937f 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts @@ -91,15 +91,15 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { }) } -async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { - if (video === undefined) return undefined +async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: boolean) { + if (videoArg === undefined) return undefined // Outside the transaction (IO on disk) - const { videoFileResolution } = await video.getOriginalFileResolution() + const { videoFileResolution } = await videoArg.getOriginalFileResolution() return sequelizeTypescript.transaction(async t => { // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) + let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) // Video does not exist anymore if (!videoDatabase) return undefined @@ -128,13 +128,13 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) } else { // No transcoding to do, it's now published - video.state = VideoState.PUBLISHED - video = await video.save({ transaction: t }) + videoDatabase.state = VideoState.PUBLISHED + videoDatabase = await videoDatabase.save({ transaction: t }) - logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) + logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) } - return federateVideoIfNeeded(video, isNewVideo, t) + return federateVideoIfNeeded(videoDatabase, isNewVideo, t) }) } -- cgit v1.2.3 From 2f5c6b2fc6e60502c2a8df4dc9029c1d87ebe30b Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 20 Dec 2018 14:31:11 +0100 Subject: Optimize actor follow scores modifications --- server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 3 ++- server/lib/job-queue/handlers/activitypub-http-unicast.ts | 6 +++--- server/lib/job-queue/job-queue.ts | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index abbd89b3b..9493945ff 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -5,6 +5,7 @@ import { doRequest } from '../../../helpers/requests' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' +import { ActorFollowScoreCache } from '../../cache' export type ActivitypubHttpBroadcastPayload = { uris: string[] @@ -38,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { .catch(() => badUrls.push(uri)) }, { concurrency: BROADCAST_CONCURRENCY }) - return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) + return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index d36479032..3973dcdc8 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -1,9 +1,9 @@ import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { doRequest } from '../../../helpers/requests' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' import { JOB_REQUEST_TIMEOUT } from '../../../initializers' +import { ActorFollowScoreCache } from '../../cache' export type ActivitypubHttpUnicastPayload = { uri: string @@ -31,9 +31,9 @@ async function processActivityPubHttpUnicast (job: Bull.Job) { try { await doRequest(options) - ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) + ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) } catch (err) { - ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) + ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) throw err } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e34be7dcd..ba9cbe0d9 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -165,10 +165,10 @@ class JobQueue { return total } - removeOldJobs () { + async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue = this.queues[key] - queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') } } -- cgit v1.2.3 From cef534ed53e4518fe0acf581bfe880788d42fc36 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 26 Dec 2018 10:36:24 +0100 Subject: Add user notification base code --- server/lib/job-queue/handlers/video-file.ts | 5 ++++- server/lib/job-queue/handlers/video-import.ts | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index 3dca2937f..959cc04fa 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts @@ -9,6 +9,7 @@ import { sequelizeTypescript } from '../../../initializers' import * as Bluebird from 'bluebird' import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' import { importVideoFile, transcodeOriginalVideofile, optimizeVideofile } from '../../video-transcoding' +import { Notifier } from '../../notifier' export type VideoFilePayload = { videoUUID: string @@ -86,6 +87,7 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { // If the video was not published, we consider it is a new one for other instances await federateVideoIfNeeded(videoDatabase, isNewVideo, t) + if (isNewVideo) Notifier.Instance.notifyOnNewVideo(video) return undefined }) @@ -134,7 +136,8 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) } - return federateVideoIfNeeded(videoDatabase, isNewVideo, t) + await federateVideoIfNeeded(videoDatabase, isNewVideo, t) + if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) }) } diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 63aacff98..82edb8d5c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -15,6 +15,7 @@ import { VideoModel } from '../../../models/video/video' import { downloadWebTorrentVideo } from '../../../helpers/webtorrent' import { getSecureTorrentName } from '../../../helpers/utils' import { remove, move, stat } from 'fs-extra' +import { Notifier } from '../../notifier' type VideoImportYoutubeDLPayload = { type: 'youtube-dl' @@ -184,6 +185,7 @@ async function processFile (downloader: () => Promise, videoImport: Vide // Now we can federate the video (reload from database, we need more attributes) const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) await federateVideoIfNeeded(videoForFederation, true, t) + Notifier.Instance.notifyOnNewVideo(videoForFederation) // Update video import object videoImport.state = VideoImportState.SUCCESS -- cgit v1.2.3 From e8d246d5267ea8b6b3114d4bcf4f34fe5f3a5241 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 28 Dec 2018 13:47:17 +0100 Subject: Add notification settings migration --- server/lib/job-queue/handlers/video-file.ts | 18 +++++++++++------- server/lib/job-queue/handlers/video-import.ts | 7 ++++--- 2 files changed, 15 insertions(+), 10 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index 959cc04fa..480d324dc 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts @@ -1,5 +1,5 @@ import * as Bull from 'bull' -import { VideoResolution, VideoState, Job } from '../../../../shared' +import { VideoResolution, VideoState } from '../../../../shared' import { logger } from '../../../helpers/logger' import { VideoModel } from '../../../models/video/video' import { JobQueue } from '../job-queue' @@ -8,7 +8,7 @@ import { retryTransactionWrapper } from '../../../helpers/database-utils' import { sequelizeTypescript } from '../../../initializers' import * as Bluebird from 'bluebird' import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' -import { importVideoFile, transcodeOriginalVideofile, optimizeVideofile } from '../../video-transcoding' +import { importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding' import { Notifier } from '../../notifier' export type VideoFilePayload = { @@ -68,7 +68,7 @@ async function processVideoFile (job: Bull.Job) { async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { if (video === undefined) return undefined - return sequelizeTypescript.transaction(async t => { + const { videoDatabase, isNewVideo } = await sequelizeTypescript.transaction(async t => { // Maybe the video changed in database, refresh it let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) // Video does not exist anymore @@ -87,10 +87,11 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { // If the video was not published, we consider it is a new one for other instances await federateVideoIfNeeded(videoDatabase, isNewVideo, t) - if (isNewVideo) Notifier.Instance.notifyOnNewVideo(video) - return undefined + return { videoDatabase, isNewVideo } }) + + if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) } async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: boolean) { @@ -99,7 +100,7 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo // Outside the transaction (IO on disk) const { videoFileResolution } = await videoArg.getOriginalFileResolution() - return sequelizeTypescript.transaction(async t => { + const videoDatabase = await sequelizeTypescript.transaction(async t => { // Maybe the video changed in database, refresh it let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) // Video does not exist anymore @@ -137,8 +138,11 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo } await federateVideoIfNeeded(videoDatabase, isNewVideo, t) - if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) + + return videoDatabase }) + + if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 82edb8d5c..29cd1198c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -180,12 +180,11 @@ async function processFile (downloader: () => Promise, videoImport: Vide // Update video DB object video.duration = duration video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED - const videoUpdated = await video.save({ transaction: t }) + await video.save({ transaction: t }) // Now we can federate the video (reload from database, we need more attributes) const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) await federateVideoIfNeeded(videoForFederation, true, t) - Notifier.Instance.notifyOnNewVideo(videoForFederation) // Update video import object videoImport.state = VideoImportState.SUCCESS @@ -193,10 +192,12 @@ async function processFile (downloader: () => Promise, videoImport: Vide logger.info('Video %s imported.', video.uuid) - videoImportUpdated.Video = videoUpdated + videoImportUpdated.Video = videoForFederation return videoImportUpdated }) + Notifier.Instance.notifyOnNewVideo(videoImportUpdated.Video) + // Create transcoding jobs? if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) { // Put uuid because we don't have id auto incremented for now -- cgit v1.2.3 From dc13348070d808d0ba3feb56a435b835c2e7e791 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 2 Jan 2019 16:37:43 +0100 Subject: Add import finished and video published notifs --- server/lib/job-queue/handlers/video-file.ts | 24 ++++++++++++++++-------- server/lib/job-queue/handlers/video-import.ts | 3 +++ 2 files changed, 19 insertions(+), 8 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index 480d324dc..593e43cc5 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts @@ -68,17 +68,17 @@ async function processVideoFile (job: Bull.Job) { async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { if (video === undefined) return undefined - const { videoDatabase, isNewVideo } = await sequelizeTypescript.transaction(async t => { + const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { // Maybe the video changed in database, refresh it let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) // Video does not exist anymore if (!videoDatabase) return undefined - let isNewVideo = false + let videoPublished = false // We transcoded the video file in another format, now we can publish it if (videoDatabase.state !== VideoState.PUBLISHED) { - isNewVideo = true + videoPublished = true videoDatabase.state = VideoState.PUBLISHED videoDatabase.publishedAt = new Date() @@ -86,12 +86,15 @@ async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { } // If the video was not published, we consider it is a new one for other instances - await federateVideoIfNeeded(videoDatabase, isNewVideo, t) + await federateVideoIfNeeded(videoDatabase, videoPublished, t) - return { videoDatabase, isNewVideo } + return { videoDatabase, videoPublished } }) - if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) + if (videoPublished) { + Notifier.Instance.notifyOnNewVideo(videoDatabase) + Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) + } } async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: boolean) { @@ -100,7 +103,7 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo // Outside the transaction (IO on disk) const { videoFileResolution } = await videoArg.getOriginalFileResolution() - const videoDatabase = await sequelizeTypescript.transaction(async t => { + const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { // Maybe the video changed in database, refresh it let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) // Video does not exist anymore @@ -113,6 +116,8 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo { resolutions: resolutionsEnabled } ) + let videoPublished = false + if (resolutionsEnabled.length !== 0) { const tasks: Bluebird>[] = [] @@ -130,6 +135,8 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) } else { + videoPublished = true + // No transcoding to do, it's now published videoDatabase.state = VideoState.PUBLISHED videoDatabase = await videoDatabase.save({ transaction: t }) @@ -139,10 +146,11 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, isNewVideo: bo await federateVideoIfNeeded(videoDatabase, isNewVideo, t) - return videoDatabase + return { videoDatabase, videoPublished } }) if (isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) + if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 29cd1198c..12004dcd7 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -197,6 +197,7 @@ async function processFile (downloader: () => Promise, videoImport: Vide }) Notifier.Instance.notifyOnNewVideo(videoImportUpdated.Video) + Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true) // Create transcoding jobs? if (videoImportUpdated.Video.state === VideoState.TO_TRANSCODE) { @@ -220,6 +221,8 @@ async function processFile (downloader: () => Promise, videoImport: Vide videoImport.state = VideoImportState.FAILED await videoImport.save() + Notifier.Instance.notifyOnFinishedVideoImport(videoImport, false) + throw err } } -- cgit v1.2.3 From f7cc67b455a12ccae9b0ea16876d166720364357 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 4 Jan 2019 08:56:20 +0100 Subject: Add new follow, mention and user registered notifs --- server/lib/job-queue/handlers/activitypub-follow.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 36d0f237b..b4d381062 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts @@ -8,6 +8,7 @@ import { getOrCreateActorAndServerAndModel } from '../../activitypub/actor' import { retryTransactionWrapper } from '../../../helpers/database-utils' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { ActorModel } from '../../../models/activitypub/actor' +import { Notifier } from '../../notifier' export type ActivitypubFollowPayload = { followerActorId: number @@ -42,7 +43,7 @@ export { // --------------------------------------------------------------------------- -function follow (fromActor: ActorModel, targetActor: ActorModel) { +async function follow (fromActor: ActorModel, targetActor: ActorModel) { if (fromActor.id === targetActor.id) { throw new Error('Follower is the same than target actor.') } @@ -50,7 +51,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) { // Same server, direct accept const state = !fromActor.serverId && !targetActor.serverId ? 'accepted' : 'pending' - return sequelizeTypescript.transaction(async t => { + const actorFollow = await sequelizeTypescript.transaction(async t => { const [ actorFollow ] = await ActorFollowModel.findOrCreate({ where: { actorId: fromActor.id, @@ -68,5 +69,9 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) { // Send a notification to remote server if our follow is not already accepted if (actorFollow.state !== 'accepted') await sendFollow(actorFollow) + + return actorFollow }) + + if (actorFollow.state === 'accepted') Notifier.Instance.notifyOfNewFollow(actorFollow) } -- cgit v1.2.3 From a4101923e699e49ceb9ff36e971c75417fafc9f0 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 9 Jan 2019 15:14:29 +0100 Subject: Implement contact form on server side --- server/lib/job-queue/handlers/email.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 73d98ae54..220d0af32 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts @@ -6,13 +6,14 @@ export type EmailPayload = { to: string[] subject: string text: string + from?: string } async function processEmail (job: Bull.Job) { const payload = job.data as EmailPayload logger.info('Processing email in job %d.', job.id) - return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text) + return Emailer.Instance.sendMail(payload.to, payload.subject, payload.text, payload.from) } // --------------------------------------------------------------------------- -- cgit v1.2.3 From 744d0eca195bce7dafeb4a958d0eb3c0046be32d Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 14 Jan 2019 11:30:15 +0100 Subject: Refresh remote actors on GET enpoints --- .../job-queue/handlers/activitypub-refresher.ts | 25 ++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 671b0f487..454b975fe 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts @@ -1,30 +1,33 @@ import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { fetchVideoByUrl } from '../../../helpers/video' -import { refreshVideoIfNeeded } from '../../activitypub' +import { refreshVideoIfNeeded, refreshActorIfNeeded } from '../../activitypub' +import { ActorModel } from '../../../models/activitypub/actor' export type RefreshPayload = { - videoUrl: string - type: 'video' + type: 'video' | 'actor' + url: string } async function refreshAPObject (job: Bull.Job) { const payload = job.data as RefreshPayload - logger.info('Processing AP refresher in job %d for video %s.', job.id, payload.videoUrl) + logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) - if (payload.type === 'video') return refreshAPVideo(payload.videoUrl) + if (payload.type === 'video') return refreshVideo(payload.url) + if (payload.type === 'actor') return refreshActor(payload.url) } // --------------------------------------------------------------------------- export { + refreshActor, refreshAPObject } // --------------------------------------------------------------------------- -async function refreshAPVideo (videoUrl: string) { +async function refreshVideo (videoUrl: string) { const fetchType = 'all' as 'all' const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } @@ -39,3 +42,13 @@ async function refreshAPVideo (videoUrl: string) { await refreshVideoIfNeeded(refreshOptions) } } + +async function refreshActor (actorUrl: string) { + const fetchType = 'all' as 'all' + const actor = await ActorModel.loadByUrlAndPopulateAccountAndChannel(actorUrl) + + if (actor) { + await refreshActorIfNeeded(actor, fetchType) + } + +} -- cgit v1.2.3