aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/friends.js228
-rw-r--r--server/lib/poolRequests.js221
-rw-r--r--server/lib/videos.js50
-rw-r--r--server/lib/webtorrent.js157
-rw-r--r--server/lib/webtorrentProcess.js92
5 files changed, 748 insertions, 0 deletions
diff --git a/server/lib/friends.js b/server/lib/friends.js
new file mode 100644
index 000000000..006a64404
--- /dev/null
+++ b/server/lib/friends.js
@@ -0,0 +1,228 @@
1'use strict'
2
3var async = require('async')
4var config = require('config')
5var fs = require('fs')
6var request = require('request')
7
8var constants = require('../initializers/constants')
9var logger = require('../helpers/logger')
10var peertubeCrypto = require('../helpers/peertubeCrypto')
11var Pods = require('../models/pods')
12var poolRequests = require('../lib/poolRequests')
13var requests = require('../helpers/requests')
14var Videos = require('../models/videos')
15
16var http = config.get('webserver.https') ? 'https' : 'http'
17var host = config.get('webserver.host')
18var port = config.get('webserver.port')
19
20var pods = {
21 addVideoToFriends: addVideoToFriends,
22 hasFriends: hasFriends,
23 makeFriends: makeFriends,
24 quitFriends: quitFriends,
25 removeVideoToFriends: removeVideoToFriends
26}
27
28function addVideoToFriends (video) {
29 // To avoid duplicates
30 var id = video.name + video.magnetUri
31 // ensure namePath is null
32 video.namePath = null
33 poolRequests.addRequest(id, 'add', video)
34}
35
36function hasFriends (callback) {
37 Pods.count(function (err, count) {
38 if (err) return callback(err)
39
40 var has_friends = (count !== 0)
41 callback(null, has_friends)
42 })
43}
44
45function makeFriends (callback) {
46 var pods_score = {}
47
48 logger.info('Make friends!')
49 fs.readFile(peertubeCrypto.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) {
50 if (err) {
51 logger.error('Cannot read public cert.')
52 return callback(err)
53 }
54
55 var urls = config.get('network.friends')
56
57 async.each(urls, function (url, callback) {
58 computeForeignPodsList(url, pods_score, callback)
59 }, function (err) {
60 if (err) return callback(err)
61
62 logger.debug('Pods scores computed.', { pods_score: pods_score })
63 var pods_list = computeWinningPods(urls, pods_score)
64 logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list })
65
66 makeRequestsToWinningPods(cert, pods_list, callback)
67 })
68 })
69}
70
71function quitFriends (callback) {
72 // Stop pool requests
73 poolRequests.deactivate()
74 // Flush pool requests
75 poolRequests.forceSend()
76
77 Pods.list(function (err, pods) {
78 if (err) return callback(err)
79
80 var request = {
81 method: 'POST',
82 path: '/api/' + constants.API_VERSION + '/pods/remove',
83 sign: true,
84 encrypt: true,
85 data: {
86 url: 'me' // Fake data
87 }
88 }
89
90 // Announce we quit them
91 requests.makeMultipleRetryRequest(request, pods, function () {
92 Pods.removeAll(function (err) {
93 poolRequests.activate()
94
95 if (err) return callback(err)
96
97 logger.info('Broke friends, so sad :(')
98
99 Videos.removeAllRemotes(function (err) {
100 if (err) return callback(err)
101
102 logger.info('Removed all remote videos.')
103 callback(null)
104 })
105 })
106 })
107 })
108}
109
110function removeVideoToFriends (video) {
111 // To avoid duplicates
112 var id = video.name + video.magnetUri
113 poolRequests.addRequest(id, 'remove', video)
114}
115
116// ---------------------------------------------------------------------------
117
118module.exports = pods
119
120// ---------------------------------------------------------------------------
121
122function computeForeignPodsList (url, pods_score, callback) {
123 // Let's give 1 point to the pod we ask the friends list
124 pods_score[url] = 1
125
126 getForeignPodsList(url, function (err, foreign_pods_list) {
127 if (err) return callback(err)
128 if (foreign_pods_list.length === 0) return callback()
129
130 async.each(foreign_pods_list, function (foreign_pod, callback_each) {
131 var foreign_url = foreign_pod.url
132
133 if (pods_score[foreign_url]) pods_score[foreign_url]++
134 else pods_score[foreign_url] = 1
135
136 callback_each()
137 }, function () {
138 callback()
139 })
140 })
141}
142
143function computeWinningPods (urls, pods_score) {
144 // Build the list of pods to add
145 // Only add a pod if it exists in more than a half base pods
146 var pods_list = []
147 var base_score = urls.length / 2
148 Object.keys(pods_score).forEach(function (pod) {
149 if (pods_score[pod] > base_score) pods_list.push({ url: pod })
150 })
151
152 return pods_list
153}
154
155function getForeignPodsList (url, callback) {
156 var path = '/api/' + constants.API_VERSION + '/pods'
157
158 request.get(url + path, function (err, response, body) {
159 if (err) return callback(err)
160
161 callback(null, JSON.parse(body))
162 })
163}
164
165function makeRequestsToWinningPods (cert, pods_list, callback) {
166 // Stop pool requests
167 poolRequests.deactivate()
168 // Flush pool requests
169 poolRequests.forceSend()
170
171 // Get the list of our videos to send to our new friends
172 Videos.listOwned(function (err, videos_list) {
173 if (err) {
174 logger.error('Cannot get the list of videos we own.')
175 return callback(err)
176 }
177
178 var data = {
179 url: http + '://' + host + ':' + port,
180 publicKey: cert,
181 videos: videos_list
182 }
183
184 requests.makeMultipleRetryRequest(
185 { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data },
186
187 pods_list,
188
189 function eachRequest (err, response, body, url, pod, callback_each_request) {
190 // We add the pod if it responded correctly with its public certificate
191 if (!err && response.statusCode === 200) {
192 Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) {
193 if (err) {
194 logger.error('Error with adding %s pod.', pod.url, { error: err })
195 return callback_each_request()
196 }
197
198 Videos.addRemotes(body.videos, function (err) {
199 if (err) {
200 logger.error('Error with adding videos of pod.', pod.url, { error: err })
201 return callback_each_request()
202 }
203
204 logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos })
205 return callback_each_request()
206 })
207 })
208 } else {
209 logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
210 return callback_each_request()
211 }
212 },
213
214 function endRequests (err) {
215 // Now we made new friends, we can re activate the pool of requests
216 poolRequests.activate()
217
218 if (err) {
219 logger.error('There was some errors when we wanted to make friends.')
220 return callback(err)
221 }
222
223 logger.debug('makeRequestsToWinningPods finished.')
224 return callback(null)
225 }
226 )
227 })
228}
diff --git a/server/lib/poolRequests.js b/server/lib/poolRequests.js
new file mode 100644
index 000000000..f786c3c7a
--- /dev/null
+++ b/server/lib/poolRequests.js
@@ -0,0 +1,221 @@
1'use strict'
2
3var async = require('async')
4var pluck = require('lodash-node/compat/collection/pluck')
5
6var constants = require('../initializers/constants')
7var logger = require('../helpers/logger')
8var Pods = require('../models/pods')
9var PoolRequests = require('../models/poolRequests')
10var requests = require('../helpers/requests')
11var Videos = require('../models/videos')
12
13var timer = null
14
15var poolRequests = {
16 activate: activate,
17 addRequest: addRequest,
18 deactivate: deactivate,
19 forceSend: forceSend
20}
21
22function activate () {
23 logger.info('Pool requests activated.')
24 timer = setInterval(makePoolRequests, constants.INTERVAL)
25}
26
27function addRequest (id, type, request) {
28 logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
29
30 PoolRequests.findById(id, function (err, entity) {
31 if (err) {
32 logger.error('Cannot find one pool request.', { error: err })
33 return // Abort
34 }
35
36 if (entity) {
37 if (entity.type === type) {
38 logger.error('Cannot insert two same requests.')
39 return // Abort
40 }
41
42 // Remove the request of the other type
43 PoolRequests.removeRequestById(id, function (err) {
44 if (err) {
45 logger.error('Cannot remove a pool request.', { error: err })
46 return // Abort
47 }
48 })
49 } else {
50 PoolRequests.create(id, type, request, function (err) {
51 if (err) logger.error('Cannot create a pool request.', { error: err })
52 return // Abort
53 })
54 }
55 })
56}
57
58function deactivate () {
59 logger.info('Pool requests deactivated.')
60 clearInterval(timer)
61}
62
63function forceSend () {
64 logger.info('Force pool requests sending.')
65 makePoolRequests()
66}
67
68// ---------------------------------------------------------------------------
69
70module.exports = poolRequests
71
72// ---------------------------------------------------------------------------
73
74function makePoolRequest (type, requests_to_make, callback) {
75 if (!callback) callback = function () {}
76
77 Pods.list(function (err, pods) {
78 if (err) return callback(err)
79
80 var params = {
81 encrypt: true,
82 sign: true,
83 method: 'POST',
84 path: null,
85 data: requests_to_make
86 }
87
88 if (type === 'add') {
89 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
90 } else if (type === 'remove') {
91 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
92 } else {
93 return callback(new Error('Unkown pool request type.'))
94 }
95
96 var bad_pods = []
97 var good_pods = []
98
99 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
100
101 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
102 if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
103 bad_pods.push(pod._id)
104 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
105 } else {
106 good_pods.push(pod._id)
107 }
108
109 return callback_each_pod_finished()
110 }
111
112 function callbackAllPodsFinished (err) {
113 if (err) return callback(err)
114
115 updatePodsScore(good_pods, bad_pods)
116 callback(null)
117 }
118 })
119}
120
121function makePoolRequests () {
122 logger.info('Making pool requests to friends.')
123
124 PoolRequests.list(function (err, pool_requests) {
125 if (err) {
126 logger.error('Cannot get the list of pool requests.', { err: err })
127 return // Abort
128 }
129
130 if (pool_requests.length === 0) return
131
132 var requests_to_make = {
133 add: {
134 ids: [],
135 requests: []
136 },
137 remove: {
138 ids: [],
139 requests: []
140 }
141 }
142
143 async.each(pool_requests, function (pool_request, callback_each) {
144 if (pool_request.type === 'add') {
145 requests_to_make.add.requests.push(pool_request.request)
146 requests_to_make.add.ids.push(pool_request._id)
147 } else if (pool_request.type === 'remove') {
148 requests_to_make.remove.requests.push(pool_request.request)
149 requests_to_make.remove.ids.push(pool_request._id)
150 } else {
151 logger.error('Unkown pool request type.', { request_type: pool_request.type })
152 return // abort
153 }
154
155 callback_each()
156 }, function () {
157 // Send the add requests
158 if (requests_to_make.add.requests.length !== 0) {
159 makePoolRequest('add', requests_to_make.add.requests, function (err) {
160 if (err) logger.error('Errors when sent add pool requests.', { error: err })
161
162 PoolRequests.removeRequests(requests_to_make.add.ids)
163 })
164 }
165
166 // Send the remove requests
167 if (requests_to_make.remove.requests.length !== 0) {
168 makePoolRequest('remove', requests_to_make.remove.requests, function (err) {
169 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
170
171 PoolRequests.removeRequests(requests_to_make.remove.ids)
172 })
173 }
174 })
175 })
176}
177
178function removeBadPods () {
179 Pods.findBadPods(function (err, pods) {
180 if (err) {
181 logger.error('Cannot find bad pods.', { error: err })
182 return // abort
183 }
184
185 if (pods.length === 0) return
186
187 var urls = pluck(pods, 'url')
188 var ids = pluck(pods, '_id')
189
190 Videos.removeAllRemotesOf(urls, function (err, r) {
191 if (err) {
192 logger.error('Cannot remove videos from a pod that we removing.', { error: err })
193 } else {
194 var videos_removed = r.result.n
195 logger.info('Removed %d videos.', videos_removed)
196 }
197
198 Pods.removeAllByIds(ids, function (err, r) {
199 if (err) {
200 logger.error('Cannot remove bad pods.', { error: err })
201 } else {
202 var pods_removed = r.result.n
203 logger.info('Removed %d pods.', pods_removed)
204 }
205 })
206 })
207 })
208}
209
210function updatePodsScore (good_pods, bad_pods) {
211 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
212
213 Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) {
214 if (err) logger.error('Cannot increment scores of good pods.')
215 })
216
217 Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) {
218 if (err) logger.error('Cannot increment scores of bad pods.')
219 removeBadPods()
220 })
221}
diff --git a/server/lib/videos.js b/server/lib/videos.js
new file mode 100644
index 000000000..2d7d9500d
--- /dev/null
+++ b/server/lib/videos.js
@@ -0,0 +1,50 @@
1'use strict'
2
3var async = require('async')
4var config = require('config')
5var path = require('path')
6var webtorrent = require('../lib/webtorrent')
7
8var logger = require('../helpers/logger')
9var Videos = require('../models/videos')
10
11var uploadDir = path.join(__dirname, '..', config.get('storage.uploads'))
12
13var videos = {
14 seed: seed,
15 seedAllExisting: seedAllExisting
16}
17
18function seed (path, callback) {
19 logger.info('Seeding %s...', path)
20
21 webtorrent.seed(path, function (torrent) {
22 logger.info('%s seeded (%s).', path, torrent.magnetURI)
23
24 return callback(null, torrent)
25 })
26}
27
28function seedAllExisting (callback) {
29 Videos.listOwned(function (err, videos_list) {
30 if (err) {
31 logger.error('Cannot get list of the videos to seed.')
32 return callback(err)
33 }
34
35 async.each(videos_list, function (video, each_callback) {
36 seed(uploadDir + video.namePath, function (err) {
37 if (err) {
38 logger.error('Cannot seed this video.')
39 return callback(err)
40 }
41
42 each_callback(null)
43 })
44 }, callback)
45 })
46}
47
48// ---------------------------------------------------------------------------
49
50module.exports = videos
diff --git a/server/lib/webtorrent.js b/server/lib/webtorrent.js
new file mode 100644
index 000000000..cb641fead
--- /dev/null
+++ b/server/lib/webtorrent.js
@@ -0,0 +1,157 @@
1'use strict'
2
3var config = require('config')
4var ipc = require('node-ipc')
5var pathUtils = require('path')
6var spawn = require('electron-spawn')
7
8var logger = require('../helpers/logger')
9
10var host = config.get('webserver.host')
11var port = config.get('webserver.port')
12var nodeKey = 'webtorrentnode' + port
13var processKey = 'webtorrentprocess' + port
14ipc.config.silent = true
15ipc.config.id = nodeKey
16
17var webtorrent = {
18 add: add,
19 app: null, // Pid of the app
20 create: create,
21 remove: remove,
22 seed: seed,
23 silent: false // Useful for beautiful tests
24}
25
26function create (options, callback) {
27 if (typeof options === 'function') {
28 callback = options
29 options = {}
30 }
31
32 // Override options
33 if (options.host) host = options.host
34 if (options.port) {
35 port = options.port
36 nodeKey = 'webtorrentnode' + port
37 processKey = 'webtorrentprocess' + port
38 ipc.config.id = nodeKey
39 }
40
41 ipc.serve(function () {
42 if (!webtorrent.silent) logger.info('IPC server ready.')
43
44 // Run a timeout of 30s after which we exit the process
45 var timeout_webtorrent_process = setTimeout(function () {
46 throw new Error('Timeout : cannot run the webtorrent process. Please ensure you have electron-prebuilt npm package installed with xvfb-run.')
47 }, 30000)
48
49 ipc.server.on(processKey + '.ready', function () {
50 if (!webtorrent.silent) logger.info('Webtorrent process ready.')
51 clearTimeout(timeout_webtorrent_process)
52 callback()
53 })
54
55 ipc.server.on(processKey + '.exception', function (data) {
56 throw new Error('Received exception error from webtorrent process.' + data.exception)
57 })
58
59 var webtorrent_process = spawn(pathUtils.join(__dirname, 'webtorrentProcess.js'), host, port, { detached: true })
60 webtorrent_process.stderr.on('data', function (data) {
61 // logger.debug('Webtorrent process stderr: ', data.toString())
62 })
63
64 webtorrent_process.stdout.on('data', function (data) {
65 // logger.debug('Webtorrent process:', data.toString())
66 })
67
68 webtorrent.app = webtorrent_process
69 })
70
71 ipc.server.start()
72}
73
74function seed (path, callback) {
75 var extension = pathUtils.extname(path)
76 var basename = pathUtils.basename(path, extension)
77 var data = {
78 _id: basename,
79 args: {
80 path: path
81 }
82 }
83
84 if (!webtorrent.silent) logger.debug('Node wants to seed %s.', data._id)
85
86 // Finish signal
87 var event_key = nodeKey + '.seedDone.' + data._id
88 ipc.server.on(event_key, function listener (received) {
89 if (!webtorrent.silent) logger.debug('Process seeded torrent %s.', received.magnetUri)
90
91 // This is a fake object, we just use the magnetUri in this project
92 var torrent = {
93 magnetURI: received.magnetUri
94 }
95
96 ipc.server.off(event_key)
97 callback(torrent)
98 })
99
100 ipc.server.broadcast(processKey + '.seed', data)
101}
102
103function add (magnetUri, callback) {
104 var data = {
105 _id: magnetUri,
106 args: {
107 magnetUri: magnetUri
108 }
109 }
110
111 if (!webtorrent.silent) logger.debug('Node wants to add ' + data._id)
112
113 // Finish signal
114 var event_key = nodeKey + '.addDone.' + data._id
115 ipc.server.on(event_key, function (received) {
116 if (!webtorrent.silent) logger.debug('Process added torrent.')
117
118 // This is a fake object, we just use the magnetUri in this project
119 var torrent = {
120 files: received.files
121 }
122
123 ipc.server.off(event_key)
124 callback(torrent)
125 })
126
127 ipc.server.broadcast(processKey + '.add', data)
128}
129
130function remove (magnetUri, callback) {
131 var data = {
132 _id: magnetUri,
133 args: {
134 magnetUri: magnetUri
135 }
136 }
137
138 if (!webtorrent.silent) logger.debug('Node wants to stop seeding %s.', data._id)
139
140 // Finish signal
141 var event_key = nodeKey + '.removeDone.' + data._id
142 ipc.server.on(event_key, function (received) {
143 if (!webtorrent.silent) logger.debug('Process removed torrent %s.', data._id)
144
145 var err = null
146 if (received.err) err = received.err
147
148 ipc.server.off(event_key)
149 callback(err)
150 })
151
152 ipc.server.broadcast(processKey + '.remove', data)
153}
154
155// ---------------------------------------------------------------------------
156
157module.exports = webtorrent
diff --git a/server/lib/webtorrentProcess.js b/server/lib/webtorrentProcess.js
new file mode 100644
index 000000000..7da52523a
--- /dev/null
+++ b/server/lib/webtorrentProcess.js
@@ -0,0 +1,92 @@
1'use strict'
2
3var WebTorrent = require('webtorrent')
4var ipc = require('node-ipc')
5
6function webtorrent (args) {
7 if (args.length !== 3) {
8 throw new Error('Wrong arguments number: ' + args.length + '/3')
9 }
10
11 var host = args[1]
12 var port = args[2]
13 var nodeKey = 'webtorrentnode' + port
14 var processKey = 'webtorrentprocess' + port
15
16 ipc.config.silent = true
17 ipc.config.id = processKey
18
19 if (host === 'client' && port === '1') global.WEBTORRENT_ANNOUNCE = []
20 else global.WEBTORRENT_ANNOUNCE = 'ws://' + host + ':' + port + '/tracker/socket'
21 var wt = new WebTorrent({ dht: false })
22
23 function seed (data) {
24 var args = data.args
25 var path = args.path
26 var _id = data._id
27
28 wt.seed(path, { announceList: '' }, function (torrent) {
29 var to_send = {
30 magnetUri: torrent.magnetURI
31 }
32
33 ipc.of[nodeKey].emit(nodeKey + '.seedDone.' + _id, to_send)
34 })
35 }
36
37 function add (data) {
38 var args = data.args
39 var magnetUri = args.magnetUri
40 var _id = data._id
41
42 wt.add(magnetUri, function (torrent) {
43 var to_send = {
44 files: []
45 }
46
47 torrent.files.forEach(function (file) {
48 to_send.files.push({ path: file.path })
49 })
50
51 ipc.of[nodeKey].emit(nodeKey + '.addDone.' + _id, to_send)
52 })
53 }
54
55 function remove (data) {
56 var args = data.args
57 var magnetUri = args.magnetUri
58 var _id = data._id
59
60 try {
61 wt.remove(magnetUri, callback)
62 } catch (err) {
63 console.log('Cannot remove the torrent from WebTorrent.')
64 return callback(null)
65 }
66
67 function callback () {
68 var to_send = {}
69 ipc.of[nodeKey].emit(nodeKey + '.removeDone.' + _id, to_send)
70 }
71 }
72
73 console.log('Configuration: ' + host + ':' + port)
74 console.log('Connecting to IPC...')
75
76 ipc.connectTo(nodeKey, function () {
77 ipc.of[nodeKey].on(processKey + '.seed', seed)
78 ipc.of[nodeKey].on(processKey + '.add', add)
79 ipc.of[nodeKey].on(processKey + '.remove', remove)
80
81 ipc.of[nodeKey].emit(processKey + '.ready')
82 console.log('Ready.')
83 })
84
85 process.on('uncaughtException', function (e) {
86 ipc.of[nodeKey].emit(processKey + '.exception', { exception: e })
87 })
88}
89
90// ---------------------------------------------------------------------------
91
92module.exports = webtorrent