Skip to content

Commit

Permalink
refactored app and added some charts
Browse files Browse the repository at this point in the history
  • Loading branch information
ttomasz committed Jan 31, 2023
1 parent 6e723c4 commit fa9850b
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 48 deletions.
23 changes: 23 additions & 0 deletions datastructures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from datetime import datetime
from typing import NamedTuple

import pandas as pd


class DatasetStats(NamedTuple):
query: str
min_opened_date: datetime
max_opened_date: datetime


class StatsForYear(NamedTuple):
query: str | None
number_of_changesets: int | None
number_of_unique_users: int | None
number_of_object_changes: int | None
number_of_comments: int | None


class DataFrameResult(NamedTuple):
query: str
df: pd.DataFrame
146 changes: 98 additions & 48 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,15 @@
import os
from datetime import datetime
from typing import NamedTuple

import duckdb
import pandas as pd
import streamlit as st


class DatasetStats(NamedTuple):
query: str
min_opened_date: datetime
max_opened_date: datetime


class StatsForYear(NamedTuple):
query: str
number_of_changesets: int
number_of_unique_users: int
number_of_object_changes: int
number_of_comments: int


class DataFrameResult(NamedTuple):
query: str
df: pd.DataFrame


# init and prepare duckdb 🦆
db = duckdb.connect(database=":memory:")
db.execute("install 'httpfs'; load 'httpfs'; set s3_region='eu-central-1';")

url_template = os.environ.get("url_template", "s3://tt-osm-changesets/full_by_year/{year}.zstd.parquet")


def get_delta(current_value: int, previous_value: int | None) -> str | None:
if previous_value is None:
return None
return f"{(current_value - previous_value):+,d}"
from datastructures import StatsForYear, DataFrameResult, DatasetStats
from utils import get_delta, paths_for_years


# ------------------------------------------------------------------------------------------------------
# functions
# ------------------------------------------------------------------------------------------------------
@st.experimental_memo
def fetch_one(query: str) -> tuple:
return db.execute(query).fetchone() # type: ignore
Expand All @@ -50,7 +21,7 @@ def fetch_df(query: str) -> DataFrameResult:
return DataFrameResult(query, result.df())


def min_max_timestamps() -> DatasetStats:
def min_max_timestamps(url_template: str) -> DatasetStats:
query = f"""
SELECT
min(created_at) start_range,
Expand All @@ -61,7 +32,7 @@ def min_max_timestamps() -> DatasetStats:
return DatasetStats(query, *result)


def year_stats(year: int) -> StatsForYear:
def year_stats(year: int, url_template: str) -> StatsForYear:
query = f"""
SELECT
count(*) number_of_changesets,
Expand All @@ -74,7 +45,7 @@ def year_stats(year: int) -> StatsForYear:
return StatsForYear(query, *result)


def most_popular_editors(year: int) -> DataFrameResult:
def most_popular_editors(year: int, url_template: str) -> DataFrameResult:
query = f"""
SELECT
CASE
Expand Down Expand Up @@ -114,7 +85,7 @@ def most_popular_editors(year: int) -> DataFrameResult:
return fetch_df(query)


def get_sample_data(year: int) -> DataFrameResult:
def get_sample_data(year: int, url_template: str) -> DataFrameResult:
query = f"""
SELECT *
FROM '{url_template.format(year=year)}'
Expand All @@ -123,7 +94,7 @@ def get_sample_data(year: int) -> DataFrameResult:
return fetch_df(query)


def most_reported_locale(year: int) -> DataFrameResult:
def most_reported_locale(year: int, url_template: str) -> DataFrameResult:
query = f"""
SELECT
coalesce(locale, '<unknown>') reported_locale,
Expand All @@ -140,10 +111,71 @@ def most_reported_locale(year: int) -> DataFrameResult:
return fetch_df(query)


st.markdown("""
def new_users(year: int, url_template: str) -> DataFrameResult:
paths_for_previous_years = paths_for_years(2005, year, url_template)
sep = ",\n" + " " * 8
query = f"""
WITH
users_who_previously_edited as (
SELECT DISTINCT uid
FROM parquet_scan([
{sep.join(paths_for_previous_years)}
])
),
users_who_edited_current_year as (
SELECT DISTINCT uid
FROM parquet_scan('{url_template.format(year=year)}')
)
SELECT
CASE
WHEN pe.uid IS NULL THEN true
ELSE false
END users_who_did_not_edit_before,
count(*) number_of_users
FROM users_who_edited_current_year cu
LEFT JOIN users_who_previously_edited pe USING(uid)
GROUP BY 1
""".strip()
return fetch_df(query)


def stats_over_years(url_template: str) -> DataFrameResult:
sep = ",\n" + " " * 4
query = f"""
SELECT
regexp_extract(filename, '([0-9]{{4}})', 1) as "year",
sum(num_changes)::bigint as number_of_changes,
count(*) as number_of_changesets,
count(distinct uid) as number_of_unique_users
FROM parquet_scan([
{sep.join(paths_for_years(2005, 2022, url_template))}
], FILENAME = 1)
GROUP BY 1
ORDER BY 1
""".strip()
return fetch_df(query)


