aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/initializers
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-07-10 17:02:20 +0200
committerChocobozzz <me@florianbigard.com>2018-07-11 14:00:17 +0200
commit94831479f5facff9469540a3d49dd347b88bdf5a (patch)
tree4e8990fc4fded913952c732b6466b15fc52ab06d /server/initializers
parent2cdf27bae6acfaa0b99bb07555edc57f48b8bc43 (diff)
downloadPeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.gz
PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.zst
PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.zip
Migrate to bull
Diffstat (limited to 'server/initializers')
-rw-r--r--server/initializers/constants.ts2
-rw-r--r--server/initializers/migrations/0230-kue-to-bull.ts63
2 files changed, 64 insertions, 1 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 24b7e2655..6173e1298 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -14,7 +14,7 @@ let config: IConfig = require('config')
14 14
15// --------------------------------------------------------------------------- 15// ---------------------------------------------------------------------------
16 16
17const LAST_MIGRATION_VERSION = 225 17const LAST_MIGRATION_VERSION = 230
18 18
19// --------------------------------------------------------------------------- 19// ---------------------------------------------------------------------------
20 20
diff --git a/server/initializers/migrations/0230-kue-to-bull.ts b/server/initializers/migrations/0230-kue-to-bull.ts
new file mode 100644
index 000000000..5fad87a61
--- /dev/null
+++ b/server/initializers/migrations/0230-kue-to-bull.ts
@@ -0,0 +1,63 @@
1import * as Sequelize from 'sequelize'
2import { createClient } from 'redis'
3import { CONFIG } from '../constants'
4import { JobQueue } from '../../lib/job-queue'
5import { initDatabaseModels } from '../database'
6
7async function up (utils: {
8 transaction: Sequelize.Transaction
9 queryInterface: Sequelize.QueryInterface
10 sequelize: Sequelize.Sequelize
11}): Promise<any> {
12 await initDatabaseModels(false)
13
14 return new Promise((res, rej) => {
15 const client = createClient({
16 host: CONFIG.REDIS.HOSTNAME,
17 port: CONFIG.REDIS.PORT,
18 db: CONFIG.REDIS.DB
19 })
20
21 const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST
22
23 client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => {
24 if (err) return rej(err)
25
26 const jobPromises = jobStrings
27 .map(s => s.split('|'))
28 .map(([ , jobId ]) => {
29 return new Promise((res, rej) => {
30 client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => {
31 if (err) return rej(err)
32
33 try {
34 const parsedData = JSON.parse(job.data)
35
36 return res({ type: job.type, payload: parsedData })
37 } catch (err) {
38 console.error('Cannot parse data %s.', job.data)
39 return res(null)
40 }
41 })
42 })
43 })
44
45 JobQueue.Instance.init()
46 .then(() => Promise.all(jobPromises))
47 .then((jobs: any) => {
48 const createJobPromises = jobs
49 .filter(job => job !== null)
50 .map(job => JobQueue.Instance.createJob(job))
51
52 return Promise.all(createJobPromises)
53 })
54 .then(() => res())
55 })
56 })
57}
58
59function down (options) {
60 throw new Error('Not implemented.')
61}
62
63export { up, down }