forked from info-beamer/package-kvv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice
executable file
·151 lines (125 loc) · 4.42 KB
/
service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python
import re
import traceback
import time
import json
import urllib2
import pytz
from datetime import datetime, timedelta
from itertools import islice
from hosted import CONFIG, NODE
CONFIG.restart_on_update()
minutes = re.compile("([0-9]+) min")
exact = re.compile("([0-9]+):([0-9]+)")
def current_time():
timezone = pytz.timezone("Europe/Berlin")
now = datetime.utcnow()
now = now.replace(tzinfo=pytz.utc)
now = now.astimezone(timezone)
now = now.replace(tzinfo=None)
return now
def to_unixtimestamp(dt):
return int(time.mktime(dt.timetuple()))
def regenerate():
now = current_time()
def from_html(h):
r = round(1.0 / 255 * int(h[0:2], 16), 2)
g = round(1.0 / 255 * int(h[2:4], 16), 2)
b = round(1.0 / 255 * int(h[4:6], 16), 2)
return r, g, b
mapping = {
"1" : from_html("ed1c24"),
"2" : from_html("0071bc"),
"4" : from_html("ffcb04"),
"5" : from_html("00c0f3"),
"6" : from_html("80c342"),
"6E" : from_html("80c342"),
"S1" : from_html("00a76d"),
"S11" : from_html("00a76d"),
"S2" : from_html("a068aa"),
"S4" : from_html("9f184c"),
"S41" : from_html("9f184c"),
"S5" : from_html("f8aca5"),
"S7" : from_html("ffff33"),
"S8" : from_html("4d4f23"),
}
def parse(stop_info):
departures = []
for dep in stop_info['departures']:
time = dep['time']
if time == '0':
delta = 0
elif minutes.search(time):
m = minutes.match(time)
delta = 60 * int(m.group(1))
elif exact.search(time):
t = exact.match(time)
abfahrt = now.replace(hour=int(t.group(1)), minute=int(t.group(2)), second=0)
if abfahrt < now:
abfahrt += timedelta(days=1)
delta = (abfahrt - now).seconds
date = now + timedelta(seconds = delta)
direction = dep['destination']
r, g, b = mapping.get(dep['route'], (0,0,0))
departures.append(((date, direction), dict(
color_r = r,
color_g = g,
color_b = b,
symbol = dep['route'],
date = to_unixtimestamp(date),
direction = direction,
nice_date = date.strftime('%H:%M'),
more = "",
stop = stop_info['stopName'],
)))
return departures
def download(stop):
url = "https://live.kvv.de/webapp/departures/bystop/%s?maxInfos=0&key=377d840e54b59adbe53608ba1aad70e8" % stop
return urllib2.urlopen(url).read()
departures = []
departures.extend(parse(json.loads(download(CONFIG['stop']))))
departures.sort()
departures = [info for sort_key, info in departures]
# find next run
for n, dep in enumerate(departures):
for follow in islice(departures, n+1, None):
if dep['direction'] == follow['direction'] and \
dep['stop'] == follow['stop'] and \
dep['symbol'] == follow['symbol']:
dep['next_date'] = follow['date']
dep['next_nice_date'] = follow['nice_date']
break
# find duplicates
for n, dep in enumerate(departures):
for follow in islice(departures, n+1, None):
if dep['direction'] == follow['direction'] and \
dep['stop'] != follow['stop'] and \
dep['symbol'] == follow['symbol'] and \
abs(dep['date'] - follow['date']) < 5 * 60:
print "duplicate:"
print dep
print follow
print
dep['duplicate'] = True
break
departures = [dep for dep in departures if not 'duplicate' in dep]
# pprint(departures, width=300)
with file("departures.json", "wb") as f:
f.write(json.dumps(departures,ensure_ascii=False).encode("utf8"))
def send_clock():
NODE['/clock/set'](to_unixtimestamp(current_time()))
def idle(seconds):
end = time.time() + seconds
while time.time() < end:
send_clock()
time.sleep(5)
def main():
while 1:
try:
regenerate()
idle(360)
except Exception:
traceback.print_exc()
time.sleep(30)
if __name__ == "__main__":
main()