]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/friends.js
Server: add database field validations
[github/Chocobozzz/PeerTube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
9
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
14
15 const friends = {
16 addVideoToFriends,
17 hasFriends,
18 getMyCertificate,
19 makeFriends,
20 quitFriends,
21 removeVideoToFriends,
22 sendOwnedVideosToPod
23 }
24
25 function addVideoToFriends (video) {
26 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video)
27 }
28
29 function hasFriends (callback) {
30 db.Pod.countAll(function (err, count) {
31 if (err) return callback(err)
32
33 const hasFriends = (count !== 0)
34 callback(null, hasFriends)
35 })
36 }
37
38 function getMyCertificate (callback) {
39 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
40 }
41
42 function makeFriends (hosts, callback) {
43 const podsScore = {}
44
45 logger.info('Make friends!')
46 getMyCertificate(function (err, cert) {
47 if (err) {
48 logger.error('Cannot read public cert.')
49 return callback(err)
50 }
51
52 eachSeries(hosts, function (host, callbackEach) {
53 computeForeignPodsList(host, podsScore, callbackEach)
54 }, function (err) {
55 if (err) return callback(err)
56
57 logger.debug('Pods scores computed.', { podsScore: podsScore })
58 const podsList = computeWinningPods(hosts, podsScore)
59 logger.debug('Pods that we keep.', { podsToKeep: podsList })
60
61 makeRequestsToWinningPods(cert, podsList, callback)
62 })
63 })
64 }
65
66 function quitFriends (callback) {
67 // Stop pool requests
68 db.Request.deactivate()
69
70 waterfall([
71 function flushRequests (callbackAsync) {
72 db.Request.flush(callbackAsync)
73 },
74
75 function getPodsList (callbackAsync) {
76 return db.Pod.list(callbackAsync)
77 },
78
79 function announceIQuitMyFriends (pods, callbackAsync) {
80 const requestParams = {
81 method: 'POST',
82 path: '/api/' + constants.API_VERSION + '/pods/remove',
83 sign: true
84 }
85
86 // Announce we quit them
87 // We don't care if the request fails
88 // The other pod will exclude us automatically after a while
89 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
90 requestParams.toPod = pod
91 requests.makeSecureRequest(requestParams, callbackEach)
92 }, function (err) {
93 if (err) {
94 logger.error('Some errors while quitting friends.', { err: err })
95 // Don't stop the process
96 }
97
98 return callbackAsync(null, pods)
99 })
100 },
101
102 function removePodsFromDB (pods, callbackAsync) {
103 each(pods, function (pod, callbackEach) {
104 pod.destroy().asCallback(callbackEach)
105 }, callbackAsync)
106 }
107 ], function (err) {
108 // Don't forget to re activate the scheduler, even if there was an error
109 db.Request.activate()
110
111 if (err) return callback(err)
112
113 logger.info('Removed all remote videos.')
114 return callback(null)
115 })
116 }
117
118 function removeVideoToFriends (videoParams) {
119 createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
120 }
121
122 function sendOwnedVideosToPod (podId) {
123 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
124 if (err) {
125 logger.error('Cannot get the list of videos we own.')
126 return
127 }
128
129 videosList.forEach(function (video) {
130 video.toRemoteJSON(function (err, remoteVideo) {
131 if (err) {
132 logger.error('Cannot convert video to remote.', { error: err })
133 // Don't break the process
134 return
135 }
136
137 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
138 })
139 })
140 })
141 }
142
143 // ---------------------------------------------------------------------------
144
145 module.exports = friends
146
147 // ---------------------------------------------------------------------------
148
149 function computeForeignPodsList (host, podsScore, callback) {
150 getForeignPodsList(host, function (err, foreignPodsList) {
151 if (err) return callback(err)
152
153 if (!foreignPodsList) foreignPodsList = []
154
155 // Let's give 1 point to the pod we ask the friends list
156 foreignPodsList.push({ host })
157
158 foreignPodsList.forEach(function (foreignPod) {
159 const foreignPodHost = foreignPod.host
160
161 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
162 else podsScore[foreignPodHost] = 1
163 })
164
165 callback()
166 })
167 }
168
169 function computeWinningPods (hosts, podsScore) {
170 // Build the list of pods to add
171 // Only add a pod if it exists in more than a half base pods
172 const podsList = []
173 const baseScore = hosts.length / 2
174 Object.keys(podsScore).forEach(function (podHost) {
175 // If the pod is not me and with a good score we add it
176 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
177 podsList.push({ host: podHost })
178 }
179 })
180
181 return podsList
182 }
183
184 function getForeignPodsList (host, callback) {
185 const path = '/api/' + constants.API_VERSION + '/pods'
186
187 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
188 if (err) return callback(err)
189
190 try {
191 const json = JSON.parse(body)
192 return callback(null, json)
193 } catch (err) {
194 return callback(err)
195 }
196 })
197 }
198
199 function makeRequestsToWinningPods (cert, podsList, callback) {
200 // Stop pool requests
201 db.Request.deactivate()
202 // Flush pool requests
203 db.Request.forceSend()
204
205 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
206 const params = {
207 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
208 method: 'POST',
209 json: {
210 host: constants.CONFIG.WEBSERVER.HOST,
211 publicKey: cert
212 }
213 }
214
215 requests.makeRetryRequest(params, function (err, res, body) {
216 if (err) {
217 logger.error('Error with adding %s pod.', pod.host, { error: err })
218 // Don't break the process
219 return callbackEach()
220 }
221
222 if (res.statusCode === 200) {
223 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
224 podObj.save().asCallback(function (err, podCreated) {
225 if (err) {
226 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
227 return callbackEach()
228 }
229
230 // Add our videos to the request scheduler
231 sendOwnedVideosToPod(podCreated.id)
232
233 return callbackEach()
234 })
235 } else {
236 logger.error('Status not 200 for %s pod.', pod.host)
237 return callbackEach()
238 }
239 })
240 }, function endRequests () {
241 // Final callback, we've ended all the requests
242 // Now we made new friends, we can re activate the pool of requests
243 db.Request.activate()
244
245 logger.debug('makeRequestsToWinningPods finished.')
246 return callback()
247 })
248 }
249
250 // Wrapper that populate "to" argument with all our friends if it is not specified
251 function createRequest (type, endpoint, data, to) {
252 if (to) return _createRequest(type, endpoint, data, to)
253
254 // If the "to" pods is not specified, we send the request to all our friends
255 db.Pod.listAllIds(function (err, podIds) {
256 if (err) {
257 logger.error('Cannot get pod ids', { error: err })
258 return
259 }
260
261 return _createRequest(type, endpoint, data, podIds)
262 })
263 }
264
265 function _createRequest (type, endpoint, data, to) {
266 const pods = []
267
268 // If there are no destination pods abort
269 if (to.length === 0) return
270
271 to.forEach(function (toPod) {
272 pods.push(db.Pod.build({ id: toPod }))
273 })
274
275 const createQuery = {
276 endpoint,
277 request: {
278 type: type,
279 data: data
280 }
281 }
282
283 // We run in transaction to keep coherency between Request and RequestToPod tables
284 db.sequelize.transaction(function (t) {
285 const dbRequestOptions = {
286 transaction: t
287 }
288
289 return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
290 return request.setPods(pods, dbRequestOptions)
291 })
292 }).asCallback(function (err) {
293 if (err) logger.error('Error in createRequest transaction.', { error: err })
294 })
295 }
296
297 function isMe (host) {
298 return host === constants.CONFIG.WEBSERVER.HOST
299 }