forked from ZeLonewolf/osm-overpass-scripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_counts.py
143 lines (136 loc) · 5.76 KB
/
get_counts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import requests
import concurrent.futures
from multiprocessing.pool import ThreadPool
import time,datetime, csv, random
def get_defaults():
with open('defaults.sh') as f:
config=list(map(lambda x: tuple(x.split('=', 1)),filter(lambda x:'=' in x,f.readlines())))
return dict(config)
defaults=get_defaults()
with open('queries/count_tags.op') as f:
op_template=f.read()
url=defaults['DEFAULT_SERVER']+'/api/interpreter'
s=requests.Session()
def log(*args):
msg=f"{datetime.datetime.now().time().isoformat()[:8]} "+' '.join(list(map(str, args)))
print(msg)
try:
with open('log.txt', 'a', encoding='utf8') as f:
print(msg, file=f)
except Exception:
log("Error logging previous message")
def get_query(rel_id,t1,t2,date=None):
if date:
date=date.strftime("[date:\"%FT%H:00:00Z\"]")
else:date=''
# Clean up tags:
t1='"'+t1.strip('"').replace('"=','=').replace('="','=').replace('=','"="')+'"'
t2='"'+t2.strip('"').replace('"=','=').replace('="','=').replace('=','"="')+'"'
return op_template.replace('#AREA',str(int(rel_id)+3600000000)).replace('#TAG1',t1).replace('#TAG2',t2).replace('#DATE',date)
def run_query(query, area_name="", date=""):
strt=time.time()
log(f"Starting for {area_name} on {date}.")
resp=s.post(url,data=query, timeout=3630)
if resp.status_code>=400:
msg=resp.content.decode()
print(msg[msg.find('<body')+6:msg.find('</body')].strip())
resp.raise_for_status()
log(f"Request for {area_name} on {date} took {round(time.time()-strt,1)}s")
return resp.text
def check_if_already_done(filename, data_row):
try:
with open(filename, 'r', encoding='utf8') as f:
t=f.read()
except FileNotFoundError:
return False
return '\n'+data_row[1]+',' in t or t.startswith(data_row[1]+',')
def run_single_request(data_row,t1,t2,filename,date=None):
# data row = [51701,CH,Switzerland]
area_name=','.join(data_row[1:])
rel_id=data_row[0]
#print('Worker started for '+' '.join(list(map(str,data_row))))
try:
q=get_query(rel_id,t1,t2,date)
except requests.HTTPError as err:
log(f"ERROR: {err} with args {t1}, {t2}, {date.strftime('%F')}, {area_name}")
return
if check_if_already_done(filename, data_row):
#print(f"{datetime.datetime.now().time().isoformat()[:8]} Request for {area_name} was already completed.")
return
txt=run_query(q,area_name, str(date)[:10]).strip().split(',')
# Respone has newline at end
with open(filename,mode='a',buffering=256, newline='', encoding='utf8') as f:
writer=csv.writer(f)
writer.writerow(data_row[1:]+txt)
def prepare_inputs(tag1, tag2, datafile, date_start, date_end, date_step, fname_template):
"""Given tags and date range, generate requests to be processed.
Parameters:
datafile (str): Name of a file that contains output of all_country_names_ids
date_start (datetime.datetime): Start of date range
date_end (datetime.datetime): End of date range
date_step (datetime.timedelta): Step between two dates
fname_template (str): Output filename for CSV. MUST contain phrase #DATE, which will be replaced with actual iso date.
Returns:
list:List suitable to be handed to run_single_request as argument
"""
# Quick basic tests to verify argument types
assert '#DATE' in fname_template
assert date_start-date_step < date_end
date_start=datetime.datetime.combine(date_start.date(), datetime.datetime.min.time())
c=0
total=(date_end-date_start).days//date_step.days+1
while date_start < date_end:
c+=1
fname = fname_template.replace("#DATE",date_start.strftime("%F"))
log(f"Date {c} of {total}")
def generator_day():
with open(datafile, encoding='utf8') as f:
file=csv.reader(f)
for row in file:
yield (row, tag1,tag2,fname, date_start)
yield generator_day
date_start+=date_step
def prepare_inputs_2(tag1, tag2, datafile, date_start, date_end, date_step, fname_template):
"""
Alternative generator generator iterates over countries
instead of dates to give more uniform time consumption.
"""
# Quick basic tests to verify argument types
assert '#DATE' in fname_template
assert date_start-date_step < date_end
with open(datafile, encoding='utf8') as f:
file=csv.reader(f)
for row in file:
log(f"Processing {row[2]}")
def generator_country():
date_strt=datetime.datetime.combine(date_start.date(), datetime.datetime.min.time())
while date_strt < date_end:
fname = fname_template.replace("#DATE",date_strt.strftime("%F"))
yield (row, tag1,tag2,fname, date_strt)
date_strt+=date_step
yield generator_country
def rsr_wrap(args):
#print('Started for '+str(args))
time.sleep(round(random.random()/3,2))
try:
run_single_request(*args)
except Exception as err:print(err)
def perform_web_requests(inputs_generator, pool_size):
pool=ThreadPool(pool_size)
pool.imap(rsr_wrap, inputs_generator())
log(f"Waiting for requests to complete.")
pool.close()
pool.join()
log("Process started")
inputs=prepare_inputs_2(defaults['DEFAULT_TAG1'],
defaults['DEFAULT_TAG2'],
'sample list of countries.txt',
datetime.datetime(2020,12,1),
datetime.datetime(2021,12,16),
datetime.timedelta(7),
r'history\tags-#DATE.csv')
for days_requests in inputs:
perform_web_requests(days_requests, 3)
# Ideally it should wait to complete with day before moving on.
log("Process finished successfully")
#rsr_wrap(list(inputs)[0])