-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCloudFunction.py
144 lines (117 loc) · 5.8 KB
/
CloudFunction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio
import os
import zipfile
import pandas as pd
import aiohttp
from aiohttp import ClientConnectorError, TCPConnector
from google.cloud import storage
from google.oauth2 import service_account
from datetime import datetime
async def obtener_url_descarga(request):
try:
package_id = request.args.get('package_id', default="5e8bb1f8-f0a5-4719-a877-38543545505b", type=str)
url_api = f"https://datos.gob.cl/api/action/package_show?id={package_id}"
response = requests.get(url_api)
response.raise_for_status()
data = response.json()
if data["success"]:
return data["result"]["resources"][0]["url"]
else:
raise ValueError(f"Error en la solicitud API: {data}")
except requests.exceptions.RequestException as e:
return str(e)
async def descargar_y_descomprimir(request):
try:
url_descarga = await obtener_url_descarga(request)
destino = "/tmp/datos_historicos.zip"
response = requests.get(url_descarga)
response.raise_for_status()
with open(destino, 'wb') as archivo:
archivo.write(response.content)
fecha_str = url_descarga.split('/')[-1].split('.')[0][-8:]
with zipfile.ZipFile(destino, 'r') as zip_ref:
csv_folder = f"/tmp/Archivos_CSV/{fecha_str}"
diarios_folder = os.path.join(csv_folder, 'Diarios')
os.makedirs(diarios_folder, exist_ok=True)
originales_folder = f"/tmp/Historicos_originales/{fecha_str}"
os.makedirs(originales_folder, exist_ok=True)
for archivo in zip_ref.namelist():
ruta_archivo = os.path.join(os.path.dirname(destino), archivo)
if archivo.endswith('.txt'):
destino_txt = os.path.join(originales_folder, f'{os.path.splitext(archivo)[0]}_{fecha_str}.txt')
with open(destino_txt, 'wb') as f:
f.write(zip_ref.read(archivo))
elif archivo.endswith('.csv'):
destino_csv = os.path.join(csv_folder, f'{os.path.splitext(archivo)[0]}_{fecha_str}.csv')
with open(destino_csv, 'wb') as f:
f.write(zip_ref.read(archivo))
convertir_a_csv(originales_folder, csv_folder, fecha_str)
convertir_a_csv(diarios_folder, csv_folder, fecha_str)
return f'Descarga y descompresión exitosas para fecha: {fecha_str}'
except requests.exceptions.RequestException as e:
return f'Error en la descarga y descompresión: {e}'
except zipfile.BadZipFile:
return 'Error: El archivo descargado no es un archivo ZIP válido.'
def convertir_a_csv(directorio_originales, directorio_csv, fecha_str):
archivos_txt = [archivo for archivo in os.listdir(directorio_originales) if archivo.endswith('.txt')]
for archivo in archivos_txt:
ruta_archivo_txt = os.path.join(directorio_originales, archivo)
dataframe = pd.read_csv(ruta_archivo_txt, delimiter='\t')
ruta_csv = os.path.join(directorio_csv, f'{os.path.splitext(archivo)[0]}.csv')
dataframe.to_csv(ruta_csv, index=False)
async def obtener_codigos_servicio(request):
try:
url = "https://www.red.cl/restservice_v2/rest/getservicios/all"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
return None
async def descargar_json_y_convertir_a_csv(request):
try:
codigos_servicio = await obtener_codigos_servicio(request)
if codigos_servicio:
códigos_fallidos = set(codigos_servicio)
while códigos_fallidos:
codigo = códigos_fallidos.pop()
url = f"https://www.red.cl/restservice_v2/rest/conocerecorrido?codsint={codigo}"
connector = TCPConnector(limit_per_host=10)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(url) as response:
response.raise_for_status()
dataframe = pd.json_normalize(await response.json())
diarios_folder = f"/tmp/Archivos_CSV/Diarios"
os.makedirs(diarios_folder, exist_ok=True)
ruta_csv = os.path.join(diarios_folder, f'datos_diarios_{codigo}.csv')
dataframe.to_csv(ruta_csv, index=False)
return 'Proceso de descarga y conversión de datos diarios completado'
except (aiohttp.ClientError, ClientConnectorError) as e:
return f'Error en la descarga de datos diarios: {e}'
def subir_datos_a_bucket(local_folder, bucket):
archivos_nuevos = 0
archivos_actualizados = 0
def upload_file(local_file, blob_name):
nonlocal archivos_nuevos, archivos_actualizados
blob = bucket.blob(blob_name)
if blob.exists():
archivos_actualizados += 1
accion = "actualizado"
else:
archivos_nuevos += 1
accion = "agregado"
blob.upload_from_filename(local_file)
print(f"Archivo {local_file} {accion} en {blob_name}.")
for root, dirs, files in os.walk(local_folder):
for filename in files:
local_file_path = os.path.join(root, filename)
upload_file(local_file_path, filename)
print(f"Total de archivos nuevos agregados: {archivos_nuevos}")
print(f"Total de archivos actualizados: {archivos_actualizados}")
def cloud_function_handler(request):
try:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(descargar_y_descomprimir(request))
return result
except Exception as e:
return f'Error en la ejecución principal: {e}'