# ------------------------------------------------------------------------------------------------------
# init
# ------------------------------------------------------------------------------------------------------
# create in-memory duckdb database 🦆
db = duckdb.connect(database=":memory:")
db.execute("install 'httpfs'; load 'httpfs'; set s3_region='eu-central-1';")

data_url_template = os.environ.get("url_template", "s3://tt-osm-changesets/full_by_year/{year}.zstd.parquet")

# ------------------------------------------------------------------------------------------------------
# beginning of the app
# ------------------------------------------------------------------------------------------------------
st.markdown(
"""
# OpenStreetMap changesets
This Streamlit app queries remote Parquet files with info from changeset dump downloaded from planet.osm.org.
This Streamlit app queries remote Parquet files with info from
[OpenStreetMap changeset](https://wiki.openstreetmap.org/wiki/Changeset)
dump downloaded from [planet.osm.org](https://planet.osm.org).
I described conversion process here: https://ttomasz.github.io/2023-01-30/spark-read-xml
Thanks to DuckDB we can query files hosted on S3 storage without having to download everything (~7 GB).
Although for running larger analyses this is a better course of action since it's order of magnitude faster.
Expand All @@ -158,29 +190,37 @@ def most_reported_locale(year: int) -> DataFrameResult:
First let's see range of timestamps in our files. Give it some time to load the data.
""")
dataset_stats = min_max_timestamps()
dataset_stats = min_max_timestamps(data_url_template)
with st.expander("SQL query", expanded=False):
st.code(dataset_stats.query, language="sql")
st.metric("Minimum changeset opening datetime", dataset_stats.min_opened_date.isoformat())
st.metric("Maximum changeset opening datetime", dataset_stats.max_opened_date.isoformat())

st.write("Let's see some charts")
stats = stats_over_years(data_url_template)
stats.df.columns = stats.df.columns.map(lambda x: str(x).replace("_", " ").title())
with st.expander("SQL query", expanded=False):
st.code(stats.query, language="sql")
st.line_chart(data=stats.df, x="year", y="number_of_changes".replace("_", " ").title())
st.line_chart(data=stats.df, x="year", y="number_of_changesets".replace("_", " ").title())
st.line_chart(data=stats.df, x="year", y="number_of_unique_users".replace("_", " ").title())

# year = st.slider("Select year for analysis:", min_value=2005, max_value=2023, value=2023, step=1)
year_options = tuple(range(2005, 2024))
selected_year = st.selectbox("Select year for analysis:", options=year_options, index=len(year_options)-1)
selected_year = st.selectbox("Select year for analysis:", options=year_options, index=len(year_options) - 1)
previous_year = selected_year - 1 if selected_year > 2005 else None

st.write("A sample of data from Parquet files:")
sample_data = get_sample_data(selected_year)
sample_data = get_sample_data(selected_year, data_url_template)
with st.expander("SQL query", expanded=False):
st.code(sample_data.query, language="sql")
st.dataframe(data=sample_data.df, use_container_width=True)

st.markdown(f"## In {selected_year} there were:")

stats_for_year = year_stats(selected_year)
stats_for_year = year_stats(selected_year, data_url_template)
if previous_year is not None:
stats_for_previous_year = year_stats(previous_year)
stats_for_previous_year = year_stats(previous_year, data_url_template)
else:
stats_for_previous_year = StatsForYear(None, None, None, None, None)
with st.expander("SQL query", expanded=False):
Expand Down Expand Up @@ -208,15 +248,25 @@ def most_reported_locale(year: int) -> DataFrameResult:
)

st.markdown("### Most popular editors")
editor_result = most_popular_editors(selected_year)
editor_result = most_popular_editors(selected_year, data_url_template)
editor_result.df.columns = editor_result.df.columns.map(lambda x: str(x).replace("_", " ").title())
with st.expander("SQL query", expanded=False):
st.code(editor_result.query, language="sql")
st.dataframe(data=editor_result.df, use_container_width=True)

st.markdown("### Most reported locale")
locale_result = most_reported_locale(selected_year)
locale_result = most_reported_locale(selected_year, data_url_template)
locale_result.df.columns = locale_result.df.columns.map(lambda x: str(x).replace("_", " ").title())
with st.expander("SQL query", expanded=False):
st.code(locale_result.query, language="sql")
st.dataframe(data=locale_result.df, use_container_width=True)

if selected_year > 2005:
st.markdown("### New vs old users")
new_users_result = new_users(selected_year, data_url_template)
new_users_result.df.columns = new_users_result.df.columns.map(lambda x: str(x).replace("_", " ").title())
with st.expander("SQL query", expanded=False):
st.code(new_users_result.query, language="sql")
st.dataframe(data=new_users_result.df, use_container_width=True)
else:
st.empty()
12 changes: 12 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
def get_delta(current_value: int, previous_value: int | None) -> str | None:
if previous_value is None:
return None
return f"{(current_value - previous_value):+,d}"


def stringify(path: str) -> str:
return f"'{path}'"


def paths_for_years(start_year: int, end_year: int, url_template: str) -> list[str]:
return [stringify(url_template.format(year=year)) for year in range(start_year, end_year + 1)]

0 comments on commit fa9850b

Please sign in to comment.