bullmq

The solution for “bullmq” can be found here. The following code will assist you in solving the problem.

import ‘dotenv/config’
import { Worker, Queue, WorkerOptions, QueueOptions, Job, QueueEvents, QueueEventsOptions } from ‘bullmq’
import { EventEmitter } from ‘events’
import IORedis from ‘ioredis’
import os from ‘os’
import { assert } from ‘is-any-type’

import { HydeLivingError } from ‘@helpers/helper.error’

export type BullJob = Job
export class BullQueue {
private workerOptions: WorkerOptions
private queueOptions: QueueOptions
private eventOptions: QueueEventsOptions
private emitter: InstanceType = new EventEmitter({ captureRejections: true })
private redisConnection: InstanceType

constructor(db: number) {
this.redisConnection = new IORedis({
host: process.env.REDIS_HOST as string,
port: parseInt(process.env.REDIS_PORT as any),
password: (process.env.REDIS_PASSWORD as string) || ”,
maxRetriesPerRequest: null,
db: db
})
}

async publisher(key: string, value: Record | Record[]): Promise {
try {
if ((assert.isObject(value as any) || assert.isArray(value as any)) != true)
throw new HydeLivingError(‘value must be an array or object’)

this.queueOptions = {
connection: this.redisConnection,
defaultJobOptions: {
removeOnComplete: true,
sizeLimit: 5242880,
timeout: 1000 * 60,
attempts: 5,
backoff: {
type: ‘exponential’,
delay: 1000 * 3
},
priority: os.cpus().length
}
}

const queue: InstanceType = new Queue(key, this.queueOptions)
await queue.add(`hydeliving:${key}:${Date.now()}`, value)

return queue
} catch (e: any) {
return Promise.reject(new HydeLivingError(e.message || ‘Publisher crash add value to queue failed’))
}
}

private notification(key: string): InstanceType {
this.eventOptions = {
connection: this.redisConnection
}
const events: InstanceType = new QueueEvents(key, this.eventOptions)

events.on(‘waiting’, (args: { jobId: string }) => console.info(`jobs ${args.jobId} is waiting`))
events.on(‘progress’, (args: { jobId: string }) => console.info(`jobs ${args.jobId} in progress`))
events.on(‘completed’, (args: { jobId: string }) => console.log(`jobs ${args.jobId}} is completed`))
events.on(‘removed’, (args: { jobId: string }) => console.info(`jobs ${args.jobId} is removed`))
events.on(‘failed’, (args: { jobId: string }) => console.error(`jobs ${args.jobId} is failed`))
events.on(‘error’, (_err: globalThis.Error) => console.error(`jobs ${events.name} is error`))

return events
}

private worker(key: string): InstanceType {
this.workerOptions = {
connection: this.redisConnection,
concurrency: os.cpus().length,
skipDelayCheck: true,
runRetryDelay: 1000 * 3,
settings: {
backoffStrategies: {
custom(attemptsMade: number) {
return Math.abs(attemptsMade * 1000)
}
}
}
}

const worker: InstanceType = new Worker(
key,
async (job: Job): Promise => {
if (await job.isCompleted()) await job.remove()
if (await job.isFailed()) await job.retry(‘failed’)
if (job.data) this.emitter.emit(‘data’, JSON.stringify(job.data))
return job
},
this.workerOptions
)

worker.on(‘active’, async () => await worker.resume())
worker.on(‘paused’, async () => await worker.resume())
worker.on(‘progress’, async () => await worker.resume())

return worker
}

async subscriber(key: string): Promise {
/**
* @description initalize worker and worker notification
*/
await this.worker(key)
await this.notification(key)

/**
* @description listening data from worker
*/
return new Promise((resolve, _reject) => {
this.emitter.on(‘data’, (data: any) => resolve(JSON.parse(data)))
})
}
}

Thank you for using DeclareCode; We hope you were able to resolve the issue.

More questions on [categories-list]

0
inline scripts encapsulated in