]>
Commit | Line | Data |
---|---|---|
94831479 C |
1 | import * as Sequelize from 'sequelize' |
2 | import { createClient } from 'redis' | |
3 | import { CONFIG } from '../constants' | |
4 | import { JobQueue } from '../../lib/job-queue' | |
5 | import { initDatabaseModels } from '../database' | |
6 | ||
7 | async function up (utils: { | |
8 | transaction: Sequelize.Transaction | |
9 | queryInterface: Sequelize.QueryInterface | |
10 | sequelize: Sequelize.Sequelize | |
11 | }): Promise<any> { | |
12 | await initDatabaseModels(false) | |
13 | ||
14 | return new Promise((res, rej) => { | |
15 | const client = createClient({ | |
16 | host: CONFIG.REDIS.HOSTNAME, | |
17 | port: CONFIG.REDIS.PORT, | |
18 | db: CONFIG.REDIS.DB | |
19 | }) | |
20 | ||
21 | const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST | |
22 | ||
23 | client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => { | |
24 | if (err) return rej(err) | |
25 | ||
26 | const jobPromises = jobStrings | |
27 | .map(s => s.split('|')) | |
28 | .map(([ , jobId ]) => { | |
29 | return new Promise((res, rej) => { | |
30 | client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => { | |
31 | if (err) return rej(err) | |
32 | ||
33 | try { | |
34 | const parsedData = JSON.parse(job.data) | |
35 | ||
36 | return res({ type: job.type, payload: parsedData }) | |
37 | } catch (err) { | |
38 | console.error('Cannot parse data %s.', job.data) | |
39 | return res(null) | |
40 | } | |
41 | }) | |
42 | }) | |
43 | }) | |
44 | ||
45 | JobQueue.Instance.init() | |
46 | .then(() => Promise.all(jobPromises)) | |
47 | .then((jobs: any) => { | |
48 | const createJobPromises = jobs | |
49 | .filter(job => job !== null) | |
50 | .map(job => JobQueue.Instance.createJob(job)) | |
51 | ||
52 | return Promise.all(createJobPromises) | |
53 | }) | |
54 | .then(() => res()) | |
55 | }) | |
56 | }) | |
57 | } | |
58 | ||
59 | function down (options) { | |
60 | throw new Error('Not implemented.') | |
61 | } | |
62 | ||
63 | export { up, down } |