You work for a multinational company that sells various goods across the globe. Currently, their sales data is spread across many different data sources making it not easily accessible or analysable by current members of the team. In an effort to become more data-driven, your organisation would like to make its sales data accessible from one centralised location. Your first goal will be to produce a system that stores the current company data in a database so that it's accessed from one centralised location and acts as a single source of truth for sales data. You will then query the database to get up-to-date metrics for the business.
The different data sources that need to be extracted from and collected together are:
- two tables of an SQL database hosted on AWS RDS
- one table stored as a .pdf file hosted on AWS S3
- one table stored as a .csv file hosted on AWS S3
- one table stored as a .json file hosted on AWD S3
- a series of JSON objects available via an API
By completing this project, I have built a pipeline for extracting the data from the various sources, transforming (cleaning) the data, and loading the data into a new Postgresql database hosted locally. Once extracted and loaded, further transformation of the data and database was performed to complete the database schema. Finally, several SQL queries were written to enable users of the database to query the data and extract meaningful insights from it.
In order to run this project, the following modules need to be installed:
pandas
sqlalchemy
requests
tabula-py
python-dotenv
PyYAML
If you are using Anaconda and virtual environments (recommended), the Conda environment can be cloned by running the following command, ensuring that env.yml is present in the project:
conda create env -f env.yml -n $ENVIRONMENT_NAME
It's worth noting that the pipeline won't run as it is without the AWS credentials or API key, but the DatabaseConnector and DataExtractor classes will work on other sources of data with amendments to API endpoints.
SQLAlchemy was used to connect to both the AWS and local SQL databases. In database_utils.py
:
from sqlalchemy import create_engine, inspect
From the SQLAlchemy documentation:
The start of any SQLAlchemy application is an object called the Engine. This object acts as a central source of connections to a particular database, providing both a factory as well as a holding space called a connection pool for these database connections. The engine is typically a global object created just once for a particular database server, and is configured using a URL string which will describe how it should connect to the database host or backend.
For example:
# Construct connection string
connection_string = f"postgresql+psycopg2://{db_username]}:{db_password}@" + f"{db_host]}:{db_port}/{db_database}"
# Create new sqlalchemy database engine
engine = create_engine(connection_string)
The inspect()
method is used to get information about a connected database:
inspector = inspect(engine)
table_name_list = inspector.get_table_names()
The credentials for the databases are stored locally in YAML files. In order to access the credentials to pass into the create_engine() method above, PyYAML was used to read the YAML files and load the contents into a dictionary:
import yaml
# Use context manager to open file
with open(self.filename, 'r') as file:
# load contents into dictionary
contents_dictionary = yaml.safe_load(file)
pandas is a fast, powerful, flexible and easy to use open source data analysis and manipulation tool, built on top of the Python programming language. This project makes use of the pandas DataFrame, a two-dimensional data structure (essentially a table) that using pandas built in methods makes it easy to search, manipulate and visualise large sets of data.
# convention is to give pandas the alias pd
import pandas as pd
This project utilises many of pandas built in methods. A few common examples are listed below:
# read SQL table from database connection established using SQLAlchemy
dataframe = pd.read_sql_table(table_name, engine)
# load DataFrame as new table into an existing database connection, don't load index, replace if already exists
dataframe.to_sql(new_table, engine, index=False, if_exists='replace')
# concatenate two DataFrames into one
new_dataframe = pd.concat([dataframe1, dataframe2], ignore_index=True)
# read contents of csv into DataFrame
dataframe = pd.read_csv('file.csv')
# read contents of json into DataFrame
dataframe = pd.read_json('file.json')
# drop a column from a DataFrame, inplace=True means action is performed on existing DataFrame
dataframe.drop('column_name', axis=1, inplace=True)
# drop any rows that contain null values
dataframe.dropna(inplace=True)
# convert a DataFrame column to datetime type
pd.to_datetime(dataframe['column_name'])
# apply a function (can be a lambda function) to a DataFrame or DataFrame column
dataframe['column_name'] = dataframe['column_name'].apply(function)
# reset the index
dataframe.reset_index(inplace=True)
# cast a column to a different data type
dataframe['column_name'].astype(str)
Tabula is a simple tool for reading tables from pdf files and converting them to a pandas DataFrame or CSV/TSV/JSON file.
import tabula
dataframe = tabula.read_pdf(link, pages='all')
# depending on the table format, you may need to reset the index of the pandas DataFrame
dataframe.reset_index(inplace=True)
In order to connect to API endpoints, Requests was used to make HTTPS GET requests.
import requests
# make HTTPS GET request using URL of API endpoint and any necessary headers, i.e. x-api-key
response = requests.get({API_URL}, headers={HEADER_DICTIONARY})
# convert JSON response to pandas DataFrame
new_dataframe = pd.DataFrame(response.json(), index=[0])
When hosting code on Github or any other public repository, it's a good idea to keep any API keys or database credentials separate from the hosted code. This can be done by using a .env file that is added to the .gitignore. python-dotenv is then used to access any environment variables stored in the .env file.
In the .env file:
API_HEADER=mysecretapikey
In the Python script:
import os
from dotenv import load_dotenv
api_header = {'x-api-key': os.getenv("API_HEADER")}
A local Postgres database was set up to receive the cleaned data from the different sources. Postgres was installed globally using Homebrew:
brew install postgresql@14
The local database was created using the following command in the terminal:
initdb -D db -U postgres -W
where db
is the directory containing the database files and postgres
is the database username. The -W
flag indicates that
the database will be password protected and the user is prompted to enter the password upon running this command.
The database can be started using:
postgres -D db
pgAdmin is used to connect to the local database. With pgAdmin installed and running, follow these steps to connect:
- On the main application page, click on 'Add New Server'
- On the 'General' tab of the dialogue that appears, enter a name for the new server connection
- On the 'Connection' tab, enter 'localhost' for the 'Host name/address', and enter the username and password specified when creating the database.
- Click 'Save' to save the server and connect to the database.
The project consists of three main classes, each with separate functions:
DatabaseConnector
- indatabase_utils.py
- contains all methods necessary for connecting and uploading to SQL databasesDataExtractor
- indata_extraction.py
- contains all methods necessary for retrieving data from various sourcesDataCleaning
-data_cleaning.py
- contains all methods necessary for cleaning individual pandas DataFrames
Running main.py
will create the necessary instances of the three classes listed above, and sequentially extracts, cleans and
loads data to the local database.
The project also contains two files with a series of SQL queries, database_schema.sql
and business_queries.sql
. The first
files contains queries that alter tables, such as changing data types and adding primary and foreign keys. The second file
contains queries for extracting insights from the data, such as finding out how certain types of store are performing in a
particular country or which months produce the highest volume of sales.
For the future direction of this project, I'd like to learn more about SQLAlchemy in order to be able to integrate running the SQL queries via the main pipeline script. It would also be a good idea to perform more error checking/handling on the database/ API connection processes.