1 import * as Bluebird from 'bluebird'
2 import * as sequelize from 'sequelize'
3 import * as magnetUtil from 'magnet-uri'
4 import * as request from 'request'
7 ActivityPlaylistSegmentHashesObject,
8 ActivityPlaylistUrlObject,
10 ActivityVideoUrlObject,
12 } from '../../../shared/index'
13 import { VideoTorrentObject } from '../../../shared/models/activitypub/objects'
14 import { VideoPrivacy } from '../../../shared/models/videos'
15 import { sanitizeAndCheckVideoTorrentObject } from '../../helpers/custom-validators/activitypub/videos'
16 import { isVideoFileInfoHashValid } from '../../helpers/custom-validators/videos'
17 import { resetSequelizeInstance, retryTransactionWrapper } from '../../helpers/database-utils'
18 import { logger } from '../../helpers/logger'
19 import { doRequest, downloadImage } from '../../helpers/requests'
20 import { ACTIVITY_PUB, CONFIG, MIMETYPES, REMOTE_SCHEME, sequelizeTypescript, THUMBNAILS_SIZE } from '../../initializers'
21 import { ActorModel } from '../../models/activitypub/actor'
22 import { TagModel } from '../../models/video/tag'
23 import { VideoModel } from '../../models/video/video'
24 import { VideoChannelModel } from '../../models/video/video-channel'
25 import { VideoFileModel } from '../../models/video/video-file'
26 import { getOrCreateActorAndServerAndModel } from './actor'
27 import { addVideoComments } from './video-comments'
28 import { crawlCollectionPage } from './crawl'
29 import { sendCreateVideo, sendUpdateVideo } from './send'
30 import { isArray } from '../../helpers/custom-validators/misc'
31 import { VideoCaptionModel } from '../../models/video/video-caption'
32 import { JobQueue } from '../job-queue'
33 import { ActivitypubHttpFetcherPayload } from '../job-queue/handlers/activitypub-http-fetcher'
34 import { createRates } from './video-rates'
35 import { addVideoShares, shareVideoByServerAndChannel } from './share'
36 import { AccountModel } from '../../models/account/account'
37 import { fetchVideoByUrl, VideoFetchByUrlType } from '../../helpers/video'
38 import { checkUrlsSameHost, getAPId } from '../../helpers/activitypub'
39 import { Notifier } from '../notifier'
40 import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist'
41 import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type'
42 import { FilteredModelAttributes } from 'sequelize-typescript/lib/models/Model'
44 async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) {
45 // If the video is not private and published, we federate it
46 if (video.privacy !== VideoPrivacy.PRIVATE && video.state === VideoState.PUBLISHED) {
47 // Fetch more attributes that we will need to serialize in AP object
48 if (isArray(video.VideoCaptions) === false) {
49 video.VideoCaptions = await video.$get('VideoCaptions', {
50 attributes: [ 'language' ],
52 }) as VideoCaptionModel[]
56 // Now we'll add the video's meta data to our followers
57 await sendCreateVideo(video, transaction)
58 await shareVideoByServerAndChannel(video, transaction)
60 await sendUpdateVideo(video, transaction)
65 async function fetchRemoteVideo (videoUrl: string): Promise<{ response: request.RequestResponse, videoObject: VideoTorrentObject }> {
73 logger.info('Fetching remote video %s.', videoUrl)
75 const { response, body } = await doRequest(options)
77 if (sanitizeAndCheckVideoTorrentObject(body) === false || checkUrlsSameHost(body.id, videoUrl) !== true) {
78 logger.debug('Remote video JSON is not valid.', { body })
79 return { response, videoObject: undefined }
82 return { response, videoObject: body }
85 async function fetchRemoteVideoDescription (video: VideoModel) {
86 const host = video.VideoChannel.Account.Actor.Server.host
87 const path = video.getDescriptionAPIPath()
89 uri: REMOTE_SCHEME.HTTP + '://' + host + path,
93 const { body } = await doRequest(options)
94 return body.description ? body.description : ''
97 function fetchRemoteVideoStaticFile (video: VideoModel, path: string, reject: Function) {
98 const host = video.VideoChannel.Account.Actor.Server.host
100 // We need to provide a callback, if no we could have an uncaught exception
101 return request.get(REMOTE_SCHEME.HTTP + '://' + host + path, err => {
106 function generateThumbnailFromUrl (video: VideoModel, icon: ActivityIconObject) {
107 const thumbnailName = video.getThumbnailName()
109 return downloadImage(icon.url, CONFIG.STORAGE.THUMBNAILS_DIR, thumbnailName, THUMBNAILS_SIZE)
112 function getOrCreateVideoChannelFromVideoObject (videoObject: VideoTorrentObject) {
113 const channel = videoObject.attributedTo.find(a => a.type === 'Group')
114 if (!channel) throw new Error('Cannot find associated video channel to video ' + videoObject.url)
116 if (checkUrlsSameHost(channel.id, videoObject.id) !== true) {
117 throw new Error(`Video channel url ${channel.id} does not have the same host than video object id ${videoObject.id}`)
120 return getOrCreateActorAndServerAndModel(channel.id, 'all')
129 refreshVideo?: boolean
131 async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: VideoTorrentObject, syncParam: SyncParam) {
132 logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid)
134 const jobPayloads: ActivitypubHttpFetcherPayload[] = []
136 if (syncParam.likes === true) {
137 await crawlCollectionPage<string>(fetchedVideo.likes, items => createRates(items, video, 'like'))
138 .catch(err => logger.error('Cannot add likes of video %s.', video.uuid, { err }))
140 jobPayloads.push({ uri: fetchedVideo.likes, videoId: video.id, type: 'video-likes' as 'video-likes' })
143 if (syncParam.dislikes === true) {
144 await crawlCollectionPage<string>(fetchedVideo.dislikes, items => createRates(items, video, 'dislike'))
145 .catch(err => logger.error('Cannot add dislikes of video %s.', video.uuid, { err }))
147 jobPayloads.push({ uri: fetchedVideo.dislikes, videoId: video.id, type: 'video-dislikes' as 'video-dislikes' })
150 if (syncParam.shares === true) {
151 await crawlCollectionPage<string>(fetchedVideo.shares, items => addVideoShares(items, video))
152 .catch(err => logger.error('Cannot add shares of video %s.', video.uuid, { err }))
154 jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' })
157 if (syncParam.comments === true) {
158 await crawlCollectionPage<string>(fetchedVideo.comments, items => addVideoComments(items, video))
159 .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err }))
161 jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' })
164 await Bluebird.map(jobPayloads, payload => JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }))
167 async function getOrCreateVideoAndAccountAndChannel (options: {
168 videoObject: { id: string } | string,
169 syncParam?: SyncParam,
170 fetchType?: VideoFetchByUrlType,
171 allowRefresh?: boolean // true by default
174 const syncParam = options.syncParam || { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true, refreshVideo: false }
175 const fetchType = options.fetchType || 'all'
176 const allowRefresh = options.allowRefresh !== false
179 const videoUrl = getAPId(options.videoObject)
181 let videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType)
182 if (videoFromDatabase) {
184 if (allowRefresh === true) {
185 const refreshOptions = {
186 video: videoFromDatabase,
187 fetchedType: fetchType,
191 if (syncParam.refreshVideo === true) videoFromDatabase = await refreshVideoIfNeeded(refreshOptions)
192 else await JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: videoFromDatabase.url } })
195 return { video: videoFromDatabase, created: false }
198 const { videoObject: fetchedVideo } = await fetchRemoteVideo(videoUrl)
199 if (!fetchedVideo) throw new Error('Cannot fetch remote video with url: ' + videoUrl)
201 const channelActor = await getOrCreateVideoChannelFromVideoObject(fetchedVideo)
202 const video = await retryTransactionWrapper(createVideo, fetchedVideo, channelActor, syncParam.thumbnail)
204 await syncVideoExternalAttributes(video, fetchedVideo, syncParam)
206 return { video, created: true }
209 async function updateVideoFromAP (options: {
211 videoObject: VideoTorrentObject,
212 account: AccountModel,
213 channel: VideoChannelModel,
214 overrideTo?: string[]
216 logger.debug('Updating remote video "%s".', options.videoObject.uuid)
218 let videoFieldsSave: any
219 const wasPrivateVideo = options.video.privacy === VideoPrivacy.PRIVATE
220 const wasUnlistedVideo = options.video.privacy === VideoPrivacy.UNLISTED
223 await sequelizeTypescript.transaction(async t => {
224 const sequelizeOptions = { transaction: t }
226 videoFieldsSave = options.video.toJSON()
228 // Check actor has the right to update the video
229 const videoChannel = options.video.VideoChannel
230 if (videoChannel.Account.id !== options.account.id) {
231 throw new Error('Account ' + options.account.Actor.url + ' does not own video channel ' + videoChannel.Actor.url)
234 const to = options.overrideTo ? options.overrideTo : options.videoObject.to
235 const videoData = await videoActivityObjectToDBAttributes(options.channel, options.videoObject, to)
236 options.video.set('name', videoData.name)
237 options.video.set('uuid', videoData.uuid)
238 options.video.set('url', videoData.url)
239 options.video.set('category', videoData.category)
240 options.video.set('licence', videoData.licence)
241 options.video.set('language', videoData.language)
242 options.video.set('description', videoData.description)
243 options.video.set('support', videoData.support)
244 options.video.set('nsfw', videoData.nsfw)
245 options.video.set('commentsEnabled', videoData.commentsEnabled)
246 options.video.set('downloadEnabled', videoData.downloadEnabled)
247 options.video.set('waitTranscoding', videoData.waitTranscoding)
248 options.video.set('state', videoData.state)
249 options.video.set('duration', videoData.duration)
250 options.video.set('createdAt', videoData.createdAt)
251 options.video.set('publishedAt', videoData.publishedAt)
252 options.video.set('privacy', videoData.privacy)
253 options.video.set('channelId', videoData.channelId)
254 options.video.set('views', videoData.views)
256 await options.video.save(sequelizeOptions)
259 const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject)
260 const newVideoFiles = videoFileAttributes.map(a => new VideoFileModel(a))
262 // Remove video files that do not exist anymore
263 const destroyTasks = options.video.VideoFiles
264 .filter(f => !newVideoFiles.find(newFile => newFile.hasSameUniqueKeysThan(f)))
265 .map(f => f.destroy(sequelizeOptions))
266 await Promise.all(destroyTasks)
268 // Update or add other one
269 const upsertTasks = videoFileAttributes.map(a => {
270 return VideoFileModel.upsert<VideoFileModel>(a, { returning: true, transaction: t })
271 .then(([ file ]) => file)
274 options.video.VideoFiles = await Promise.all(upsertTasks)
278 const streamingPlaylistAttributes = streamingPlaylistActivityUrlToDBAttributes(options.video, options.videoObject)
279 const newStreamingPlaylists = streamingPlaylistAttributes.map(a => new VideoStreamingPlaylistModel(a))
281 // Remove video files that do not exist anymore
282 const destroyTasks = options.video.VideoStreamingPlaylists
283 .filter(f => !newStreamingPlaylists.find(newPlaylist => newPlaylist.hasSameUniqueKeysThan(f)))
284 .map(f => f.destroy(sequelizeOptions))
285 await Promise.all(destroyTasks)
287 // Update or add other one
288 const upsertTasks = streamingPlaylistAttributes.map(a => {
289 return VideoStreamingPlaylistModel.upsert<VideoStreamingPlaylistModel>(a, { returning: true, transaction: t })
290 .then(([ streamingPlaylist ]) => streamingPlaylist)
293 options.video.VideoStreamingPlaylists = await Promise.all(upsertTasks)
298 const tags = options.videoObject.tag.map(tag => tag.name)
299 const tagInstances = await TagModel.findOrCreateTags(tags, t)
300 await options.video.$set('Tags', tagInstances, sequelizeOptions)
305 await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t)
307 const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => {
308 return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t)
310 options.video.VideoCaptions = await Promise.all(videoCaptionsPromises)
315 if (wasPrivateVideo || wasUnlistedVideo) {
316 Notifier.Instance.notifyOnNewVideo(options.video)
319 logger.info('Remote video with uuid %s updated', options.videoObject.uuid)
321 if (options.video !== undefined && videoFieldsSave !== undefined) {
322 resetSequelizeInstance(options.video, videoFieldsSave)
325 // This is just a debug because we will retry the insert
326 logger.debug('Cannot update the remote video.', { err })
331 await generateThumbnailFromUrl(options.video, options.videoObject.icon)
333 logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })
337 async function refreshVideoIfNeeded (options: {
339 fetchedType: VideoFetchByUrlType,
341 }): Promise<VideoModel> {
342 if (!options.video.isOutdated()) return options.video
344 // We need more attributes if the argument video was fetched with not enough joints
345 const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url)
348 const { response, videoObject } = await fetchRemoteVideo(video.url)
349 if (response.statusCode === 404) {
350 logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url)
352 // Video does not exist anymore
353 await video.destroy()
357 if (videoObject === undefined) {
358 logger.warn('Cannot refresh remote video %s: invalid body.', video.url)
360 await video.setAsRefreshed()
364 const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
365 const account = await AccountModel.load(channelActor.VideoChannel.accountId)
367 const updateOptions = {
371 channel: channelActor.VideoChannel
373 await retryTransactionWrapper(updateVideoFromAP, updateOptions)
374 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
378 logger.warn('Cannot refresh video %s.', options.video.url, { err })
380 // Don't refresh in loop
381 await video.setAsRefreshed()
388 refreshVideoIfNeeded,
389 federateVideoIfNeeded,
391 getOrCreateVideoAndAccountAndChannel,
392 fetchRemoteVideoStaticFile,
393 fetchRemoteVideoDescription,
394 generateThumbnailFromUrl,
395 getOrCreateVideoChannelFromVideoObject
398 // ---------------------------------------------------------------------------
400 function isAPVideoUrlObject (url: ActivityUrlObject): url is ActivityVideoUrlObject {
401 const mimeTypes = Object.keys(MIMETYPES.VIDEO.MIMETYPE_EXT)
403 const urlMediaType = url.mediaType || url.mimeType
404 return mimeTypes.indexOf(urlMediaType) !== -1 && urlMediaType.startsWith('video/')
407 function isAPStreamingPlaylistUrlObject (url: ActivityUrlObject): url is ActivityPlaylistUrlObject {
408 const urlMediaType = url.mediaType || url.mimeType
410 return urlMediaType === 'application/x-mpegURL'
413 function isAPPlaylistSegmentHashesUrlObject (tag: any): tag is ActivityPlaylistSegmentHashesObject {
414 const urlMediaType = tag.mediaType || tag.mimeType
416 return tag.name === 'sha256' && tag.type === 'Link' && urlMediaType === 'application/json'
419 async function createVideo (videoObject: VideoTorrentObject, channelActor: ActorModel, waitThumbnail = false) {
420 logger.debug('Adding remote video %s.', videoObject.id)
422 const videoCreated: VideoModel = await sequelizeTypescript.transaction(async t => {
423 const sequelizeOptions = { transaction: t }
425 const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, videoObject.to)
426 const video = VideoModel.build(videoData)
428 const videoCreated = await video.save(sequelizeOptions)
431 const videoFileAttributes = videoFileActivityUrlToDBAttributes(videoCreated, videoObject)
432 if (videoFileAttributes.length === 0) {
433 throw new Error('Cannot find valid files for video %s ' + videoObject.url)
436 const videoFilePromises = videoFileAttributes.map(f => VideoFileModel.create(f, { transaction: t }))
437 await Promise.all(videoFilePromises)
439 const videoStreamingPlaylists = streamingPlaylistActivityUrlToDBAttributes(videoCreated, videoObject)
440 const playlistPromises = videoStreamingPlaylists.map(p => VideoStreamingPlaylistModel.create(p, { transaction: t }))
441 await Promise.all(playlistPromises)
444 const tags = videoObject.tag
445 .filter(t => t.type === 'Hashtag')
447 const tagInstances = await TagModel.findOrCreateTags(tags, t)
448 await videoCreated.$set('Tags', tagInstances, sequelizeOptions)
451 const videoCaptionsPromises = videoObject.subtitleLanguage.map(c => {
452 return VideoCaptionModel.insertOrReplaceLanguage(videoCreated.id, c.identifier, t)
454 await Promise.all(videoCaptionsPromises)
456 logger.info('Remote video with uuid %s inserted.', videoObject.uuid)
458 videoCreated.VideoChannel = channelActor.VideoChannel
462 const p = generateThumbnailFromUrl(videoCreated, videoObject.icon)
463 .catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err }))
465 if (waitThumbnail === true) await p
470 async function videoActivityObjectToDBAttributes (
471 videoChannel: VideoChannelModel,
472 videoObject: VideoTorrentObject,
475 const privacy = to.indexOf(ACTIVITY_PUB.PUBLIC) !== -1 ? VideoPrivacy.PUBLIC : VideoPrivacy.UNLISTED
476 const duration = videoObject.duration.replace(/[^\d]+/, '')
478 let language: string | undefined
479 if (videoObject.language) {
480 language = videoObject.language.identifier
483 let category: number | undefined
484 if (videoObject.category) {
485 category = parseInt(videoObject.category.identifier, 10)
488 let licence: number | undefined
489 if (videoObject.licence) {
490 licence = parseInt(videoObject.licence.identifier, 10)
493 const description = videoObject.content || null
494 const support = videoObject.support || null
497 name: videoObject.name,
498 uuid: videoObject.uuid,
505 nsfw: videoObject.sensitive,
506 commentsEnabled: videoObject.commentsEnabled,
507 downloadEnabled: videoObject.downloadEnabled,
508 waitTranscoding: videoObject.waitTranscoding,
509 state: videoObject.state,
510 channelId: videoChannel.id,
511 duration: parseInt(duration, 10),
512 createdAt: new Date(videoObject.published),
513 publishedAt: new Date(videoObject.published),
514 // FIXME: updatedAt does not seems to be considered by Sequelize
515 updatedAt: new Date(videoObject.updated),
516 views: videoObject.views,
524 function videoFileActivityUrlToDBAttributes (video: VideoModel, videoObject: VideoTorrentObject) {
525 const fileUrls = videoObject.url.filter(u => isAPVideoUrlObject(u)) as ActivityVideoUrlObject[]
527 if (fileUrls.length === 0) {
528 throw new Error('Cannot find video files for ' + video.url)
531 const attributes: FilteredModelAttributes<VideoFileModel>[] = []
532 for (const fileUrl of fileUrls) {
533 // Fetch associated magnet uri
534 const magnet = videoObject.url.find(u => {
535 const mediaType = u.mediaType || u.mimeType
536 return mediaType === 'application/x-bittorrent;x-scheme-handler/magnet' && (u as any).height === fileUrl.height
539 if (!magnet) throw new Error('Cannot find associated magnet uri for file ' + fileUrl.href)
541 const parsed = magnetUtil.decode(magnet.href)
542 if (!parsed || isVideoFileInfoHashValid(parsed.infoHash) === false) {
543 throw new Error('Cannot parse magnet URI ' + magnet.href)
546 const mediaType = fileUrl.mediaType || fileUrl.mimeType
548 extname: MIMETYPES.VIDEO.MIMETYPE_EXT[ mediaType ],
549 infoHash: parsed.infoHash,
550 resolution: fileUrl.height,
553 fps: fileUrl.fps || -1
556 attributes.push(attribute)
562 function streamingPlaylistActivityUrlToDBAttributes (video: VideoModel, videoObject: VideoTorrentObject) {
563 const playlistUrls = videoObject.url.filter(u => isAPStreamingPlaylistUrlObject(u)) as ActivityPlaylistUrlObject[]
564 if (playlistUrls.length === 0) return []
566 const attributes: FilteredModelAttributes<VideoStreamingPlaylistModel>[] = []
567 for (const playlistUrlObject of playlistUrls) {
568 const p2pMediaLoaderInfohashes = playlistUrlObject.tag
569 .filter(t => t.type === 'Infohash')
571 if (p2pMediaLoaderInfohashes.length === 0) {
572 logger.warn('No infohashes found in AP playlist object.', { playlistUrl: playlistUrlObject })
576 const segmentsSha256UrlObject = playlistUrlObject.tag
578 return isAPPlaylistSegmentHashesUrlObject(t)
579 }) as ActivityPlaylistSegmentHashesObject
580 if (!segmentsSha256UrlObject) {
581 logger.warn('No segment sha256 URL found in AP playlist object.', { playlistUrl: playlistUrlObject })
586 type: VideoStreamingPlaylistType.HLS,
587 playlistUrl: playlistUrlObject.href,
588 segmentsSha256Url: segmentsSha256UrlObject.href,
589 p2pMediaLoaderInfohashes,
593 attributes.push(attribute)