kafkajs kafkajs

The solution for “kafkajs kafkajs” can be found here. The following code will assist you in solving the problem.

const { Kafka } = require(‘kafkajs’)

const kafka = new Kafka({
clientId: ‘node-app’,
brokers: [‘localhost:9092’],
requestTimeout: 3000,
connectionTimeout: 6000,
ssl: false
})

exports.producer = async (eventName, data) => {
const producer = kafka.producer()

// event kafka producer notification
await producer.on(‘producer.connect’, () => console.info(‘producer kafka connected’))
await producer.on(‘producer.disconnect’, () => console.error(‘producer kafka disconnect’))
await producer.on(‘producer.network.request_timeout’, () => console.error(‘producer kafka network timeout’))

await producer.connect()
await producer.send({
topic: eventName,
messages: data,
acks: true,
compression: 1
})

await producer.disconnect()
}

exports.consumer = async (eventName, callback) => {
const consumer = kafka.consumer({
groupId: ‘test-group’,
maxBytes: 1048576000, // 1GB
maxBytesPerPartition: 1048576000, // 1GB
sessionTimeout: 60000,
heartbeatInterval: 6000,
rebalanceTimeout: 30000
})

// event kafka consumer notification
await consumer.on(‘consumer.connect’, () => console.info(‘consumer kafka connected’))
await consumer.on(‘consumer.disconnect’, () => console.error(‘consumer kafka disconnect’))
await consumer.on(‘consumer.crash’, () => console.error(‘consumer kafka crash’))
await consumer.on(‘consumer.stop’, () => console.error(‘consumer kafka stop’))
await consumer.on(‘consumer.network.request_timeout’, () => console.error(‘consumer kafka network timeout’))

await consumer.connect()
await consumer.subscribe({ topic: eventName, fromBeginning: true })
await consumer.run({ autoCommit: true, eachMessage: callback })
}// check here include example usage
// https://pastebin.com/Wu5hG6WK

import {
Consumer,
ConsumerConfig,
ConsumerSubscribeTopics,
EachBatchHandler,
EachMessagePayload,
Kafka as KafkaJs,
KafkaConfig,
Producer,
ProducerBatch,
ProducerConfig,
ProducerRecord,
Transaction
} from ‘kafkajs’

interface ConsumerRunConfig {
autoCommit?: boolean
autoCommitInterval?: number | null
autoCommitThreshold?: number | null
eachBatchAutoResolve?: boolean
partitionsConsumedConcurrently?: number
eachBatch?: EachBatchHandler
}

interface SubscriberPayload {
subscribeConfig: ConsumerSubscribeTopics
consumerConfig: ConsumerConfig
runConfig: ConsumerRunConfig
}

interface PublisherPayload {
type: ‘single’ | ‘multiple’
sendConfig: ProducerRecord | ProducerBatch
producerConfig?: ProducerConfig
}

interface PublisherTransactionPayload {
type: ‘single’ | ‘multiple’
sendConfig: ProducerRecord | ProducerBatch
producerConfig?: ProducerConfig
}

export class Kafka {
private config: KafkaConfig
private kafka: InstanceType
private producer: Producer
private consumer: Consumer
private transaction: Transaction

constructor(config: KafkaConfig) {
this.config = config
this.kafka = new KafkaJs(this.config)
}

async publisher(options: PublisherPayload): Promise {
try {
this.producer = this.kafka.producer(options.producerConfig || {})
await this.notification(‘publisher’, this.producer)
await this.producer.connect()

options.type == ‘single’
? await this.producer.send(options.sendConfig as ProducerRecord)
: await this.producer.sendBatch(options.sendConfig as ProducerBatch)

await this.producer.disconnect()
} catch (e: any) {
console.error(`publisher is not working: ${e}`)
}
}

async publisherTransaction(options: PublisherTransactionPayload): Promise {
try {
this.producer = this.kafka.producer(options.producerConfig || {})
this.transaction = await this.producer.transaction()
try {
await this.notification(‘publisher’, this.producer)
await this.producer.connect()

options.type == ‘single’
? await this.producer.send(options.sendConfig as ProducerRecord)
: await this.producer.sendBatch(options.sendConfig as ProducerBatch)

await this.transaction.commit()
await this.producer.disconnect()
} catch (e: any) {
if (this.transaction.isActive()) this.transaction.abort()
console.error(`publisher transaction is not working: ${e}`)
}
} catch (e: any) {
console.error(`publisher transaction is not working: ${e}`)
}
}

async subscriber(options: SubscriberPayload, cb: (payload: EachMessagePayload) => Promise): Promise {
try {
this.consumer = this.kafka.consumer(options.consumerConfig)
this.notification(‘subscriber’, this.consumer)
await this.consumer.connect()
await this.consumer.subscribe(options.subscribeConfig)
await this.consumer.run({ …(options.runConfig || {}), eachMessage: cb })
} catch (e: any) {
console.error(`subscriber is not working: ${e}`)
}
}

private async notification(type: string, handler: Producer | Consumer): Promise {
try {
if (type == ‘subscriber’) {
this.consumer = handler as Consumer
await this.consumer.on(‘consumer.connect’, () => console.info(‘consumer kafka connected’))
await this.consumer.on(‘consumer.network.request_timeout’, () => console.error(‘consumer kafka network timeout’))
await this.consumer.on(‘consumer.crash’, async (): Promise => {
await this.consumer.disconnect()
console.error(‘consumer kafka crash’)
})
await this.consumer.on(‘consumer.disconnect’, async (): Promise => {
await this.consumer.disconnect()
console.error(‘consumer kafka disconnect’)
})
await this.consumer.on(‘consumer.stop’, async (): Promise => {
await this.consumer.stop()
console.error(‘consumer kafka disconnect’)
})
}

if (type == ‘publisher’) {
this.producer = handler as Producer
await this.producer.on(‘producer.connect’, (): void => console.info(‘producer kafka connected’))
await this.producer.on(‘producer.network.request_timeout’, (): void => console.error(‘producer kafka network timeout’))
await this.producer.on(‘producer.disconnect’, async (): Promise => {
await this.producer.disconnect()
console.error(‘producer kafka disconnect’)
})
}
} catch (e: any) {
console.error(`notification is not working: ${e}`)
}
}
}

Thank you for using DeclareCode; We hope you were able to resolve the issue.

More questions on [categories-list]

0
inline scripts encapsulated in