]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/friends.ts
Add this context to instance model functions
[github/Chocobozzz/PeerTube.git] / server / lib / friends.ts
1 import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2 import * as request from 'request'
3 import * as Sequelize from 'sequelize'
4
5 import { database as db } from '../initializers/database'
6 import {
7 API_VERSION,
8 CONFIG,
9 REQUESTS_IN_PARALLEL,
10 REQUEST_ENDPOINTS,
11 REQUEST_ENDPOINT_ACTIONS,
12 REMOTE_SCHEME
13 } from '../initializers'
14 import {
15 logger,
16 getMyPublicCert,
17 makeSecureRequest,
18 makeRetryRequest,
19 createEmptyCallback
20 } from '../helpers'
21 import {
22 RequestScheduler,
23 RequestSchedulerOptions,
24
25 RequestVideoQaduScheduler,
26 RequestVideoQaduSchedulerOptions,
27
28 RequestVideoEventScheduler,
29 RequestVideoEventSchedulerOptions
30 } from './request'
31 import { PodInstance, VideoInstance } from '../models'
32
33 type QaduParam = { videoId: string, type: string }
34 type EventParam = { videoId: string, type: string }
35
36 const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
37
38 const requestScheduler = new RequestScheduler()
39 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
40 const requestVideoEventScheduler = new RequestVideoEventScheduler()
41
42 function activateSchedulers () {
43 requestScheduler.activate()
44 requestVideoQaduScheduler.activate()
45 requestVideoEventScheduler.activate()
46 }
47
48 function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
49 const options = {
50 type: ENDPOINT_ACTIONS.ADD,
51 endpoint: REQUEST_ENDPOINTS.VIDEOS,
52 data: videoData,
53 transaction
54 }
55 createRequest(options, callback)
56 }
57
58 function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
59 const options = {
60 type: ENDPOINT_ACTIONS.UPDATE,
61 endpoint: REQUEST_ENDPOINTS.VIDEOS,
62 data: videoData,
63 transaction
64 }
65 createRequest(options, callback)
66 }
67
68 function removeVideoToFriends (videoParams: Object) {
69 const options = {
70 type: ENDPOINT_ACTIONS.REMOVE,
71 endpoint: REQUEST_ENDPOINTS.VIDEOS,
72 data: videoParams,
73 transaction: null
74 }
75 createRequest(options)
76 }
77
78 function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) {
79 const options = {
80 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
81 endpoint: REQUEST_ENDPOINTS.VIDEOS,
82 data: reportData,
83 toIds: [ video.Author.podId ],
84 transaction: null
85 }
86 createRequest(options)
87 }
88
89 function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
90 const options = {
91 videoId: qaduParam.videoId,
92 type: qaduParam.type,
93 transaction
94 }
95 return createVideoQaduRequest(options, callback)
96 }
97
98 function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
99 const tasks = []
100
101 qadusParams.forEach(function (qaduParams) {
102 const fun = function (callback) {
103 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
104 }
105
106 tasks.push(fun)
107 })
108
109 series(tasks, finalCallback)
110 }
111
112 function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
113 const options = {
114 videoId: eventParam.videoId,
115 type: eventParam.type,
116 transaction
117 }
118 createVideoEventRequest(options, callback)
119 }
120
121 function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
122 const tasks = []
123
124 eventsParams.forEach(function (eventParams) {
125 const fun = function (callback) {
126 addEventToRemoteVideo(eventParams, transaction, callback)
127 }
128
129 tasks.push(fun)
130 })
131
132 series(tasks, finalCallback)
133 }
134
135 function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) {
136 db.Pod.countAll(function (err, count) {
137 if (err) return callback(err)
138
139 const hasFriends = (count !== 0)
140 callback(null, hasFriends)
141 })
142 }
143
144 function makeFriends (hosts: string[], callback: (err: Error) => void) {
145 const podsScore = {}
146
147 logger.info('Make friends!')
148 getMyPublicCert(function (err, cert) {
149 if (err) {
150 logger.error('Cannot read public cert.')
151 return callback(err)
152 }
153
154 eachSeries(hosts, function (host, callbackEach) {
155 computeForeignPodsList(host, podsScore, callbackEach)
156 }, function (err: Error) {
157 if (err) return callback(err)
158
159 logger.debug('Pods scores computed.', { podsScore: podsScore })
160 const podsList = computeWinningPods(hosts, podsScore)
161 logger.debug('Pods that we keep.', { podsToKeep: podsList })
162
163 makeRequestsToWinningPods(cert, podsList, callback)
164 })
165 })
166 }
167
168 function quitFriends (callback: (err: Error) => void) {
169 // Stop pool requests
170 requestScheduler.deactivate()
171
172 waterfall([
173 function flushRequests (callbackAsync) {
174 requestScheduler.flush(err => callbackAsync(err))
175 },
176
177 function flushVideoQaduRequests (callbackAsync) {
178 requestVideoQaduScheduler.flush(err => callbackAsync(err))
179 },
180
181 function getPodsList (callbackAsync) {
182 return db.Pod.list(callbackAsync)
183 },
184
185 function announceIQuitMyFriends (pods, callbackAsync) {
186 const requestParams = {
187 method: 'POST' as 'POST',
188 path: '/api/' + API_VERSION + '/remote/pods/remove',
189 sign: true,
190 toPod: null
191 }
192
193 // Announce we quit them
194 // We don't care if the request fails
195 // The other pod will exclude us automatically after a while
196 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
197 requestParams.toPod = pod
198 makeSecureRequest(requestParams, callbackEach)
199 }, function (err) {
200 if (err) {
201 logger.error('Some errors while quitting friends.', { err: err })
202 // Don't stop the process
203 }
204
205 return callbackAsync(null, pods)
206 })
207 },
208
209 function removePodsFromDB (pods, callbackAsync) {
210 each(pods, function (pod: any, callbackEach) {
211 pod.destroy().asCallback(callbackEach)
212 }, callbackAsync)
213 }
214 ], function (err: Error) {
215 // Don't forget to re activate the scheduler, even if there was an error
216 requestScheduler.activate()
217
218 if (err) return callback(err)
219
220 logger.info('Removed all remote videos.')
221 return callback(null)
222 })
223 }
224
225 function sendOwnedVideosToPod (podId: number) {
226 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
227 if (err) {
228 logger.error('Cannot get the list of videos we own.')
229 return
230 }
231
232 videosList.forEach(function (video) {
233 video.toAddRemoteJSON(function (err, remoteVideo) {
234 if (err) {
235 logger.error('Cannot convert video to remote.', { error: err })
236 // Don't break the process
237 return
238 }
239
240 const options = {
241 type: 'add',
242 endpoint: REQUEST_ENDPOINTS.VIDEOS,
243 data: remoteVideo,
244 toIds: [ podId ],
245 transaction: null
246 }
247 createRequest(options)
248 })
249 })
250 })
251 }
252
253 function getRequestScheduler () {
254 return requestScheduler
255 }
256
257 function getRequestVideoQaduScheduler () {
258 return requestVideoQaduScheduler
259 }
260
261 function getRequestVideoEventScheduler () {
262 return requestVideoEventScheduler
263 }
264
265 // ---------------------------------------------------------------------------
266
267 export {
268 activateSchedulers,
269 addVideoToFriends,
270 updateVideoToFriends,
271 reportAbuseVideoToFriend,
272 quickAndDirtyUpdateVideoToFriends,
273 quickAndDirtyUpdatesVideoToFriends,
274 addEventToRemoteVideo,
275 addEventsToRemoteVideo,
276 hasFriends,
277 makeFriends,
278 quitFriends,
279 removeVideoToFriends,
280 sendOwnedVideosToPod,
281 getRequestScheduler,
282 getRequestVideoQaduScheduler,
283 getRequestVideoEventScheduler
284 }
285
286 // ---------------------------------------------------------------------------
287
288 function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) {
289 getForeignPodsList(host, function (err, res) {
290 if (err) return callback(err)
291
292 const foreignPodsList = res.data
293
294 // Let's give 1 point to the pod we ask the friends list
295 foreignPodsList.push({ host })
296
297 foreignPodsList.forEach(function (foreignPod) {
298 const foreignPodHost = foreignPod.host
299
300 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
301 else podsScore[foreignPodHost] = 1
302 })
303
304 return callback(null)
305 })
306 }
307
308 function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
309 // Build the list of pods to add
310 // Only add a pod if it exists in more than a half base pods
311 const podsList = []
312 const baseScore = hosts.length / 2
313
314 Object.keys(podsScore).forEach(function (podHost) {
315 // If the pod is not me and with a good score we add it
316 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
317 podsList.push({ host: podHost })
318 }
319 })
320
321 return podsList
322 }
323
324 function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) {
325 const path = '/api/' + API_VERSION + '/pods'
326
327 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
328 if (err) return callback(err)
329
330 try {
331 const json = JSON.parse(body)
332 return callback(null, json)
333 } catch (err) {
334 return callback(err)
335 }
336 })
337 }
338
339 function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) {
340 // Stop pool requests
341 requestScheduler.deactivate()
342 // Flush pool requests
343 requestScheduler.forceSend()
344
345 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) {
346 const params = {
347 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
348 method: 'POST' as 'POST',
349 json: {
350 host: CONFIG.WEBSERVER.HOST,
351 email: CONFIG.ADMIN.EMAIL,
352 publicKey: cert
353 }
354 }
355
356 makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) {
357 if (err) {
358 logger.error('Error with adding %s pod.', pod.host, { error: err })
359 // Don't break the process
360 return callbackEach()
361 }
362
363 if (res.statusCode === 200) {
364 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
365 podObj.save().asCallback(function (err, podCreated) {
366 if (err) {
367 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
368 return callbackEach()
369 }
370
371 // Add our videos to the request scheduler
372 sendOwnedVideosToPod(podCreated.id)
373
374 return callbackEach()
375 })
376 } else {
377 logger.error('Status not 200 for %s pod.', pod.host)
378 return callbackEach()
379 }
380 })
381 }, function endRequests () {
382 // Final callback, we've ended all the requests
383 // Now we made new friends, we can re activate the pool of requests
384 requestScheduler.activate()
385
386 logger.debug('makeRequestsToWinningPods finished.')
387 return callback(null)
388 })
389 }
390
391 // Wrapper that populate "toIds" argument with all our friends if it is not specified
392 type CreateRequestOptions = {
393 type: string
394 endpoint: string
395 data: Object
396 toIds?: number[]
397 transaction: Sequelize.Transaction
398 }
399 function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) {
400 if (!callback) callback = function () { /* empty */ }
401
402 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback)
403
404 // If the "toIds" pods is not specified, we send the request to all our friends
405 db.Pod.listAllIds(options.transaction, function (err, podIds) {
406 if (err) {
407 logger.error('Cannot get pod ids', { error: err })
408 return
409 }
410
411 const newOptions = Object.assign(options, { toIds: podIds })
412 return requestScheduler.createRequest(newOptions, callback)
413 })
414 }
415
416 function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) {
417 if (!callback) callback = createEmptyCallback()
418
419 requestVideoQaduScheduler.createRequest(options, callback)
420 }
421
422 function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) {
423 if (!callback) callback = createEmptyCallback()
424
425 requestVideoEventScheduler.createRequest(options, callback)
426 }
427
428 function isMe (host: string) {
429 return host === CONFIG.WEBSERVER.HOST
430 }