-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathutils.py
150 lines (127 loc) · 4.52 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
'''
Created on 09.02.2023
@author: irimi
'''
import threading
import io
import socketserver
import logging
from http import server
class ThreadEvent(threading.Thread):
"""
helper class to control threads with stop events
"""
def __init__(self, parent=None):
threading.Thread.__init__(self)
self.timeout = None
self._parent = parent
self._stopEvent = threading.Event()
def run(self):
self._worker_()
self._stopEvent.wait(self.timeout)
self._shutdown_()
def _worker_(self):
pass
def _shutdown_(self):
if self._parent:
self._parent.child_down(self)
def child_down(self,child):
pass
def trigger_stop(self):
self._stopEvent.set()
class ThreadEvents(ThreadEvent):
"""
Simple Multi Thread Events Handler
"""
def __init__(self):
self.threads=list()
def addThread (self, newThread: ThreadEvent):
newThread.start()
#self.threads.update({tId, newThread})
self.threads.append(newThread)
def rmThread (self,rmThread: ThreadEvent ):
if rmThread in self.threads:
self.threads.remove(rmThread)
def stopAllThreads(self):
for t in self.threads:
t.trigger_stop()
# Mostly copied from
# https://github.com/raspberrypi/picamera2/blob/next/examples/mjpeg_server_2.py
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
self.frame = None
self.condition = threading.Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
# Mostly copied from
# https://github.com/raspberrypi/picamera2/blob/next/examples/mjpeg_server_2.py
class StreamingHandler(server.BaseHTTPRequestHandler):
output = None
pagefn = None
def do_GET(self):
if self.path == '/':
self.send_response(301)
self.send_header('Location', '/index.html')
self.end_headers()
elif self.path == '/index.html':
# content = PAGE.encode('utf-8')
content = self.getPage()
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
elif self.path == "/favicon.ico":
icon = io.open("www/pi_logo.ico", "rb").read()
self.send_response(200)
self.send_header('Content-type', 'mage/x-icon')
self.send_header('Content-length', len(icon))
self.end_headers()
self.wfile.write(icon)
elif self.path == '/stream.mjpg':
self.send_response(200)
self.send_header('Age', 0)
self.send_header('Cache-Control', 'no-cache, private')
self.send_header('Pragma', 'no-cache')
self.send_header('Content-Type',
'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
try:
while True:
with StreamingHandler.output.condition:
StreamingHandler.output.condition.wait()
frame = StreamingHandler.output.frame
self.wfile.write(b'--FRAME\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(frame))
self.end_headers()
self.wfile.write(frame)
self.wfile.write(b'\r\n')
except Exception as e:
logging.warning(
'Removed streaming client %s: %s',
self.client_address, str(e))
else:
self.send_error(404)
self.end_headers()
def getPage(self):
try:
with open(StreamingHandler.pagefn) as f:
page = f.read()
f.close()
PAGE = page.encode('utf-8')
except OSError as e:
print(f"ERROR {type(e)}: {e}")
PAGE = f"<html>ERROR: {e}</html>"
return PAGE
# Mostly copied from
# https://github.com/raspberrypi/picamera2/blob/next/examples/mjpeg_server_2.py
class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
def __init__(self, address, handler, output, pagefn):
handler.output = output
handler.pagefn = pagefn
super().__init__(address, handler)