From 94831479f5facff9469540a3d49dd347b88bdf5a Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 10 Jul 2018 17:02:20 +0200 Subject: Migrate to bull --- server/initializers/migrations/0230-kue-to-bull.ts | 63 ++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 server/initializers/migrations/0230-kue-to-bull.ts (limited to 'server/initializers/migrations') diff --git a/server/initializers/migrations/0230-kue-to-bull.ts b/server/initializers/migrations/0230-kue-to-bull.ts new file mode 100644 index 000000000..5fad87a61 --- /dev/null +++ b/server/initializers/migrations/0230-kue-to-bull.ts @@ -0,0 +1,63 @@ +import * as Sequelize from 'sequelize' +import { createClient } from 'redis' +import { CONFIG } from '../constants' +import { JobQueue } from '../../lib/job-queue' +import { initDatabaseModels } from '../database' + +async function up (utils: { + transaction: Sequelize.Transaction + queryInterface: Sequelize.QueryInterface + sequelize: Sequelize.Sequelize +}): Promise { + await initDatabaseModels(false) + + return new Promise((res, rej) => { + const client = createClient({ + host: CONFIG.REDIS.HOSTNAME, + port: CONFIG.REDIS.PORT, + db: CONFIG.REDIS.DB + }) + + const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST + + client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => { + if (err) return rej(err) + + const jobPromises = jobStrings + .map(s => s.split('|')) + .map(([ , jobId ]) => { + return new Promise((res, rej) => { + client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => { + if (err) return rej(err) + + try { + const parsedData = JSON.parse(job.data) + + return res({ type: job.type, payload: parsedData }) + } catch (err) { + console.error('Cannot parse data %s.', job.data) + return res(null) + } + }) + }) + }) + + JobQueue.Instance.init() + .then(() => Promise.all(jobPromises)) + .then((jobs: any) => { + const createJobPromises = jobs + .filter(job => job !== null) + .map(job => JobQueue.Instance.createJob(job)) + + return Promise.all(createJobPromises) + }) + .then(() => res()) + }) + }) +} + +function down (options) { + throw new Error('Not implemented.') +} + +export { up, down } -- cgit v1.2.3