]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/friends.js
First version with PostgreSQL
[github/Chocobozzz/PeerTube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
9
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
14
15 const friends = {
16 addVideoToFriends,
17 hasFriends,
18 getMyCertificate,
19 makeFriends,
20 quitFriends,
21 removeVideoToFriends,
22 sendOwnedVideosToPod
23 }
24
25 function addVideoToFriends (video) {
26 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video)
27 }
28
29 function hasFriends (callback) {
30 db.Pod.countAll(function (err, count) {
31 if (err) return callback(err)
32
33 const hasFriends = (count !== 0)
34 callback(null, hasFriends)
35 })
36 }
37
38 function getMyCertificate (callback) {
39 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
40 }
41
42 function makeFriends (hosts, callback) {
43 const podsScore = {}
44
45 logger.info('Make friends!')
46 getMyCertificate(function (err, cert) {
47 if (err) {
48 logger.error('Cannot read public cert.')
49 return callback(err)
50 }
51
52 eachSeries(hosts, function (host, callbackEach) {
53 computeForeignPodsList(host, podsScore, callbackEach)
54 }, function (err) {
55 if (err) return callback(err)
56
57 logger.debug('Pods scores computed.', { podsScore: podsScore })
58 const podsList = computeWinningPods(hosts, podsScore)
59 logger.debug('Pods that we keep.', { podsToKeep: podsList })
60
61 makeRequestsToWinningPods(cert, podsList, callback)
62 })
63 })
64 }
65
66 function quitFriends (callback) {
67 // Stop pool requests
68 db.Request.deactivate()
69 // Flush pool requests
70 db.Request.flush()
71
72 waterfall([
73 function getPodsList (callbackAsync) {
74 return db.Pod.list(callbackAsync)
75 },
76
77 function announceIQuitMyFriends (pods, callbackAsync) {
78 const requestParams = {
79 method: 'POST',
80 path: '/api/' + constants.API_VERSION + '/pods/remove',
81 sign: true
82 }
83
84 // Announce we quit them
85 // We don't care if the request fails
86 // The other pod will exclude us automatically after a while
87 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
88 requestParams.toPod = pod
89 requests.makeSecureRequest(requestParams, callbackEach)
90 }, function (err) {
91 if (err) {
92 logger.error('Some errors while quitting friends.', { err: err })
93 // Don't stop the process
94 }
95
96 return callbackAsync(null, pods)
97 })
98 },
99
100 function removePodsFromDB (pods, callbackAsync) {
101 each(pods, function (pod, callbackEach) {
102 pod.destroy().asCallback(callbackEach)
103 }, callbackAsync)
104 }
105 ], function (err) {
106 // Don't forget to re activate the scheduler, even if there was an error
107 db.Request.activate()
108
109 if (err) return callback(err)
110
111 logger.info('Removed all remote videos.')
112 return callback(null)
113 })
114 }
115
116 function removeVideoToFriends (videoParams) {
117 createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
118 }
119
120 function sendOwnedVideosToPod (podId) {
121 db.Video.listOwnedAndPopulateAuthor(function (err, videosList) {
122 if (err) {
123 logger.error('Cannot get the list of videos we own.')
124 return
125 }
126
127 videosList.forEach(function (video) {
128 video.toRemoteJSON(function (err, remoteVideo) {
129 if (err) {
130 logger.error('Cannot convert video to remote.', { error: err })
131 // Don't break the process
132 return
133 }
134
135 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
136 })
137 })
138 })
139 }
140
141 // ---------------------------------------------------------------------------
142
143 module.exports = friends
144
145 // ---------------------------------------------------------------------------
146
147 function computeForeignPodsList (host, podsScore, callback) {
148 getForeignPodsList(host, function (err, foreignPodsList) {
149 if (err) return callback(err)
150
151 if (!foreignPodsList) foreignPodsList = []
152
153 // Let's give 1 point to the pod we ask the friends list
154 foreignPodsList.push({ host })
155
156 foreignPodsList.forEach(function (foreignPod) {
157 const foreignPodHost = foreignPod.host
158
159 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
160 else podsScore[foreignPodHost] = 1
161 })
162
163 callback()
164 })
165 }
166
167 function computeWinningPods (hosts, podsScore) {
168 // Build the list of pods to add
169 // Only add a pod if it exists in more than a half base pods
170 const podsList = []
171 const baseScore = hosts.length / 2
172 Object.keys(podsScore).forEach(function (podHost) {
173 // If the pod is not me and with a good score we add it
174 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
175 podsList.push({ host: podHost })
176 }
177 })
178
179 return podsList
180 }
181
182 function getForeignPodsList (host, callback) {
183 const path = '/api/' + constants.API_VERSION + '/pods'
184
185 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
186 if (err) return callback(err)
187
188 try {
189 const json = JSON.parse(body)
190 return callback(null, json)
191 } catch (err) {
192 return callback(err)
193 }
194 })
195 }
196
197 function makeRequestsToWinningPods (cert, podsList, callback) {
198 // Stop pool requests
199 db.Request.deactivate()
200 // Flush pool requests
201 db.Request.forceSend()
202
203 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
204 const params = {
205 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
206 method: 'POST',
207 json: {
208 host: constants.CONFIG.WEBSERVER.HOST,
209 publicKey: cert
210 }
211 }
212
213 requests.makeRetryRequest(params, function (err, res, body) {
214 if (err) {
215 logger.error('Error with adding %s pod.', pod.host, { error: err })
216 // Don't break the process
217 return callbackEach()
218 }
219
220 if (res.statusCode === 200) {
221 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
222 podObj.save().asCallback(function (err, podCreated) {
223 if (err) {
224 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
225 return callbackEach()
226 }
227
228 // Add our videos to the request scheduler
229 sendOwnedVideosToPod(podCreated._id)
230
231 return callbackEach()
232 })
233 } else {
234 logger.error('Status not 200 for %s pod.', pod.host)
235 return callbackEach()
236 }
237 })
238 }, function endRequests () {
239 // Final callback, we've ended all the requests
240 // Now we made new friends, we can re activate the pool of requests
241 db.Request.activate()
242
243 logger.debug('makeRequestsToWinningPods finished.')
244 return callback()
245 })
246 }
247
248 // Wrapper that populate "to" argument with all our friends if it is not specified
249 function createRequest (type, endpoint, data, to) {
250 if (to) return _createRequest(type, endpoint, data, to)
251
252 // If the "to" pods is not specified, we send the request to all our friends
253 db.Pod.listAllIds(function (err, podIds) {
254 if (err) {
255 logger.error('Cannot get pod ids', { error: err })
256 return
257 }
258
259 return _createRequest(type, endpoint, data, podIds)
260 })
261 }
262
263 function _createRequest (type, endpoint, data, to) {
264 const pods = []
265
266 // If there are no destination pods abort
267 if (to.length === 0) return
268
269 to.forEach(function (toPod) {
270 pods.push(db.Pod.build({ id: toPod }))
271 })
272
273 const createQuery = {
274 endpoint,
275 request: {
276 type: type,
277 data: data
278 }
279 }
280
281 // We run in transaction to keep coherency between Request and RequestToPod tables
282 db.sequelize.transaction(function (t) {
283 const dbRequestOptions = {
284 transaction: t
285 }
286
287 return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
288 return request.setPods(pods, dbRequestOptions)
289 })
290 }).asCallback(function (err) {
291 if (err) logger.error('Error in createRequest transaction.', { error: err })
292 })
293 }
294
295 function isMe (host) {
296 return host === constants.CONFIG.WEBSERVER.HOST
297 }