-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbangMTY.py
executable file
·364 lines (322 loc) · 12.1 KB
/
bangMTY.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
#! /usr/bin/env python
# -*- coding: latin-1 -*-
## References:
# - GPIO: http://bit.ly/JTlFE3 (elinux.org wiki)
# http://bit.ly/QI8sAU (wiring pi)
# http://bit.ly/MDEJVo (wiringpi-python)
# - TWITTER: https://github.com/tweepy/tweepy
# https://github.com/ryanmcgrath/twython
# - GRAPHICS: http://bit.ly/96VoEC (pygame)
# http://bit.ly/XmX8gA (display.set_mode)
# http://bit.ly/Ld5NXV (auto-login)
# - REGEXP: http://bit.ly/5UuJA (Py RegExp Testing Tool)
import os, sys, platform, time
import re
import Queue
import pygame
from pygame.locals import *
from twython import Twython
if not pygame.font: print 'Warning, fonts disabled'
if not pygame.mixer: print 'Warning, sound disabled'
## import wiringpi lib if available
HAS_WIRINGPI = True
try:
import wiringpi
except ImportError:
HAS_WIRINGPI = False
## setup GPIO object (real or fake)
if (HAS_WIRINGPI):
gpio = wiringpi.GPIO(wiringpi.GPIO.WPI_MODE_SYS)
else:
# define fake gpio class with gpio.LOW, gpio.HIGH, gpio.digitalWrite, etc
class Gpio(object):
def __init__(self):
self.LOW = False
self.HIGH = True
self.OUTPUT = 0
def digitalWrite(self,pin=0, val=False):
anEmptyStatement = ""
def pinMode(self,pin=0, val=False):
anEmptyStatement = ""
gpio = Gpio()
######### GPIO pins
MOTOR_PIN = [17, 18]
LIGHT_PIN = [22, 23]
## for setting up the GPIO pins
def setUpGpio():
os.system("gpio export "+str(MOTOR_PIN[0])+" out 2> /dev/null")
os.system("gpio export "+str(MOTOR_PIN[1])+" out 2> /dev/null")
os.system("gpio export "+str(LIGHT_PIN[0])+" out 2> /dev/null")
os.system("gpio export "+str(LIGHT_PIN[1])+" out 2> /dev/null")
gpio.pinMode(MOTOR_PIN[0],gpio.OUTPUT)
gpio.pinMode(MOTOR_PIN[1],gpio.OUTPUT)
gpio.pinMode(LIGHT_PIN[0],gpio.OUTPUT)
gpio.pinMode(LIGHT_PIN[1],gpio.OUTPUT)
gpio.digitalWrite(MOTOR_PIN[0],gpio.LOW)
gpio.digitalWrite(MOTOR_PIN[1],gpio.LOW)
gpio.digitalWrite(LIGHT_PIN[0],gpio.LOW)
gpio.digitalWrite(LIGHT_PIN[1],gpio.LOW)
## for cleaning up the GPIO pins on exit
def cleanUpGpio():
print "Cleaning up GPIO"
gpio.digitalWrite(MOTOR_PIN[0],gpio.LOW)
gpio.digitalWrite(MOTOR_PIN[1],gpio.LOW)
gpio.digitalWrite(LIGHT_PIN[0],gpio.LOW)
gpio.digitalWrite(LIGHT_PIN[1],gpio.LOW)
######### time constants
## how often to check twitter (in seconds)
TWITTER_CHECK_PERIOD = 6
## how often to check queue for new tweets to be processed (in seconds)
## or, how much time between bang routines
QUEUE_CHECK_PERIOD = 1
## how long to keep motors on (in seconds)
MOTOR_ON_PERIOD = 0.5
## how long to wait between turning on motors (in seconds)
MOTOR_OFF_PERIOD = 0.3
## how long to keep each light on (in seconds)
LIGHT_ON_PERIOD = [0.25, 0.333]
## number of bangs per routine
## (this can be dynamic, based on length of tweet, for example)
NUMBER_OF_BANGS = 10
## font size (height in pixels)
FONT_SIZE = 210
## how long to keep text in same position before moving it (in seconds)
TEXT_SCROLL_PERIOD = 0.1
######### state machine
## different states motors can be in
(STATE_WAITING, STATE_BANGING_FORWARD, STATE_BANGING_BACK,
STATE_PAUSE_FORWARD, STATE_PAUSE_BACK) = range(5)
## state/time variables
currentMotorState = STATE_WAITING
bangsLeft = 0
currentLightState = [False, False]
lastTwitterCheck = time.time()
lastMotorUpdate = time.time()
lastLightUpdate = [time.time(), time.time()]
lastTextUpdate = time.time()
## tweet queue
tweetQueue = Queue.Queue()
######### twitter init
twitter = None
twitterAuthenticated = False
twitterResults = None
## get tweets that come after this post made on 2013/02/23
largestTweetId = 305155172542324700
tweetSplit = re.compile("^(.{0,70}) (.{0,100})$")
## with these terms
SEARCH_TERM = "#bangMTY #BangMTY #bangMty #BangMty #bangmty #Bangmty #BANGMTY"
## read secrets from file
inFile = open('oauth.txt', 'r')
secrets = {}
for line in inFile:
(k,v) = line.split()
secrets[k] = v
## get largest Id for tweets that came before starting the program
def getLargestTweetId():
global largestTweetId
if (not twitterResults is None):
for tweet in twitterResults["statuses"]:
print ("Tweet %s from @%s at %s" %
(tweet['id'],
tweet['user']['screen_name'],
tweet['created_at']))
print tweet['text'],"\n"
if (int(tweet['id']) > largestTweetId):
largestTweetId = int(tweet['id'])
## authenticate
def authenticateTwitter():
global twitter, twitterAuthenticated
try:
twitter = Twython(twitter_token = secrets['CONSUMER_KEY'],
twitter_secret = secrets['CONSUMER_SECRET'],
oauth_token = secrets['ACCESS_TOKEN'],
oauth_token_secret = secrets['ACCESS_SECRET'])
twitterAuthenticated = True
## assume connected
searchTwitter()
getLargestTweetId()
except:
twitter = None
twitterAuthenticated = False
## query twitter
def searchTwitter():
global twitterResults
if ((not twitterAuthenticated) or (twitter is None)):
authenticateTwitter()
try:
twitterResults = twitter.search(q=SEARCH_TERM,
include_entities="false",
count="50", result_type="recent",
since_id=largestTweetId)
except:
twitterResults = None
######### Windowing stuff
screen = None
background = None
font = None
textAndPos = None
def setUpWindowing():
global screen, background, font, textAndPos
flags = pygame.FULLSCREEN|pygame.DOUBLEBUF|pygame.HWSURFACE
pygame.init()
screen = pygame.display.set_mode((0, 0),flags)
pygame.display.set_caption('#bangMTY')
pygame.mouse.set_visible(False)
background = pygame.Surface(screen.get_size())
background = background.convert()
background.fill((0,0,0))
font = pygame.font.Font("./otis.ttf", FONT_SIZE)
textAndPos = [{'text':font.render("", 1, (250,0,0)),
'pos':Rect(-2,0,-1,0)}]
screen.blit(background, (0, 0))
pygame.display.flip()
######### Setup
def setup():
setUpGpio()
setUpWindowing()
searchTwitter()
######### Loop
def loop():
global currentMotorState, bangsLeft, currentLightState
global lastTwitterCheck, lastMotorUpdate, lastLightUpdate, lastTextUpdate
global largestTweetId, textAndPos
## handle events
for event in pygame.event.get():
if (event.type == MOUSEBUTTONDOWN):
tweetQueue.put("MÉSSÃGÉD !!!")
elif ((event.type == QUIT) or
(event.type == KEYDOWN and event.key == K_ESCAPE)):
cleanUpGpio()
sys.exit()
## graphics stuff
if ((textAndPos[0]['pos'].right > -1) and
(time.time()-lastTextUpdate > TEXT_SCROLL_PERIOD)):
background.fill((0,0,0))
for tpd in textAndPos:
tpd['pos'].right -= FONT_SIZE/7
background.blit(tpd['text'], tpd['pos'])
screen.blit(background, (0,0))
pygame.display.flip()
lastTextUpdate = time.time()
## twitter check.
if ((currentMotorState==STATE_WAITING) and
(time.time()-lastTwitterCheck > TWITTER_CHECK_PERIOD) and
(textAndPos[0]['pos'].right < 0)):
# check twitter
searchTwitter()
## parse results, print stuff, push on queue
if (not twitterResults is None):
for tweet in twitterResults["statuses"]:
## print
print ("pushing %s from @%s" %
(tweet['text'],
tweet['user']['screen_name']))
## push
tweetQueue.put(tweet['text'])
## update largestTweetId for next searches
if (int(tweet['id']) > largestTweetId):
largestTweetId = int(tweet['id'])
## update timer
lastTwitterCheck = time.time()
## state machine for motors
## if motor is idle, no text is scrolling and there are
## tweets to process, then start dance
if ((currentMotorState==STATE_WAITING) and
(time.time()-lastMotorUpdate > QUEUE_CHECK_PERIOD) and
(textAndPos[0]['pos'].right < 0) and
(not tweetQueue.empty())):
print "BANG FWD"
tweetText = tweetQueue.get()
# create objects to display text
textObjects = []
if (len(tweetText) > 80):
regExpResult = tweetSplit.search(tweetText)
if (not regExpResult is None):
textObjects = regExpResult.groups()
else:
textObjects = [tweetText]
if (len(textObjects) > 0):
textAndPos = []
currentWidth = 0
for subTweet in textObjects:
t = font.render(subTweet+" ", 1, (250,0,0))
p = t.get_rect(left=background.get_width()+currentWidth,
centery=background.get_height()/2)
currentWidth += p.width
textAndPos.insert(0,{'text':t,'pos':p})
# set pins
gpio.digitalWrite(MOTOR_PIN[0],gpio.HIGH)
gpio.digitalWrite(MOTOR_PIN[1],gpio.LOW)
currentMotorState = STATE_BANGING_FORWARD
lastMotorUpdate = time.time()
bangsLeft = NUMBER_OF_BANGS
## if motor0 has been on for a while, pause, then reverse direction
elif ((currentMotorState==STATE_BANGING_FORWARD) and
(time.time()-lastMotorUpdate > MOTOR_ON_PERIOD) and
(bangsLeft > 0)):
gpio.digitalWrite(MOTOR_PIN[0],gpio.LOW)
gpio.digitalWrite(MOTOR_PIN[1],gpio.LOW)
currentMotorState = STATE_PAUSE_BACK
lastMotorUpdate = time.time()
## if we're done pausing, proceed
elif ((currentMotorState==STATE_PAUSE_BACK) and
(time.time()-lastMotorUpdate > MOTOR_OFF_PERIOD) and
(bangsLeft > 0)):
print "BANG BACK"
gpio.digitalWrite(MOTOR_PIN[0],gpio.LOW)
gpio.digitalWrite(MOTOR_PIN[1],gpio.HIGH)
currentMotorState = STATE_BANGING_BACK
lastMotorUpdate = time.time()
bangsLeft -= 1
## if motor1 has been on for a while, pause, then reverse direction
elif ((currentMotorState==STATE_BANGING_BACK) and
(time.time()-lastMotorUpdate > MOTOR_ON_PERIOD) and
(bangsLeft > 0)):
gpio.digitalWrite(MOTOR_PIN[0],gpio.LOW)
gpio.digitalWrite(MOTOR_PIN[1],gpio.LOW)
currentMotorState = STATE_PAUSE_FORWARD
lastMotorUpdate = time.time()
## if we're done pausing, proceed
elif ((currentMotorState==STATE_PAUSE_FORWARD) and
(time.time()-lastMotorUpdate > MOTOR_OFF_PERIOD) and
(bangsLeft > 0)):
print "BANG FWD"
gpio.digitalWrite(MOTOR_PIN[0],gpio.HIGH)
gpio.digitalWrite(MOTOR_PIN[1],gpio.LOW)
currentMotorState = STATE_BANGING_FORWARD
lastMotorUpdate = time.time()
## if no more bangs left
elif((currentMotorState != STATE_WAITING) and
(bangsLeft <= 0) and
(time.time()-lastMotorUpdate > MOTOR_ON_PERIOD)):
print "WAITING"
gpio.digitalWrite(MOTOR_PIN[0],gpio.LOW)
gpio.digitalWrite(MOTOR_PIN[1],gpio.LOW)
gpio.digitalWrite(LIGHT_PIN[0],gpio.LOW)
gpio.digitalWrite(LIGHT_PIN[1],gpio.LOW)
currentMotorState = STATE_WAITING
lastMotorUpdate = time.time()
## state machine for lights: if banging, flicker lights
if (currentMotorState != STATE_WAITING):
for i in range(0,2):
if (time.time()-lastLightUpdate[i] > LIGHT_ON_PERIOD[i]):
currentLightState[i] = not currentLightState[i]
gpio.digitalWrite(LIGHT_PIN[i],currentLightState[i])
lastLightUpdate[i] = time.time()
######### main
def main():
setup()
print "WAITING"
while True:
## keep it from looping faster than ~60 times per second
loopStart = time.time()
loop()
loopTime = time.time()-loopStart
if (loopTime < 0.017):
time.sleep(0.017 - loopTime)
######### magick
if __name__=="__main__":
try:
main()
except KeyboardInterrupt:
cleanUpGpio()