1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
import * as Sequelize from 'sequelize'
import { createClient } from 'redis'
import { CONFIG } from '../constants'
import { JobQueue } from '../../lib/job-queue'
import { initDatabaseModels } from '../database'
async function up (utils: {
transaction: Sequelize.Transaction
queryInterface: Sequelize.QueryInterface
sequelize: Sequelize.Sequelize
}): Promise<any> {
await initDatabaseModels(false)
return new Promise((res, rej) => {
const client = createClient({
host: CONFIG.REDIS.HOSTNAME,
port: CONFIG.REDIS.PORT,
db: CONFIG.REDIS.DB
})
const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST
client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => {
if (err) return rej(err)
const jobPromises = jobStrings
.map(s => s.split('|'))
.map(([ , jobId ]) => {
return new Promise((res, rej) => {
client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => {
if (err) return rej(err)
try {
const parsedData = JSON.parse(job.data)
return res({ type: job.type, payload: parsedData })
} catch (err) {
console.error('Cannot parse data %s.', job.data)
return res(null)
}
})
})
})
JobQueue.Instance.init()
.then(() => Promise.all(jobPromises))
.then((jobs: any) => {
const createJobPromises = jobs
.filter(job => job !== null)
.map(job => JobQueue.Instance.createJob(job))
return Promise.all(createJobPromises)
})
.then(() => res())
})
})
}
function down (options) {
throw new Error('Not implemented.')
}
export { up, down }
|