-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
23 additions
and
239 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,254 +1,38 @@ | ||
# Création d'un Pipeline ETL avec Node.js, Python et Cassandra | ||
# Demo ETL | ||
|
||
Ce tutoriel explique comment j'ai créé un pipeline ETL (Extract, Transform, Load) pour analyser les articles de HackerNews. | ||
Exemple simple d'un pipeline ETL complet. | ||
|
||
## 1. Extraction des données (Extract) | ||
## Structure | ||
|
||
J'ai utilisé Playwright pour scraper HackerNews car il gère bien le JavaScript moderne : | ||
|
||
```javascript | ||
// scraper.js | ||
const { chromium } = require('playwright'); | ||
const fs = require('fs').promises; | ||
|
||
async function scrape() { | ||
const browser = await chromium.launch(); | ||
const page = await browser.newPage(); | ||
|
||
await page.goto('https://news.ycombinator.com'); | ||
|
||
const articles = await page.evaluate(() => { | ||
const items = document.querySelectorAll('.athing'); | ||
return Array.from(items).map(item => ({ | ||
id: item.getAttribute('id'), | ||
title: item.querySelector('.titleline a').innerText, | ||
url: item.querySelector('.titleline a').href | ||
})); | ||
}); | ||
|
||
await fs.writeFile('raw_data.json', JSON.stringify(articles, null, 2)); | ||
} | ||
``` | ||
|
||
## 2. Transformation des données (Transform) | ||
|
||
Pour la transformation, j'utilise pandas qui est parfait pour manipuler et analyser les données : | ||
|
||
```python | ||
# transform.py | ||
import pandas as pd | ||
from urllib.parse import urlparse | ||
|
||
def transform(): | ||
# Lecture et conversion en DataFrame | ||
df = pd.DataFrame(raw_data) | ||
|
||
# Enrichissement | ||
df['title'] = df['title'].str.lower() | ||
df['domain'] = df['url'].apply(lambda x: urlparse(x).netloc) | ||
df['word_count'] = df['title'].str.split().str.len() | ||
|
||
# Catégorisation intelligente | ||
domain_categories = { | ||
'github.com': 'development', | ||
'stackoverflow.com': 'programming', | ||
'aws.amazon.com': 'cloud', | ||
'medium.com': 'blog', | ||
'youtube.com': 'video' | ||
} | ||
df['category'] = df['domain'].map(domain_categories).fillna('other') | ||
|
||
# Détection tech avec mots-clés | ||
tech_keywords = [ | ||
'python', 'javascript', 'react', 'node', 'aws', | ||
'cloud', 'api', 'docker', 'kubernetes' | ||
] | ||
df['is_tech'] = df['title'].str.contains('|'.join(tech_keywords)) | ||
``` | ||
|
||
## 3. Stockage des données (Load) | ||
|
||
Cassandra est une base de données NoSQL orientée colonnes, parfaite pour notre ETL : | ||
|
||
### Structure Cassandra | ||
``` | ||
Keyspace (≈ Database) | ||
└── Table (≈ Column Family) | ||
└── Row | ||
└── Column | ||
├── Name | ||
├── Value | ||
└── Timestamp | ||
``` | ||
|
||
### Pourquoi Cassandra ? | ||
- Optimisé pour l'écriture (parfait pour ETL) | ||
- Scalable horizontalement (scale out) | ||
- Pas de JOINS (on dénormalise à l'écriture) | ||
- Idéal pour les données temporelles | ||
|
||
### Scaling Horizontal vs Vertical | ||
``` | ||
Scaling Vertical (Scale Up) : | ||
┌────────────────┐ | ||
│ Grosse Machine │ <- Ajouter RAM/CPU à une seule machine | ||
└────────────────┘ | ||
Scaling Horizontal (Scale Out) : | ||
┌────────┐ ┌────────┐ ┌────────┐ | ||
│Machine1│ │Machine2│ │Machine3│ <- Ajouter plus de machines | ||
└────────┘ └────────┘ └────────┘ | ||
``` | ||
|
||
- **Vertical** : Augmenter la puissance d'une machine (plus de RAM, meilleur CPU) | ||
- **Horizontal** : Ajouter plus de machines au cluster | ||
|
||
Cassandra est conçu pour le scaling horizontal : | ||
- Les données sont automatiquement distribuées | ||
- Pas de point unique de défaillance | ||
- Parfait pour gérer de gros volumes de données | ||
|
||
### Notre modèle de données | ||
```sql | ||
-- Keyspace pour notre application | ||
CREATE KEYSPACE etl_data | ||
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; | ||
|
||
-- Table principale | ||
CREATE TABLE articles ( | ||
id text PRIMARY KEY, -- Clé de partitionnement | ||
title text, -- Colonne standard | ||
url text, | ||
domain text, | ||
word_count int, | ||
category text, | ||
is_tech boolean, | ||
processed_at timestamp | ||
); | ||
``` | ||
|
||
```javascript | ||
// server.js | ||
const express = require('express'); | ||
const { Client } = require('cassandra-driver'); | ||
|
||
const client = new Client({ | ||
contactPoints: ['localhost'], | ||
localDataCenter: 'datacenter1' | ||
}); | ||
|
||
// Création du schéma | ||
await client.execute(` | ||
CREATE TABLE IF NOT EXISTS articles ( | ||
id text PRIMARY KEY, | ||
title text, | ||
url text, | ||
domain text, | ||
word_count int, | ||
category text, | ||
is_tech boolean | ||
) | ||
`); | ||
|
||
// API pour récupérer les données | ||
app.get('/articles', async (req, res) => { | ||
const result = await client.execute('SELECT * FROM articles'); | ||
res.json(result.rows); | ||
}); | ||
etl-demo/ | ||
├── data/ | ||
│ ├── raw/ # Données brutes (JSON) | ||
│ └── processed/ # Données nettoyées (CSV) | ||
├── 1-extract.js # Scraping avec Playwright | ||
├── 2-transform.py # Nettoyage avec Pandas | ||
└── 3-load.py # Chargement dans Cassandra | ||
``` | ||
|
||
## 4. Visualisation avec React | ||
|
||
Le frontend utilise React avec des hooks pour une UI réactive : | ||
|
||
```jsx | ||
// front.jsx | ||
function App() { | ||
const [articles, setArticles] = useState([]) | ||
const [stats, setStats] = useState({ | ||
totalArticles: 0, | ||
avgWordCount: 0, | ||
techArticles: 0, | ||
categoryBreakdown: {} | ||
}) | ||
|
||
useEffect(() => { | ||
fetch('/articles') | ||
.then(res => res.json()) | ||
.then(data => { | ||
setArticles(data) | ||
// Calcul des stats | ||
setStats({ | ||
totalArticles: data.length, | ||
avgWordCount: data.reduce((acc, curr) => acc + curr.word_count, 0) / data.length, | ||
techArticles: data.filter(a => a.is_tech).length, | ||
categoryBreakdown: data.reduce((acc, curr) => { | ||
acc[curr.category] = (acc[curr.category] || 0) + 1 | ||
return acc | ||
}, {}) | ||
}) | ||
}) | ||
}, []) | ||
} | ||
``` | ||
|
||
## 5. Automatisation | ||
|
||
Le pipeline s'exécute toutes les minutes : | ||
## Utilisation | ||
|
||
```javascript | ||
// server.js | ||
const cron = require('node-cron'); | ||
|
||
cron.schedule('* * * * *', async () => { | ||
await scrape(); // Extract | ||
await execAsync('python transform.py'); // Transform | ||
await loadData(); // Load | ||
}); | ||
1. Extraction : | ||
```bash | ||
node 1-extract.js | ||
``` | ||
|
||
## Points clés appris | ||
|
||
1. **Architecture ETL** : La séparation en trois étapes rend le code plus maintenable | ||
2. **Pandas > JSON** : Pandas simplifie énormément les transformations de données | ||
3. **React moderne** : Les hooks rendent le code plus lisible | ||
4. **NoSQL** : Cassandra est excellent pour les écritures fréquentes | ||
|
||
## Installation et lancement | ||
|
||
2. Transformation : | ||
```bash | ||
# Installation des dépendances | ||
npm install | ||
pip install -r requirements.txt | ||
|
||
# Démarrage de Cassandra (avec brew) | ||
brew services start cassandra | ||
|
||
# Dans un premier terminal (backend) | ||
node server.js | ||
|
||
# Dans un second terminal (frontend) | ||
npm run front | ||
python 2-transform.py | ||
``` | ||
|
||
## Structure des fichiers | ||
|
||
``` | ||
. | ||
├── scraper.js # Extraction (Playwright) | ||
├── transform.py # Transformation (Pandas) | ||
├── server.js # API + Cassandra | ||
├── front.jsx # Interface React | ||
├── styles.css # Styles | ||
└── package.json # Dépendances | ||
3. Chargement : | ||
```bash | ||
python 3-load.py | ||
``` | ||
|
||
## Prochaines améliorations possibles | ||
|
||
1. Ajouter Kafka pour du vrai temps réel | ||
2. Utiliser TypeScript | ||
3. Ajouter des tests | ||
4. Dockeriser l'application | ||
5. Ajouter plus de visualisations (graphiques, etc.) | ||
## Notes | ||
|
||
Le dashboard est accessible sur http://localhost:5173 | ||
- Le scraping est configuré pour la Fnac | ||
- Les données sont sauvegardées à chaque étape | ||
- Utilise des batch pour Cassandra |
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.