aboutsummaryrefslogtreecommitdiffhomepage
path: root/lib
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2016-01-30 17:05:22 +0100
committerChocobozzz <florian.bigard@gmail.com>2016-01-30 17:05:22 +0100
commitcda021079ff455cc0fd0eb95a5395fa808ab63d1 (patch)
tree056716de7460462b74b861051a5e9da6e2633fce /lib
parent86435b9baedfe300a28ea4545511c1b50d4119f6 (diff)
downloadPeerTube-cda021079ff455cc0fd0eb95a5395fa808ab63d1.tar.gz
PeerTube-cda021079ff455cc0fd0eb95a5395fa808ab63d1.tar.zst
PeerTube-cda021079ff455cc0fd0eb95a5395fa808ab63d1.zip
New directory organization
Diffstat (limited to 'lib')
-rw-r--r--lib/poolRequests.js206
-rw-r--r--lib/webTorrentNode.js160
-rw-r--r--lib/webtorrent.js91
3 files changed, 457 insertions, 0 deletions
diff --git a/lib/poolRequests.js b/lib/poolRequests.js
new file mode 100644
index 000000000..9c7f3238b
--- /dev/null
+++ b/lib/poolRequests.js
@@ -0,0 +1,206 @@
1;(function () {
2 'use strict'
3
4 var async = require('async')
5
6 var constants = require('../initializers/constants')
7 var logger = require('../helpers/logger')
8 var database = require('../initializers/database')
9 var pluck = require('lodash-node/compat/collection/pluck')
10 var PoolRequestsDB = database.PoolRequestsDB
11 var PodsDB = database.PodsDB
12 var utils = require('../helpers/utils')
13 var VideosDB = database.VideosDB
14
15 var poolRequests = {}
16
17 // ----------- Private -----------
18 var timer = null
19
20 function removePoolRequestsFromDB (ids) {
21 PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) {
22 if (err) {
23 logger.error('Cannot remove requests from the pool requests database.', { error: err })
24 return
25 }
26
27 logger.info('Pool requests flushed.')
28 })
29 }
30
31 function makePoolRequests () {
32 logger.info('Making pool requests to friends.')
33
34 PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, function (err, pool_requests) {
35 if (err) throw err
36
37 if (pool_requests.length === 0) return
38
39 var requests = {
40 add: {
41 ids: [],
42 requests: []
43 },
44 remove: {
45 ids: [],
46 requests: []
47 }
48 }
49
50 async.each(pool_requests, function (pool_request, callback_each) {
51 if (pool_request.type === 'add') {
52 requests.add.requests.push(pool_request.request)
53 requests.add.ids.push(pool_request._id)
54 } else if (pool_request.type === 'remove') {
55 requests.remove.requests.push(pool_request.request)
56 requests.remove.ids.push(pool_request._id)
57 } else {
58 throw new Error('Unkown pool request type.')
59 }
60
61 callback_each()
62 }, function () {
63 // Send the add requests
64 if (requests.add.requests.length !== 0) {
65 makePoolRequest('add', requests.add.requests, function (err) {
66 if (err) logger.error('Errors when sent add pool requests.', { error: err })
67
68 removePoolRequestsFromDB(requests.add.ids)
69 })
70 }
71
72 // Send the remove requests
73 if (requests.remove.requests.length !== 0) {
74 makePoolRequest('remove', requests.remove.requests, function (err) {
75 if (err) logger.error('Errors when sent remove pool requests.', { error: err })
76
77 removePoolRequestsFromDB(requests.remove.ids)
78 })
79 }
80 })
81 })
82 }
83
84 function updatePodsScore (good_pods, bad_pods) {
85 logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
86
87 PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: constants.PODS_SCORE.BONUS } }, { multi: true }).exec()
88 PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: constants.PODS_SCORE.MALUS } }, { multi: true }, function (err) {
89 if (err) throw err
90 removeBadPods()
91 })
92 }
93
94 function removeBadPods () {
95 PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) {
96 if (err) throw err
97
98 if (pods.length === 0) return
99
100 var urls = pluck(pods, 'url')
101 var ids = pluck(pods, '_id')
102
103 VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) {
104 if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err })
105 var videos_removed = r.result.n
106 logger.info('Removed %d videos.', videos_removed)
107
108 PodsDB.remove({ _id: { $in: ids } }, function (err, r) {
109 if (err) logger.error('Cannot remove bad pods.', { error: err })
110
111 var pods_removed = r.result.n
112 logger.info('Removed %d pods.', pods_removed)
113 })
114 })
115 })
116 }
117
118 function makePoolRequest (type, requests, callback) {
119 if (!callback) callback = function () {}
120
121 PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) {
122 if (err) throw err
123
124 var params = {
125 encrypt: true,
126 sign: true,
127 method: 'POST',
128 path: null,
129 data: requests
130 }
131
132 if (type === 'add') {
133 params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
134 } else if (type === 'remove') {
135 params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
136 } else {
137 throw new Error('Unkown pool request type.')
138 }
139
140 var bad_pods = []
141 var good_pods = []
142
143 utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
144
145 function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
146 if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
147 bad_pods.push(pod._id)
148 logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
149 } else {
150 good_pods.push(pod._id)
151 }
152
153 return callback_each_pod_finished()
154 }
155
156 function callbackAllPodsFinished (err) {
157 if (err) return callback(err)
158
159 updatePodsScore(good_pods, bad_pods)
160 callback(null)
161 }
162 })
163 }
164
165 // ----------- Public -----------
166 poolRequests.activate = function () {
167 logger.info('Pool requests activated.')
168 timer = setInterval(makePoolRequests, constants.INTERVAL)
169 }
170
171 poolRequests.addToPoolRequests = function (id, type, request) {
172 logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
173
174 PoolRequestsDB.findOne({ id: id }, function (err, entity) {
175 if (err) logger.error(err)
176
177 if (entity) {
178 if (entity.type === type) {
179 logger.error(new Error('Cannot insert two same requests.'))
180 return
181 }
182
183 // Remove the request of the other type
184 PoolRequestsDB.remove({ id: id }, function (err) {
185 if (err) logger.error(err)
186 })
187 } else {
188 PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) {
189 if (err) logger.error(err)
190 })
191 }
192 })
193 }
194
195 poolRequests.deactivate = function () {
196 logger.info('Pool requests deactivated.')
197 clearInterval(timer)
198 }
199
200 poolRequests.forceSend = function () {
201 logger.info('Force pool requests sending.')
202 makePoolRequests()
203 }
204
205 module.exports = poolRequests
206})()
diff --git a/lib/webTorrentNode.js b/lib/webTorrentNode.js
new file mode 100644
index 000000000..8827c68c5
--- /dev/null
+++ b/lib/webTorrentNode.js
@@ -0,0 +1,160 @@
1;(function () {
2 'use strict'
3
4 var config = require('config')
5 var ipc = require('node-ipc')
6 var pathUtils = require('path')
7 var spawn = require('electron-spawn')
8
9 var logger = require('../helpers/logger')
10
11 var host = config.get('webserver.host')
12 var port = config.get('webserver.port')
13
14 var nodeKey = 'webtorrentnode' + port
15 var processKey = 'webtorrent' + port
16
17 ipc.config.silent = true
18 ipc.config.id = nodeKey
19
20 var webtorrentnode = {}
21
22 // Useful for beautiful tests
23 webtorrentnode.silent = false
24
25 // Useful to kill it
26 webtorrentnode.app = null
27
28 webtorrentnode.create = function (options, callback) {
29 if (typeof options === 'function') {
30 callback = options
31 options = {}
32 }
33
34 // Override options
35 if (options.host) host = options.host
36 if (options.port) {
37 port = options.port
38 nodeKey = 'webtorrentnode' + port
39 processKey = 'webtorrent' + port
40 ipc.config.id = nodeKey
41 }
42
43 ipc.serve(function () {
44 if (!webtorrentnode.silent) logger.info('IPC server ready.')
45
46 // Run a timeout of 30s after which we exit the process
47 var timeout_webtorrent_process = setTimeout(function () {
48 logger.error('Timeout : cannot run the webtorrent process. Please ensure you have electron-prebuilt npm package installed with xvfb-run.')
49 process.exit()
50 }, 30000)
51
52 ipc.server.on(processKey + '.ready', function () {
53 if (!webtorrentnode.silent) logger.info('Webtorrent process ready.')
54 clearTimeout(timeout_webtorrent_process)
55 callback()
56 })
57
58 ipc.server.on(processKey + '.exception', function (data) {
59 logger.error('Received exception error from webtorrent process.', { exception: data.exception })
60 process.exit()
61 })
62
63 var webtorrent_process = spawn(__dirname + '/webtorrent.js', host, port, { detached: true })
64 webtorrent_process.stderr.on('data', function (data) {
65 // logger.debug('Webtorrent process stderr: ', data.toString())
66 })
67
68 webtorrent_process.stdout.on('data', function (data) {
69 // logger.debug('Webtorrent process:', data.toString())
70 })
71
72 webtorrentnode.app = webtorrent_process
73 })
74
75 ipc.server.start()
76 }
77
78 webtorrentnode.seed = function (path, callback) {
79 var extension = pathUtils.extname(path)
80 var basename = pathUtils.basename(path, extension)
81 var data = {
82 _id: basename,
83 args: {
84 path: path
85 }
86 }
87
88 if (!webtorrentnode.silent) logger.debug('Node wants to seed %s.', data._id)
89
90 // Finish signal
91 var event_key = nodeKey + '.seedDone.' + data._id
92 ipc.server.on(event_key, function listener (received) {
93 if (!webtorrentnode.silent) logger.debug('Process seeded torrent %s.', received.magnetUri)
94
95 // This is a fake object, we just use the magnetUri in this project
96 var torrent = {
97 magnetURI: received.magnetUri
98 }
99
100 ipc.server.off(event_key)
101 callback(torrent)
102 })
103
104 ipc.server.broadcast(processKey + '.seed', data)
105 }
106
107 webtorrentnode.add = function (magnetUri, callback) {
108 var data = {
109 _id: magnetUri,
110 args: {
111 magnetUri: magnetUri
112 }
113 }
114
115 if (!webtorrentnode.silent) logger.debug('Node wants to add ' + data._id)
116
117 // Finish signal
118 var event_key = nodeKey + '.addDone.' + data._id
119 ipc.server.on(event_key, function (received) {
120 if (!webtorrentnode.silent) logger.debug('Process added torrent.')
121
122 // This is a fake object, we just use the magnetUri in this project
123 var torrent = {
124 files: received.files
125 }
126
127 ipc.server.off(event_key)
128 callback(torrent)
129 })
130
131 ipc.server.broadcast(processKey + '.add', data)
132 }
133
134 webtorrentnode.remove = function (magnetUri, callback) {
135 var data = {
136 _id: magnetUri,
137 args: {
138 magnetUri: magnetUri
139 }
140 }
141
142 if (!webtorrentnode.silent) logger.debug('Node wants to stop seeding %s.', data._id)
143
144 // Finish signal
145 var event_key = nodeKey + '.removeDone.' + data._id
146 ipc.server.on(event_key, function (received) {
147 if (!webtorrentnode.silent) logger.debug('Process removed torrent %s.', data._id)
148
149 var err = null
150 if (received.err) err = received.err
151
152 ipc.server.off(event_key)
153 callback(err)
154 })
155
156 ipc.server.broadcast(processKey + '.remove', data)
157 }
158
159 module.exports = webtorrentnode
160})()
diff --git a/lib/webtorrent.js b/lib/webtorrent.js
new file mode 100644
index 000000000..b72bc500d
--- /dev/null
+++ b/lib/webtorrent.js
@@ -0,0 +1,91 @@
1;(function () {
2 'use strict'
3
4 module.exports = function (args) {
5 var WebTorrent = require('webtorrent')
6 var ipc = require('node-ipc')
7
8 if (args.length !== 3) {
9 console.log('Wrong arguments number: ' + args.length + '/3')
10 process.exit(-1)
11 }
12
13 var host = args[1]
14 var port = args[2]
15 var nodeKey = 'webtorrentnode' + port
16 var processKey = 'webtorrent' + port
17
18 ipc.config.silent = true
19 ipc.config.id = processKey
20
21 if (host === 'client' && port === '1') global.WEBTORRENT_ANNOUNCE = []
22 else global.WEBTORRENT_ANNOUNCE = 'ws://' + host + ':' + port + '/tracker/socket'
23 var wt = new WebTorrent({ dht: false })
24
25 function seed (data) {
26 var args = data.args
27 var path = args.path
28 var _id = data._id
29
30 wt.seed(path, { announceList: '' }, function (torrent) {
31 var to_send = {
32 magnetUri: torrent.magnetURI
33 }
34
35 ipc.of[nodeKey].emit(nodeKey + '.seedDone.' + _id, to_send)
36 })
37 }
38
39 function add (data) {
40 var args = data.args
41 var magnetUri = args.magnetUri
42 var _id = data._id
43
44 wt.add(magnetUri, function (torrent) {
45 var to_send = {
46 files: []
47 }
48
49 torrent.files.forEach(function (file) {
50 to_send.files.push({ path: file.path })
51 })
52
53 ipc.of[nodeKey].emit(nodeKey + '.addDone.' + _id, to_send)
54 })
55 }
56
57 function remove (data) {
58 var args = data.args
59 var magnetUri = args.magnetUri
60 var _id = data._id
61
62 try {
63 wt.remove(magnetUri, callback)
64 } catch (err) {
65 console.log('Cannot remove the torrent from WebTorrent', { err: err })
66 return callback(null)
67 }
68
69 function callback () {
70 var to_send = {}
71 ipc.of[nodeKey].emit(nodeKey + '.removeDone.' + _id, to_send)
72 }
73 }
74
75 console.log('Configuration: ' + host + ':' + port)
76 console.log('Connecting to IPC...')
77
78 ipc.connectTo(nodeKey, function () {
79 ipc.of[nodeKey].on(processKey + '.seed', seed)
80 ipc.of[nodeKey].on(processKey + '.add', add)
81 ipc.of[nodeKey].on(processKey + '.remove', remove)
82
83 ipc.of[nodeKey].emit(processKey + '.ready')
84 console.log('Ready.')
85 })
86
87 process.on('uncaughtException', function (e) {
88 ipc.of[nodeKey].emit(processKey + '.exception', { exception: e })
89 })
90 }
91})()