-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_server_pyk4a.py
94 lines (75 loc) · 2.64 KB
/
run_server_pyk4a.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import argparse
import pickle
from io import BytesIO
import imageio
import numpy as np
from aiohttp import web
import asyncio
import pyk4a
from pyk4a import Config, PyK4A
# launch kinect
k4a = PyK4A(
Config(
camera_fps=pyk4a.FPS.FPS_30,
color_resolution=pyk4a.ColorResolution.RES_720P,
depth_mode=pyk4a.DepthMode.NFOV_UNBINNED,
)
)
k4a.start()
# set exposure parameters
k4a.exposure_mode_auto = False
k4a.exposure = 8330
async def exposure_handle(request):
try:
auto = (request.query['auto'] == 'True')
exposure = int(request.query.get('shutter_us', '8330'))
k4a.exposure_mode_auto = auto
if auto:
await asyncio.sleep(0.5)
exposure = k4a.exposure
else:
k4a.exposure = exposure
text = f'Success! Auto exposure {auto}, shutt_us {exposure}'
return web.Response(body=text)
except:
text = 'Incorrect format! use format <host>:<port>/exposure?auto=False&shutter_us=8330'
return web.Response(body=text)
async def view_handle(request):
img_buf = BytesIO()
capture = k4a.get_capture()
color_img = capture.color
depth_img = capture.transformed_depth
color_img = color_img[...,[2,1,0]].astype(np.float32) / 255.0
depth_img = depth_img.astype(np.float32)
depth_img -= np.min(depth_img)
depth_img /= np.max(depth_img)
img = np.concatenate([color_img, np.repeat(depth_img[:, :, np.newaxis], 3, axis=2)], axis=0)
imageio.imwrite(img_buf, (img * 255).astype(np.uint8), format='jpeg')
return web.Response(body=img_buf.getbuffer(), content_type='image/jpeg')
async def pickle_handle(request):
img_buf = BytesIO()
capture = k4a.get_capture()
color_img = capture.color
depth_img = capture.transformed_depth.astype(np.float32) / 1000.0
color_img = color_img[...,[2,1,0]]
enc = pickle.dumps({
'color_img':color_img,
'depth_img':depth_img
})
return web.Response(body=enc)
async def intr_handle(request):
color_intr = k4a.calibration.get_camera_matrix(pyk4a.calibration.CalibrationType.COLOR)
enc = pickle.dumps(color_intr)
return web.Response(body=enc)
app = web.Application()
app.add_routes([web.get('/', view_handle),
web.get('/view', view_handle),
web.get('/intr', intr_handle),
web.get('/pickle', pickle_handle),
web.get('/exposure', exposure_handle)
])
if __name__ == '__main__':
parser = argparse.ArgumentParser("Kinect Server")
parser.add_argument("--port", type=int, default=8080, help="port")
args = parser.parse_args()
web.run_app(app, port=args.port)