Skip to content

Commit

Permalink
Merge branch 'feature/add_filter_data' into 'develop'
Browse files Browse the repository at this point in the history
Feature/add filter data

See merge request gmv-bda/upm/inesdata-mov/data-generation!32
  • Loading branch information
María Limones Andrade committed Oct 10, 2024
2 parents 2c021a1 + 6c4354c commit 7466c18
Show file tree
Hide file tree
Showing 21 changed files with 2,750 additions and 62 deletions.
3 changes: 2 additions & 1 deletion config_dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ sources:


storage:
default: minio
# default: minio
default: local
config:
minio:
access_key : trmlia
Expand Down
51 changes: 28 additions & 23 deletions inesdata_mov_datasets/sources/create/aemet.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ def download_aemet(
aws_access_key_id (str): minio user
aws_secret_access_key (str): minio password
"""
loop = asyncio.new_event_loop()
loop.run_until_complete(
download_objs(
bucket, prefix, output_path, endpoint_url, aws_access_key_id, aws_secret_access_key
try:
loop = asyncio.new_event_loop()
loop.run_until_complete(
download_objs(
bucket, prefix, output_path, endpoint_url, aws_access_key_id, aws_secret_access_key
)
)
)
except Exception as e:
logger.error(e)


def generate_df_from_file(content: dict, date: str) -> pd.DataFrame:
Expand Down Expand Up @@ -99,26 +102,28 @@ def generate_day_df(storage_path: str, date: str):
content = json.load(f)
df = generate_df_from_file(content, date)
dfs.append(df)

if len(dfs) > 0:
final_df = pd.concat(dfs)
# fix hour col
final_df["periodo"] = (
final_df["periodo"]
.str.pad(width=4, side="right", fillchar="0")
.replace(r"(\d{2})(\d+)", r"\1:\2", regex=True)
)
# add datetime col
final_df["datetime"] = pd.to_datetime(date + " " + final_df["periodo"])
final_df.drop(columns="periodo", inplace=True)
# sort values
final_df = final_df.sort_values(by="datetime")
# export final df
processed_storage_dir = Path(storage_path) / Path("processed") / "aemet" / date
date_formatted = date.replace("/", "")
Path(processed_storage_dir).mkdir(parents=True, exist_ok=True)
final_df.to_csv(processed_storage_dir / f"aemet_{date_formatted}.csv", index=None)
logger.info(f"Created AEMET df of shape {final_df.shape}")
if 'periodo' in final_df.columns:
# fix hour col
final_df["periodo"] = (
final_df["periodo"]
.str.pad(width=4, side="right", fillchar="0")
.replace(r"(\d{2})(\d+)", r"\1:\2", regex=True)
)
# add datetime col
final_df["datetime"] = pd.to_datetime(date + " " + final_df["periodo"])
final_df.drop(columns="periodo", inplace=True)
# sort values
final_df = final_df.sort_values(by="datetime")
# export final df
processed_storage_dir = Path(storage_path) / Path("processed") / "aemet" / date
date_formatted = date.replace("/", "")
Path(processed_storage_dir).mkdir(parents=True, exist_ok=True)
final_df.to_csv(processed_storage_dir / f"aemet_{date_formatted}.csv", index=None)
logger.info(f"Created AEMET df of shape {final_df.shape}")
else:
logger.debug("There is no data to create")
else:
logger.debug("There is no data to create")

Expand Down
5 changes: 3 additions & 2 deletions inesdata_mov_datasets/sources/create/informo.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def generate_day_df(storage_path: str, date: str):
filename = raw_storage_dir / file
with open(filename, "r") as f:
content = json.load(f)
df = generate_df_from_file(content["pms"])
dfs.append(df)
if "pms" in content:
df = generate_df_from_file(content["pms"])
dfs.append(df)

if len(dfs) > 0:
final_df = pd.concat(dfs)
Expand Down
7 changes: 6 additions & 1 deletion inesdata_mov_datasets/sources/extract/emt.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async def get_calendar(
)
async with session.get(calendar_url, headers=headers) as response:
try:
response.raise_for_status()
return await response.json()
except Exception as e:
logger.error("Error in calendar call to the server")
Expand Down Expand Up @@ -73,6 +74,7 @@ async def get_line_detail(
)
async with session.get(line_detail_url, headers=headers) as response:
try:
response.raise_for_status()
return await response.json()
except Exception as e:
logger.error(f"Error in line_detail call line {line_id} to the server")
Expand Down Expand Up @@ -100,6 +102,7 @@ async def get_eta(session: aiohttp, stop_id: str, headers: json) -> json:
eta_url = f"https://openapi.emtmadrid.es/v2/transport/busemtmad/stops/{stop_id}/arrives/"
async with session.post(eta_url, headers=headers, json=body) as response:
try:
response.raise_for_status()
return await response.json()
except Exception as e:
logger.error(f"Error in ETA call stop {stop_id} to the server")
Expand Down Expand Up @@ -153,7 +156,7 @@ async def login_emt(config: Settings, object_login_name: str, local_path: Path =
login_dict_upload,
)

if config.storage.default == "local":
if config.storage.default == "local" and local_path:
os.makedirs(local_path, exist_ok=True)
with open(os.path.join(local_path, object_login_name), "w") as file:
file.write(login_json_str)
Expand Down Expand Up @@ -243,6 +246,7 @@ async def token_control(config: Settings, date_slash: str, date_day: str) -> str
#Catching an error that ocurrs when the server doesnt provide the token expiration
try:
expiration_date_unix = data["data"][0]["tokenDteExpiration"]["$date"]
print(expiration_date_unix)
except:
logger.error(f"Error saving time expiration from login. Solving the problem retrying the call.")
token = await login_emt(config, object_login_name, local_path=dir_path)
Expand All @@ -252,6 +256,7 @@ async def token_control(config: Settings, date_slash: str, date_day: str) -> str
expiration_date_unix / 1000
) # miliseconds to seconds
now = datetime.datetime.now()
print(now)
# Compare the time expiration of the token withthe actual date
if now >= expiration_date: # reset token
token = await login_emt(config, object_login_name, local_path=dir_path)
Expand Down
37 changes: 37 additions & 0 deletions inesdata_mov_datasets/sources/filter_data/aemet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Gather raw data from aemet."""

import traceback

import requests
from loguru import logger

from inesdata_mov_datasets.settings import Settings


async def get_filter_aemet(config: Settings):
"""Request aemet API to get data from Madrid weather.
Args:
config (Settings): Object with the config file.
"""
try:
url_madrid = (
"https://opendata.aemet.es/opendata/api/prediccion/especifica/municipio/horaria/28079"
)

headers = {
"api_key": config.sources.aemet.credentials.api_key,
"Content-Type": "application/json",
"Accept": "application/json",
}

r = requests.get(url_madrid, headers=headers)
r_json = requests.get(r.json()["datos"]).json()

logger.info("Extracted AEMET")

return r_json

except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
104 changes: 104 additions & 0 deletions inesdata_mov_datasets/sources/filter_data/emt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Gather raw data from EMT."""

import asyncio
import datetime
import json
import traceback

import aiohttp
import pytz
from loguru import logger

from inesdata_mov_datasets.settings import Settings
from inesdata_mov_datasets.sources.extract.emt import (
get_calendar,
token_control,
)


async def get_eta(session: aiohttp, stop_id: str, line_id: str, headers: json) -> json:
"""Make the API call to ETA endpoint.
Args:
session (aiohttp): Call session to make faster the calls to the same API.
stop_id (str): Id of the bus stop.
line_id (str): Id of the bus line.
headers (json): Headers of the http call.
Returns:
json: Response of the petition in json format.
"""
body = {
"cultureInfo": "ES",
"Text_StopRequired_YN": "N",
"Text_EstimationsRequired_YN": "Y",
"Text_IncidencesRequired_YN": "N",
}
eta_url = (
f"https://openapi.emtmadrid.es/v2/transport/busemtmad/stops/{stop_id}/arrives/{line_id}"
)

async with session.post(eta_url, headers=headers, json=body) as response:
try:
return await response.json()
except Exception as e:
logger.error(f"Error in ETA call stop {stop_id} to the server")
logger.error(e)
return {"code": -1}


async def get_filter_emt(config: Settings, stop_id: str, line_id: str):
"""Get all the data from EMT endpoints.
Args:
config (Settings): Object with the config file.
stop_id (str): The stop id.
line_id (str): The line id.
"""
try:
# Get the timezone from Madrid and formated the dates for the object_name of the files
europe_timezone = pytz.timezone("Europe/Madrid")
current_datetime = datetime.datetime.now(europe_timezone).replace(second=0)
formatted_date_day = current_datetime.strftime(
"%Y%m%d"
) # formatted date year|month|day all together
formatted_date_slash = current_datetime.strftime(
"%Y/%m/%d"
) # formatted date year/month/day for storage in Minio

access_token = await token_control(
config, formatted_date_slash, formatted_date_day
) # Obtain token from EMT

# Headers for requests to the EMT API
headers = {
"accessToken": access_token,
"Content-Type": "application/json",
"Accept": "application/json",
}

async with aiohttp.ClientSession() as session:
# List to store tasks asynchronously
calendar_tasks = []
eta_tasks = []

# ETA
eta_task = asyncio.ensure_future(get_eta(session, stop_id, line_id, headers))

eta_tasks.append(eta_task)

# Calendar
calendar_task = asyncio.ensure_future(
get_calendar(session, formatted_date_day, formatted_date_day, headers)
)
calendar_tasks.append(calendar_task)

eta_responses = await asyncio.gather(*eta_tasks)
calendar_responses = await asyncio.gather(*calendar_tasks)

logger.info("Extracted EMT")
return eta_responses[0], calendar_responses[0]

except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
33 changes: 33 additions & 0 deletions inesdata_mov_datasets/sources/filter_data/informo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Gather raw data from Informo."""

import traceback

import requests
import xmltodict
from loguru import logger

from inesdata_mov_datasets.settings import Settings


async def get_filter_informo(config: Settings):
"""Request informo API to get data from Madrid traffic.
Args:
config (Settings): Object with the config file.
"""
try:
url_informo = "https://informo.madrid.es/informo/tmadrid/pm.xml"

r = requests.get(url_informo)

# Parse XML
xml_dict = xmltodict.parse(r.content)
# await save_informo(config, xml_dict)

logger.info("Extracted INFORMO")

return xml_dict["pms"]

except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
2 changes: 1 addition & 1 deletion inesdata_mov_datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def check_local_file_exists(path_dir: Path, object_name: str) -> bool:
"""
# Create a Path object for the file
file_path = Path(path_dir) / object_name

print(file_path)
# Check if the file exists
if file_path.exists():
return True
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pandas==2.2.0
polars==0.20.22
pyarrow==15.0.0
pydantic-settings==2.1.0
pydantic==2.6.1
pytz==2024.1
PyYAML==6.0.1
requests == 2.31.0
Expand Down
22 changes: 14 additions & 8 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile requirements/requirements.in
#
--trusted-host pypi.org
--trusted-host files.pythonhosted.org
--trusted-host pypi.python.org

aioboto3==12.3.0
# via -r requirements/requirements.in
aiobotocore[boto3]==2.11.2
# via
# aioboto3
# aiobotocore
# via aioboto3
aiofiles==23.2.1
# via -r requirements/requirements.in
aiohttp==3.9.3
Expand All @@ -22,6 +24,8 @@ aiosignal==1.3.1
# via aiohttp
annotated-types==0.6.0
# via pydantic
async-timeout==4.0.3
# via aiohttp
attrs==23.2.0
# via aiohttp
boto3==1.34.34
Expand Down Expand Up @@ -67,10 +71,14 @@ numpy==1.26.4
# pyarrow
pandas==2.2.0
# via -r requirements/requirements.in
polars==0.20.22
# via -r requirements/requirements.in
pyarrow==15.0.0
# via -r requirements/requirements.in
pydantic==2.6.1
# via pydantic-settings
# via
# -r requirements/requirements.in
# pydantic-settings
pydantic-core==2.16.2
# via pydantic
pydantic-settings==2.1.0
Expand Down Expand Up @@ -100,9 +108,7 @@ shellingham==1.5.4
six==1.16.0
# via python-dateutil
typer[all]==0.9.0
# via
# -r requirements/requirements.in
# typer
# via -r requirements/requirements.in
typing-extensions==4.9.0
# via
# pydantic
Expand Down
Loading

0 comments on commit 7466c18

Please sign in to comment.