-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathnew_pipeline.py
146 lines (108 loc) · 3.22 KB
/
new_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import pandas as pd
import requests
import folium
import luigi
YEARS = {2012: 'http://opendata.dc.gov/datasets/5f4ea2f25c9a45b29e15e53072126739_7.csv',
2013: 'http://opendata.dc.gov/datasets/4911fcf3527246ae9bf81b5553a48c4d_6.csv',
2014: 'http://opendata.dc.gov/datasets/d4891ca6951947538f6707a6b07ae225_5.csv',
2015: 'http://opendata.dc.gov/datasets/981c105beef74af38cc4090992661264_25.csv',
2016: 'http://opendata.dc.gov/datasets/5d14ae7dcd1544878c54e61edda489c3_24.csv'}
class DownloadTask(luigi.ExternalTask):
"""
downloads data from the portal.
"""
year = luigi.IntParameter(default=2012)
def run(self):
pass
def output(self):
pass
class cleanCSV(luigi.Task):
"""
cleans a CSV into the format we'd like for analysis.
you'll want to grab the Ward, Fees, Permit, and Geospacial fees.
"""
def requires(self):
return DownloadTask(self.param)
def run(self):
pass
def output(self):
pass
class mergeDatasets(luigi.Task):
"""
merges the datasets
"""
def requires(self):
return [cleanCSV(year) for year in range(2012,2017)]
def run(self):
pass
def output(self):
pass
class importIntoPandasDF(luigi.Task):
"""
converts the CSVs into pandas dataframes, saves as pickle file.
"""
def requires(self):
return mergeDatasets
def run(self):
pass
def output(self):
pass
class computeWards(luigi.Task):
"""
compute the development by ward per year and save.
Basically, you'll want to value_counts() on the ward field.
How many permits per ward were issued
"""
def requires(self):
return importIntoPandasDF()
def run(self):
pass
def output(self):
pass
class makeMap(luigi.Task):
"""
make a map of development
"""
def requires(self):
return importIntoPandasDF()
def run(self):
"""
We're gonna use Folium to make a map.
I'm giving you some basic code here to get a map object and show you
how to plot a marker.
"""
# make a map
map= folium.Map(location=[38.9072, -77.0369],
zoom_start=12)
# add a marker
folium.Marker([45.3288, -121.6625], popup='Mt. Hood Meadows').add_to(map)
pass
def output(self):
pass
class makePredictions(luigi.Task):
"""
Make predictions for given next years number of ward development,
Given the ward sums, predict the values using a simple regression.
NB: This is not good modeling, but I'm trying to demostrate plumbing here.
"""
def requires(self):
return computeWards()
def run(self):
pass
def output(self):
pass
class makeReport(luigi.Task):
"""
Gathers info and saves to report in HTML.
You'll want to add the predictions, counts and map (via iframe) to a text file that is
valid HTML.
"""
def requires(self):
return makePredictions(), makeMap()
def run(self):
pass
def output(self):
pass
if __name__ == '__main__':
luigi.run(['DownloadTask', '--local-scheduler']
)