-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdata_access.py
91 lines (78 loc) · 2.56 KB
/
data_access.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from typing import Optional
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client, Future
DASK_CLUSTER = "localhost:8786"
async def get_data(country: Optional[str] = None) -> pd.DataFrame:
async with Client(DASK_CLUSTER, asynchronous=True) as client:
path = "data/*.parquet"
if country is not None:
predicates = [
("country", "==", str.capitalize(country)),
]
columns = [
"id",
"first_name",
"last_name",
"country",
"registration_dttm",
"email",
"gender",
"ip_address",
"cc",
"birthdate",
"salary",
"title",
]
if country:
df: dd.DataFrame = dd.read_parquet(
path,
engine="pyarrow-dataset",
columns=columns,
filters=predicates,
)
else:
df: dd.DataFrame = dd.read_parquet(
path,
engine="pyarrow-dataset",
columns=columns,
)
future: Future = client.compute(df)
return await future
async def get_data_for_pie(group_by: str) -> pd.DataFrame:
TOP_N = 10
async with Client(DASK_CLUSTER, asynchronous=True) as client:
path = "data/*.parquet"
df: dd.DataFrame = dd.read_parquet(
path,
engine="pyarrow-dataset",
)
grouped_df = df[group_by].value_counts().nlargest(TOP_N).to_frame()
result = await client.compute(grouped_df)
countries = result.index.to_frame()
result.merge(countries, how="cross")
return result
async def get_salary_data() -> pd.DataFrame:
async with Client(DASK_CLUSTER, asynchronous=True) as client:
path = "data/*.parquet"
df: dd.DataFrame = dd.read_parquet(
path,
engine="pyarrow-dataset",
)
bins = [5000, 10000, 20000, 50000, 100000, 200000, 300000, 400000, 500000]
groups = [
"< 10k",
"< 20k",
"< 50k",
"< 100k",
"< 200k",
"< 300k",
"< 400k",
"< 500k",
]
result = await client.compute(df)
result["grouped_salary"] = pd.cut(result["salary"], bins, labels=groups)
grouped_df = result["grouped_salary"].value_counts().to_frame()
grouped_salary = grouped_df.index.to_frame()
grouped_df.merge(grouped_salary, how="cross")
return grouped_df