1 import * as Bluebird from 'bluebird'
2 import * as sequelize from 'sequelize'
3 import * as magnetUtil from 'magnet-uri'
4 import * as request from 'request'
7 ActivityPlaylistSegmentHashesObject,
8 ActivityPlaylistUrlObject,
10 ActivityVideoUrlObject,
12 } from '../../../shared/index'
13 import { VideoTorrentObject } from '../../../shared/models/activitypub/objects'
14 import { VideoPrivacy } from '../../../shared/models/videos'
15 import { sanitizeAndCheckVideoTorrentObject } from '../../helpers/custom-validators/activitypub/videos'
16 import { isVideoFileInfoHashValid } from '../../helpers/custom-validators/videos'
17 import { resetSequelizeInstance, retryTransactionWrapper } from '../../helpers/database-utils'
18 import { logger } from '../../helpers/logger'
19 import { doRequest, downloadImage } from '../../helpers/requests'
23 P2P_MEDIA_LOADER_PEER_VERSION,
27 } from '../../initializers'
28 import { ActorModel } from '../../models/activitypub/actor'
29 import { TagModel } from '../../models/video/tag'
30 import { VideoModel } from '../../models/video/video'
31 import { VideoChannelModel } from '../../models/video/video-channel'
32 import { VideoFileModel } from '../../models/video/video-file'
33 import { getOrCreateActorAndServerAndModel } from './actor'
34 import { addVideoComments } from './video-comments'
35 import { crawlCollectionPage } from './crawl'
36 import { sendCreateVideo, sendUpdateVideo } from './send'
37 import { isArray } from '../../helpers/custom-validators/misc'
38 import { VideoCaptionModel } from '../../models/video/video-caption'
39 import { JobQueue } from '../job-queue'
40 import { ActivitypubHttpFetcherPayload } from '../job-queue/handlers/activitypub-http-fetcher'
41 import { createRates } from './video-rates'
42 import { addVideoShares, shareVideoByServerAndChannel } from './share'
43 import { AccountModel } from '../../models/account/account'
44 import { fetchVideoByUrl, VideoFetchByUrlType } from '../../helpers/video'
45 import { checkUrlsSameHost, getAPId } from '../../helpers/activitypub'
46 import { Notifier } from '../notifier'
47 import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist'
48 import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type'
49 import { FilteredModelAttributes } from 'sequelize-typescript/lib/models/Model'
50 import { AccountVideoRateModel } from '../../models/account/account-video-rate'
51 import { VideoShareModel } from '../../models/video/video-share'
52 import { VideoCommentModel } from '../../models/video/video-comment'
53 import { CONFIG } from '../../initializers/config'
55 async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) {
56 // If the video is not private and is published, we federate it
57 if (video.privacy !== VideoPrivacy.PRIVATE && video.state === VideoState.PUBLISHED) {
58 // Fetch more attributes that we will need to serialize in AP object
59 if (isArray(video.VideoCaptions) === false) {
60 video.VideoCaptions = await video.$get('VideoCaptions', {
61 attributes: [ 'language' ],
63 }) as VideoCaptionModel[]
67 // Now we'll add the video's meta data to our followers
68 await sendCreateVideo(video, transaction)
69 await shareVideoByServerAndChannel(video, transaction)
71 await sendUpdateVideo(video, transaction)
76 async function fetchRemoteVideo (videoUrl: string): Promise<{ response: request.RequestResponse, videoObject: VideoTorrentObject }> {
84 logger.info('Fetching remote video %s.', videoUrl)
86 const { response, body } = await doRequest(options)
88 if (sanitizeAndCheckVideoTorrentObject(body) === false || checkUrlsSameHost(body.id, videoUrl) !== true) {
89 logger.debug('Remote video JSON is not valid.', { body })
90 return { response, videoObject: undefined }
93 return { response, videoObject: body }
96 async function fetchRemoteVideoDescription (video: VideoModel) {
97 const host = video.VideoChannel.Account.Actor.Server.host
98 const path = video.getDescriptionAPIPath()
100 uri: REMOTE_SCHEME.HTTP + '://' + host + path,
104 const { body } = await doRequest(options)
105 return body.description ? body.description : ''
108 function fetchRemoteVideoStaticFile (video: VideoModel, path: string, reject: Function) {
109 const host = video.VideoChannel.Account.Actor.Server.host
111 // We need to provide a callback, if no we could have an uncaught exception
112 return request.get(REMOTE_SCHEME.HTTP + '://' + host + path, err => {
117 function generateThumbnailFromUrl (video: VideoModel, icon: ActivityIconObject) {
118 const thumbnailName = video.getThumbnailName()
120 return downloadImage(icon.url, CONFIG.STORAGE.THUMBNAILS_DIR, thumbnailName, THUMBNAILS_SIZE)
123 function getOrCreateVideoChannelFromVideoObject (videoObject: VideoTorrentObject) {
124 const channel = videoObject.attributedTo.find(a => a.type === 'Group')
125 if (!channel) throw new Error('Cannot find associated video channel to video ' + videoObject.url)
127 if (checkUrlsSameHost(channel.id, videoObject.id) !== true) {
128 throw new Error(`Video channel url ${channel.id} does not have the same host than video object id ${videoObject.id}`)
131 return getOrCreateActorAndServerAndModel(channel.id, 'all')
140 refreshVideo?: boolean
142 async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: VideoTorrentObject, syncParam: SyncParam) {
143 logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid)
145 const jobPayloads: ActivitypubHttpFetcherPayload[] = []
147 if (syncParam.likes === true) {
148 const handler = items => createRates(items, video, 'like')
149 const cleaner = crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'like' as 'like', crawlStartDate)
151 await crawlCollectionPage<string>(fetchedVideo.likes, handler, cleaner)
152 .catch(err => logger.error('Cannot add likes of video %s.', video.uuid, { err }))
154 jobPayloads.push({ uri: fetchedVideo.likes, videoId: video.id, type: 'video-likes' as 'video-likes' })
157 if (syncParam.dislikes === true) {
158 const handler = items => createRates(items, video, 'dislike')
159 const cleaner = crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'dislike' as 'dislike', crawlStartDate)
161 await crawlCollectionPage<string>(fetchedVideo.dislikes, handler, cleaner)
162 .catch(err => logger.error('Cannot add dislikes of video %s.', video.uuid, { err }))
164 jobPayloads.push({ uri: fetchedVideo.dislikes, videoId: video.id, type: 'video-dislikes' as 'video-dislikes' })
167 if (syncParam.shares === true) {
168 const handler = items => addVideoShares(items, video)
169 const cleaner = crawlStartDate => VideoShareModel.cleanOldSharesOf(video.id, crawlStartDate)
171 await crawlCollectionPage<string>(fetchedVideo.shares, handler, cleaner)
172 .catch(err => logger.error('Cannot add shares of video %s.', video.uuid, { err }))
174 jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' })
177 if (syncParam.comments === true) {
178 const handler = items => addVideoComments(items, video)
179 const cleaner = crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate)
181 await crawlCollectionPage<string>(fetchedVideo.comments, handler, cleaner)
182 .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err }))
184 jobPayloads.push({ uri: fetchedVideo.comments, videoId: video.id, type: 'video-comments' as 'video-comments' })
187 await Bluebird.map(jobPayloads, payload => JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }))
190 async function getOrCreateVideoAndAccountAndChannel (options: {
191 videoObject: { id: string } | string,
192 syncParam?: SyncParam,
193 fetchType?: VideoFetchByUrlType,
194 allowRefresh?: boolean // true by default
197 const syncParam = options.syncParam || { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true, refreshVideo: false }
198 const fetchType = options.fetchType || 'all'
199 const allowRefresh = options.allowRefresh !== false
202 const videoUrl = getAPId(options.videoObject)
204 let videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType)
205 if (videoFromDatabase) {
206 if (videoFromDatabase.isOutdated() && allowRefresh === true) {
207 const refreshOptions = {
208 video: videoFromDatabase,
209 fetchedType: fetchType,
213 if (syncParam.refreshVideo === true) videoFromDatabase = await refreshVideoIfNeeded(refreshOptions)
214 else await JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: videoFromDatabase.url } })
217 return { video: videoFromDatabase, created: false }
220 const { videoObject: fetchedVideo } = await fetchRemoteVideo(videoUrl)
221 if (!fetchedVideo) throw new Error('Cannot fetch remote video with url: ' + videoUrl)
223 const channelActor = await getOrCreateVideoChannelFromVideoObject(fetchedVideo)
224 const video = await retryTransactionWrapper(createVideo, fetchedVideo, channelActor, syncParam.thumbnail)
226 await syncVideoExternalAttributes(video, fetchedVideo, syncParam)
228 return { video, created: true }
231 async function updateVideoFromAP (options: {
233 videoObject: VideoTorrentObject,
234 account: AccountModel,
235 channel: VideoChannelModel,
236 overrideTo?: string[]
238 logger.debug('Updating remote video "%s".', options.videoObject.uuid)
240 let videoFieldsSave: any
241 const wasPrivateVideo = options.video.privacy === VideoPrivacy.PRIVATE
242 const wasUnlistedVideo = options.video.privacy === VideoPrivacy.UNLISTED
245 await sequelizeTypescript.transaction(async t => {
246 const sequelizeOptions = { transaction: t }
248 videoFieldsSave = options.video.toJSON()
250 // Check actor has the right to update the video
251 const videoChannel = options.video.VideoChannel
252 if (videoChannel.Account.id !== options.account.id) {
253 throw new Error('Account ' + options.account.Actor.url + ' does not own video channel ' + videoChannel.Actor.url)
256 const to = options.overrideTo ? options.overrideTo : options.videoObject.to
257 const videoData = await videoActivityObjectToDBAttributes(options.channel, options.videoObject, to)
258 options.video.set('name', videoData.name)
259 options.video.set('uuid', videoData.uuid)
260 options.video.set('url', videoData.url)
261 options.video.set('category', videoData.category)
262 options.video.set('licence', videoData.licence)
263 options.video.set('language', videoData.language)
264 options.video.set('description', videoData.description)
265 options.video.set('support', videoData.support)
266 options.video.set('nsfw', videoData.nsfw)
267 options.video.set('commentsEnabled', videoData.commentsEnabled)
268 options.video.set('downloadEnabled', videoData.downloadEnabled)
269 options.video.set('waitTranscoding', videoData.waitTranscoding)
270 options.video.set('state', videoData.state)
271 options.video.set('duration', videoData.duration)
272 options.video.set('createdAt', videoData.createdAt)
273 options.video.set('publishedAt', videoData.publishedAt)
274 options.video.set('originallyPublishedAt', videoData.originallyPublishedAt)
275 options.video.set('privacy', videoData.privacy)
276 options.video.set('channelId', videoData.channelId)
277 options.video.set('views', videoData.views)
279 await options.video.save(sequelizeOptions)
282 const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject)
283 const newVideoFiles = videoFileAttributes.map(a => new VideoFileModel(a))
285 // Remove video files that do not exist anymore
286 const destroyTasks = options.video.VideoFiles
287 .filter(f => !newVideoFiles.find(newFile => newFile.hasSameUniqueKeysThan(f)))
288 .map(f => f.destroy(sequelizeOptions))
289 await Promise.all(destroyTasks)
291 // Update or add other one
292 const upsertTasks = videoFileAttributes.map(a => {
293 return VideoFileModel.upsert<VideoFileModel>(a, { returning: true, transaction: t })
294 .then(([ file ]) => file)
297 options.video.VideoFiles = await Promise.all(upsertTasks)
301 const streamingPlaylistAttributes = streamingPlaylistActivityUrlToDBAttributes(
304 options.video.VideoFiles
306 const newStreamingPlaylists = streamingPlaylistAttributes.map(a => new VideoStreamingPlaylistModel(a))
308 // Remove video files that do not exist anymore
309 const destroyTasks = options.video.VideoStreamingPlaylists
310 .filter(f => !newStreamingPlaylists.find(newPlaylist => newPlaylist.hasSameUniqueKeysThan(f)))
311 .map(f => f.destroy(sequelizeOptions))
312 await Promise.all(destroyTasks)
314 // Update or add other one
315 const upsertTasks = streamingPlaylistAttributes.map(a => {
316 return VideoStreamingPlaylistModel.upsert<VideoStreamingPlaylistModel>(a, { returning: true, transaction: t })
317 .then(([ streamingPlaylist ]) => streamingPlaylist)
320 options.video.VideoStreamingPlaylists = await Promise.all(upsertTasks)
325 const tags = options.videoObject.tag.map(tag => tag.name)
326 const tagInstances = await TagModel.findOrCreateTags(tags, t)
327 await options.video.$set('Tags', tagInstances, sequelizeOptions)
332 await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t)
334 const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => {
335 return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t)
337 options.video.VideoCaptions = await Promise.all(videoCaptionsPromises)
342 if (wasPrivateVideo || wasUnlistedVideo) {
343 Notifier.Instance.notifyOnNewVideo(options.video)
346 logger.info('Remote video with uuid %s updated', options.videoObject.uuid)
348 if (options.video !== undefined && videoFieldsSave !== undefined) {
349 resetSequelizeInstance(options.video, videoFieldsSave)
352 // This is just a debug because we will retry the insert
353 logger.debug('Cannot update the remote video.', { err })
358 await generateThumbnailFromUrl(options.video, options.videoObject.icon)
360 logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })
364 async function refreshVideoIfNeeded (options: {
366 fetchedType: VideoFetchByUrlType,
368 }): Promise<VideoModel> {
369 if (!options.video.isOutdated()) return options.video
371 // We need more attributes if the argument video was fetched with not enough joints
372 const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url)
375 const { response, videoObject } = await fetchRemoteVideo(video.url)
376 if (response.statusCode === 404) {
377 logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url)
379 // Video does not exist anymore
380 await video.destroy()
384 if (videoObject === undefined) {
385 logger.warn('Cannot refresh remote video %s: invalid body.', video.url)
387 await video.setAsRefreshed()
391 const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
392 const account = await AccountModel.load(channelActor.VideoChannel.accountId)
394 const updateOptions = {
398 channel: channelActor.VideoChannel
400 await retryTransactionWrapper(updateVideoFromAP, updateOptions)
401 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
405 logger.warn('Cannot refresh video %s.', options.video.url, { err })
407 // Don't refresh in loop
408 await video.setAsRefreshed()
415 refreshVideoIfNeeded,
416 federateVideoIfNeeded,
418 getOrCreateVideoAndAccountAndChannel,
419 fetchRemoteVideoStaticFile,
420 fetchRemoteVideoDescription,
421 generateThumbnailFromUrl,
422 getOrCreateVideoChannelFromVideoObject
425 // ---------------------------------------------------------------------------
427 function isAPVideoUrlObject (url: ActivityUrlObject): url is ActivityVideoUrlObject {
428 const mimeTypes = Object.keys(MIMETYPES.VIDEO.MIMETYPE_EXT)
430 const urlMediaType = url.mediaType || url.mimeType
431 return mimeTypes.indexOf(urlMediaType) !== -1 && urlMediaType.startsWith('video/')
434 function isAPStreamingPlaylistUrlObject (url: ActivityUrlObject): url is ActivityPlaylistUrlObject {
435 const urlMediaType = url.mediaType || url.mimeType
437 return urlMediaType === 'application/x-mpegURL'
440 function isAPPlaylistSegmentHashesUrlObject (tag: any): tag is ActivityPlaylistSegmentHashesObject {
441 const urlMediaType = tag.mediaType || tag.mimeType
443 return tag.name === 'sha256' && tag.type === 'Link' && urlMediaType === 'application/json'
446 async function createVideo (videoObject: VideoTorrentObject, channelActor: ActorModel, waitThumbnail = false) {
447 logger.debug('Adding remote video %s.', videoObject.id)
449 const videoCreated: VideoModel = await sequelizeTypescript.transaction(async t => {
450 const sequelizeOptions = { transaction: t }
452 const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, videoObject.to)
453 const video = VideoModel.build(videoData)
455 const videoCreated = await video.save(sequelizeOptions)
458 const videoFileAttributes = videoFileActivityUrlToDBAttributes(videoCreated, videoObject)
459 if (videoFileAttributes.length === 0) {
460 throw new Error('Cannot find valid files for video %s ' + videoObject.url)
463 const videoFilePromises = videoFileAttributes.map(f => VideoFileModel.create(f, { transaction: t }))
464 const videoFiles = await Promise.all(videoFilePromises)
466 const videoStreamingPlaylists = streamingPlaylistActivityUrlToDBAttributes(videoCreated, videoObject, videoFiles)
467 const playlistPromises = videoStreamingPlaylists.map(p => VideoStreamingPlaylistModel.create(p, { transaction: t }))
468 await Promise.all(playlistPromises)
471 const tags = videoObject.tag
472 .filter(t => t.type === 'Hashtag')
474 const tagInstances = await TagModel.findOrCreateTags(tags, t)
475 await videoCreated.$set('Tags', tagInstances, sequelizeOptions)
478 const videoCaptionsPromises = videoObject.subtitleLanguage.map(c => {
479 return VideoCaptionModel.insertOrReplaceLanguage(videoCreated.id, c.identifier, t)
481 await Promise.all(videoCaptionsPromises)
483 logger.info('Remote video with uuid %s inserted.', videoObject.uuid)
485 videoCreated.VideoChannel = channelActor.VideoChannel
489 const p = generateThumbnailFromUrl(videoCreated, videoObject.icon)
490 .catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err }))
492 if (waitThumbnail === true) await p
497 async function videoActivityObjectToDBAttributes (
498 videoChannel: VideoChannelModel,
499 videoObject: VideoTorrentObject,
502 const privacy = to.indexOf(ACTIVITY_PUB.PUBLIC) !== -1 ? VideoPrivacy.PUBLIC : VideoPrivacy.UNLISTED
503 const duration = videoObject.duration.replace(/[^\d]+/, '')
505 let language: string | undefined
506 if (videoObject.language) {
507 language = videoObject.language.identifier
510 let category: number | undefined
511 if (videoObject.category) {
512 category = parseInt(videoObject.category.identifier, 10)
515 let licence: number | undefined
516 if (videoObject.licence) {
517 licence = parseInt(videoObject.licence.identifier, 10)
520 const description = videoObject.content || null
521 const support = videoObject.support || null
524 name: videoObject.name,
525 uuid: videoObject.uuid,
532 nsfw: videoObject.sensitive,
533 commentsEnabled: videoObject.commentsEnabled,
534 downloadEnabled: videoObject.downloadEnabled,
535 waitTranscoding: videoObject.waitTranscoding,
536 state: videoObject.state,
537 channelId: videoChannel.id,
538 duration: parseInt(duration, 10),
539 createdAt: new Date(videoObject.published),
540 publishedAt: new Date(videoObject.published),
541 originallyPublishedAt: videoObject.originallyPublishedAt ? new Date(videoObject.originallyPublishedAt) : null,
542 // FIXME: updatedAt does not seems to be considered by Sequelize
543 updatedAt: new Date(videoObject.updated),
544 views: videoObject.views,
552 function videoFileActivityUrlToDBAttributes (video: VideoModel, videoObject: VideoTorrentObject) {
553 const fileUrls = videoObject.url.filter(u => isAPVideoUrlObject(u)) as ActivityVideoUrlObject[]
555 if (fileUrls.length === 0) {
556 throw new Error('Cannot find video files for ' + video.url)
559 const attributes: FilteredModelAttributes<VideoFileModel>[] = []
560 for (const fileUrl of fileUrls) {
561 // Fetch associated magnet uri
562 const magnet = videoObject.url.find(u => {
563 const mediaType = u.mediaType || u.mimeType
564 return mediaType === 'application/x-bittorrent;x-scheme-handler/magnet' && (u as any).height === fileUrl.height
567 if (!magnet) throw new Error('Cannot find associated magnet uri for file ' + fileUrl.href)
569 const parsed = magnetUtil.decode(magnet.href)
570 if (!parsed || isVideoFileInfoHashValid(parsed.infoHash) === false) {
571 throw new Error('Cannot parse magnet URI ' + magnet.href)
574 const mediaType = fileUrl.mediaType || fileUrl.mimeType
576 extname: MIMETYPES.VIDEO.MIMETYPE_EXT[ mediaType ],
577 infoHash: parsed.infoHash,
578 resolution: fileUrl.height,
581 fps: fileUrl.fps || -1
584 attributes.push(attribute)
590 function streamingPlaylistActivityUrlToDBAttributes (video: VideoModel, videoObject: VideoTorrentObject, videoFiles: VideoFileModel[]) {
591 const playlistUrls = videoObject.url.filter(u => isAPStreamingPlaylistUrlObject(u)) as ActivityPlaylistUrlObject[]
592 if (playlistUrls.length === 0) return []
594 const attributes: FilteredModelAttributes<VideoStreamingPlaylistModel>[] = []
595 for (const playlistUrlObject of playlistUrls) {
596 const segmentsSha256UrlObject = playlistUrlObject.tag
598 return isAPPlaylistSegmentHashesUrlObject(t)
599 }) as ActivityPlaylistSegmentHashesObject
600 if (!segmentsSha256UrlObject) {
601 logger.warn('No segment sha256 URL found in AP playlist object.', { playlistUrl: playlistUrlObject })
606 type: VideoStreamingPlaylistType.HLS,
607 playlistUrl: playlistUrlObject.href,
608 segmentsSha256Url: segmentsSha256UrlObject.href,
609 p2pMediaLoaderInfohashes: VideoStreamingPlaylistModel.buildP2PMediaLoaderInfoHashes(playlistUrlObject.href, videoFiles),
610 p2pMediaLoaderPeerVersion: P2P_MEDIA_LOADER_PEER_VERSION,
614 attributes.push(attribute)