]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/friends.ts
d07433a5dd72b97445d3777f60f1bd232b9a350d
[github/Chocobozzz/PeerTube.git] / server / lib / friends.ts
1 import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2 import * as request from 'request'
3 import * as Sequelize from 'sequelize'
4
5 import { database as db } from '../initializers/database'
6 import {
7 API_VERSION,
8 CONFIG,
9 REQUESTS_IN_PARALLEL,
10 REQUEST_ENDPOINTS,
11 REQUEST_ENDPOINT_ACTIONS,
12 REMOTE_SCHEME
13 } from '../initializers'
14 import {
15 logger,
16 getMyPublicCert,
17 makeSecureRequest,
18 makeRetryRequest,
19 createEmptyCallback
20 } from '../helpers'
21 import {
22 RequestScheduler,
23 RequestSchedulerOptions,
24
25 RequestVideoQaduScheduler,
26 RequestVideoQaduSchedulerOptions,
27
28 RequestVideoEventScheduler,
29 RequestVideoEventSchedulerOptions
30 } from './request'
31 import {
32 PodInstance,
33 VideoInstance
34 } from '../models'
35 import {
36 RequestEndpoint,
37 RequestVideoEventType,
38 RequestVideoQaduType
39 } from '../../shared'
40
41 type QaduParam = { videoId: string, type: RequestVideoQaduType }
42 type EventParam = { videoId: string, type: RequestVideoEventType }
43
44 const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
45
46 const requestScheduler = new RequestScheduler()
47 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
48 const requestVideoEventScheduler = new RequestVideoEventScheduler()
49
50 function activateSchedulers () {
51 requestScheduler.activate()
52 requestVideoQaduScheduler.activate()
53 requestVideoEventScheduler.activate()
54 }
55
56 function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
57 const options = {
58 type: ENDPOINT_ACTIONS.ADD,
59 endpoint: REQUEST_ENDPOINTS.VIDEOS,
60 data: videoData,
61 transaction
62 }
63 createRequest(options, callback)
64 }
65
66 function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
67 const options = {
68 type: ENDPOINT_ACTIONS.UPDATE,
69 endpoint: REQUEST_ENDPOINTS.VIDEOS,
70 data: videoData,
71 transaction
72 }
73 createRequest(options, callback)
74 }
75
76 function removeVideoToFriends (videoParams: Object) {
77 const options = {
78 type: ENDPOINT_ACTIONS.REMOVE,
79 endpoint: REQUEST_ENDPOINTS.VIDEOS,
80 data: videoParams,
81 transaction: null
82 }
83 createRequest(options)
84 }
85
86 function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) {
87 const options = {
88 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
89 endpoint: REQUEST_ENDPOINTS.VIDEOS,
90 data: reportData,
91 toIds: [ video.Author.podId ],
92 transaction: null
93 }
94 createRequest(options)
95 }
96
97 function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
98 const options = {
99 videoId: qaduParam.videoId,
100 type: qaduParam.type,
101 transaction
102 }
103 return createVideoQaduRequest(options, callback)
104 }
105
106 function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
107 const tasks = []
108
109 qadusParams.forEach(function (qaduParams) {
110 const fun = function (callback) {
111 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
112 }
113
114 tasks.push(fun)
115 })
116
117 series(tasks, finalCallback)
118 }
119
120 function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
121 const options = {
122 videoId: eventParam.videoId,
123 type: eventParam.type,
124 transaction
125 }
126 createVideoEventRequest(options, callback)
127 }
128
129 function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
130 const tasks = []
131
132 eventsParams.forEach(function (eventParams) {
133 const fun = function (callback) {
134 addEventToRemoteVideo(eventParams, transaction, callback)
135 }
136
137 tasks.push(fun)
138 })
139
140 series(tasks, finalCallback)
141 }
142
143 function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) {
144 db.Pod.countAll(function (err, count) {
145 if (err) return callback(err)
146
147 const hasFriends = (count !== 0)
148 callback(null, hasFriends)
149 })
150 }
151
152 function makeFriends (hosts: string[], callback: (err: Error) => void) {
153 const podsScore = {}
154
155 logger.info('Make friends!')
156 getMyPublicCert(function (err, cert) {
157 if (err) {
158 logger.error('Cannot read public cert.')
159 return callback(err)
160 }
161
162 eachSeries(hosts, function (host, callbackEach) {
163 computeForeignPodsList(host, podsScore, callbackEach)
164 }, function (err: Error) {
165 if (err) return callback(err)
166
167 logger.debug('Pods scores computed.', { podsScore: podsScore })
168 const podsList = computeWinningPods(hosts, podsScore)
169 logger.debug('Pods that we keep.', { podsToKeep: podsList })
170
171 makeRequestsToWinningPods(cert, podsList, callback)
172 })
173 })
174 }
175
176 function quitFriends (callback: (err: Error) => void) {
177 // Stop pool requests
178 requestScheduler.deactivate()
179
180 waterfall([
181 function flushRequests (callbackAsync) {
182 requestScheduler.flush(err => callbackAsync(err))
183 },
184
185 function flushVideoQaduRequests (callbackAsync) {
186 requestVideoQaduScheduler.flush(err => callbackAsync(err))
187 },
188
189 function getPodsList (callbackAsync) {
190 return db.Pod.list(callbackAsync)
191 },
192
193 function announceIQuitMyFriends (pods, callbackAsync) {
194 const requestParams = {
195 method: 'POST' as 'POST',
196 path: '/api/' + API_VERSION + '/remote/pods/remove',
197 sign: true,
198 toPod: null
199 }
200
201 // Announce we quit them
202 // We don't care if the request fails
203 // The other pod will exclude us automatically after a while
204 eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
205 requestParams.toPod = pod
206 makeSecureRequest(requestParams, callbackEach)
207 }, function (err) {
208 if (err) {
209 logger.error('Some errors while quitting friends.', { err: err })
210 // Don't stop the process
211 }
212
213 return callbackAsync(null, pods)
214 })
215 },
216
217 function removePodsFromDB (pods, callbackAsync) {
218 each(pods, function (pod: any, callbackEach) {
219 pod.destroy().asCallback(callbackEach)
220 }, callbackAsync)
221 }
222 ], function (err: Error) {
223 // Don't forget to re activate the scheduler, even if there was an error
224 requestScheduler.activate()
225
226 if (err) return callback(err)
227
228 logger.info('Removed all remote videos.')
229 return callback(null)
230 })
231 }
232
233 function sendOwnedVideosToPod (podId: number) {
234 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
235 if (err) {
236 logger.error('Cannot get the list of videos we own.')
237 return
238 }
239
240 videosList.forEach(function (video) {
241 video.toAddRemoteJSON(function (err, remoteVideo) {
242 if (err) {
243 logger.error('Cannot convert video to remote.', { error: err })
244 // Don't break the process
245 return
246 }
247
248 const options = {
249 type: 'add',
250 endpoint: REQUEST_ENDPOINTS.VIDEOS,
251 data: remoteVideo,
252 toIds: [ podId ],
253 transaction: null
254 }
255 createRequest(options)
256 })
257 })
258 })
259 }
260
261 function getRequestScheduler () {
262 return requestScheduler
263 }
264
265 function getRequestVideoQaduScheduler () {
266 return requestVideoQaduScheduler
267 }
268
269 function getRequestVideoEventScheduler () {
270 return requestVideoEventScheduler
271 }
272
273 // ---------------------------------------------------------------------------
274
275 export {
276 activateSchedulers,
277 addVideoToFriends,
278 updateVideoToFriends,
279 reportAbuseVideoToFriend,
280 quickAndDirtyUpdateVideoToFriends,
281 quickAndDirtyUpdatesVideoToFriends,
282 addEventToRemoteVideo,
283 addEventsToRemoteVideo,
284 hasFriends,
285 makeFriends,
286 quitFriends,
287 removeVideoToFriends,
288 sendOwnedVideosToPod,
289 getRequestScheduler,
290 getRequestVideoQaduScheduler,
291 getRequestVideoEventScheduler
292 }
293
294 // ---------------------------------------------------------------------------
295
296 function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) {
297 getForeignPodsList(host, function (err, res) {
298 if (err) return callback(err)
299
300 const foreignPodsList = res.data
301
302 // Let's give 1 point to the pod we ask the friends list
303 foreignPodsList.push({ host })
304
305 foreignPodsList.forEach(function (foreignPod) {
306 const foreignPodHost = foreignPod.host
307
308 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
309 else podsScore[foreignPodHost] = 1
310 })
311
312 return callback(null)
313 })
314 }
315
316 function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
317 // Build the list of pods to add
318 // Only add a pod if it exists in more than a half base pods
319 const podsList = []
320 const baseScore = hosts.length / 2
321
322 Object.keys(podsScore).forEach(function (podHost) {
323 // If the pod is not me and with a good score we add it
324 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
325 podsList.push({ host: podHost })
326 }
327 })
328
329 return podsList
330 }
331
332 function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) {
333 const path = '/api/' + API_VERSION + '/pods'
334
335 request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
336 if (err) return callback(err)
337
338 try {
339 const json = JSON.parse(body)
340 return callback(null, json)
341 } catch (err) {
342 return callback(err)
343 }
344 })
345 }
346
347 function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) {
348 // Stop pool requests
349 requestScheduler.deactivate()
350 // Flush pool requests
351 requestScheduler.forceSend()
352
353 eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) {
354 const params = {
355 url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
356 method: 'POST' as 'POST',
357 json: {
358 host: CONFIG.WEBSERVER.HOST,
359 email: CONFIG.ADMIN.EMAIL,
360 publicKey: cert
361 }
362 }
363
364 makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) {
365 if (err) {
366 logger.error('Error with adding %s pod.', pod.host, { error: err })
367 // Don't break the process
368 return callbackEach()
369 }
370
371 if (res.statusCode === 200) {
372 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
373 podObj.save().asCallback(function (err, podCreated) {
374 if (err) {
375 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
376 return callbackEach()
377 }
378
379 // Add our videos to the request scheduler
380 sendOwnedVideosToPod(podCreated.id)
381
382 return callbackEach()
383 })
384 } else {
385 logger.error('Status not 200 for %s pod.', pod.host)
386 return callbackEach()
387 }
388 })
389 }, function endRequests () {
390 // Final callback, we've ended all the requests
391 // Now we made new friends, we can re activate the pool of requests
392 requestScheduler.activate()
393
394 logger.debug('makeRequestsToWinningPods finished.')
395 return callback(null)
396 })
397 }
398
399 // Wrapper that populate "toIds" argument with all our friends if it is not specified
400 type CreateRequestOptions = {
401 type: string
402 endpoint: RequestEndpoint
403 data: Object
404 toIds?: number[]
405 transaction: Sequelize.Transaction
406 }
407 function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) {
408 if (!callback) callback = function () { /* empty */ }
409
410 if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback)
411
412 // If the "toIds" pods is not specified, we send the request to all our friends
413 db.Pod.listAllIds(options.transaction, function (err, podIds) {
414 if (err) {
415 logger.error('Cannot get pod ids', { error: err })
416 return
417 }
418
419 const newOptions = Object.assign(options, { toIds: podIds })
420 return requestScheduler.createRequest(newOptions, callback)
421 })
422 }
423
424 function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) {
425 if (!callback) callback = createEmptyCallback()
426
427 requestVideoQaduScheduler.createRequest(options, callback)
428 }
429
430 function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) {
431 if (!callback) callback = createEmptyCallback()
432
433 requestVideoEventScheduler.createRequest(options, callback)
434 }
435
436 function isMe (host: string) {
437 return host === CONFIG.WEBSERVER.HOST
438 }