From 57b6503441a246ddec60629eeb5e2f677739772c Mon Sep 17 00:00:00 2001 From: Vignesh Shetty Date: Sat, 9 Nov 2024 20:42:18 +0530 Subject: [PATCH] Simple batch insert --- .../src/services/rabbitmq/producer.ts | 89 +++++++++++++------ .../src/services/rabbitmq/types.ts | 7 ++ .../src/services/redis/subscriber.ts | 13 ++- 3 files changed, 81 insertions(+), 28 deletions(-) diff --git a/kafka-clickhouse/src/services/rabbitmq/producer.ts b/kafka-clickhouse/src/services/rabbitmq/producer.ts index a79a15e..45815ff 100644 --- a/kafka-clickhouse/src/services/rabbitmq/producer.ts +++ b/kafka-clickhouse/src/services/rabbitmq/producer.ts @@ -1,6 +1,6 @@ import rabbitmqConnection from './connection.js'; import UserLocationService from '../location/location.js'; -import { ProduceMessage, UserLocation } from './types.js'; +import { ProduceMessage, UserLocation, SourceObjects } from './types.js'; const exchangeName = 'exchange'; // The exchange name @@ -15,16 +15,47 @@ const pub = rabbitmqConnection.createPublisher({ exchanges: [{ exchange: exchangeName, type: 'fanout', durable: true }], }); -// Publish a message for testing -// pub.send({ exchange: exchangeName, routingKey: routingKey }, { -// code: '123', -// browser: 'Chrome', -// os: 'Windows', -// device: 'Desktop', -// country: 'US', -// region: 'CA', -// city: 'Los Angeles', -// }) +// const test_data = [ +// { +// code: '123', +// browser: 'Chrome', +// os: 'Windows', +// device: 'Desktop', +// country: 'US', +// region: 'CA', +// city: 'Los Angeles', +// }, +// { +// code: '123', +// browser: 'Chrome', +// os: 'Windows', +// device: 'Desktop', +// country: 'US', +// region: 'CA', +// city: 'Los Angeles', +// }, +// { +// code: '123', +// browser: 'Chrome', +// os: 'Windows', +// device: 'Desktop', +// country: 'US', +// region: 'CA', +// city: 'Los Angeles', +// }, +// { +// code: '123', +// browser: 'Chrome', +// os: 'Windows', +// device: 'Desktop', +// country: 'US', +// region: 'CA', +// city: 'Los Angeles', +// } +// ] + +// // Publish a message for testing +// pub.send({ exchange: exchangeName, routingKey: routingKey }, test_data) // .then(() => console.log('Message published')) // .catch((error) => console.error('Error publishing message:', error)); @@ -32,25 +63,31 @@ const pub = rabbitmqConnection.createPublisher({ // TODO: Make it batch processing class Producer { - async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise { - const location: UserLocation = await UserLocationService.getUserLocation(ip); - const message: ProduceMessage = { - code, - browser, - os, - device, - country: location.country, - region: location.region, - city: location.city, - }; - console.log('Producing message:', message); + async produceLogic(produceObjects : SourceObjects[]): Promise { + const messages: ProduceMessage[] = []; + for (const obj of produceObjects) { + const { ip, browser, os, device, code } = obj; + const location: UserLocation = await UserLocationService.getUserLocation(ip); + const message: ProduceMessage = { + code, + browser, + os, + device, + country: location.country, + region: location.region, + city: location.city, + }; + messages.push(message); + } pub.send( - {exchange: exchangeName, routingKey: routingKey}, // metadata - message + {exchange: exchangeName, routingKey: routingKey}, + messages ).catch((error) => { console.error('Error producing message:', error); + messages.length = 0; }).finally(() => { - console.log('Message produced'); + console.log(`[Info] : Produced ${messages.length} messages at ${new Date().toISOString()}`); + messages.length = 0; }); } } diff --git a/kafka-clickhouse/src/services/rabbitmq/types.ts b/kafka-clickhouse/src/services/rabbitmq/types.ts index 43f53dd..24cfbbd 100644 --- a/kafka-clickhouse/src/services/rabbitmq/types.ts +++ b/kafka-clickhouse/src/services/rabbitmq/types.ts @@ -13,3 +13,10 @@ export interface ProduceMessage { region: string | null; city: string | null; } +export interface SourceObjects { + ip: string; + browser: string; + os: string; + device: string; + code: string; +} \ No newline at end of file diff --git a/kafka-clickhouse/src/services/redis/subscriber.ts b/kafka-clickhouse/src/services/redis/subscriber.ts index 1010d8d..daa1361 100644 --- a/kafka-clickhouse/src/services/redis/subscriber.ts +++ b/kafka-clickhouse/src/services/redis/subscriber.ts @@ -2,16 +2,25 @@ import { parentPort } from 'worker_threads'; import RedisQueue from './connection.js'; import KafkaProducer from '../rabbitmq/producer.js'; +import { SourceObjects } from '../rabbitmq/types.js'; let running = true; +const batch = 10; async function processQueue(): Promise { + const batchMessages : SourceObjects[] = []; while (running) { try { const message = await RedisQueue.dequeue(10); if (message) { - const { ip, browser, os, device, code } = message; - await KafkaProducer.produceLogic(ip, browser, os, device, code); + batchMessages.push(message); + if (batchMessages.length < batch) { + continue; + } + else { + await KafkaProducer.produceLogic(batchMessages); + batchMessages.length = 0; + } } } catch (error) { console.error('Error processing queue:', error);