amqplib
The solution for “amqplib” can be found here. The following code will assist you in solving the problem.
import ‘dotenv/config’
import amqplib, { Options, Connection, Channel, ConsumeMessage } from ‘amqplib’
import os from ‘os’
import EventEmitter from ‘events’
import { HydeLivingError } from ‘@helpers/helper.error’
export class Rabbitmq {
private connectionOptions: Options.Connect
private optionsPublish: Options.Publish
private assertQueueOptions: Options.AssertQueue
private emitter: InstanceType = new EventEmitter({ captureRejections: true })
private async connection(): Promise {
try {
this.connectionOptions = {
protocol: process.env.RABBITMQ_PROTOCOL,
vhost: process.env.RABBITMQ_HOST,
username: process.env.RABBITMQ_USERNAME,
password: process.env.RABBITMQ_PASSWORD
}
const connect: Connection = await amqplib.connect(this.connectionOptions)
if (connect instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)
const channel: Channel = await connect.createChannel()
channel.assertExchange(‘exchage’, ‘topic’, { autoDelete: false, durable: true })
return connect
} catch (e: any) {
return new HydeLivingError(e.message)
}
}
async publisher(prefix: string, key: string, data: Record | Record[]): Promise {
try {
const broker: Connection = await this.connection()
if (broker instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)
this.assertQueueOptions = {
maxPriority: 10,
autoDelete: true,
durable: true,
messageTtl: Number(new Date().getTime() + 3 * 60 * 1000), // 3 minutes,
expires: Number(new Date().getTime() + 60 * 60 * 1000) // 1 hours
}
this.optionsPublish = {
persistent: true,
priority: os.cpus().length,
timestamp: Date.now()
}
const channel: Channel = await broker.createChannel()
if (channel instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)
await channel.assertQueue(`${prefix}_${key}`, this.assertQueueOptions)
const publish: boolean = channel.sendToQueue(`${prefix}_${key}`, Buffer.from(JSON.stringify(data)), this.optionsPublish)
if (publish == false) {
await channel.close()
throw new HydeLivingError(‘Send message to queue failed’)
}
return true
} catch (e: any) {
return new HydeLivingError(e.message)
}
}
private async consumer(prefix: string, key: string): Promise {
try {
const broker: Connection = await this.connection()
if (broker instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)
const channel: Channel = await broker.createChannel()
if (channel instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)
channel.consume(`${prefix}_${key}`, (msg: ConsumeMessage): void => {
this.emitter.emit(‘data’, msg.content.toString())
channel.ack(msg)
})
} catch (e: any) {
return new HydeLivingError(e.message)
}
}
async subscriber(prefix: string, key: string): Promise {
try {
const consumer: Channel = await this.consumer(prefix, key)
if (consumer instanceof Error) throw new HydeLivingError(‘rabbitmq is not connected’)
return new Promise((resolve, _reject) => {
this.emitter.on(‘data’, (data: any) => resolve(JSON.parse(JSON.stringify(data))))
})
} catch (e: any) {
return new HydeLivingError(e.message)
}
}
}
More questions on [categories-list]
- tss from gene granges
- ixl ansers ixl ansers
- get coin prices node-binance
- how to setup netflix workflow worker
- spritesheets in pyqt spritesheets in pyqt
- cahokia mounds pictures cahokia mounds pictures cahokia mounds pictures
- python 2 decimal places how to get decimal part of a double in python set number of decimals python
- how to find nuber of tweets per day using python how to find nuber of tweets per day using python how to find nuber of tweets per day using python how to find nuber of tweets per day using python how to find nuber of tweets per day using python
- haskell get specific elements of a String
- vb net code snippets for storing password
- error TS2307: Cannot find module ‘@ngx-meta/core’.
- inline scripts encapsulated in tags