aboutsummaryrefslogtreecommitdiffhomepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/friends.js356
-rw-r--r--lib/poolRequests.js358
-rw-r--r--lib/videos.js102
-rw-r--r--lib/webtorrent.js254
-rw-r--r--lib/webtorrentProcess.js144
5 files changed, 602 insertions, 612 deletions
diff --git a/lib/friends.js b/lib/friends.js
index c05ccedb1..8cc1a3151 100644
--- a/lib/friends.js
+++ b/lib/friends.js
@@ -1,224 +1,222 @@
1;(function () { 1'use strict'
2 'use strict' 2
3 3var async = require('async')
4 var async = require('async') 4var config = require('config')
5 var config = require('config') 5var fs = require('fs')
6 var fs = require('fs') 6var request = require('request')
7 var request = require('request') 7
8 8var constants = require('../initializers/constants')
9 var constants = require('../initializers/constants') 9var logger = require('../helpers/logger')
10 var logger = require('../helpers/logger') 10var peertubeCrypto = require('../helpers/peertubeCrypto')
11 var peertubeCrypto = require('../helpers/peertubeCrypto') 11var Pods = require('../models/pods')
12 var Pods = require('../models/pods') 12var poolRequests = require('../lib/poolRequests')
13 var poolRequests = require('../lib/poolRequests') 13var requests = require('../helpers/requests')
14 var requests = require('../helpers/requests') 14var Videos = require('../models/videos')
15 var Videos = require('../models/videos') 15
16 16var http = config.get('webserver.https') ? 'https' : 'http'
17 var http = config.get('webserver.https') ? 'https' : 'http' 17var host = config.get('webserver.host')
18 var host = config.get('webserver.host') 18var port = config.get('webserver.port')
19 var port = config.get('webserver.port') 19
20 20var pods = {
21 var pods = { 21 addVideoToFriends: addVideoToFriends,
22 addVideoToFriends: addVideoToFriends, 22 hasFriends: hasFriends,
23 hasFriends: hasFriends, 23 makeFriends: makeFriends,
24 makeFriends: makeFriends, 24 quitFriends: quitFriends,
25 quitFriends: quitFriends, 25 removeVideoToFriends: removeVideoToFriends
26 removeVideoToFriends: removeVideoToFriends 26}
27 } 27
28function addVideoToFriends (video) {
29 // To avoid duplicates
30 var id = video.name + video.magnetUri
31 // ensure namePath is null
32 video.namePath = null
33 poolRequests.addRequest(id, 'add', video)
34}
35
36function hasFriends (callback) {
37 Pods.count(function (err, count) {
38 if (err) return callback(err)
39
40 var has_friends = (count !== 0)
41 callback(null, has_friends)
42 })
43}
44
45function makeFriends (callback) {
46 var pods_score = {}
47
48 logger.info('Make friends!')
49 fs.readFile(peertubeCrypto.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) {
50 if (err) {
51 logger.error('Cannot read public cert.')
52 return callback(err)
53 }
28 54
29 function addVideoToFriends (video) { 55 var urls = config.get('network.friends')
30 // To avoid duplicates
31 var id = video.name + video.magnetUri
32 // ensure namePath is null
33 video.namePath = null
34 poolRequests.addRequest(id, 'add', video)
35 }
36 56
37 function hasFriends (callback) { 57 async.each(urls, computeForeignPodsList, function (err) {
38 Pods.count(function (err, count) {
39 if (err) return callback(err) 58 if (err) return callback(err)
40 59
41 var has_friends = (count !== 0) 60 logger.debug('Pods scores computed.', { pods_score: pods_score })
42 callback(null, has_friends) 61 var pods_list = computeWinningPods(urls, pods_score)
62 logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list })
63
64 makeRequestsToWinningPods(cert, pods_list)
43 }) 65 })
44 } 66 })
45 67
46 function makeFriends (callback) { 68 // -----------------------------------------------------------------------
47 var pods_score = {}
48 69
49 logger.info('Make friends!') 70 function computeForeignPodsList (url, callback) {
50 fs.readFile(peertubeCrypto.getCertDir() + 'peertube.pub', 'utf8', function (err, cert) { 71 // Let's give 1 point to the pod we ask the friends list
51 if (err) { 72 pods_score[url] = 1
52 logger.error('Cannot read public cert.')
53 return callback(err)
54 }
55 73
56 var urls = config.get('network.friends') 74 getForeignPodsList(url, function (err, foreign_pods_list) {
75 if (err) return callback(err)
76 if (foreign_pods_list.length === 0) return callback()
57 77
58 async.each(urls, computeForeignPodsList, function (err) { 78 async.each(foreign_pods_list, function (foreign_pod, callback_each) {
59 if (err) return callback(err) 79 var foreign_url = foreign_pod.url
60 80
61 logger.debug('Pods scores computed.', { pods_score: pods_score }) 81 if (pods_score[foreign_url]) pods_score[foreign_url]++
62 var pods_list = computeWinningPods(urls, pods_score) 82 else pods_score[foreign_url] = 1
63 logger.debug('Pods that we keep computed.', { pods_to_keep: pods_list })
64 83
65 makeRequestsToWinningPods(cert, pods_list) 84 callback_each()
85 }, function () {
86 callback()
66 }) 87 })
67 }) 88 })
89 }
68 90
69 // ----------------------------------------------------------------------- 91 function computeWinningPods (urls, pods_score) {
70 92 // Build the list of pods to add
71 function computeForeignPodsList (url, callback) { 93 // Only add a pod if it exists in more than a half base pods
72 // Let's give 1 point to the pod we ask the friends list 94 var pods_list = []
73 pods_score[url] = 1 95 var base_score = urls.length / 2
74 96 Object.keys(pods_score).forEach(function (pod) {
75 getForeignPodsList(url, function (err, foreign_pods_list) { 97 if (pods_score[pod] > base_score) pods_list.push({ url: pod })
76 if (err) return callback(err) 98 })
77 if (foreign_pods_list.length === 0) return callback()
78
79 async.each(foreign_pods_list, function (foreign_pod, callback_each) {
80 var foreign_url = foreign_pod.url
81
82 if (pods_score[foreign_url]) pods_score[foreign_url]++
83 else pods_score[foreign_url] = 1
84
85 callback_each()
86 }, function () {
87 callback()
88 })
89 })
90 }
91 99
92 function computeWinningPods (urls, pods_score) { 100 return pods_list
93 // Build the list of pods to add 101 }
94 // Only add a pod if it exists in more than a half base pods
95 var pods_list = []
96 var base_score = urls.length / 2
97 Object.keys(pods_score).forEach(function (pod) {
98 if (pods_score[pod] > base_score) pods_list.push({ url: pod })
99 })
100 102
101 return pods_list 103 function makeRequestsToWinningPods (cert, pods_list) {
102 } 104 // Stop pool requests
105 poolRequests.deactivate()
106 // Flush pool requests
107 poolRequests.forceSend()
103 108
104 function makeRequestsToWinningPods (cert, pods_list) { 109 // Get the list of our videos to send to our new friends
105 // Stop pool requests 110 Videos.listOwned(function (err, videos_list) {
106 poolRequests.deactivate() 111 if (err) {
107 // Flush pool requests 112 logger.error('Cannot get the list of videos we own.')
108 poolRequests.forceSend() 113 return callback(err)
109 114 }
110 // Get the list of our videos to send to our new friends
111 Videos.listOwned(function (err, videos_list) {
112 if (err) {
113 logger.error('Cannot get the list of videos we own.')
114 return callback(err)
115 }
116 115
117 var data = { 116 var data = {
118 url: http + '://' + host + ':' + port, 117 url: http + '://' + host + ':' + port,
119 publicKey: cert, 118 publicKey: cert,
120 videos: videos_list 119 videos: videos_list
121 } 120 }
122 121
123 requests.makeMultipleRetryRequest( 122 requests.makeMultipleRetryRequest(
124 { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data }, 123 { method: 'POST', path: '/api/' + constants.API_VERSION + '/pods/', data: data },
125 124
126 pods_list, 125 pods_list,
127 126
128 function eachRequest (err, response, body, url, pod, callback_each_request) { 127 function eachRequest (err, response, body, url, pod, callback_each_request) {
129 // We add the pod if it responded correctly with its public certificate 128 // We add the pod if it responded correctly with its public certificate
130 if (!err && response.statusCode === 200) { 129 if (!err && response.statusCode === 200) {
131 Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) { 130 Pods.add({ url: pod.url, publicKey: body.cert, score: constants.FRIEND_BASE_SCORE }, function (err) {
132 if (err) logger.error('Error with adding %s pod.', pod.url, { error: err }) 131 if (err) logger.error('Error with adding %s pod.', pod.url, { error: err })
133 132
134 Videos.addRemotes(body.videos, function (err) { 133 Videos.addRemotes(body.videos, function (err) {
135 if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err }) 134 if (err) logger.error('Error with adding videos of pod.', pod.url, { error: err })
136 135
137 logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos }) 136 logger.debug('Adding remote videos from %s.', pod.url, { videos: body.videos })
138 return callback_each_request() 137 return callback_each_request()
139 })
140 }) 138 })
141 } else { 139 })
142 logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') }) 140 } else {
143 return callback_each_request() 141 logger.error('Error with adding %s pod.', pod.url, { error: err || new Error('Status not 200') })
144 } 142 return callback_each_request()
145 },
146
147 function endRequests (err) {
148 // Now we made new friends, we can re activate the pool of requests
149 poolRequests.activate()
150
151 if (err) {
152 logger.error('There was some errors when we wanted to make friends.')
153 return callback(err)
154 }
155
156 logger.debug('makeRequestsToWinningPods finished.')
157 return callback(null)
158 } 143 }
159 ) 144 },
160 })
161 }
162 }
163 145
164 function quitFriends (callback) { 146 function endRequests (err) {
165 // Stop pool requests 147 // Now we made new friends, we can re activate the pool of requests
166 poolRequests.deactivate() 148 poolRequests.activate()
167 // Flush pool requests
168 poolRequests.forceSend()
169 149
170 Pods.list(function (err, pods) { 150 if (err) {
171 if (err) return callback(err) 151 logger.error('There was some errors when we wanted to make friends.')
152 return callback(err)
153 }
172 154
173 var request = { 155 logger.debug('makeRequestsToWinningPods finished.')
174 method: 'POST', 156 return callback(null)
175 path: '/api/' + constants.API_VERSION + '/pods/remove',
176 sign: true,
177 encrypt: true,
178 data: {
179 url: 'me' // Fake data
180 } 157 }
158 )
159 })
160 }
161}
162
163function quitFriends (callback) {
164 // Stop pool requests
165 poolRequests.deactivate()
166 // Flush pool requests
167 poolRequests.forceSend()
168
169 Pods.list(function (err, pods) {
170 if (err) return callback(err)
171
172 var request = {
173 method: 'POST',
174 path: '/api/' + constants.API_VERSION + '/pods/remove',
175 sign: true,
176 encrypt: true,
177 data: {
178 url: 'me' // Fake data
181 } 179 }
180 }
182 181
183 // Announce we quit them 182 // Announce we quit them
184 requests.makeMultipleRetryRequest(request, pods, function () { 183 requests.makeMultipleRetryRequest(request, pods, function () {
185 Pods.removeAll(function (err) { 184 Pods.removeAll(function (err) {
186 poolRequests.activate() 185 poolRequests.activate()
187 186
188 if (err) return callback(err) 187 if (err) return callback(err)
189 188
190 logger.info('Broke friends, so sad :(') 189 logger.info('Broke friends, so sad :(')
191 190
192 Videos.removeAllRemotes(function (err) { 191 Videos.removeAllRemotes(function (err) {
193 if (err) return callback(err) 192 if (err) return callback(err)
194 193
195 logger.info('Removed all remote videos.') 194 logger.info('Removed all remote videos.')
196 callback(null) 195 callback(null)
197 })
198 }) 196 })
199 }) 197 })
200 }) 198 })
201 } 199 })
200}
202 201
203 function removeVideoToFriends (video) { 202function removeVideoToFriends (video) {
204 // To avoid duplicates 203 // To avoid duplicates
205 var id = video.name + video.magnetUri 204 var id = video.name + video.magnetUri
206 poolRequests.addRequest(id, 'remove', video) 205 poolRequests.addRequest(id, 'remove', video)
207 } 206}
208 207
209 // --------------------------------------------------------------------------- 208// ---------------------------------------------------------------------------
210 209
211 module.exports = pods 210module.exports = pods
212 211
213 // --------------------------------------------------------------------------- 212// ---------------------------------------------------------------------------
214 213
215 function getForeignPodsList (url, callback) { 214function getForeignPodsList (url, callback) {
216 var path = '/api/' + constants.API_VERSION + '/pods' 215 var path = '/api/' + constants.API_VERSION + '/pods'
217 216
218 request.get(url + path, function (err, response, body) { 217 request.get(url + path, function (err, response, body) {
219 if (err) return callback(err) 218 if (err) return callback(err)
220 219
221 callback(null, JSON.parse(body)) 220 callback(null, JSON.parse(body))
222 }) 221 })
223 } 222}
224})()
diff --git a/lib/poolRequests.js b/lib/poolRequests.js
index 9ea41f383..f786c3c7a 100644
--- a/lib/poolRequests.js
+++ b/lib/poolRequests.js
@@ -1,223 +1,221 @@
1;(function () { 1'use strict'
2 'use strict' 2
3 3var async = require('async')
4 var async = require('async') 4var pluck = require('lodash-node/compat/collection/pluck')
5 var pluck = require('lodash-node/compat/collection/pluck') 5
6 6var constants = require('../initializers/constants')
7 var constants = require('../initializers/constants') 7var logger = require('../helpers/logger')
8 var logger = require('../helpers/logger') 8var Pods = require('../models/pods')
9 var Pods = require('../models/pods') 9var PoolRequests = require('../models/poolRequests')
10 var PoolRequests = require('../models/poolRequests') 10var requests = require('../helpers/requests')
11 var requests = require('../helpers/requests') 11var Videos = require('../models/videos')
12 var Videos = require('../models/videos') 12
13 13var timer = null
14 var timer = null 14
15 15var poolRequests = {
16 var poolRequests = { 16 activate: activate,
17 activate: activate, 17 addRequest: addRequest,
18 addRequest: addRequest, 18 deactivate: deactivate,
19 deactivate: deactivate, 19 forceSend: forceSend
20 forceSend: forceSend 20}
21 } 21
22 22function activate () {
23 function activate () { 23 logger.info('Pool requests activated.')
24 logger.info('Pool requests activated.') 24 timer = setInterval(makePoolRequests, constants.INTERVAL)
25 timer = setInterval(makePoolRequests, constants.INTERVAL) 25}
26 } 26
27 27function addRequest (id, type, request) {
28 function addRequest (id, type, request) { 28 logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
29 logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) 29
30 30 PoolRequests.findById(id, function (err, entity) {
31 PoolRequests.findById(id, function (err, entity) { 31 if (err) {
32 if (err) { 32 logger.error('Cannot find one pool request.', { error: err })
33 logger.error('Cannot find one pool request.', { error: err }) 33 return // Abort
34 }
35
36 if (entity) {
37 if (entity.type === type) {
38 logger.error('Cannot insert two same requests.')
34 return // Abort 39 return // Abort
35 } 40 }
36 41
37 if (entity) { 42 // Remove the request of the other type
38 if (entity.type === type) { 43 PoolRequests.removeRequestById(id, function (err) {
39 logger.error('Cannot insert two same requests.') 44 if (err) {
45 logger.error('Cannot remove a pool request.', { error: err })
40 return // Abort 46 return // Abort
41 } 47 }
48 })
49 } else {
50 PoolRequests.create(id, type, request, function (err) {
51 if (err) logger.error('Cannot create a pool request.', { error: err })
52 return // Abort
53 })
54 }
55 })
56}
42 57
43 // Remove the request of the other type 58function deactivate () {
44 PoolRequests.removeRequestById(id, function (err) { 59 logger.info('Pool requests deactivated.')
45 if (err) { 60 clearInterval(timer)
46 logger.error('Cannot remove a pool request.', { error: err }) 61}
47 return // Abort
48 }
49 })
50 } else {
51 PoolRequests.create(id, type, request, function (err) {
52 if (err) logger.error('Cannot create a pool request.', { error: err })
53 return // Abort
54 })
55 }
56 })
57 }
58 62
59 function deactivate () { 63function forceSend () {
60 logger.info('Pool requests deactivated.') 64 logger.info('Force pool requests sending.')
61 clearInterval(timer) 65 makePoolRequests()
62 } 66}
63 67
64 function forceSend () { 68// ---------------------------------------------------------------------------
65 logger.info('Force pool requests sending.')
66 makePoolRequests()
67 }
68 69
69 // --------------------------------------------------------------------------- 70module.exports = poolRequests
70 71
71 module.exports = poolRequests 72// ---------------------------------------------------------------------------
72 73
73 // --------------------------------------------------------------------------- 74function makePoolRequest (type, requests_to_make, callback) {
75 if (!callback) callback = function () {}
74 76
75 function makePoolRequest (type, requests_to_make, callback) { 77 Pods.list(function (err, pods) {
76 if (!callback) callback = function () {} 78 if (err) return callback(err)
77 79
78 Pods.list(function (err, pods) { 80 var params = {
79 if (err) return callback(err) 81 encrypt: true,
82 sign: true,
83 method: 'POST',
84 path: null,
85 data: requests_to_make
86 }
80 87
81 var params = { 88 if (type === 'add') {
82 encrypt: true, 89 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
83 sign: true, 90 } else if (type === 'remove') {
84 method: 'POST', 91 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
85 path: null, 92 } else {
86 data: requests_to_make 93 return callback(new Error('Unkown pool request type.'))
87 } 94 }
95
96 var bad_pods = []
97 var good_pods = []
88 98
89 if (type === 'add') { 99 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
90 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' 100
91 } else if (type === 'remove') { 101 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
92 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' 102 if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
103 bad_pods.push(pod._id)
104 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
93 } else { 105 } else {
94 return callback(new Error('Unkown pool request type.')) 106 good_pods.push(pod._id)
95 } 107 }
96 108
97 var bad_pods = [] 109 return callback_each_pod_finished()
98 var good_pods = [] 110 }
99
100 requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
101 111
102 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { 112 function callbackAllPodsFinished (err) {
103 if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { 113 if (err) return callback(err)
104 bad_pods.push(pod._id)
105 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
106 } else {
107 good_pods.push(pod._id)
108 }
109 114
110 return callback_each_pod_finished() 115 updatePodsScore(good_pods, bad_pods)
116 callback(null)
117 }
118 })
119}
120
121function makePoolRequests () {
122 logger.info('Making pool requests to friends.')
123
124 PoolRequests.list(function (err, pool_requests) {
125 if (err) {
126 logger.error('Cannot get the list of pool requests.', { err: err })
127 return // Abort
128 }
129
130 if (pool_requests.length === 0) return
131
132 var requests_to_make = {
133 add: {
134 ids: [],
135 requests: []
136 },
137 remove: {
138 ids: [],
139 requests: []
111 } 140 }
112 141 }
113 function callbackAllPodsFinished (err) { 142
114 if (err) return callback(err) 143 async.each(pool_requests, function (pool_request, callback_each) {
115 144 if (pool_request.type === 'add') {
116 updatePodsScore(good_pods, bad_pods) 145 requests_to_make.add.requests.push(pool_request.request)
117 callback(null) 146 requests_to_make.add.ids.push(pool_request._id)
147 } else if (pool_request.type === 'remove') {
148 requests_to_make.remove.requests.push(pool_request.request)
149 requests_to_make.remove.ids.push(pool_request._id)
150 } else {
151 logger.error('Unkown pool request type.', { request_type: pool_request.type })
152 return // abort
118 } 153 }
119 })
120 }
121 154
122 function makePoolRequests () { 155 callback_each()
123 logger.info('Making pool requests to friends.') 156 }, function () {
157 // Send the add requests
158 if (requests_to_make.add.requests.length !== 0) {
159 makePoolRequest('add', requests_to_make.add.requests, function (err) {
160 if (err) logger.error('Errors when sent add pool requests.', { error: err })
124 161
125 PoolRequests.list(function (err, pool_requests) { 162 PoolRequests.removeRequests(requests_to_make.add.ids)
126 if (err) { 163 })
127 logger.error('Cannot get the list of pool requests.', { err: err })
128 return // Abort
129 } 164 }
130 165
131 if (pool_requests.length === 0) return 166 // Send the remove requests
167 if (requests_to_make.remove.requests.length !== 0) {
168 makePoolRequest('remove', requests_to_make.remove.requests, function (err) {
169 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
132 170
133 var requests_to_make = { 171 PoolRequests.removeRequests(requests_to_make.remove.ids)
134 add: { 172 })
135 ids: [],
136 requests: []
137 },
138 remove: {
139 ids: [],
140 requests: []
141 }
142 } 173 }
174 })
175 })
176}
143 177
144 async.each(pool_requests, function (pool_request, callback_each) { 178function removeBadPods () {
145 if (pool_request.type === 'add') { 179 Pods.findBadPods(function (err, pods) {
146 requests_to_make.add.requests.push(pool_request.request) 180 if (err) {
147 requests_to_make.add.ids.push(pool_request._id) 181 logger.error('Cannot find bad pods.', { error: err })
148 } else if (pool_request.type === 'remove') { 182 return // abort
149 requests_to_make.remove.requests.push(pool_request.request) 183 }
150 requests_to_make.remove.ids.push(pool_request._id)
151 } else {
152 logger.error('Unkown pool request type.', { request_type: pool_request.type })
153 return // abort
154 }
155
156 callback_each()
157 }, function () {
158 // Send the add requests
159 if (requests_to_make.add.requests.length !== 0) {
160 makePoolRequest('add', requests_to_make.add.requests, function (err) {
161 if (err) logger.error('Errors when sent add pool requests.', { error: err })
162
163 PoolRequests.removeRequests(requests_to_make.add.ids)
164 })
165 }
166 184
167 // Send the remove requests 185 if (pods.length === 0) return
168 if (requests_to_make.remove.requests.length !== 0) {
169 makePoolRequest('remove', requests_to_make.remove.requests, function (err) {
170 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
171 186
172 PoolRequests.removeRequests(requests_to_make.remove.ids) 187 var urls = pluck(pods, 'url')
173 }) 188 var ids = pluck(pods, '_id')
174 }
175 })
176 })
177 }
178 189
179 function removeBadPods () { 190 Videos.removeAllRemotesOf(urls, function (err, r) {
180 Pods.findBadPods(function (err, pods) {
181 if (err) { 191 if (err) {
182 logger.error('Cannot find bad pods.', { error: err }) 192 logger.error('Cannot remove videos from a pod that we removing.', { error: err })
183 return // abort 193 } else {
194 var videos_removed = r.result.n
195 logger.info('Removed %d videos.', videos_removed)
184 } 196 }
185 197
186 if (pods.length === 0) return 198 Pods.removeAllByIds(ids, function (err, r) {
187
188 var urls = pluck(pods, 'url')
189 var ids = pluck(pods, '_id')
190
191 Videos.removeAllRemotesOf(urls, function (err, r) {
192 if (err) { 199 if (err) {
193 logger.error('Cannot remove videos from a pod that we removing.', { error: err }) 200 logger.error('Cannot remove bad pods.', { error: err })
194 } else { 201 } else {
195 var videos_removed = r.result.n 202 var pods_removed = r.result.n
196 logger.info('Removed %d videos.', videos_removed) 203 logger.info('Removed %d pods.', pods_removed)
197 } 204 }
198
199 Pods.removeAllByIds(ids, function (err, r) {
200 if (err) {
201 logger.error('Cannot remove bad pods.', { error: err })
202 } else {
203 var pods_removed = r.result.n
204 logger.info('Removed %d pods.', pods_removed)
205 }
206 })
207 }) 205 })
208 }) 206 })
209 } 207 })
208}
210 209
211 function updatePodsScore (good_pods, bad_pods) { 210function updatePodsScore (good_pods, bad_pods) {
212 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) 211 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
213 212
214 Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) { 213 Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) {
215 if (err) logger.error('Cannot increment scores of good pods.') 214 if (err) logger.error('Cannot increment scores of good pods.')
216 }) 215 })
217 216
218 Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { 217 Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) {
219 if (err) logger.error('Cannot increment scores of bad pods.') 218 if (err) logger.error('Cannot increment scores of bad pods.')
220 removeBadPods() 219 removeBadPods()
221 }) 220 })
222 } 221}
223})()
diff --git a/lib/videos.js b/lib/videos.js
index 0e8143351..2d7d9500d 100644
--- a/lib/videos.js
+++ b/lib/videos.js
@@ -1,52 +1,50 @@
1;(function () { 1'use strict'
2 'use strict' 2
3 3var async = require('async')
4 var async = require('async') 4var config = require('config')
5 var config = require('config') 5var path = require('path')
6 var path = require('path') 6var webtorrent = require('../lib/webtorrent')
7 var webtorrent = require('../lib/webtorrent') 7
8 8var logger = require('../helpers/logger')
9 var logger = require('../helpers/logger') 9var Videos = require('../models/videos')
10 var Videos = require('../models/videos') 10
11 11var uploadDir = path.join(__dirname, '..', config.get('storage.uploads'))
12 var uploadDir = path.join(__dirname, '..', config.get('storage.uploads')) 12
13 13var videos = {
14 var videos = { 14 seed: seed,
15 seed: seed, 15 seedAllExisting: seedAllExisting
16 seedAllExisting: seedAllExisting 16}
17 } 17
18 18function seed (path, callback) {
19 function seed (path, callback) { 19 logger.info('Seeding %s...', path)
20 logger.info('Seeding %s...', path) 20
21 21 webtorrent.seed(path, function (torrent) {
22 webtorrent.seed(path, function (torrent) { 22 logger.info('%s seeded (%s).', path, torrent.magnetURI)
23 logger.info('%s seeded (%s).', path, torrent.magnetURI) 23
24 24 return callback(null, torrent)
25 return callback(null, torrent) 25 })
26 }) 26}
27 } 27
28 28function seedAllExisting (callback) {
29 function seedAllExisting (callback) { 29 Videos.listOwned(function (err, videos_list) {
30 Videos.listOwned(function (err, videos_list) { 30 if (err) {
31 if (err) { 31 logger.error('Cannot get list of the videos to seed.')
32 logger.error('Cannot get list of the videos to seed.') 32 return callback(err)
33 return callback(err) 33 }
34 } 34
35 35 async.each(videos_list, function (video, each_callback) {
36 async.each(videos_list, function (video, each_callback) { 36 seed(uploadDir + video.namePath, function (err) {
37 seed(uploadDir + video.namePath, function (err) { 37 if (err) {
38 if (err) { 38 logger.error('Cannot seed this video.')
39 logger.error('Cannot seed this video.') 39 return callback(err)
40 return callback(err) 40 }
41 } 41
42 42 each_callback(null)
43 each_callback(null) 43 })
44 }) 44 }, callback)
45 }, callback) 45 })
46 }) 46}
47 } 47
48 48// ---------------------------------------------------------------------------
49 // --------------------------------------------------------------------------- 49
50 50module.exports = videos
51 module.exports = videos
52})()
diff --git a/lib/webtorrent.js b/lib/webtorrent.js
index 083e5b77a..5f10322a5 100644
--- a/lib/webtorrent.js
+++ b/lib/webtorrent.js
@@ -1,161 +1,159 @@
1;(function () { 1'use strict'
2 'use strict' 2
3 3var config = require('config')
4 var config = require('config') 4var ipc = require('node-ipc')
5 var ipc = require('node-ipc') 5var pathUtils = require('path')
6 var pathUtils = require('path') 6var spawn = require('electron-spawn')
7 var spawn = require('electron-spawn') 7
8 8var logger = require('../helpers/logger')
9 var logger = require('../helpers/logger') 9
10 10var host = config.get('webserver.host')
11 var host = config.get('webserver.host') 11var port = config.get('webserver.port')
12 var port = config.get('webserver.port') 12var nodeKey = 'webtorrentnode' + port
13 var nodeKey = 'webtorrentnode' + port 13var processKey = 'webtorrentprocess' + port
14 var processKey = 'webtorrentprocess' + port 14ipc.config.silent = true
15 ipc.config.silent = true 15ipc.config.id = nodeKey
16 ipc.config.id = nodeKey 16
17 17var webtorrent = {
18 var webtorrent = { 18 add: add,
19 add: add, 19 app: null, // Pid of the app
20 app: null, // Pid of the app 20 create: create,
21 create: create, 21 remove: remove,
22 remove: remove, 22 seed: seed,
23 seed: seed, 23 silent: false // Useful for beautiful tests
24 silent: false // Useful for beautiful tests 24}
25
26function create (options, callback) {
27 if (typeof options === 'function') {
28 callback = options
29 options = {}
25 } 30 }
26 31
27 function create (options, callback) { 32 // Override options
28 if (typeof options === 'function') { 33 if (options.host) host = options.host
29 callback = options 34 if (options.port) {
30 options = {} 35 port = options.port
31 } 36 nodeKey = 'webtorrentnode' + port
32 37 processKey = 'webtorrentprocess' + port
33 // Override options 38 ipc.config.id = nodeKey
34 if (options.host) host = options.host 39 }
35 if (options.port) {
36 port = options.port
37 nodeKey = 'webtorrentnode' + port
38 processKey = 'webtorrentprocess' + port
39 ipc.config.id = nodeKey
40 }
41
42 ipc.serve(function () {
43 if (!webtorrent.silent) logger.info('IPC server ready.')
44 40
45 // Run a timeout of 30s after which we exit the process 41 ipc.serve(function () {
46 var timeout_webtorrent_process = setTimeout(function () { 42 if (!webtorrent.silent) logger.info('IPC server ready.')
47 logger.error('Timeout : cannot run the webtorrent process. Please ensure you have electron-prebuilt npm package installed with xvfb-run.')
48 process.exit()
49 }, 30000)
50 43
51 ipc.server.on(processKey + '.ready', function () { 44 // Run a timeout of 30s after which we exit the process
52 if (!webtorrent.silent) logger.info('Webtorrent process ready.') 45 var timeout_webtorrent_process = setTimeout(function () {
53 clearTimeout(timeout_webtorrent_process) 46 logger.error('Timeout : cannot run the webtorrent process. Please ensure you have electron-prebuilt npm package installed with xvfb-run.')
54 callback() 47 process.exit()
55 }) 48 }, 30000)
56 49
57 ipc.server.on(processKey + '.exception', function (data) { 50 ipc.server.on(processKey + '.ready', function () {
58 logger.error('Received exception error from webtorrent process.', { exception: data.exception }) 51 if (!webtorrent.silent) logger.info('Webtorrent process ready.')
59 process.exit() 52 clearTimeout(timeout_webtorrent_process)
60 }) 53 callback()
54 })
61 55
62 var webtorrent_process = spawn(pathUtils.join(__dirname, 'webtorrentProcess.js'), host, port, { detached: true }) 56 ipc.server.on(processKey + '.exception', function (data) {
63 webtorrent_process.stderr.on('data', function (data) { 57 logger.error('Received exception error from webtorrent process.', { exception: data.exception })
64 // logger.debug('Webtorrent process stderr: ', data.toString()) 58 process.exit()
65 }) 59 })
66 60
67 webtorrent_process.stdout.on('data', function (data) { 61 var webtorrent_process = spawn(pathUtils.join(__dirname, 'webtorrentProcess.js'), host, port, { detached: true })
68 // logger.debug('Webtorrent process:', data.toString()) 62 webtorrent_process.stderr.on('data', function (data) {
69 }) 63 // logger.debug('Webtorrent process stderr: ', data.toString())
64 })
70 65
71 webtorrent.app = webtorrent_process 66 webtorrent_process.stdout.on('data', function (data) {
67 // logger.debug('Webtorrent process:', data.toString())
72 }) 68 })
73 69
74 ipc.server.start() 70 webtorrent.app = webtorrent_process
75 } 71 })
76 72
77 function seed (path, callback) { 73 ipc.server.start()
78 var extension = pathUtils.extname(path) 74}
79 var basename = pathUtils.basename(path, extension) 75
80 var data = { 76function seed (path, callback) {
81 _id: basename, 77 var extension = pathUtils.extname(path)
82 args: { 78 var basename = pathUtils.basename(path, extension)
83 path: path 79 var data = {
84 } 80 _id: basename,
81 args: {
82 path: path
85 } 83 }
84 }
86 85
87 if (!webtorrent.silent) logger.debug('Node wants to seed %s.', data._id) 86 if (!webtorrent.silent) logger.debug('Node wants to seed %s.', data._id)
88 87
89 // Finish signal 88 // Finish signal
90 var event_key = nodeKey + '.seedDone.' + data._id 89 var event_key = nodeKey + '.seedDone.' + data._id
91 ipc.server.on(event_key, function listener (received) { 90 ipc.server.on(event_key, function listener (received) {
92 if (!webtorrent.silent) logger.debug('Process seeded torrent %s.', received.magnetUri) 91 if (!webtorrent.silent) logger.debug('Process seeded torrent %s.', received.magnetUri)
93 92
94 // This is a fake object, we just use the magnetUri in this project 93 // This is a fake object, we just use the magnetUri in this project
95 var torrent = { 94 var torrent = {
96 magnetURI: received.magnetUri 95 magnetURI: received.magnetUri
97 } 96 }
98 97
99 ipc.server.off(event_key) 98 ipc.server.off(event_key)
100 callback(torrent) 99 callback(torrent)
101 }) 100 })
102 101
103 ipc.server.broadcast(processKey + '.seed', data) 102 ipc.server.broadcast(processKey + '.seed', data)
104 } 103}
105 104
106 function add (magnetUri, callback) { 105function add (magnetUri, callback) {
107 var data = { 106 var data = {
108 _id: magnetUri, 107 _id: magnetUri,
109 args: { 108 args: {
110 magnetUri: magnetUri 109 magnetUri: magnetUri
111 }
112 } 110 }
111 }
113 112
114 if (!webtorrent.silent) logger.debug('Node wants to add ' + data._id) 113 if (!webtorrent.silent) logger.debug('Node wants to add ' + data._id)
115 114
116 // Finish signal 115 // Finish signal
117 var event_key = nodeKey + '.addDone.' + data._id 116 var event_key = nodeKey + '.addDone.' + data._id
118 ipc.server.on(event_key, function (received) { 117 ipc.server.on(event_key, function (received) {
119 if (!webtorrent.silent) logger.debug('Process added torrent.') 118 if (!webtorrent.silent) logger.debug('Process added torrent.')
120 119
121 // This is a fake object, we just use the magnetUri in this project 120 // This is a fake object, we just use the magnetUri in this project
122 var torrent = { 121 var torrent = {
123 files: received.files 122 files: received.files
124 } 123 }
125 124
126 ipc.server.off(event_key) 125 ipc.server.off(event_key)
127 callback(torrent) 126 callback(torrent)
128 }) 127 })
129 128
130 ipc.server.broadcast(processKey + '.add', data) 129 ipc.server.broadcast(processKey + '.add', data)
131 } 130}
132 131
133 function remove (magnetUri, callback) { 132function remove (magnetUri, callback) {
134 var data = { 133 var data = {
135 _id: magnetUri, 134 _id: magnetUri,
136 args: { 135 args: {
137 magnetUri: magnetUri 136 magnetUri: magnetUri
138 }
139 } 137 }
138 }
140 139
141 if (!webtorrent.silent) logger.debug('Node wants to stop seeding %s.', data._id) 140 if (!webtorrent.silent) logger.debug('Node wants to stop seeding %s.', data._id)
142 141
143 // Finish signal 142 // Finish signal
144 var event_key = nodeKey + '.removeDone.' + data._id 143 var event_key = nodeKey + '.removeDone.' + data._id
145 ipc.server.on(event_key, function (received) { 144 ipc.server.on(event_key, function (received) {
146 if (!webtorrent.silent) logger.debug('Process removed torrent %s.', data._id) 145 if (!webtorrent.silent) logger.debug('Process removed torrent %s.', data._id)
147 146
148 var err = null 147 var err = null
149 if (received.err) err = received.err 148 if (received.err) err = received.err
150 149
151 ipc.server.off(event_key) 150 ipc.server.off(event_key)
152 callback(err) 151 callback(err)
153 }) 152 })
154 153
155 ipc.server.broadcast(processKey + '.remove', data) 154 ipc.server.broadcast(processKey + '.remove', data)
156 } 155}
157 156
158 // --------------------------------------------------------------------------- 157// ---------------------------------------------------------------------------
159 158
160 module.exports = webtorrent 159module.exports = webtorrent
161})()
diff --git a/lib/webtorrentProcess.js b/lib/webtorrentProcess.js
index 7dc655f10..96ebf9d02 100644
--- a/lib/webtorrentProcess.js
+++ b/lib/webtorrentProcess.js
@@ -1,95 +1,93 @@
1;(function () { 1'use strict'
2 'use strict'
3 2
4 function webtorrent (args) { 3function webtorrent (args) {
5 var WebTorrent = require('webtorrent') 4 var WebTorrent = require('webtorrent')
6 var ipc = require('node-ipc') 5 var ipc = require('node-ipc')
7 6
8 if (args.length !== 3) { 7 if (args.length !== 3) {
9 console.log('Wrong arguments number: ' + args.length + '/3') 8 console.log('Wrong arguments number: ' + args.length + '/3')
10 process.exit(-1) 9 process.exit(-1)
11 } 10 }
12
13 var host = args[1]
14 var port = args[2]
15 var nodeKey = 'webtorrentnode' + port
16 var processKey = 'webtorrentprocess' + port
17 11
18 ipc.config.silent = true 12 var host = args[1]
19 ipc.config.id = processKey 13 var port = args[2]
14 var nodeKey = 'webtorrentnode' + port
15 var processKey = 'webtorrentprocess' + port
20 16
21 if (host === 'client' && port === '1') global.WEBTORRENT_ANNOUNCE = [] 17 ipc.config.silent = true
22 else global.WEBTORRENT_ANNOUNCE = 'ws://' + host + ':' + port + '/tracker/socket' 18 ipc.config.id = processKey
23 var wt = new WebTorrent({ dht: false })
24 19
25 function seed (data) { 20 if (host === 'client' && port === '1') global.WEBTORRENT_ANNOUNCE = []
26 var args = data.args 21 else global.WEBTORRENT_ANNOUNCE = 'ws://' + host + ':' + port + '/tracker/socket'
27 var path = args.path 22 var wt = new WebTorrent({ dht: false })
28 var _id = data._id
29 23
30 wt.seed(path, { announceList: '' }, function (torrent) { 24 function seed (data) {
31 var to_send = { 25 var args = data.args
32 magnetUri: torrent.magnetURI 26 var path = args.path
33 } 27 var _id = data._id
34 28
35 ipc.of[nodeKey].emit(nodeKey + '.seedDone.' + _id, to_send) 29 wt.seed(path, { announceList: '' }, function (torrent) {
36 }) 30 var to_send = {
37 } 31 magnetUri: torrent.magnetURI
32 }
38 33
39 function add (data) { 34 ipc.of[nodeKey].emit(nodeKey + '.seedDone.' + _id, to_send)
40 var args = data.args 35 })
41 var magnetUri = args.magnetUri 36 }
42 var _id = data._id
43 37
44 wt.add(magnetUri, function (torrent) { 38 function add (data) {
45 var to_send = { 39 var args = data.args
46 files: [] 40 var magnetUri = args.magnetUri
47 } 41 var _id = data._id
48 42
49 torrent.files.forEach(function (file) { 43 wt.add(magnetUri, function (torrent) {
50 to_send.files.push({ path: file.path }) 44 var to_send = {
51 }) 45 files: []
46 }
52 47
53 ipc.of[nodeKey].emit(nodeKey + '.addDone.' + _id, to_send) 48 torrent.files.forEach(function (file) {
49 to_send.files.push({ path: file.path })
54 }) 50 })
55 }
56 51
57 function remove (data) { 52 ipc.of[nodeKey].emit(nodeKey + '.addDone.' + _id, to_send)
58 var args = data.args 53 })
59 var magnetUri = args.magnetUri 54 }
60 var _id = data._id
61 55
62 try { 56 function remove (data) {
63 wt.remove(magnetUri, callback) 57 var args = data.args
64 } catch (err) { 58 var magnetUri = args.magnetUri
65 console.log('Cannot remove the torrent from WebTorrent.') 59 var _id = data._id
66 return callback(null)
67 }
68 60
69 function callback () { 61 try {
70 var to_send = {} 62 wt.remove(magnetUri, callback)
71 ipc.of[nodeKey].emit(nodeKey + '.removeDone.' + _id, to_send) 63 } catch (err) {
72 } 64 console.log('Cannot remove the torrent from WebTorrent.')
65 return callback(null)
66 }
67
68 function callback () {
69 var to_send = {}
70 ipc.of[nodeKey].emit(nodeKey + '.removeDone.' + _id, to_send)
73 } 71 }
72 }
74 73
75 console.log('Configuration: ' + host + ':' + port) 74 console.log('Configuration: ' + host + ':' + port)
76 console.log('Connecting to IPC...') 75 console.log('Connecting to IPC...')
77 76
78 ipc.connectTo(nodeKey, function () { 77 ipc.connectTo(nodeKey, function () {
79 ipc.of[nodeKey].on(processKey + '.seed', seed) 78 ipc.of[nodeKey].on(processKey + '.seed', seed)
80 ipc.of[nodeKey].on(processKey + '.add', add) 79 ipc.of[nodeKey].on(processKey + '.add', add)
81 ipc.of[nodeKey].on(processKey + '.remove', remove) 80 ipc.of[nodeKey].on(processKey + '.remove', remove)
82 81
83 ipc.of[nodeKey].emit(processKey + '.ready') 82 ipc.of[nodeKey].emit(processKey + '.ready')
84 console.log('Ready.') 83 console.log('Ready.')
85 }) 84 })
86 85
87 process.on('uncaughtException', function (e) { 86 process.on('uncaughtException', function (e) {
88 ipc.of[nodeKey].emit(processKey + '.exception', { exception: e }) 87 ipc.of[nodeKey].emit(processKey + '.exception', { exception: e })
89 }) 88 })
90 } 89}
91 90
92 // --------------------------------------------------------------------------- 91// ---------------------------------------------------------------------------
93 92
94 module.exports = webtorrent 93module.exports = webtorrent
95})()