Creando Pipeline de datos - Proceso de ETL (extracción - transformación y carga de datos al DataWarehouse):
Proceso de ETL: proceso de ingesta, transformación y carga de data al DataWarehouse.
Data Pipeles: es un canal de datos donde se usan diferentes tecnologias para procesar y transformar "datos crudos" desde su origen hacia su destino. Asimismo las fuentes pueden ser tanto internas como externas, en el primer caso puede ser: b/d transaccional de MongoDB o PostgreSQL, una app de negocios en la nube como Salesforce, Shopify o MailChimp. En el segundo caso es decir "externa", están: Nielsen o Qualtrics.
En tanto el destino de este canal de datos será el respositorio dónde una empresa almacena para usarla de forma conjunta: estos pueden ser los almacenes de datos "DATAWAREHOUSE" y los lagos de datos "Data Lakes". Por última a la hora de realizar la transformación de la data ésta puede ser a través de Spark, Trifacta, dbt, o si es manual usando Python para hacer los scripts, Airflow, etc. En definitiva, después de todos estos pasos es posible generar Visualizaciones de los resultados o analisis esperados, para esto podemos usar Tableau, Google Data Studio, Power BI, etc.
Durante el desarrollo de todo el proyecto se aplica el proceso ETC (extración, transformación y carga) de data al DataWarehouse. En este caso se utilizaron archivos CSV's, con dataset ubicados en datos abiertos del Ministerio de Transporte. En el proceso de transformación se normalizó y limpio los dataset: eliminando columnas que no iba a utilizar, eliminando datos nulos, cambio de nombre de las columnas, el tipo de dato, y también se unieron ambos datasets para que sea solo una tabla a consultar. Para cargar la data se usó SQL, HIVE y AIRFLOW. Este último automatiza el proceso mediante la creación de un Dag, con scripts.
Ingesta de datos: Durante esta etapa se obtiene datos desde diferentes fuentes que luego al ser transformada será información para la organización determinada. De manera que todo esto se agrega al proyecto en el que estás trabajando. Como se mencionó al principio estos datos pueden ser de otras B/D, archivos almacenados o apps. Asimismo puede provenir de manera interna cuando es desde la misma empresa, o externa.
En el siguiente caso de ejemplo obtenemos los datos que están almacenados en un Cloud Provider: Amazon S3. Allí están cargados los CSV. Asimismo otras opciones que podrían presentarse es cuando están almacenados en Cloud Storage de Google Cloud o Blog Storage de Azure.
Este proceso se lleva adelante desde una consola y después se almacena en un sistema de archivos HDFS- Hadoop (*REPOSITORIO DE DATA), tambien pude ser en un repositorio de algún Cloud Provider, Cloud Storage, Blog Storage.
Para realizar esto utilicé Hadoop almacenado en un Docker, desde allí coloqué los comandos wget -P + especificamos ruta donde queremos dejarlo + la url donde está alojado el dataset. Como puede verse en la siguiente imagen:
Se utiliza para almacenar datasets grandes con tipos de datos estructurados, semi-estructurados y no estructurados como imágenes, vídeo, datos de sensores, etc. Está optimizado para almacenar enormes cantidades de datos y mantener varias copias para garantizar una alta disponibilidad y la tolerancia a fallos. En definitiva es una tecnología fundamental para Big Data, o dicho de otra forma: Entonces:
- HDFS divide los ficheros de datos en bloques, generalmente de 128MB de tamaño, estos bloques son replicados y distribuidos en los nodos que componen el clúster.
En esta etapa realice las transformaciones de los dataset para posteriormente hacer las consultas con los filtros ya realizados:
- Lectura de los datasets
- union de los datasets
- creacion de vistas
- Utilice withColumn para cambiar nombres de algunas columnas
- Despues con lenguaje SQL, realicé el filtro de los datos, cambie el tipo de dato, y cambié nulos por '0'
- Inserción de la data a las tablas creadas en HIVE
Durante las transformaciones se filtraron los datos por vuelos domésticos, se eliminaron la columna inhab y fir. De esta manera se ingestaron los datos
- .option("delimiter",";") *utilicé esto para separar las columnas y ver mejor el contenido
- informe_2021.createOrReplaceTempView("info2021") *con esta sintaxis en spark se pueden crear vistas temporales del DataFrame/Dataset que me permiten ejercutar/consultar en SQL
- df_union = spark.sql("select * from info2021 UNION select * from info2022")
- df_union.createOrReplaceTempView("dfunion") *en estos dos pasos utilicé sintaxtis con SQL para generar un join con la otra tabla, luego cree la vista temporal
- .withColumnRenamed *Lo utilicé para cambiar el nombre de las columnas que necesitaba, si bien deben existir otros métodos por mi parte decidí hacerlo de esa manera
- .drop *con esta función elimine la columna que no iba a usar
- cast(coalesce(Pasajeros, 0) as integer)as pasajeros *en este caso pase de un tipo de dato string a integer, y utilicé 'coalesce' para que los valores nulos pasen a '0' y le coloqué el mismo nombre al campo pero con minúsculas
- spark.sql("insert into vuelos_argentina.aero_detall select * from aeropuertoss") * en este paso inserto la tabla que estaba transformando a la tabla que había creado previamente en HIVE. Como puede verse se coloca el nombre de la base de datos + nombre de la tabla, y la vista de la tabla que tiene el contenido de datos que voy a sumar.
En esta etapa se utilizo los script para automatizar todo el proceso de transformacion del dataset, a traves de un DAG, como puede verse en la imagen:
Resultados: En esta imagen podemos notar la automatización realizada, siguiendo el proceso por si surge algún problema podemos chequearlo en "Log":
Modelización de una base de datos Antes de comenzar a filtrar y/o seleccionar data para cargar y crear bases de datos es necesario tener en cuenta las buenas prácticas para modelizar las tablas. Es decir saber de antemano el nombre de las tablas, su estructura (tipo de dato) y las relaciones que van a tener. Entonces para eso lo más usado es el "Diagrama de Entidad - Relación" o "DER". Por ende su caracteristica destacada es que puede representar las tablas de manera gráfica y sus relaciones vinculadas.
Buenas prácticas:
- nombres en minuscula
- evitar tildes en los nombres de tablas/columnas
- reemplazar espacios por guión bajo: el símbolo “_”
Entonces, en esta etapa me pareció importante crear una base de datos exclusiva, donde solo irían los dataset relacionadas a los vuelos del Ministerio de Transporte. Para esto utilicé los comandos:
- CREATE SCHEMA vuelos_argentina;
- USE vuelos_argentina; Luego genere las tablas que se ven en la imagen, especificando nombre de las columnas, tipos de datos, y estructura:
Durante esta etapa realicé consultas usando "Dbeaver" sobre los dataset para crear los informes según lo solicitado:
- Cantidad de vuelos entre las fechas 01/12/2021 y 31/01/2022. Mostrar consulta y Resultado de la query
- Cantidad de pasajeros que viajaron en Aerolíneas Argentinas entre el 01/01/2021 y 30/06/2022. Mostrar consulta y Resultado de la query
- Mostrar fecha, hora, código aeropuerto salida, ciudad de salida, código de aeropuerto de arribo, ciudad de arribo, y cantidad de pasajeros de cada vuelo, entre el 01/01/2022 y el 30/06/2022 ordenados por fecha de manera descendiente. Mostrar consulta y Resultado de la query
- Cuales son las 10 aerolíneas que más pasajeros llevaron entre el 01/01/2021 y el 30/06/2022 exceptuando aquellas aerolíneas que no tengan nombre. Mostrar consulta y Visualización
- Cuales son las 10 aeronaves más utilizadas entre el 01/01/2021 y el 30/06/22 que despegaron desde la Ciudad autónoma de Buenos Aires o de Buenos Aires, exceptuando aquellas aeronaves que no cuentan con nombre. Mostrar consulta y Visualización
- Cantidad de pasajeros que viajaron en Aerolíneas Argentinas entre el 01/01/2021 y 30/06/2022: AEROLINEAS ARGENTINAS 7.262.042
- las 10 aerolíneas que más pasajeros llevaron entre el 01/01/2021 y el 30/06/2022 exceptuando aquellas aerolíneas que no tengan nombre:
a) Aerolineas Argentina: 7.262.042
b) JetSmart Airlines: 1.466.020
c) FB lineas aéras FLYBONDY: 1.436.491 ... *Etc
- 10 Aeronaves más utilizadas entre el 01/01/2021 y el 30/06/22 que despegaron desde la Ciudad autónoma de Buenos Aires o de Buenos Aires, exceptuando aquellas aeronaves que no cuentan con nombre:
a) CE-150-L
b) CE-152
c) CE-150-M ... *etc
Para agregar, estudiar la experiencia del usuario para analizar y mejorar esto. Serían datos relevantes tanto para el sector de marketing, servicio como experiencia de usuario. También podría generar alguna proyección sobre presupuestos
TABLEAU:
DEEPNOTE: