await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
}
} catch (err) {
- logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTags })
-
- await moveToFailedMoveToObjectStorageState(video)
- await VideoJobInfoModel.abortAllTasks(video.uuid, 'pendingMove')
+ await onMoveToObjectStorageFailure(job, err)
}
return payload.videoUUID
}
+export async function onMoveToObjectStorageFailure (job: Job, err: any) {
+ const payload = job.data as MoveObjectStoragePayload
+
+ const video = await VideoModel.loadWithFiles(payload.videoUUID)
+ if (!video) return
+
+ logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTagsBase(video.uuid, video.url) })
+
+ await moveToFailedMoveToObjectStorageState(video)
+ await VideoJobInfoModel.abortAllTasks(video.uuid, 'pendingMove')
+}
+
// ---------------------------------------------------------------------------
async function moveWebTorrentFiles (video: MVideoWithAllFiles) {
import { processActorKeys } from './handlers/actor-keys'
import { processEmail } from './handlers/email'
import { processManageVideoTorrent } from './handlers/manage-video-torrent'
-import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
+import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending'
'video-studio-edition': processVideoStudioEdition
}
+const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
+ 'move-to-object-storage': onMoveToObjectStorageFailure
+}
+
const jobTypes: JobType[] = [
'activitypub-follow',
'activitypub-http-broadcast',
: 'error'
logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err })
+
+ if (errorHandlers[job.name]) {
+ errorHandlers[job.name](job, err)
+ .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
+ }
})
queue.on('error', err => {