+ createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
+ return this.flowProducer.add({
+ ...this.buildJobFlowOption(parent),
+
+ children: children.map(c => this.buildJobFlowOption(c))
+ })
+ }
+
+ private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
+ return {
+ name: 'job',
+ data: job.payload,
+ queueName: job.type,
+ opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
+ }
+ }
+
+ private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
+ return {
+ backoff: { delay: 60 * 1000, type: 'exponential' },
+ attempts: JOB_ATTEMPTS[type],
+ priority: options.priority,
+ delay: options.delay,
+
+ ...this.buildJobRemovalOptions(type)
+ }
+ }
+
+ // ---------------------------------------------------------------------------
+
+ async listForApi (options: {
+ state?: JobState
+ start: number
+ count: number
+ asc?: boolean
+ jobType: JobType
+ }): Promise<Job[]> {
+ const { state, start, count, asc, jobType } = options
+
+ const states = this.buildStateFilter(state)
+ const filteredJobTypes = this.buildTypeFilter(jobType)
+
+ let results: Job[] = []
+
+ for (const jobType of filteredJobTypes) {
+ const queue: Queue = this.queues[jobType]