-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathmain.py
295 lines (241 loc) · 8.29 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
#! /usr/bin/python3
import re
import time
import threading
import subprocess
import logging
from streaming import Streamer, set_volume
import database
from display import Display
from positional_encoders import *
from ui_manager import UI_Manager
from rgb_led import RGB_LED
from scheduler import Scheduler
AUDIO_SERVICE = "pulse"
VOLUME_INCREMENT = 5
state = "start"
volume_display = False
volume = 95
jog = 0
last_jog = 0
state_entry = True
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
ui_manager = UI_Manager()
# This is used to increase the size of the area searched around the coords
# For example, fuzziness 2, latitude 50 and longitude 0 will result in a
# search square 48,1022 to 52,2 (with encoder resolution 1024)
def Look_Around(latitude:int, longitude:int, fuzziness:int):
# Offset fuzziness, so 0 means only the given coords
fuzziness += 1
search_coords = []
# Work out how big the perimeter is for each layer out from the origin
ODD_NUMBERS = [((i * 2) + 1) for i in range(0, fuzziness)]
# With each 'layer' of fuzziness we need a starting point. 70% of people are right-eye dominant and
# the globe is likely to be below the user, so go down and left first then scan horizontally, moving up
for layer in range(0, fuzziness):
for y in range(0, ODD_NUMBERS[layer]):
for x in range(0, ODD_NUMBERS[layer]):
coord_x = (latitude + x - (ODD_NUMBERS[layer] // 2)) % ENCODER_RESOLUTION
coord_y = (longitude + y - (ODD_NUMBERS[layer] // 2)) % ENCODER_RESOLUTION
if [coord_x, coord_y] not in search_coords:
search_coords.append([coord_x, coord_y])
return search_coords
def Back_To_Tuning():
global state
global state_entry
if state != "tuning":
state = "tuning"
state_entry = True
def Clear_Volume_Display():
global volume_display
volume_display = False
def Process_UI_Events():
global state
global state_entry
global volume
global volume_display
global jog
global ui_manager
global encoders_thread
global rgb_led
ui_events = []
ui_manager.update(ui_events)
for event in ui_events:
if event[0] == "Jog":
if event[1] == 1:
# Next station
jog += 1
elif event[1] == -1:
# Previous station
jog -= 1
print(jog)
elif event[0] == "Volume":
if event[1] == 1:
volume += VOLUME_INCREMENT
volume = set_volume(volume)
volume_display = True
scheduler.attach_timer(Clear_Volume_Display, 3)
rgb_led.set_static("BLUE", timeout_sec=0.5, restore_previous_on_timeout=True)
print(("Volume up: {}%").format(volume))
elif event[1] == -1:
if state == "shutdown_confirm":
Back_To_Tuning()
else:
volume -= VOLUME_INCREMENT
volume = set_volume(volume)
volume_display = True
scheduler.attach_timer(Clear_Volume_Display, 3)
rgb_led.set_static("BLUE", timeout_sec=0.5, restore_previous_on_timeout=True)
print(("Volume down: {}%").format(volume))
elif event[0] == "Random":
print("Toggle jog mode - not implemented")
elif event[0] == "Shutdown":
state = "shutdown_confirm"
state_entry = True
elif event[0] == "Calibrate":
# Zero the positional encoders
offsets = encoders_thread.zero()
database.Save_Calibration(offsets[0], offsets[1])
rgb_led.set_static("GREEN", timeout_sec=0.5, restore_previous_on_timeout=True)
print("Calibrated")
display_thread.message(
line_1="",
line_2="Calibrated!",
line_3="",
line_4="")
time.sleep(1)
elif event[0] == "Confirm":
if state == "shutdown_confirm":
state = "shutdown"
state_entry= True
else:
pass
# PROGRAM START
database.Load_Map()
encoder_offsets = database.Load_Calibration()
# Positional encoders - used to select latitude and longitude
encoders_thread = Positional_Encoders(2, "Encoders", encoder_offsets[0], encoder_offsets[1])
encoders_thread.start()
display_thread = Display(3, "Display")
display_thread.start()
rgb_led = RGB_LED(20, "RGB_LED")
rgb_led.start()
scheduler = Scheduler(50, "SCHEDULER")
scheduler.start()
set_volume(volume)
while True:
if state == "start":
# Entry - setup state
if state_entry:
state_entry = False
display_thread.message(
line_1="Radio Globe",
line_2="Made for DesignSpark",
line_3="by Jude Pullen and",
line_4="Donald Robson, 2020")
scheduler.attach_timer(Back_To_Tuning, 3)
elif state == "tuning":
# Entry - setup state
if state_entry:
state_entry = False
rgb_led.set_blink("WHITE")
display_thread.clear()
# Normal operation
else:
coordinates = encoders_thread.get_readings()
search_area = Look_Around(coordinates[0], coordinates[1], fuzziness=3)
location_name = ""
stations_list = []
url_list = []
# Check the search area. Saving the first location name encountered
# and all radio stations in the area, in order encountered
for ref in search_area:
index = database.index_map[ref[0]][ref[1]]
if index != 0xFFFF:
encoders_thread.latch(coordinates[0], coordinates[1], stickiness=3)
state = "playing"
state_entry = True
location = database.Get_Location_By_Index(index)
if location_name == "":
location_name = location
for station in database.stations_data[location]["urls"]:
stations_list.append(station["name"])
url_list.append(station["url"])
# Provide 'helper' coordinates
latitude = round((360 * coordinates[0] / ENCODER_RESOLUTION - 180), 2)
longitude = round((360 * coordinates[1] / ENCODER_RESOLUTION - 180), 2)
if volume_display:
volume_disp = volume
else:
volume_disp = 0
display_thread.update(latitude, longitude,
"Tuning...", volume_disp, "", False)
elif state == "playing":
# Entry - setup
if state_entry:
state_entry = False
jog = 0
last_jog = 0
rgb_led.set_static("RED", timeout_sec=3.0)
streamer = None
# Get display coordinates - from file, so there's no jumping about
latitude = database.stations_data[location]["coords"]["n"]
longitude = database.stations_data[location]["coords"]["e"]
# Play the top station
streamer = Streamer(AUDIO_SERVICE, url_list[jog])
streamer.play()
# Exit back to tuning state if latch has 'come unstuck'
elif not encoders_thread.is_latched():
streamer.stop()
state = "tuning"
state_entry = True
# If the jog dial is used, stop the stream and restart with the new url
elif jog != last_jog:
# Restrict the jog dial value to the bounds of stations_list
jog %= len(stations_list)
last_jog = jog
streamer.stop()
streamer = Streamer(AUDIO_SERVICE, url_list[jog])
streamer.play()
# Idle operation - just keep display updated
else:
if volume_display:
volume_disp = volume
else:
volume_disp = 0
# Add arrows to the display if there is more than one station here
if len(stations_list) > 1:
display_thread.update(latitude, longitude, location_name, volume_disp, stations_list[jog], True)
elif len(stations_list) == 1:
display_thread.update(latitude, longitude, location_name, volume_disp, stations_list[jog], False)
elif state == "shutdown_confirm":
if state_entry:
state_entry = False
display_thread.clear()
time.sleep(0.1)
display_thread.message(
line_1="Really shut down?",
line_2="<- Press mid button ",
line_3="to confirm or",
line_4="<- bottom to cancel.")
# Auto-cancel in 5s
scheduler.attach_timer(Back_To_Tuning, 5)
elif state == "shutdown":
if state_entry:
state_entry = False
display_thread.clear()
time.sleep(0.1)
display_thread.message(
line_1="Shutting down...",
line_2="Please wait 10 sec",
line_3="before disconnecting",
line_4="power.")
subprocess.run(["sudo", "poweroff"])
else:
# Just in case!
state = "tuning"
Process_UI_Events()
# Avoid unnecessarily high polling
time.sleep(0.1)
# Clean up threads
encoders_thread.join()