-
Notifications
You must be signed in to change notification settings - Fork 1
/
pircam
executable file
·367 lines (337 loc) · 16.6 KB
/
pircam
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
#!/usr/bin/python
# pircam, 2019 by Niklaus Fankhauser
# Motion detection sensor triggered capture and combination of MJPEG video and audio from D-Link network cameras.
import os, sys, time, threading, urllib2, subprocess, xmpp, signal, base64, SocketServer
from tinkerforge.ip_connection import IPConnection
from tinkerforge.bricklet_motion_detector import BrickletMotionDetector
from tinkerforge.bricklet_motion_detector_v2 import BrickletMotionDetectorV2
# Globals
config_file = '/opt/pircam_config.txt'
current_video_thread, current_audio_thread = {}, {} # One thread per camera, for video and audio
current_convert_thread = None # Only one convert thread, for video and audio
video_queue, audio_queue = {}, {} # One queue per camera
sensorAPI = {} # Motion detector dictionary
cbFuns = {} # Callback function dictionary
ipcon = {} # TCP/IP connections
cam_offline_time = {} # Dictionary for times in seconds cameras are offline
class GracefulKiller:
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self,signum, frame):
self.kill_now = True
def load_config(fn, section):
"""Load specified section of config file into dictionary."""
cdict, current_section = {}, ''
for i in open(fn):
if not i.strip(): continue
isp = i.strip().split('=')
if len(isp) == 1:
title = isp[0].strip()
if title in ['GLOBAL', 'CAMERA', 'SENSOR']: current_section = title
else: device = title
if current_section != section: continue
if len(isp) == 2:
key, value = isp[0].strip(), isp[1].strip()
if current_section == 'GLOBAL':
cdict[key] = value
else:
if not cdict.has_key(device): cdict[device] = {}
cdict[device][key] = value
return cdict
def logg(x):
"""Writes message and formated timestamp to logfile."""
x = '%s: %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), x)
open(config['logfile'], 'a').write(x + '\n')
def urllib2_request(inURL):
if '@' in inURL and '//' in inURL and ':' in inURL:
username, password = inURL.split('@')[0].split('//')[1].split(':')
url = inURL.replace('%s:%s@' % (username, password), '')
request = urllib2.Request(url)
base64string = base64.b64encode('%s:%s' % (username, password))
request.add_header("Authorization", "Basic %s" % base64string)
return request
else: return inURL
def download_mjpeg(CAMID):
"""Download specified size from MJPEG stream by HTTP using urllib2 and append filename to video-queue."""
global video_queue
ofn = '%s/%s_mjpeg/%s.mjpeg' % (config['data_dir'], CAMID, time.strftime('%Y%m%d_%H%M%S'))
try:
stream = urllib2.urlopen(urllib2_request(cameras[CAMID]['mjpeg']), timeout=30).read(int(cameras[CAMID]['read']))
except:
logg('%s: Video download error!' % CAMID)
return
open(ofn, 'w').write(stream)
if not video_queue.has_key(CAMID): video_queue[CAMID] = []
video_queue[CAMID].append(ofn)
logg('%s: Downloaded %2.2f MB to %s' % (CAMID, len(stream)/1024.0/1024.0, ofn))
def log_and_run(cmd):
"""Run command using subprocess.call and only log in case of error."""
if config['verbose'] == 'True': logg(' '.join(cmd))
rcode = subprocess.call(cmd)
if rcode > 0: logg('%s (%d)' % (' '.join(cmd), rcode))
def download_audio(CAMID):
"""Download AAC audio in a MP4 container from RTSP using ffmpeg and add filename to audio-queue."""
global audio_queue
ofn = '%s/%s_mp4/%s.mp4' % (config['data_dir'], CAMID, time.strftime('%Y%m%d_%H%M%S'))
cmd = 'ffmpeg', '-hide_banner', '-loglevel', 'panic', '-y', '-i', cameras[CAMID]['audio'], '-vn', '-acodec', 'copy', '-t', config['rec_sec'], ofn
log_and_run(cmd)
if not os.path.isfile(ofn):
logg('%s: Audio download failed!' % CAMID)
return
if not audio_queue.has_key(CAMID): audio_queue[CAMID] = []
audio_queue[CAMID].append(ofn)
logg('%s: Downloaded %2.2f KB to %s' % (CAMID, os.stat(ofn).st_size/1024.0, ofn))
def extract_first_jpeg(mjpeg_data):
"""Exctracts first image from MJPEG stream."""
header = mjpeg_data.split('\r\n\r\n')[0]
if not 'Length: ' in header: return ''
content_length = header.split('Length: ')[1].split('\r\n')[0].strip()
try: content_length = int(content_length)
except: return ''
data_start = len(header) + 4
if len(mjpeg_data) <= data_start + content_length: return ''
return mjpeg_data[data_start:(data_start + content_length)]
def upload_to_webserver(fn, remote_path):
"""Run a rsync command to upload a file to a path on the webserver."""
cmd = 'rsync', fn, '%s:%s/' % (config['webserver'], remote_path)
log_and_run(cmd)
def filebase(x):
"""Returns filename without path and extension."""
return os.path.splitext(os.path.basename(x))[0]
def timeFormat(x):
"""Convert YYYYMMDD_HHMMSS to YYYY-MM-DD_HH:MM:SS"""
return time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(filebase(x), "%Y%m%d_%H%M%S"))
def index_element(fpath):
"""Generates variables required for an element in photoswipe."""
src = fpath.replace(config['data_dir'], '')
w, h = config['size_x'], config['size_y']
video = src.replace('jpg', 'ogg').strip('/')
#if True in [k in fpath for k, v in cameras.items() if 'pipir' in v]: video = video.replace('ogg', 'mp4')
title = timeFormat(fpath)
pid = filebase(fpath)
return "{src:'%s', w:%s, h:%s, title:'%s', video:'%s', pid:'%s'}" % (src, w, h, title, video, pid)
def generate_index_page():
"""Generates HTML page for the numLast recorded videos with photoswipe user interface."""
path = lambda x : '%s/%s' % (config['data_dir'], x)
ftup = [ (f, '%s/%s' % (path(c), f)) for c in cameras.keys() for f in os.listdir(path(c)) if f.endswith('jpg') ]
image_paths = [ fpath for (fn, fpath) in sorted(ftup) ]
if config['verbose'] == 'True': logg('Total %d image files' % len(image_paths))
elemlist = [ index_element(fpath) for fpath in reversed(image_paths) ]
photoswipe_script = 'var items = [\n%s\n];\n' % ',\n'.join(elemlist[:int(config['numLast'])])
scriptPath = '%s/pircam.js' % config['data_dir']
open(scriptPath, 'w').write(photoswipe_script)
if config['verbose'] == 'True': logg('List of %d files written to %s' % (int(config['numLast']), scriptPath))
def run_converter(CAMID):
"""Matches audio to video and combines using ffmpeg, also comnverting to ogg theora/vorbis.
Extracts first image, creates the index-page and uploads all to the webserver."""
global video_queue, audio_queue, cameras
local_video_queue, local_audio_queue = list(video_queue[CAMID]), list(audio_queue[CAMID]) # Copy queue
video_queue[CAMID], audio_queue[CAMID] = [], [] # Empty queue
if config['verbose'] == 'True': logg('%s: Converter started: %d video, %d audio files' % (CAMID, len(local_video_queue), len(local_audio_queue)))
unixtime = lambda x : time.mktime(time.strptime(x, "%Y%m%d_%H%M%S"))
audio_base = [ filebase(i) for i in local_audio_queue ]
fps = int(cameras[CAMID]['fps'])
video_sec = 0
for vfn in local_video_queue:
if not os.path.isfile(vfn):
logg('%s: Missing video!' % CAMID)
continue
jfn = '%s/%s/%s.jpg' % (config['data_dir'], CAMID, filebase(vfn))
ofn = '%s/%s/%s.ogg' % (config['data_dir'], CAMID, filebase(vfn))
mjpeg_data = open(vfn).read()
nb_frames = mjpeg_data.count('image/jpeg')
jpeg_data = extract_first_jpeg(mjpeg_data)
if len(jpeg_data) == 0:
logg('%s: Broken video!' % CAMID)
continue
open(jfn, 'w').write(jpeg_data)
if config['verbose'] == 'True': logg('%s: Image of %2.2f KB saved to %s' % (CAMID, len(jpeg_data)/1024.0, jfn))
video_sec = nb_frames / float(fps)
if config['verbose'] == 'True': logg('%s: %s contains %d frames -> %2.2f seconds' % (CAMID, vfn, nb_frames, video_sec))
if nb_frames < fps:
logg('%s: Truncated video!' % CAMID)
continue
if local_audio_queue:
video_time = unixtime(filebase(vfn))
audio_diff = [abs(unixtime(i) - video_time) for i in audio_base]
afn = local_audio_queue[audio_diff.index(min(audio_diff))]
cmd = 'ffmpeg', '-hide_banner', '-loglevel', 'panic', '-y', '-r', str(fps), '-fflags', 'discardcorrupt', '-i', vfn, '-i', afn, '-codec:v', 'libtheora', '-qscale:v', '7', ofn
else:
cmd = 'ffmpeg', '-hide_banner', '-loglevel', 'panic', '-y', '-r', str(fps), '-fflags', 'discardcorrupt', '-i', vfn, '-codec:v', 'libtheora', '-qscale:v', '7', ofn
log_and_run(cmd)
if not os.path.isfile(ofn):
logg('%s: Failed to create %s!' % (CAMID, ofn))
continue
logg('%s: Video of %2.2f MB created at %s' % (CAMID, os.stat(ofn).st_size/1024.0/1024.0, ofn))
upload_to_webserver(ofn, '%s/%s' % (config['data_dir'], CAMID))
upload_to_webserver(jfn, '%s/%s' % (config['data_dir'], CAMID))
if video_sec > 0: cameras[CAMID]['read'] = int(int(cameras[CAMID]['read']) * int(config['rec_sec']) / video_sec)
if cameras[CAMID]['read'] > int(config['max_down']): cameras[CAMID]['read'] = int(config['max_down'])
if config['verbose'] == 'True': logg('%s: New MJPEG read length is %2.2f MB' % (CAMID, cameras[CAMID]['read']/1024.0/1024.0))
generate_index_page()
upload_to_webserver('%s/pircam.js' % config['data_dir'], config['webRoot'])
upload_to_webserver(config['logfile'], config['webRoot'])
if config['verbose'] == 'True': logg('%s: Converter thread finished' % CAMID)
def send_jabber(CAMID):
"""Sends a jabber message containing activated cam to the recipient specified in config."""
jid = xmpp.protocol.JID(config['xmpp_from'])
cl = xmpp.Client(config['xmpp_server'], debug=[])
if not cl.connect(use_srv=False): return
if not cl.auth(jid.getNode(), config['xmpp_pass']): return
mid = cl.send(xmpp.protocol.Message(config['xmpp_to'], CAMID))
if config['verbose'] == 'True': logg('%s: Jabber message id %s sent' % (CAMID, str(mid)))
def cb_motion_detected(CAMID):
"""Callback function for motion detection starts threads for video/audio download, if possible."""
global current_video_thread, current_audio_thread, audio_queue
if os.path.isfile(config['pause_file']):
logg('%s: Motion, paused' % CAMID)
return
if current_video_thread.has_key(CAMID) and current_video_thread[CAMID].isAlive(): return
current_video_thread[CAMID] = threading.Thread(target=download_mjpeg, args=[CAMID])
current_video_thread[CAMID].start()
logg('%s: Motion, video thread started' % CAMID)
send_jabber(CAMID)
if cameras[CAMID]['audio'] == 'None':
audio_queue[CAMID] = []
return
if current_audio_thread.has_key(CAMID) and current_audio_thread[CAMID].isAlive(): return
current_audio_thread[CAMID] = threading.Thread(target=download_audio, args=[CAMID])
current_audio_thread[CAMID].start()
logg('%s: Motion, audio thread started' % CAMID)
def is_idle():
"""Returns True if no download or convert threads are running."""
for CAMID in cameras.keys():
if current_video_thread.has_key(CAMID) and current_video_thread[CAMID].isAlive():
if config['verbose'] == 'True': logg('%s: video isAlive' % CAMID)
return False
if current_audio_thread.has_key(CAMID) and current_audio_thread[CAMID].isAlive():
if config['verbose'] == 'True': logg('%s: audio isAlive' % CAMID)
return False
if current_convert_thread != None and current_convert_thread.isAlive():
if config['verbose'] == 'True': logg('Converter isAlive')
return False
return True
def configure_motion_detector(uid):
"""Turn off LED, set sensitivity and register motion detection callback function."""
global sensorAPI, cbFuns
try: sensorAPI[uid].set_status_led_config(0) # Turn off status LED (is on after bricklet reset)
except:
logg("Failed to configure sensor %s" % uid)
return
if sensors[uid]['vers'] == '2': sensorAPI[uid].set_sensitivity(sensors[uid]['sensi']) # Set sensitivity
cbFuns[uid] = lambda : cb_motion_detected(sensors[uid]['cam']) # Create callback function
sensorAPI[uid].register_callback(sensorAPI[uid].CALLBACK_MOTION_DETECTED, cbFuns[uid]) # Register callback function
logg('Callback for %s at %s to %s registered' % (uid, sensors[uid]['ip'], sensors[uid]['cam']))
def is_camera_online(CAMID):
try: stream = urllib2.urlopen(urllib2_request(cameras[CAMID]['mjpeg']), timeout=30).read(200)
except: return False
if 'jpeg' in stream: return True
return False
def check_camera_status():
global cam_offline_time
for CAMID in cameras.keys():
if not cameras[CAMID].has_key('mjpeg'): continue
now = time.time()
offline_sec = now - cam_offline_time.get(CAMID, now)
jtext = '%s offline for %2.1f hours!' % (CAMID, offline_sec / 3600)
if is_camera_online(CAMID) == True: cam_offline_time.pop(CAMID, None)
else:
if offline_sec >= int(config['warning_sec']):
send_jabber(jtext)
logg(jtext)
if not cam_offline_time.has_key(CAMID):
cam_offline_time[CAMID] = time.time()
class PiSignalHandler(SocketServer.BaseRequestHandler):
def handle(self):
ip_data = self.request.recv(50).strip()
logg('%s from %s' % (ip_data, self.client_address[0]))
if not ':' in ip_data: logg('Bad data'); return
if 'CAMID' in ip_data:
msg_id, CAMID = ip_data.split(':')
return cb_motion_detected(CAMID)
camid, fn_mp4 = ip_data.split(':')
if not cameras.has_key(camid): logg('Bad camera'); return
if not cameras[camid].has_key('pipir'): logg('Missing pipir path'); return
if not cameras[camid].has_key('addr'): logg('Missing pipir addr'); return
if cameras[camid]['addr'] != self.client_address[0]: logg('Bad client addr'); return
local_path = '%s/%s' % (config['data_dir'], camid)
path_mp4 = '%s/%s' % (local_path, fn_mp4)
path_jpg = '%s/%s' % (local_path, fn_mp4.replace('mp4', 'jpg').replace('ogg', 'jpg'))
cmd1 = '/usr/bin/rsync', '%s/%s' % (cameras[camid]['pipir'], fn_mp4), path_mp4
cmd2 = '/usr/bin/ffmpeg', '-hide_banner', '-loglevel', 'panic', '-i', path_mp4, '-vf', 'select=eq(n\,5)', path_jpg
cmd3 = '/usr/bin/mogrify', '-scale', '1280x720', path_jpg
for cmd in [cmd1, cmd2, cmd3]: log_and_run(cmd)
webserver_path = '%s/%s' % (config['data_dir'], camid)
upload_to_webserver(path_jpg, webserver_path)
upload_to_webserver(path_mp4, webserver_path)
generate_index_page()
upload_to_webserver('%s/pircam.js' % config['data_dir'], config['webRoot'])
upload_to_webserver(config['logfile'], config['webRoot'])
if not ':' in cameras[camid]['pipir']: return
client_id, client_path = cameras[camid]['pipir'].split(':')
cmd4 = '/usr/bin/ssh', client_id, 'mv', '%s/%s' % (client_path, fn_mp4), '%s_ok/%s' % (client_path, fn_mp4)
log_and_run(cmd4)
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
pass
# Load configuration into dictionaries
sensors = load_config(config_file, 'SENSOR')
cameras = load_config(config_file, 'CAMERA')
config = load_config(config_file, 'GLOBAL')
logg('%d sensors, %d cameras' % (len(sensors), len(cameras)))
# Create IP connections to all bricks required by the sensors
for uid in sensors.keys():
ip = sensors[uid]['ip']
if not ipcon.has_key(ip):
ipcon[ip] = IPConnection() # Create IP connection
try: ipcon[ip].connect(ip, int(config['brick_port'])) # Connect to brickd
except:
logg('%s: Bricklet unreachable' % uid)
continue
# Create motion detector sensor device object using the IP connection
if sensors[uid]['vers'] == '1': sensorAPI[uid] = BrickletMotionDetector(uid, ipcon[ip])
if sensors[uid]['vers'] == '2': sensorAPI[uid] = BrickletMotionDetectorV2(uid, ipcon[ip])
configure_motion_detector(uid)
logg('IP connections: %d' % len(ipcon))
reconfigure_time = time.time()
check_time = time.time()
killer = GracefulKiller()
tcp_server = ThreadedTCPServer((config['server_addr'], 22333), PiSignalHandler)
server_thread = threading.Thread(target=tcp_server.serve_forever)
server_thread.start()
logg('TCP server running')
while True: # Main loop
threading.Event().wait(int(config['sleep_sec'])) # Sleep, but wake for events.
# Start convert thread per camera, if there are videos queued and system is idle.
idle_status = is_idle()
for CAMID in cameras.keys():
if not video_queue.has_key(CAMID): continue
if len(video_queue[CAMID]) == 0: continue
if idle_status or len(video_queue[CAMID]) > int(config['max_queue']):
if config['verbose'] == 'True':
tp = CAMID, idle_status, len(video_queue[CAMID]), int(config['max_queue'])
logg('%s: Starting converter thread, idle=%s, queue=%d, max_queue=%d' % tp)
current_convert_thread = threading.Thread(target=run_converter, args=[CAMID])
current_convert_thread.start()
break
if idle_status and killer.kill_now: break # Terminate
# Check for bricklet reset and reconfigure if needed (status LED on).
if reconfigure_time + int(config['reconf_sec']) < time.time():
for uid in sensors.keys():
try: led_status = sensorAPI[uid].get_status_led_config()
except:
logg('%s: Bricklet unreachable' % uid)
continue
if led_status > 0:
logg('%s: Bricklet reset' % uid)
configure_motion_detector(uid)
reconfigure_time = time.time()
# Check if all camera are online and send warning otherwise.
if idle_status and check_time + int(config['check_sec']) < time.time():
check_camera_status()
check_time = time.time()
for ip in ipcon.keys(): ipcon[ip].disconnect()
tcp_server.shutdown()
tcp_server.server_close()
logg('Finished')