forked from 18alantom/concurrent_inference
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread_and_detect.py
64 lines (56 loc) · 2.13 KB
/
read_and_detect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import time
import torch
import torchvision
from PIL import Image
from queue import Empty
from pathlib import Path
from output_handler import handle_output
COMPLETE = "READING_COMPLETE"
def transform(pil_image):
# Transforms to apply on the input PIL image
return torchvision.transforms.functional.to_tensor(pil_image)
def read_images_into_q(images_path, queue, event, psend_pipe, ext="jpg",\
wait_time=0.05, transform=transform):
"""
Reader process, if queue is not full it will read an `ext` image from
`images_path` and put it onto the `queue` after applying the `transform`,
else it will wait for `wait_time` for the queue to free up.
It uses `send_pipe` to signal downstream processes when all images have
been entered into the queue.
It uses `psend_pipe` for indication.
"""
image_list = list(Path(images_path).rglob(f"*.{ext}"))
print(f"processing {len(image_list)} images... ")
while len(image_list) > 0:
if queue.full():
time.sleep(wait_time)
continue
else:
image_path = image_list.pop()
image = Image.open(image_path)
image = transform(image)
queue.put((image, image_path))
psend_pipe.send((len(image_list), image_path.name))
event.set()
queue.join()
def detect_objects(queue, event, detector, device, lock, output_path):
"""
Detector process, Reads a transformed image from the `queue`
passes it to the detector from `get_detector` and processes the
output using `lock` and `output_path` file for handling the output.
Uses `pipe` to know if all the images have been written to
the `queue`.
"""
file = open(output_path.as_posix(), "a")
detector.eval().to(device)
while not (event.is_set() and queue.empty()):
try:
image, image_path = queue.get(block=True, timeout=0.1)
except Empty:
continue
with torch.no_grad():
image = [image.to(device)]
output = detector(image)[0]
queue.task_done()
handle_output(image_path, output, lock, file)
file.close()