About an ETL pipeline that extracts data from S3, stages them in Redshift, and transforms data into a set of dimensional tables.
A music streaming startup, Sparkify, has grown their user base and song database and want to move their processes and data onto the cloud. Their data resides in S3, in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.
They'd like a data engineer to build an ETL pipeline that extracts their data from S3, stages them in Redshift, nd transforms data into a set of dimensional tables for their analytics team to continue finding insights in what songs their users are listening to.
The purpose of this project is to implemeting data warehouse and build an ETL pipeline for a database hosted on Redshift. Project uses AWS S3 and AWS Redshift.
- Load data from S3 to staging tables on Redshift
- Execute SQL statements that create the analytics tables from these staging tables
The twice datasetset reside in S3:
Song data:
s3://udacity-dend/song_data`Log data: s3://udacity-dend/log_data
Log data json path: s3://udacity-dend/log_json_path.json
The first dataset is a subset of real data from the Million Song Dataset. Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song's track ID.
song_data/A/B/C/TRABCEI128F424C983.json
song_data/A/A/B/TRAABJL12903CDCF1A.json
And below is an example of what a single song file, TRAABJL12903CDCF1A.json, looks like.
{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}
The second dataset consists of log files in JSON format generated by this event simulator based on the songs in the dataset above. These simulate app activity logs from an imaginary music streaming app based on configuration settings. The log files in the dataset you'll be working with are partitioned by year and month.
log_data/2018/11/2018-11-12-events.json
log_data/2018/11/2018-11-13-events.json
And below is an example of what the data in a log file, 2018-11-12-events.json, looks like.
First, Postgres and Anaconda are installed on your computer then we will create a Python environment and its dependencies customized to this project, then create cluster and launch ETL
Steps one-by-one
-
Create environment to work in Anaconda
- run
conda create --quiet --yes --name <Your env name> psycopg2 boto3 configparser numpy pandas tabulate botocore
- then
conda install --yes -c conda-forge ipython-sql
- to open do `conda activate
- to close
conda deactivate
- to remove the environment
conda remove --name myenv --all
- and check
conda info --envs
- go in the
sparkify.py
folder
- run
-
Update KEY AND SECRET and your IP(or default 0.0.0.0\0) in dwh.cfg
-
Create Redshift Cluster and all tables run
python sparkify.py
-
Load data in staging tables and insert data in fact and dimensions tables run
python etl.py
-
Test, explore data and run queries with
sparkify_analytic.ypnb
open in Jupyter Notebook -
In you want remove and re-create the tables, run
create_tables.py
-
Delete Redshift Cluster and Iam role run
python delete_cluster.py
- Explore data on 'S3 bucket udacity-dend'.
- song_data contains one record(385253 rows), log_data contains few record(31 rows)
- get the columns name in song_data
['song_id"', '"num_songs"', '"title"', '"artist_name"', '"artist_latitude"', '"year"', '"duration"', '"artist_id"', '"artist_longitude"', '"artist_location"']
- get the columns name in log_data
['artist"', 'auth"', 'firstName"', 'gender"', 'itemInSession"', 'lastName"', 'length"', 'level"', 'location"', 'TX', 'method"', 'page"', 'registration"', 'sessionId"', 'song"', 'status"', 'ts"', 'userAgent"', 'userId"']
- "Access by programming"
- "Attach existing policies directly"
- "AmazonRedshiftFullAccess"
- keep the "Access key Id" and " Secret access key"
[AWS]
KEY=<Your Access key Id>
SECRET=<Your Secret access key>
[IAM_ROLE]
DWH_IAM_ROLE_NAME=dwhRole
...
- Create IAM Role
- Create Redshift Cluster
- Open ports
-
Drop tables if exists
-
Create 2 staging tables to load data because Amazon Redshift doesn't support upsert(update or insert). Then join the staging tables in different tables.
staging_events
with the log_data filesstaging_songs
with the song_data files
-
Create dimensions tables
dimUser
,dimArtist
,dimSong
with their PRIMARY KEY as SORTKEYdimTime
with its PRIMARY KEY as SORTKEY AND as DISTKEY
Example :
time_table_create = ("""CREATE TABLE IF NOT EXISTS dimTime ( start_time timestamp NOT NULL PRIMARY KEY DISTKEY SORTKEY, hour int, day int, week int, month int, year int, weekday varchar ) """)
-
Create a fact tables
factSongplay
artist_id
as SORTKEY andstart_time
as DISTKEY
I choose artist_id
as SORTKEY for the factSongplay table because I need twice for my analytical queries and start_time
as DISTKEY for a better distribution on the slices.
schema | table | tableid | distkey | skew | sortkey | #sks | mbytes | pct_of_total | enc | rows | unsorted_rows | pct_unsorted |
---|---|---|---|---|---|---|---|---|---|---|---|---|
public | factsongplay | 100526 | start_time | 1.000000 | artist_id | 1 | 192 | 0.020000 | y | 333 | 0.000000 | 0.000000 |
public | staging_events | 100524 | None | 1.000000 | None | 0 | 168 | 0.020000 | y | 8056 | nan | nan |
public | dimtime | 100543 | start_time | 1.000000 | start_time | 1 | 160 | 0.020000 | y | 8023 | 0.000000 | 0.000000 |
public | dimartist | 100539 | None | 1.000000 | artist_id | 1 | 128 | 0.010000 | y | 10025 | 0.000000 | 0.000000 |
public | dimuser | 100531 | None | 1.000000 | user_id | 1 | 128 | 0.010000 | y | 105 | 0.000000 | 0.000000 |
public | dimsong | 100535 | None | 1.000000 | song_id | 1 | 128 | 0.010000 | y | 14896 | 0.000000 | 0.000000 |
public | staging_songs | 100385 | None | 1.000000 | None | 0 | 104 | 0.010000 | y | 74480 | nan | nan |
staging_events | staging_songs |
---|---|
artist | num_songs |
auth | artist_id |
firstName | artist_latitude |
gender | artist_longitude |
itemInSession | artist_location |
lastName | artist_name |
length | song_id |
level | title |
location | duration |
method | year |
page | |
registration | |
sessionId | |
song | |
status | |
ts | |
userAgent | |
userId |
songplay | Reference Table |
---|---|
songplay_id | PRIMARY KEY |
start_time | time table |
user_id | user table |
level | |
song_id | song table |
artist_id | artist table |
session_id | |
location | |
user_agent |
user | song | artist | time | ||||
---|---|---|---|---|---|---|---|
user_id | Primary KEY | song_id | PRIMARY KEY | artist_id | PRIMARY KEY | start_time | PRIMARY KEY |
first_name | title | name | hour | ||||
last_name | artist_id | location | day | ||||
gender | year | latitude | week | ||||
level | duration | longitude | month | ||||
year | |||||||
weekday |
- Connect to the database in the cluster
- COPY to load data in staging tables
Example:
staging_songs_copy = (""" copy staging_songs
from 's3://udacity-dend/song_data'
credentials 'aws_iam_role={}'
region 'us-west-2' compupdate off
JSON 'auto' truncatecolumns;
""").format(config.get('IAM_ROLE', 'ARN'))
- INSERT data in fact and Dim tables
Example: Query to insert data in
dimUser
user_table_insert = ("""INSERT INTO dimUser(user_id, first_name, last_name, gender, level)
SELECT DISTINCT userId AS user_id,
firstName AS first_name,
lastName AS last_name,
gender,
level
FROM staging_events
WHERE user_id IS NOT NULL;
- Connect to the database
- Explore data in the Udacity-dend Bucket
- Explore and check staging_songs and staging_events
- Queries
- Explre dimTables and FactTables
- Queries for the analytic team
- Queries to play with Sortkey and Distkey
Example: The average number of sessions per week per user
query = """
SELECT user_id, AVG(Events) AS AVG_sessionUserWeek
FROM (SELECT t.week AS Week,
sp.user_id,
COUNT(*) AS Events
FROM factSongplay AS sp
JOIN dimTime as t
ON sp.start_time=t.start_time
GROUP BY 1, 2) sub
GROUP BY user_id
ORDER BY AVG_sessionUserWeek DESC
LIMIT 5;
"""
pd.read_sql(query, conn_string).style.hide_index()
user_id | avg_sessionuserweek |
---|---|
49 | 8 |
97 | 8 |
80 | 7 |
44 | 5 |
88 | 5 |