]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/friends.js
Merge branch 'postgresql'
[github/Chocobozzz/PeerTube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
9
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
14
15 const friends = {
16 addVideoToFriends,
17 updateVideoToFriends,
18 reportAbuseVideoToFriend,
19 hasFriends,
20 getMyCertificate,
21 makeFriends,
22 quitFriends,
23 removeVideoToFriends,
24 sendOwnedVideosToPod
25 }
26
27 function addVideoToFriends (videoData, transaction, callback) {
28 const options = {
29 type: 'add',
30 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
31 data: videoData,
32 transaction
33 }
34 createRequest(options, callback)
35 }
36
37 function updateVideoToFriends (videoData, transaction, callback) {
38 const options = {
39 type: 'update',
40 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
41 data: videoData,
42 transaction
43 }
44 createRequest(options, callback)
45 }
46
47 function removeVideoToFriends (videoParams) {
48 const options = {
49 type: 'remove',
50 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
51 data: videoParams
52 }
53 createRequest(options)
54 }
55
56 function reportAbuseVideoToFriend (reportData, video) {
57 const options = {
58 type: 'report-abuse',
59 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
60 data: reportData,
61 toIds: [ video.Author.podId ]
62 }
63 createRequest(options)
64 }
65
66 function hasFriends (callback) {
67 db.Pod.countAll(function (err, count) {
68 if (err) return callback(err)
69
70 const hasFriends = (count !== 0)
71 callback(null, hasFriends)
72 })
73 }
74
75 function getMyCertificate (callback) {
76 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
77 }
78
79 function makeFriends (hosts, callback) {
80 const podsScore = {}
81
82 logger.info('Make friends!')
83 getMyCertificate(function (err, cert) {
84 if (err) {
85 logger.error('Cannot read public cert.')
86 return callback(err)
87 }
88
89 eachSeries(hosts, function (host, callbackEach) {
90 computeForeignPodsList(host, podsScore, callbackEach)
91 }, function (err) {
92 if (err) return callback(err)
93
94 logger.debug('Pods scores computed.', { podsScore: podsScore })
95 const podsList = computeWinningPods(hosts, podsScore)
96 logger.debug('Pods that we keep.', { podsToKeep: podsList })
97
98 makeRequestsToWinningPods(cert, podsList, callback)
99 })
100 })
101 }
102
103 function quitFriends (callback) {
104 // Stop pool requests
105 db.Request.deactivate()
106
107 waterfall([
108 function flushRequests (callbackAsync) {
109 db.Request.flush(callbackAsync)
110 },
111
112 function getPodsList (callbackAsync) {
113 return db.Pod.list(callbackAsync)
114 },
115
116 function announceIQuitMyFriends (pods, callbackAsync) {
117 const requestParams = {
118 method: 'POST',
119 path: '/api/' + constants.API_VERSION + '/pods/remove',
120 sign: true
121 }
122
123 // Announce we quit them
124 // We don't care if the request fails
125 // The other pod will exclude us automatically after a while
126 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
127 requestParams.toPod = pod
128 requests.makeSecureRequest(requestParams, callbackEach)
129 }, function (err) {
130 if (err) {
131 logger.error('Some errors while quitting friends.', { err: err })
132 // Don't stop the process
133 }
134
135 return callbackAsync(null, pods)
136 })
137 },
138
139 function removePodsFromDB (pods, callbackAsync) {
140 each(pods, function (pod, callbackEach) {
141 pod.destroy().asCallback(callbackEach)
142 }, callbackAsync)
143 }
144 ], function (err) {
145 // Don't forget to re activate the scheduler, even if there was an error
146 db.Request.activate()
147
148 if (err) return callback(err)
149
150 logger.info('Removed all remote videos.')
151 return callback(null)
152 })
153 }
154
155 function sendOwnedVideosToPod (podId) {
156 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
157 if (err) {
158 logger.error('Cannot get the list of videos we own.')
159 return
160 }
161
162 videosList.forEach(function (video) {
163 video.toAddRemoteJSON(function (err, remoteVideo) {
164 if (err) {
165 logger.error('Cannot convert video to remote.', { error: err })
166 // Don't break the process
167 return
168 }
169
170 const options = {
171 type: 'add',
172 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
173 data: remoteVideo,
174 toIds: [ podId ]
175 }
176 createRequest(options)
177 })
178 })
179 })
180 }
181
182 // ---------------------------------------------------------------------------
183
184 module.exports = friends
185
186 // ---------------------------------------------------------------------------
187
188 function computeForeignPodsList (host, podsScore, callback) {
189 getForeignPodsList(host, function (err, res) {
190 if (err) return callback(err)
191
192 const foreignPodsList = res.data
193
194 // Let's give 1 point to the pod we ask the friends list
195 foreignPodsList.push({ host })
196
197 foreignPodsList.forEach(function (foreignPod) {
198 const foreignPodHost = foreignPod.host
199
200 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
201 else podsScore[foreignPodHost] = 1
202 })
203
204 callback()
205 })
206 }
207
208 function computeWinningPods (hosts, podsScore) {
209 // Build the list of pods to add
210 // Only add a pod if it exists in more than a half base pods
211 const podsList = []
212 const baseScore = hosts.length / 2
213 Object.keys(podsScore).forEach(function (podHost) {
214 // If the pod is not me and with a good score we add it
215 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
216 podsList.push({ host: podHost })
217 }
218 })
219
220 return podsList
221 }
222
223 function getForeignPodsList (host, callback) {
224 const path = '/api/' + constants.API_VERSION + '/pods'
225
226 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
227 if (err) return callback(err)
228
229 try {
230 const json = JSON.parse(body)
231 return callback(null, json)
232 } catch (err) {
233 return callback(err)
234 }
235 })
236 }
237
238 function makeRequestsToWinningPods (cert, podsList, callback) {
239 // Stop pool requests
240 db.Request.deactivate()
241 // Flush pool requests
242 db.Request.forceSend()
243
244 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
245 const params = {
246 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
247 method: 'POST',
248 json: {
249 host: constants.CONFIG.WEBSERVER.HOST,
250 publicKey: cert
251 }
252 }
253
254 requests.makeRetryRequest(params, function (err, res, body) {
255 if (err) {
256 logger.error('Error with adding %s pod.', pod.host, { error: err })
257 // Don't break the process
258 return callbackEach()
259 }
260
261 if (res.statusCode === 200) {
262 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
263 podObj.save().asCallback(function (err, podCreated) {
264 if (err) {
265 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
266 return callbackEach()
267 }
268
269 // Add our videos to the request scheduler
270 sendOwnedVideosToPod(podCreated.id)
271
272 return callbackEach()
273 })
274 } else {
275 logger.error('Status not 200 for %s pod.', pod.host)
276 return callbackEach()
277 }
278 })
279 }, function endRequests () {
280 // Final callback, we've ended all the requests
281 // Now we made new friends, we can re activate the pool of requests
282 db.Request.activate()
283
284 logger.debug('makeRequestsToWinningPods finished.')
285 return callback()
286 })
287 }
288
289 // Wrapper that populate "toIds" argument with all our friends if it is not specified
290 // { type, endpoint, data, toIds, transaction }
291 function createRequest (options, callback) {
292 if (!callback) callback = function () {}
293 if (options.toIds) return _createRequest(options, callback)
294
295 // If the "toIds" pods is not specified, we send the request to all our friends
296 db.Pod.listAllIds(options.transaction, function (err, podIds) {
297 if (err) {
298 logger.error('Cannot get pod ids', { error: err })
299 return
300 }
301
302 const newOptions = Object.assign(options, { toIds: podIds })
303 return _createRequest(newOptions, callback)
304 })
305 }
306
307 // { type, endpoint, data, toIds, transaction }
308 function _createRequest (options, callback) {
309 const type = options.type
310 const endpoint = options.endpoint
311 const data = options.data
312 const toIds = options.toIds
313 const transaction = options.transaction
314
315 const pods = []
316
317 // If there are no destination pods abort
318 if (toIds.length === 0) return callback(null)
319
320 toIds.forEach(function (toPod) {
321 pods.push(db.Pod.build({ id: toPod }))
322 })
323
324 const createQuery = {
325 endpoint,
326 request: {
327 type: type,
328 data: data
329 }
330 }
331
332 const dbRequestOptions = {
333 transaction
334 }
335
336 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
337 if (err) return callback(err)
338
339 return request.setPods(pods, dbRequestOptions).asCallback(callback)
340 })
341 }
342
343 function isMe (host) {
344 return host === constants.CONFIG.WEBSERVER.HOST
345 }