amqplib

The solution for “amqplib” can be found here. The following code will assist you in solving the problem.

import ‘dotenv/config’
import amqplib, { Options, Connection, Channel, ConsumeMessage } from ‘amqplib’
import os from ‘os’
import EventEmitter from ‘events’

import { HydeLivingError } from ‘@helpers/helper.error’

export class Rabbitmq {
private connectionOptions: Options.Connect
private optionsPublish: Options.Publish
private assertQueueOptions: Options.AssertQueue
private emitter: InstanceType = new EventEmitter({ captureRejections: true })

private async connection(): Promise {
try {
this.connectionOptions = {
protocol: process.env.RABBITMQ_PROTOCOL,
vhost: process.env.RABBITMQ_HOST,
username: process.env.RABBITMQ_USERNAME,
password: process.env.RABBITMQ_PASSWORD
}

const connect: Connection = await amqplib.connect(this.connectionOptions)
if (connect instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)

const channel: Channel = await connect.createChannel()
channel.assertExchange(‘exchage’, ‘topic’, { autoDelete: false, durable: true })

return connect
} catch (e: any) {
return new HydeLivingError(e.message)
}
}

async publisher(prefix: string, key: string, data: Record | Record[]): Promise {
try {
const broker: Connection = await this.connection()
if (broker instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)

this.assertQueueOptions = {
maxPriority: 10,
autoDelete: true,
durable: true,
messageTtl: Number(new Date().getTime() + 3 * 60 * 1000), // 3 minutes,
expires: Number(new Date().getTime() + 60 * 60 * 1000) // 1 hours
}

this.optionsPublish = {
persistent: true,
priority: os.cpus().length,
timestamp: Date.now()
}

const channel: Channel = await broker.createChannel()
if (channel instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)

await channel.assertQueue(`${prefix}_${key}`, this.assertQueueOptions)
const publish: boolean = channel.sendToQueue(`${prefix}_${key}`, Buffer.from(JSON.stringify(data)), this.optionsPublish)

if (publish == false) {
await channel.close()
throw new HydeLivingError(‘Send message to queue failed’)
}
return true
} catch (e: any) {
return new HydeLivingError(e.message)
}
}

private async consumer(prefix: string, key: string): Promise {
try {
const broker: Connection = await this.connection()
if (broker instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)

const channel: Channel = await broker.createChannel()
if (channel instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)

channel.consume(`${prefix}_${key}`, (msg: ConsumeMessage): void => {
this.emitter.emit(‘data’, msg.content.toString())
channel.ack(msg)
})
} catch (e: any) {
return new HydeLivingError(e.message)
}
}

async subscriber(prefix: string, key: string): Promise {
try {
const consumer: Channel = await this.consumer(prefix, key)
if (consumer instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)

return new Promise((resolve, _reject) => {
this.emitter.on(‘data’, (data: any) => resolve(JSON.parse(JSON.stringify(data))))
})
} catch (e: any) {
return new HydeLivingError(e.message)
}
}
}

Thank you for using DeclareCode; We hope you were able to resolve the issue.

More questions on [categories-list]

0
inline scripts encapsulated in