diff options
author | Chocobozzz <me@florianbigard.com> | 2018-07-10 17:02:20 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-07-11 14:00:17 +0200 |
commit | 94831479f5facff9469540a3d49dd347b88bdf5a (patch) | |
tree | 4e8990fc4fded913952c732b6466b15fc52ab06d /server/initializers/migrations/0230-kue-to-bull.ts | |
parent | 2cdf27bae6acfaa0b99bb07555edc57f48b8bc43 (diff) | |
download | PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.gz PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.zst PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.zip |
Migrate to bull
Diffstat (limited to 'server/initializers/migrations/0230-kue-to-bull.ts')
-rw-r--r-- | server/initializers/migrations/0230-kue-to-bull.ts | 63 |
1 files changed, 63 insertions, 0 deletions
diff --git a/server/initializers/migrations/0230-kue-to-bull.ts b/server/initializers/migrations/0230-kue-to-bull.ts new file mode 100644 index 000000000..5fad87a61 --- /dev/null +++ b/server/initializers/migrations/0230-kue-to-bull.ts | |||
@@ -0,0 +1,63 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | import { createClient } from 'redis' | ||
3 | import { CONFIG } from '../constants' | ||
4 | import { JobQueue } from '../../lib/job-queue' | ||
5 | import { initDatabaseModels } from '../database' | ||
6 | |||
7 | async function up (utils: { | ||
8 | transaction: Sequelize.Transaction | ||
9 | queryInterface: Sequelize.QueryInterface | ||
10 | sequelize: Sequelize.Sequelize | ||
11 | }): Promise<any> { | ||
12 | await initDatabaseModels(false) | ||
13 | |||
14 | return new Promise((res, rej) => { | ||
15 | const client = createClient({ | ||
16 | host: CONFIG.REDIS.HOSTNAME, | ||
17 | port: CONFIG.REDIS.PORT, | ||
18 | db: CONFIG.REDIS.DB | ||
19 | }) | ||
20 | |||
21 | const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST | ||
22 | |||
23 | client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => { | ||
24 | if (err) return rej(err) | ||
25 | |||
26 | const jobPromises = jobStrings | ||
27 | .map(s => s.split('|')) | ||
28 | .map(([ , jobId ]) => { | ||
29 | return new Promise((res, rej) => { | ||
30 | client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => { | ||
31 | if (err) return rej(err) | ||
32 | |||
33 | try { | ||
34 | const parsedData = JSON.parse(job.data) | ||
35 | |||
36 | return res({ type: job.type, payload: parsedData }) | ||
37 | } catch (err) { | ||
38 | console.error('Cannot parse data %s.', job.data) | ||
39 | return res(null) | ||
40 | } | ||
41 | }) | ||
42 | }) | ||
43 | }) | ||
44 | |||
45 | JobQueue.Instance.init() | ||
46 | .then(() => Promise.all(jobPromises)) | ||
47 | .then((jobs: any) => { | ||
48 | const createJobPromises = jobs | ||
49 | .filter(job => job !== null) | ||
50 | .map(job => JobQueue.Instance.createJob(job)) | ||
51 | |||
52 | return Promise.all(createJobPromises) | ||
53 | }) | ||
54 | .then(() => res()) | ||
55 | }) | ||
56 | }) | ||
57 | } | ||
58 | |||
59 | function down (options) { | ||
60 | throw new Error('Not implemented.') | ||
61 | } | ||
62 | |||
63 | export { up, down } | ||