1 import * as Bluebird from 'bluebird'
2 import * as sequelize from 'sequelize'
3 import * as magnetUtil from 'magnet-uri'
4 import * as request from 'request'
7 ActivityPlaylistSegmentHashesObject,
8 ActivityPlaylistUrlObject,
10 ActivityVideoUrlObject,
12 } from '../../../shared/index'
13 import { VideoTorrentObject } from '../../../shared/models/activitypub/objects'
14 import { VideoPrivacy } from '../../../shared/models/videos'
15 import { sanitizeAndCheckVideoTorrentObject } from '../../helpers/custom-validators/activitypub/videos'
16 import { isVideoFileInfoHashValid } from '../../helpers/custom-validators/videos'
17 import { resetSequelizeInstance, retryTransactionWrapper } from '../../helpers/database-utils'
18 import { logger } from '../../helpers/logger'
19 import { doRequest, downloadImage } from '../../helpers/requests'
20 import { ACTIVITY_PUB, CONFIG, MIMETYPES, REMOTE_SCHEME, sequelizeTypescript, THUMBNAILS_SIZE } from '../../initializers'
21 import { ActorModel } from '../../models/activitypub/actor'
22 import { TagModel } from '../../models/video/tag'
23 import { VideoModel } from '../../models/video/video'
24 import { VideoChannelModel } from '../../models/video/video-channel'
25 import { VideoFileModel } from '../../models/video/video-file'
26 import { getOrCreateActorAndServerAndModel } from './actor'
27 import { addVideoComments } from './video-comments'
28 import { crawlCollectionPage } from './crawl'
29 import { sendCreateVideo, sendUpdateVideo } from './send'
30 import { isArray } from '../../helpers/custom-validators/misc'
31 import { VideoCaptionModel } from '../../models/video/video-caption'
32 import { JobQueue } from '../job-queue'
33 import { ActivitypubHttpFetcherPayload } from '../job-queue/handlers/activitypub-http-fetcher'
34 import { createRates } from './video-rates'
35 import { addVideoShares, shareVideoByServerAndChannel } from './share'
36 import { AccountModel } from '../../models/account/account'
37 import { fetchVideoByUrl, VideoFetchByUrlType } from '../../helpers/video'
38 import { checkUrlsSameHost, getAPId } from '../../helpers/activitypub'
39 import { Notifier } from '../notifier'
40 import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist'
41 import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type'
42 import { FilteredModelAttributes } from 'sequelize-typescript/lib/models/Model'
43 import { AccountVideoRateModel } from '../../models/account/account-video-rate'
44 import { VideoShareModel } from '../../models/video/video-share'
45 import { VideoCommentModel } from '../../models/video/video-comment'
47 async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) {
48 // If the video is not private and published, we federate it
49 if (video.privacy !== VideoPrivacy.PRIVATE && video.state === VideoState.PUBLISHED) {
50 // Fetch more attributes that we will need to serialize in AP object
51 if (isArray(video.VideoCaptions) === false) {
52 video.VideoCaptions = await video.$get('VideoCaptions', {
53 attributes: [ 'language' ],
55 }) as VideoCaptionModel[]
59 // Now we'll add the video's meta data to our followers
60 await sendCreateVideo(video, transaction)
61 await shareVideoByServerAndChannel(video, transaction)
63 await sendUpdateVideo(video, transaction)
68 async function fetchRemoteVideo (videoUrl: string): Promise<{ response: request.RequestResponse, videoObject: VideoTorrentObject }> {
76 logger.info('Fetching remote video %s.', videoUrl)
78 const { response, body } = await doRequest(options)
80 if (sanitizeAndCheckVideoTorrentObject(body) === false || checkUrlsSameHost(body.id, videoUrl) !== true) {
81 logger.debug('Remote video JSON is not valid.', { body })
82 return { response, videoObject: undefined }
85 return { response, videoObject: body }
88 async function fetchRemoteVideoDescription (video: VideoModel) {
89 const host = video.VideoChannel.Account.Actor.Server.host
90 const path = video.getDescriptionAPIPath()
92 uri: REMOTE_SCHEME.HTTP + '://' + host + path,
96 const { body } = await doRequest(options)
97 return body.description ? body.description : ''
100 function fetchRemoteVideoStaticFile (video: VideoModel, path: string, reject: Function) {
101 const host = video.VideoChannel.Account.Actor.Server.host
103 // We need to provide a callback, if no we could have an uncaught exception
104 return request.get(REMOTE_SCHEME.HTTP + '://' + host + path, err => {
109 function generateThumbnailFromUrl (video: VideoModel, icon: ActivityIconObject) {
110 const thumbnailName = video.getThumbnailName()
112 return downloadImage(icon.url, CONFIG.STORAGE.THUMBNAILS_DIR, thumbnailName, THUMBNAILS_SIZE)
115 function getOrCreateVideoChannelFromVideoObject (videoObject: VideoTorrentObject) {
116 const channel = videoObject.attributedTo.find(a => a.type === 'Group')
117 if (!channel) throw new Error('Cannot find associated video channel to video ' + videoObject.url)
119 if (checkUrlsSameHost(channel.id, videoObject.id) !== true) {
120 throw new Error(`Video channel url ${channel.id} does not have the same host than video object id ${videoObject.id}`)
123 return getOrCreateActorAndServerAndModel(channel.id, 'all')
132 refreshVideo?: boolean
134 async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: VideoTorrentObject, syncParam: SyncParam) {
135 logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid)
137 const jobPayloads: ActivitypubHttpFetcherPayload[] = []
139 if (syncParam.likes === true) {
140 const handler = items => createRates(items, video, 'like')
141 const cleaner = crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'like' as 'like', crawlStartDate)
143 await crawlCollectionPage<string>(fetchedVideo.likes, handler, cleaner)
144 .catch(err => logger.error('Cannot add likes of video %s.', video.uuid, { err }))
146 jobPayloads.push({ uri: fetchedVideo.likes, videoId: video.id, type: 'video-likes' as 'video-likes' })
149 if (syncParam.dislikes === true) {
150 const handler = items => createRates(items, video, 'dislike')
151 const cleaner = crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'dislike' as 'dislike', crawlStartDate)
153 await crawlCollectionPage<string>(fetchedVideo.dislikes, handler, cleaner)
154 .catch(err => logger.error('Cannot add dislikes of video %s.', video.uuid, { err }))
156 jobPayloads.push({ uri: fetchedVideo.dislikes, videoId: video.id, type: 'video-dislikes' as 'video-dislikes' })
159 if (syncParam.shares === true) {
160 const handler = items => addVideoShares(items, video)
161 const cleaner = crawlStartDate => VideoShareModel.cleanOldSharesOf(video.id, crawlStartDate)
163 await crawlCollectionPage<string>(fetchedVideo.shares, handler, cleaner)
164 .catch(err => logger.error('Cannot add shares of video %s.', video.uuid, { err }))
166 jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' })
169 if (syncParam.comments === true) {
170 const handler = items => addVideoComments(items, video)
171 const cleaner = crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate)
173 await crawlCollectionPage<string>(fetchedVideo.comments, handler, cleaner)
174 .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err }))
176 jobPayloads.push({ uri: fetchedVideo.comments, videoId: video.id, type: 'video-comments' as 'video-comments' })
179 await Bluebird.map(jobPayloads, payload => JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }))
182 async function getOrCreateVideoAndAccountAndChannel (options: {
183 videoObject: { id: string } | string,
184 syncParam?: SyncParam,
185 fetchType?: VideoFetchByUrlType,
186 allowRefresh?: boolean // true by default
189 const syncParam = options.syncParam || { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true, refreshVideo: false }
190 const fetchType = options.fetchType || 'all'
191 const allowRefresh = options.allowRefresh !== false
194 const videoUrl = getAPId(options.videoObject)
196 let videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType)
197 if (videoFromDatabase) {
198 if (videoFromDatabase.isOutdated() && allowRefresh === true) {
199 const refreshOptions = {
200 video: videoFromDatabase,
201 fetchedType: fetchType,
205 if (syncParam.refreshVideo === true) videoFromDatabase = await refreshVideoIfNeeded(refreshOptions)
206 else await JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: videoFromDatabase.url } })
209 return { video: videoFromDatabase, created: false }
212 const { videoObject: fetchedVideo } = await fetchRemoteVideo(videoUrl)
213 if (!fetchedVideo) throw new Error('Cannot fetch remote video with url: ' + videoUrl)
215 const channelActor = await getOrCreateVideoChannelFromVideoObject(fetchedVideo)
216 const video = await retryTransactionWrapper(createVideo, fetchedVideo, channelActor, syncParam.thumbnail)
218 await syncVideoExternalAttributes(video, fetchedVideo, syncParam)
220 return { video, created: true }
223 async function updateVideoFromAP (options: {
225 videoObject: VideoTorrentObject,
226 account: AccountModel,
227 channel: VideoChannelModel,
228 overrideTo?: string[]
230 logger.debug('Updating remote video "%s".', options.videoObject.uuid)
232 let videoFieldsSave: any
233 const wasPrivateVideo = options.video.privacy === VideoPrivacy.PRIVATE
234 const wasUnlistedVideo = options.video.privacy === VideoPrivacy.UNLISTED
237 await sequelizeTypescript.transaction(async t => {
238 const sequelizeOptions = { transaction: t }
240 videoFieldsSave = options.video.toJSON()
242 // Check actor has the right to update the video
243 const videoChannel = options.video.VideoChannel
244 if (videoChannel.Account.id !== options.account.id) {
245 throw new Error('Account ' + options.account.Actor.url + ' does not own video channel ' + videoChannel.Actor.url)
248 const to = options.overrideTo ? options.overrideTo : options.videoObject.to
249 const videoData = await videoActivityObjectToDBAttributes(options.channel, options.videoObject, to)
250 options.video.set('name', videoData.name)
251 options.video.set('uuid', videoData.uuid)
252 options.video.set('url', videoData.url)
253 options.video.set('category', videoData.category)
254 options.video.set('licence', videoData.licence)
255 options.video.set('language', videoData.language)
256 options.video.set('description', videoData.description)
257 options.video.set('support', videoData.support)
258 options.video.set('nsfw', videoData.nsfw)
259 options.video.set('commentsEnabled', videoData.commentsEnabled)
260 options.video.set('downloadEnabled', videoData.downloadEnabled)
261 options.video.set('waitTranscoding', videoData.waitTranscoding)
262 options.video.set('state', videoData.state)
263 options.video.set('duration', videoData.duration)
264 options.video.set('createdAt', videoData.createdAt)
265 options.video.set('publishedAt', videoData.publishedAt)
266 options.video.set('originallyPublishedAt', videoData.originallyPublishedAt)
267 options.video.set('privacy', videoData.privacy)
268 options.video.set('channelId', videoData.channelId)
269 options.video.set('views', videoData.views)
271 await options.video.save(sequelizeOptions)
274 const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject)
275 const newVideoFiles = videoFileAttributes.map(a => new VideoFileModel(a))
277 // Remove video files that do not exist anymore
278 const destroyTasks = options.video.VideoFiles
279 .filter(f => !newVideoFiles.find(newFile => newFile.hasSameUniqueKeysThan(f)))
280 .map(f => f.destroy(sequelizeOptions))
281 await Promise.all(destroyTasks)
283 // Update or add other one
284 const upsertTasks = videoFileAttributes.map(a => {
285 return VideoFileModel.upsert<VideoFileModel>(a, { returning: true, transaction: t })
286 .then(([ file ]) => file)
289 options.video.VideoFiles = await Promise.all(upsertTasks)
293 const streamingPlaylistAttributes = streamingPlaylistActivityUrlToDBAttributes(options.video, options.videoObject)
294 const newStreamingPlaylists = streamingPlaylistAttributes.map(a => new VideoStreamingPlaylistModel(a))
296 // Remove video files that do not exist anymore
297 const destroyTasks = options.video.VideoStreamingPlaylists
298 .filter(f => !newStreamingPlaylists.find(newPlaylist => newPlaylist.hasSameUniqueKeysThan(f)))
299 .map(f => f.destroy(sequelizeOptions))
300 await Promise.all(destroyTasks)
302 // Update or add other one
303 const upsertTasks = streamingPlaylistAttributes.map(a => {
304 return VideoStreamingPlaylistModel.upsert<VideoStreamingPlaylistModel>(a, { returning: true, transaction: t })
305 .then(([ streamingPlaylist ]) => streamingPlaylist)
308 options.video.VideoStreamingPlaylists = await Promise.all(upsertTasks)
313 const tags = options.videoObject.tag.map(tag => tag.name)
314 const tagInstances = await TagModel.findOrCreateTags(tags, t)
315 await options.video.$set('Tags', tagInstances, sequelizeOptions)
320 await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t)
322 const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => {
323 return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t)
325 options.video.VideoCaptions = await Promise.all(videoCaptionsPromises)
330 if (wasPrivateVideo || wasUnlistedVideo) {
331 Notifier.Instance.notifyOnNewVideo(options.video)
334 logger.info('Remote video with uuid %s updated', options.videoObject.uuid)
336 if (options.video !== undefined && videoFieldsSave !== undefined) {
337 resetSequelizeInstance(options.video, videoFieldsSave)
340 // This is just a debug because we will retry the insert
341 logger.debug('Cannot update the remote video.', { err })
346 await generateThumbnailFromUrl(options.video, options.videoObject.icon)
348 logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })
352 async function refreshVideoIfNeeded (options: {
354 fetchedType: VideoFetchByUrlType,
356 }): Promise<VideoModel> {
357 if (!options.video.isOutdated()) return options.video
359 // We need more attributes if the argument video was fetched with not enough joints
360 const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url)
363 const { response, videoObject } = await fetchRemoteVideo(video.url)
364 if (response.statusCode === 404) {
365 logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url)
367 // Video does not exist anymore
368 await video.destroy()
372 if (videoObject === undefined) {
373 logger.warn('Cannot refresh remote video %s: invalid body.', video.url)
375 await video.setAsRefreshed()
379 const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
380 const account = await AccountModel.load(channelActor.VideoChannel.accountId)
382 const updateOptions = {
386 channel: channelActor.VideoChannel
388 await retryTransactionWrapper(updateVideoFromAP, updateOptions)
389 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
393 logger.warn('Cannot refresh video %s.', options.video.url, { err })
395 // Don't refresh in loop
396 await video.setAsRefreshed()
403 refreshVideoIfNeeded,
404 federateVideoIfNeeded,
406 getOrCreateVideoAndAccountAndChannel,
407 fetchRemoteVideoStaticFile,
408 fetchRemoteVideoDescription,
409 generateThumbnailFromUrl,
410 getOrCreateVideoChannelFromVideoObject
413 // ---------------------------------------------------------------------------
415 function isAPVideoUrlObject (url: ActivityUrlObject): url is ActivityVideoUrlObject {
416 const mimeTypes = Object.keys(MIMETYPES.VIDEO.MIMETYPE_EXT)
418 const urlMediaType = url.mediaType || url.mimeType
419 return mimeTypes.indexOf(urlMediaType) !== -1 && urlMediaType.startsWith('video/')
422 function isAPStreamingPlaylistUrlObject (url: ActivityUrlObject): url is ActivityPlaylistUrlObject {
423 const urlMediaType = url.mediaType || url.mimeType
425 return urlMediaType === 'application/x-mpegURL'
428 function isAPPlaylistSegmentHashesUrlObject (tag: any): tag is ActivityPlaylistSegmentHashesObject {
429 const urlMediaType = tag.mediaType || tag.mimeType
431 return tag.name === 'sha256' && tag.type === 'Link' && urlMediaType === 'application/json'
434 async function createVideo (videoObject: VideoTorrentObject, channelActor: ActorModel, waitThumbnail = false) {
435 logger.debug('Adding remote video %s.', videoObject.id)
437 const videoCreated: VideoModel = await sequelizeTypescript.transaction(async t => {
438 const sequelizeOptions = { transaction: t }
440 const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, videoObject.to)
441 const video = VideoModel.build(videoData)
443 const videoCreated = await video.save(sequelizeOptions)
446 const videoFileAttributes = videoFileActivityUrlToDBAttributes(videoCreated, videoObject)
447 if (videoFileAttributes.length === 0) {
448 throw new Error('Cannot find valid files for video %s ' + videoObject.url)
451 const videoFilePromises = videoFileAttributes.map(f => VideoFileModel.create(f, { transaction: t }))
452 await Promise.all(videoFilePromises)
454 const videoStreamingPlaylists = streamingPlaylistActivityUrlToDBAttributes(videoCreated, videoObject)
455 const playlistPromises = videoStreamingPlaylists.map(p => VideoStreamingPlaylistModel.create(p, { transaction: t }))
456 await Promise.all(playlistPromises)
459 const tags = videoObject.tag
460 .filter(t => t.type === 'Hashtag')
462 const tagInstances = await TagModel.findOrCreateTags(tags, t)
463 await videoCreated.$set('Tags', tagInstances, sequelizeOptions)
466 const videoCaptionsPromises = videoObject.subtitleLanguage.map(c => {
467 return VideoCaptionModel.insertOrReplaceLanguage(videoCreated.id, c.identifier, t)
469 await Promise.all(videoCaptionsPromises)
471 logger.info('Remote video with uuid %s inserted.', videoObject.uuid)
473 videoCreated.VideoChannel = channelActor.VideoChannel
477 const p = generateThumbnailFromUrl(videoCreated, videoObject.icon)
478 .catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err }))
480 if (waitThumbnail === true) await p
485 async function videoActivityObjectToDBAttributes (
486 videoChannel: VideoChannelModel,
487 videoObject: VideoTorrentObject,
490 const privacy = to.indexOf(ACTIVITY_PUB.PUBLIC) !== -1 ? VideoPrivacy.PUBLIC : VideoPrivacy.UNLISTED
491 const duration = videoObject.duration.replace(/[^\d]+/, '')
493 let language: string | undefined
494 if (videoObject.language) {
495 language = videoObject.language.identifier
498 let category: number | undefined
499 if (videoObject.category) {
500 category = parseInt(videoObject.category.identifier, 10)
503 let licence: number | undefined
504 if (videoObject.licence) {
505 licence = parseInt(videoObject.licence.identifier, 10)
508 const description = videoObject.content || null
509 const support = videoObject.support || null
512 name: videoObject.name,
513 uuid: videoObject.uuid,
520 nsfw: videoObject.sensitive,
521 commentsEnabled: videoObject.commentsEnabled,
522 downloadEnabled: videoObject.downloadEnabled,
523 waitTranscoding: videoObject.waitTranscoding,
524 state: videoObject.state,
525 channelId: videoChannel.id,
526 duration: parseInt(duration, 10),
527 createdAt: new Date(videoObject.published),
528 publishedAt: new Date(videoObject.published),
529 originallyPublishedAt: videoObject.originallyPublishedAt ? new Date(videoObject.originallyPublishedAt) : null,
530 // FIXME: updatedAt does not seems to be considered by Sequelize
531 updatedAt: new Date(videoObject.updated),
532 views: videoObject.views,
540 function videoFileActivityUrlToDBAttributes (video: VideoModel, videoObject: VideoTorrentObject) {
541 const fileUrls = videoObject.url.filter(u => isAPVideoUrlObject(u)) as ActivityVideoUrlObject[]
543 if (fileUrls.length === 0) {
544 throw new Error('Cannot find video files for ' + video.url)
547 const attributes: FilteredModelAttributes<VideoFileModel>[] = []
548 for (const fileUrl of fileUrls) {
549 // Fetch associated magnet uri
550 const magnet = videoObject.url.find(u => {
551 const mediaType = u.mediaType || u.mimeType
552 return mediaType === 'application/x-bittorrent;x-scheme-handler/magnet' && (u as any).height === fileUrl.height
555 if (!magnet) throw new Error('Cannot find associated magnet uri for file ' + fileUrl.href)
557 const parsed = magnetUtil.decode(magnet.href)
558 if (!parsed || isVideoFileInfoHashValid(parsed.infoHash) === false) {
559 throw new Error('Cannot parse magnet URI ' + magnet.href)
562 const mediaType = fileUrl.mediaType || fileUrl.mimeType
564 extname: MIMETYPES.VIDEO.MIMETYPE_EXT[ mediaType ],
565 infoHash: parsed.infoHash,
566 resolution: fileUrl.height,
569 fps: fileUrl.fps || -1
572 attributes.push(attribute)
578 function streamingPlaylistActivityUrlToDBAttributes (video: VideoModel, videoObject: VideoTorrentObject) {
579 const playlistUrls = videoObject.url.filter(u => isAPStreamingPlaylistUrlObject(u)) as ActivityPlaylistUrlObject[]
580 if (playlistUrls.length === 0) return []
582 const attributes: FilteredModelAttributes<VideoStreamingPlaylistModel>[] = []
583 for (const playlistUrlObject of playlistUrls) {
584 const p2pMediaLoaderInfohashes = playlistUrlObject.tag
585 .filter(t => t.type === 'Infohash')
587 if (p2pMediaLoaderInfohashes.length === 0) {
588 logger.warn('No infohashes found in AP playlist object.', { playlistUrl: playlistUrlObject })
592 const segmentsSha256UrlObject = playlistUrlObject.tag
594 return isAPPlaylistSegmentHashesUrlObject(t)
595 }) as ActivityPlaylistSegmentHashesObject
596 if (!segmentsSha256UrlObject) {
597 logger.warn('No segment sha256 URL found in AP playlist object.', { playlistUrl: playlistUrlObject })
602 type: VideoStreamingPlaylistType.HLS,
603 playlistUrl: playlistUrlObject.href,
604 segmentsSha256Url: segmentsSha256UrlObject.href,
605 p2pMediaLoaderInfohashes,
609 attributes.push(attribute)