diff --git a/kafka-clickhouse/package-lock.json b/kafka-clickhouse/package-lock.json index e96b97c..501d4bd 100644 --- a/kafka-clickhouse/package-lock.json +++ b/kafka-clickhouse/package-lock.json @@ -11,8 +11,8 @@ "@clickhouse/client": "^1.1.0", "express": "^4.19.2", "ioredis": "^5.4.1", - "kafkajs": "^2.2.4", "node-fetch": "^3.3.2", + "rabbitmq-client": "^5.0.0", "tslib": "~2.6" }, "devDependencies": { @@ -4499,14 +4499,6 @@ "node": ">=6" } }, - "node_modules/kafkajs": { - "version": "2.2.4", - "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", - "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", - "engines": { - "node": ">=14.0.0" - } - }, "node_modules/keyv": { "version": "4.5.4", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz", @@ -5339,6 +5331,14 @@ } ] }, + "node_modules/rabbitmq-client": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/rabbitmq-client/-/rabbitmq-client-5.0.0.tgz", + "integrity": "sha512-IvPafeLO89qEv9QPbMb8nqkXOKRYHvo0sXntK8cHLY/fkkS8DJRtwCPtQerMZZ9n1jLhifJtioyK7vp5wIp6Gg==", + "engines": { + "node": ">=16" + } + }, "node_modules/range-parser": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/range-parser/-/range-parser-1.2.1.tgz", diff --git a/kafka-clickhouse/package.json b/kafka-clickhouse/package.json index eb7fadb..b017c85 100644 --- a/kafka-clickhouse/package.json +++ b/kafka-clickhouse/package.json @@ -25,6 +25,7 @@ "scripts": { "dev": "nodemon --env-file=.env build/src/main.js", "kafka:worker": "nodemon --env-file=.env build/src/services/redis/worker.js", + "kafka:worker:test": "nodemon --env-file=.env build/src/services/redis/subscriber.js", "start": "node build/src/main.js", "clean": "rimraf coverage build tmp", "prebuild": "npm run lint", @@ -41,8 +42,8 @@ "@clickhouse/client": "^1.1.0", "express": "^4.19.2", "ioredis": "^5.4.1", - "kafkajs": "^2.2.4", "node-fetch": "^3.3.2", + "rabbitmq-client": "^5.0.0", "tslib": "~2.6" }, "volta": { diff --git a/kafka-clickhouse/src/controller/healthcheck.ts b/kafka-clickhouse/src/controller/healthcheck.ts index 6ac53a5..235810b 100644 --- a/kafka-clickhouse/src/controller/healthcheck.ts +++ b/kafka-clickhouse/src/controller/healthcheck.ts @@ -36,4 +36,6 @@ class HealthCheckController { } } -export default new HealthCheckController(); +const instance = new HealthCheckController(); + +export default instance; diff --git a/kafka-clickhouse/src/fixtures/rabbitmq_clickhouse.sql b/kafka-clickhouse/src/fixtures/rabbitmq_clickhouse.sql new file mode 100644 index 0000000..09ea56d --- /dev/null +++ b/kafka-clickhouse/src/fixtures/rabbitmq_clickhouse.sql @@ -0,0 +1,63 @@ +-- Create a new database named 'eurl_data' +CREATE DATABASE IF NOT EXISTS eurl_data; + +-- Switch to the 'eurl_data' database to ensure all subsequent operations are performed within this database +USE eurl_data; + +-- =========================== +-- TABLE: 'click_analytics_rq' +-- =========================== +-- Create a table named 'click_analytics_rq' to consume data from a RabbitMQ queue +-- The table schema includes columns for code, browser, os, device, country, region, and city +-- LowCardinality is used for columns with a limited number of unique values to optimize storage +-- The table uses the RabbitMQ engine, with settings specified for the RabbitMQ broker, routing key, exchange name, and message format + +CREATE TABLE IF NOT EXISTS eurl_data.click_analytics_rq ( + `code` String, -- A unique code representing the click analytics data + `browser` LowCardinality(String), -- The browser type (e.g., Chrome, Firefox) with LowCardinality to optimize storage + `os` LowCardinality(String), -- The operating system (e.g., Windows, Linux), optimized with LowCardinality + `device` LowCardinality(String), -- The type of device (e.g., Desktop, Mobile), optimized with LowCardinality + `country` LowCardinality(String), -- The country of the user, optimized with LowCardinality + `region` String, -- The region or state where the user is located + `city` String -- The city where the user is located +) ENGINE = RabbitMQ -- Use RabbitMQ as the table engine for real-time data ingestion +SETTINGS + rabbitmq_host_port = '172.17.0.5:5672', -- RabbitMQ broker address + rabbitmq_routing_key_list = 'eurl_click_analytics', -- RabbitMQ routing key for message filtering + rabbitmq_exchange_name = 'exchange', -- The RabbitMQ exchange name where messages are published + rabbitmq_format = 'JSONEachRow'; -- Format for incoming messages from RabbitMQ (JSON format, one message per row) + +-- ======================= +-- TABLE: 'click_analytics' +-- ======================= +-- Create a table named 'click_analytics' to store processed click analytics data +-- The table schema includes columns for code, browser, os, device, country, region, city, and a timestamp for the event +-- The MergeTree engine is used for high-performance OLAP queries +-- Data is partitioned by month and ordered by multiple columns for efficient querying + +CREATE TABLE IF NOT EXISTS eurl_data.click_analytics +( + `code` String, -- A unique code representing the click analytics data + `browser` LowCardinality(String), -- Browser type (Chrome, Firefox), optimized with LowCardinality + `os` LowCardinality(String), -- Operating system (Windows, macOS), optimized with LowCardinality + `device` LowCardinality(String), -- Type of device (Desktop, Mobile), optimized with LowCardinality + `country` LowCardinality(String), -- The country where the user is located, optimized with LowCardinality + `region` String, -- The region or state of the user + `city` String, -- The city of the user + `timestamp` Date DEFAULT toDate(now()) -- A timestamp for the event, defaulting to the current date +) +ENGINE = MergeTree -- Use MergeTree for efficient OLAP queries +PARTITION BY toYYYYMM(timestamp) -- Partition data by month (YYYYMM) based on the timestamp +ORDER BY (code, timestamp, browser, os, device, country) -- Order data by multiple columns to speed up queries +SETTINGS index_granularity = 8192; -- Set the granularity of the primary key index for efficient data access + +-- ======================================= +-- MATERIALIZED VIEW: 'click_analytics_mv' +-- ======================================= +-- Create a materialized view to automatically transfer data from the RabbitMQ table to the analytics table +-- The materialized view watches the 'click_analytics_rq' table and inserts the data into 'click_analytics' as it arrives + +CREATE MATERIALIZED VIEW IF NOT EXISTS eurl_data.click_analytics_mv +TO eurl_data.click_analytics -- Target table where processed data will be stored +AS +SELECT * FROM eurl_data.click_analytics_rq; -- Select all data from the RabbitMQ table for insertion into the analytics table diff --git a/kafka-clickhouse/src/services/kafka/connection.ts b/kafka-clickhouse/src/services/kafka/connection.ts deleted file mode 100644 index 7868d41..0000000 --- a/kafka-clickhouse/src/services/kafka/connection.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { Kafka, Producer, Consumer, Admin } from 'kafkajs'; - -class KafkaClient { - private static instance: KafkaClient; - private kafka: Kafka; - private producer: Producer | null = null; - private consumer: Consumer | null = null; - - private constructor() { - this.kafka = new Kafka({ - clientId: 'my-app', - brokers: [process.env.KAFKA_BROKER as string], - connectionTimeout: 10000, - ssl: true, - sasl: { - mechanism: 'plain', - username: process.env.KAFKA_USER as string, - password: process.env.KAFKA_PASSWORD as string, - }, - }); - } - - public static getInstance(): KafkaClient { - if (!KafkaClient.instance) { - KafkaClient.instance = new KafkaClient(); - } - return KafkaClient.instance; - } - - public admin(): Admin { - return this.kafka.admin(); - } - - public async connectProducer(): Promise { - if (!this.producer) { - this.producer = this.kafka.producer(); - try { - await this.producer.connect(); - console.log('Kafka Producer connected successfully.'); - } catch (error) { - console.error('Error connecting Kafka Producer:', error); - throw error; - } - } - return this.producer; - } - - public async connectConsumer( - groupId: string, - topics: string[], - ): Promise { - if (!this.consumer) { - this.consumer = this.kafka.consumer({ groupId }); - try { - await this.consumer.connect(); - await Promise.all( - topics.map((topic) => - this.consumer!.subscribe({ topic, fromBeginning: true }), - ), - ); - console.log( - 'Kafka Consumer connected successfully and subscribed to topics:', - topics, - ); - } catch (error) { - console.error('Error connecting Kafka Consumer:', error); - throw error; - } - } - return this.consumer; - } - - public async disconnect(): Promise { - try { - if (this.producer) { - await this.producer.disconnect(); - this.producer = null; - console.log('Kafka Producer disconnected successfully.'); - } - if (this.consumer) { - await this.consumer.disconnect(); - this.consumer = null; - console.log('Kafka Consumer disconnected successfully.'); - } - } catch (error) { - console.error('Error disconnecting Kafka Client:', error); - } - } -} - -const kafkaClient = KafkaClient.getInstance(); -await kafkaClient.connectProducer(); -export default kafkaClient; diff --git a/kafka-clickhouse/src/services/kafka/producer.ts b/kafka-clickhouse/src/services/kafka/producer.ts deleted file mode 100644 index d47c136..0000000 --- a/kafka-clickhouse/src/services/kafka/producer.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { CompressionTypes, Producer } from 'kafkajs'; - -import KafkaClient from './connection.js'; -import UserLocationService from '../location/location.js'; -import { ProduceMessage, UserLocation } from './types.js'; - -class KafkaProducer { - private producer: Producer | null = null; - - private async getProducer(): Promise { - if (!this.producer) { - this.producer = await KafkaClient.connectProducer(); - } - return this.producer; - } - - public async produceLogic( - ip: string, - browser: string, - os: string, - device: string, - code: string, - ): Promise { - try { - const userLocation: UserLocation = - await UserLocationService.getUserLocation(ip); - const message: ProduceMessage = { - code, - browser, - os, - device, - ...userLocation, - }; - const produceableMessage = JSON.stringify(message); - const producer = await this.getProducer(); - await producer.send({ - topic: 'click_analytics', - compression: CompressionTypes.GZIP, - messages: [ - { - value: produceableMessage, - }, - ], - }); - console.log(`Message sent: ${produceableMessage}`); - } catch (error) { - console.error(`Unable to send message: ${JSON.stringify(error)}`, error); - } - } -} - -export default new KafkaProducer(); diff --git a/kafka-clickhouse/src/services/location/location.ts b/kafka-clickhouse/src/services/location/location.ts index 57d285b..4576759 100644 --- a/kafka-clickhouse/src/services/location/location.ts +++ b/kafka-clickhouse/src/services/location/location.ts @@ -30,4 +30,6 @@ class UserLocationService { } } -export default new UserLocationService(); +const instance = new UserLocationService(); + +export default instance; diff --git a/kafka-clickhouse/src/services/rabbitmq/connection.ts b/kafka-clickhouse/src/services/rabbitmq/connection.ts new file mode 100644 index 0000000..f0e6485 --- /dev/null +++ b/kafka-clickhouse/src/services/rabbitmq/connection.ts @@ -0,0 +1,14 @@ +import {Connection} from 'rabbitmq-client' + + +const rabbitmqConnection = new Connection(process.env.RABBITMQ_URI); + +rabbitmqConnection.on('error', (error) => { + console.error('RabbitMQ connection error:', error); +}); + +rabbitmqConnection.on('connection', () => { + console.log('Connection successfully (re)established') +}) + +export default rabbitmqConnection; diff --git a/kafka-clickhouse/src/services/rabbitmq/producer.ts b/kafka-clickhouse/src/services/rabbitmq/producer.ts new file mode 100644 index 0000000..a79a15e --- /dev/null +++ b/kafka-clickhouse/src/services/rabbitmq/producer.ts @@ -0,0 +1,60 @@ +import rabbitmqConnection from './connection.js'; +import UserLocationService from '../location/location.js'; +import { ProduceMessage, UserLocation } from './types.js'; + + +const exchangeName = 'exchange'; // The exchange name +const routingKey = 'eurl_click_analytics'; // The routing key + +const pub = rabbitmqConnection.createPublisher({ + // Enable publish confirmations, similar to consumer acknowledgements + confirm: true, + // Enable retries + maxAttempts: 2, + // Ensure the existence of an exchange before we use it + exchanges: [{ exchange: exchangeName, type: 'fanout', durable: true }], +}); + +// Publish a message for testing +// pub.send({ exchange: exchangeName, routingKey: routingKey }, { +// code: '123', +// browser: 'Chrome', +// os: 'Windows', +// device: 'Desktop', +// country: 'US', +// region: 'CA', +// city: 'Los Angeles', +// }) +// .then(() => console.log('Message published')) +// .catch((error) => console.error('Error publishing message:', error)); + +// TODO: Implement the Producer class +// TODO: Make it batch processing + +class Producer { + async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise { + const location: UserLocation = await UserLocationService.getUserLocation(ip); + const message: ProduceMessage = { + code, + browser, + os, + device, + country: location.country, + region: location.region, + city: location.city, + }; + console.log('Producing message:', message); + pub.send( + {exchange: exchangeName, routingKey: routingKey}, // metadata + message + ).catch((error) => { + console.error('Error producing message:', error); + }).finally(() => { + console.log('Message produced'); + }); +} +} + +const instance = new Producer(); + +export default instance; diff --git a/kafka-clickhouse/src/services/kafka/types.ts b/kafka-clickhouse/src/services/rabbitmq/types.ts similarity index 100% rename from kafka-clickhouse/src/services/kafka/types.ts rename to kafka-clickhouse/src/services/rabbitmq/types.ts diff --git a/kafka-clickhouse/src/services/redis/subscriber.ts b/kafka-clickhouse/src/services/redis/subscriber.ts index 77eaca1..1010d8d 100644 --- a/kafka-clickhouse/src/services/redis/subscriber.ts +++ b/kafka-clickhouse/src/services/redis/subscriber.ts @@ -1,7 +1,7 @@ // worker.js import { parentPort } from 'worker_threads'; import RedisQueue from './connection.js'; -import KafkaProducer from '../kafka/producer.js'; +import KafkaProducer from '../rabbitmq/producer.js'; let running = true;