Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

김민재 W1 #17

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,15 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

.DS_Store
.venv
.ipynb_checkpoints
__pycache__
*.db
*.json
*.txt
*.html
*.csv
*.json
*.sql
542 changes: 542 additions & 0 deletions missions/W1/M1.ipynb

Large diffs are not rendered by default.

2,156 changes: 2,156 additions & 0 deletions missions/W1/M2.ipynb

Large diffs are not rendered by default.

273 changes: 273 additions & 0 deletions missions/W1/M3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
# GDP ETL Project

The goal of this project is to implement ETL process for GDP data.

## Business Requirements
The main purpose of this project is to collect GDP data from IMF website and transform it to a format that can be used for analysis.

Common analysis use cases are as follows:
1. Filter countries by GDP
2. Top N countries by GDP
3. Group by region


### Contents
---
- [GDP ETL Project](#gdp-etl-project)
- [Business Requirements](#business-requirements)
- [Contents](#contents)
- [ETL Process Overview](#etl-process-overview)
- [1. Extract](#1-extract)
- [2. Transform](#2-transform)
- [3. Load](#3-load)
- [Implementation](#implementation)
- [ETL Process](#etl-process)
- [Modules](#modules)
- [**`importer.py`**](#importerpy)
- [**`transformer.py`**](#transformerpy)
- [**`exporter.py`**](#exporterpy)
- [**`logger.py`**](#loggerpy)
- [**`query_helper.py`**](#query_helperpy)
- [Utils](#utils)
- [**`create_country_region_table.py`**](#create_country_region_tablepy)
- [**`create_large_data_csv.py`**](#create_large_data_csvpy)
- [Performance Experiment](#performance-experiment)
- [Read CSV](#read-csv)
- [Pandas DataFrame](#pandas-dataframe)
- [Parallel/Distributed Processing](#paralleldistributed-processing)
- [Steps](#steps)

## ETL Process Overview

### 1. Extract
- Move data from external system to workspace
- This process will be abstracted in `importer.py` along with parsing process.

### 2. Transform
- Prasing raw data to structured format
- After parsing, the data should follow the format:
```json
[
{
"Country": "United States",
"GDP": "30,337,162",
"Region": "North America"
},
// ...
]
```
- This process will be abstracted in `importer.py` along with extracting process.
- Transform GDP value
1. Convert GDP value string to float
2. Convert GDP value to billion
- Sort data by GDP
- After transformation, the data should follow the format:
```json
[
{
"Country":"United States",
"GDP":30337.16,
"Region":"North America"
},
// ...
]
```

### 3. Load
- Export the data to a JSON file or sqlite database.
- For optimizing query performance, store the data in GDP order.

---

## Implementation

### ETL Process
1. **`etl_project_gdp.py`**: ETL process wiki web -> json
2. **`etl_project_gdp_with_sql.py`**: ETL process wiki web -> sqlite
3. **`etl_project_gdp_from_csv.py`**: ETL process csv -> sqlite
4. **`etl_project_gdp_parallel.py`**: ETL process with Parallel/Distributed Design

### Modules

#### **`importer.py`**
Extracts data from data source and parse it to structured format.

Supported Data Source:
- Wikipedia
- CSV File

Importer Class Hierarchy:
Seperate Interface and Implementation to support multiple data source.
- `ImporterInterface`
- `WebImporterInterface`
- `WikiWebImporter`
- `FileImporterInterface`
- `CsvFileImporter`

`ImporterInterface` defines the interface for all importers.
`WebImporterInterface` and `FileImporterInterface` defines how to extract data from data source.
`WikiWebImporter` and `CsvFileImporter` defines how to parse data from data source.

---

#### **`transformer.py`**
Functions for transforming data.

---

#### **`exporter.py`**
Exports the data to a JSON file.

Supported Export Target:
- JSON File
- SQLite Database(.db file)

Exporter Class Hierarchy:
- `ExporterInterface`
- `JsonFileExporter`
- `SqliteExporter`

---

#### **`logger.py`**
Logs the data to a file.

Supported Log Level:
- info
- error

---

#### **`query_helper.py`**
Functions for querying data.

---

### Utils

#### **`create_country_region_table.py`**
Extracts country and region data from Wikipedia and saves it to a JSON file.

Format: `{country: region}`

#### **`create_large_data_csv.py`**
Generates a large CSV file for testing.

---
## Performance Experiment

Test Data(Generated by `create_large_data_csv.py`):
- 10M row(260MB)
- 100M row(2.6GB)

Environment:
- 32GB RAM
- CPU 10 core

---

### Read CSV

```python
# 10M: 4.51s
# 100M: 48.77s
df = pd.read_csv("large_data.csv")
```

If the file is too large to fit in memory, we should use `chunksize` parameter to read the file in chunks.

```python
chunks = pd.read_csv(
"large_data.csv",
dtype=schema,
header=None,
names=schema.keys(),
chunksize=CHUNKSIZE,
)
df = pd.concat(chunks)
```

**Estimated Result:**

More chunks(smaller chunksize), Slower
- Overhead of creating new dataframe for each chunk
- Overhead of concatenating all chunks


**Actual Result:**

| | chunksize 10K | chunksize 100K | chunksize 1M | none |
| ------------- | ------------- | -------------- | ------------ | ------ |
| datasize 10M | 4.82s | 3.92s | 4.3s | 4.51s |
| datasize 100M | 46.85s | 40.25s | 44.38s | 48.77s |

Regardless of data size, chunksize 100K is the fastest.

**Why?**

Pandas buffer realloc problem?

---

### Pandas DataFrame

```python
# 10M: 4.70s
# 100M: 50.19s
df["GDP"] = df["GDP"].apply(lambda x: x.replace(",", ""))
df["GDP"] = df["GDP"].apply(lambda x: round(float(x) / 1000, 2))
```

```python
# 10M: 4.14s
# 100M: 42.46s
df["GDP"] = df["GDP"].apply(lambda x: round(float(x.replace(",", "")) / 1000, 2))
```

```python
# 10M: 3.98s
# 100M: 39.55s
df["GDP"] = (
pd.to_numeric(df["GDP"].str.replace(",", ""), errors="coerce")
.div(1000)
.round(2)
)
```

```python
# 10M: 3.19s
# 100M: 32.64s
df["GDP"] = (df["GDP"].replace(",", "", regex=True).astype(float) / 1000).round(2)
```

```python
# 10M: 1.66s
# 100M: 17.05s
df["GDP"] = (df["GDP"].str.replace(",", "").astype(float) / 1000).round(2)
```

---

## Parallel/Distributed Processing

Main idea:
- Split file and process each file in parallel
- Store data seperately by region

See detail in `etl_project_gdp_parallel.py`.

### Steps

1. Split one big file to small files
ex) data.csv -> data_0.csv, data_1.csv
2. Preprocess each file
ex) data_0.csv -> data_0_preprocessed.csv
3. Map each file to region
ex) data_0_preprocessed.csv -> data_0_asia.csv, data_0_europe.csv
4. Reduce by region
ex) data_0_asia.csv, data_1_asia.csv -> data_asia.csv
5. Sort by GDP
ex) data_asia.csv -> data_asia_sorted.csv
6. Load to sqlite
ex) data_asia_sorted.csv -> data_asia_sorted.db
7. Query by region
12 changes: 12 additions & 0 deletions missions/W1/M3/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pathlib import Path

HOME_DIR = Path(__file__).resolve().parent
LOG_FILE_PATH = HOME_DIR / "log/etl_project_log.txt"
# RAW_DATA_FILE_PATH = HOME_DIR / "data/Countries_by_GDP.json"
RAW_DATA_FILE_PATH = HOME_DIR / "data/Countries_by_GDP.html"
OUTPUT_FILE_PATH = HOME_DIR / "data/Countries_by_GDP_Transformed.json"
DB_NAME = "World_Economies"
TABLE_NAME = "Countries_by_GDP"
DB_PATH = HOME_DIR / f"data/{DB_NAME}.db"
CSV_FILE_NAME = "large_data"
CSV_INPUT_FILE_PATH = HOME_DIR / f"data/{CSV_FILE_NAME}.csv"
32 changes: 32 additions & 0 deletions missions/W1/M3/etl_project_gdp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from config import LOG_FILE_PATH, RAW_DATA_FILE_PATH, OUTPUT_FILE_PATH
from modules.logger import logger, init_logger
from modules.importer import WikiWebImporter
from modules.transformer import transform_gdp
from modules.exporter import JsonFileExporter
from modules.query_helper import (
print_gdp_over_100_countries_df,
print_top5_avg_gdp_by_region_df,
)


def main():
init_logger(LOG_FILE_PATH)
logger.print_separator()
logger.info("Starting the ETL process")

importer = WikiWebImporter(raw_data_file_path=RAW_DATA_FILE_PATH)
df = importer.import_data()

df = transform_gdp(df)

exporter = JsonFileExporter(OUTPUT_FILE_PATH)
exporter.export_data(df)

logger.info("ETL process completed successfully")

print_gdp_over_100_countries_df(df)
print_top5_avg_gdp_by_region_df(df)


if __name__ == "__main__":
main()
31 changes: 31 additions & 0 deletions missions/W1/M3/etl_project_gdp_from_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from config import LOG_FILE_PATH, DB_PATH, TABLE_NAME, CSV_INPUT_FILE_PATH
from modules.logger import logger, init_logger, LogExecutionTime
from modules.importer import CsvFileImporter
from modules.transformer import transform_gdp
from modules.exporter import SqliteExporter
from modules.query_helper import print_top5_avg_gdp_by_region_sql


def main():
init_logger(LOG_FILE_PATH)
logger.print_separator()
logger.info("Starting the ETL process")

with LogExecutionTime("Extract"):
importer = CsvFileImporter(CSV_INPUT_FILE_PATH)
df = importer.import_data()

with LogExecutionTime("Transform"):
df = transform_gdp(df)

with LogExecutionTime("Load"):
exporter = SqliteExporter(DB_PATH, table_name=TABLE_NAME)
exporter.export_data(df)

logger.info("ETL process completed successfully")

print_top5_avg_gdp_by_region_sql(DB_PATH, TABLE_NAME)


if __name__ == "__main__":
main()
Loading