-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
227 lines (191 loc) · 8.83 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from flask import Flask, request, jsonify
import cv2
import numpy as np
from keras.models import Sequential
from keras.layers import LSTM, Dense
from ultralytics import YOLO
import yaml
import os
import logging
app = Flask(__name__)
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Ensure the 'uploads' directory exists
os.makedirs('uploads', exist_ok=True)
# Load class names from data.yaml
def load_class_names(yaml_file):
try:
with open(yaml_file, 'r') as stream:
data_loaded = yaml.safe_load(stream)
return data_loaded['names']
except Exception as e:
logger.error(f"Failed to load class names from {yaml_file}: {e}")
raise
# LSTM model for motion detection
def create_lstm_model(input_shape):
try:
model = Sequential()
model.add(LSTM(50, return_sequences=True, input_shape=input_shape))
model.add(LSTM(50))
model.add(Dense(1, activation='sigmoid'))
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
return model
except Exception as e:
logger.error(f"Failed to create LSTM model: {e}")
raise
# Extract features from bounding boxes
def extract_features_from_boxes(boxes):
try:
features = []
for box in boxes:
x1, y1, x2, y2 = box
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
width = x2 - x1
height = y2 - y1
features.append([center_x, center_y, width, height])
return np.array(features)
except Exception as e:
logger.error(f"Failed to extract features from bounding boxes: {e}")
raise
# Check if required personnel are near the ICU patient
def check_proximity(boxes, class_ids, personnel_classes, icu_patient_class_id, threshold=50):
try:
icu_patient_boxes = [boxes[i] for i in range(len(class_ids)) if class_ids[i] == icu_patient_class_id]
personnel_boxes = [boxes[i] for i in range(len(class_ids)) if class_ids[i] in personnel_classes]
for icu_box in icu_patient_boxes:
icu_x, icu_y, icu_w, icu_h = icu_box
for p_box in personnel_boxes:
p_x, p_y, p_w, p_h = p_box
if abs(icu_x - p_x) < threshold and abs(icu_y - p_y) < threshold:
return True
return False
except Exception as e:
logger.error(f"Failed to check proximity: {e}")
raise
# Draw bounding boxes on the frame
def draw_bounding_boxes(frame, boxes, confidences, class_ids, classes):
try:
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
label = f"{classes[class_ids[i]]}: {confidences[i]:.2f}"
color = (0, 255, 0)
cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
except Exception as e:
logger.error(f"Failed to draw bounding boxes: {e}")
raise
# Process video
def process_video(video_path):
try:
yaml_file = "data.yaml"
model_weights = "best.pt"
logger.info(f"Loading class names from: {yaml_file}")
class_names = load_class_names(yaml_file)
personnel_classes = [class_names.index(name) for name in ['Doctor', 'Nurse', 'Staff']]
icu_patient_class_id = class_names.index('ICU_Patient')
logger.info(f"Loading YOLO model with weights: {model_weights}")
yolo_model = YOLO(model_weights) # Load YOLOv8 model with custom weights
cap = cv2.VideoCapture(video_path)
lstm_model = create_lstm_model((10, 4)) # Assuming sequence length of 10 and 4 features per box
sequence_length = 10
feature_sequence = []
frame_count = 0
results_to_return = []
frame_confidences = []
# Define the codec and create VideoWriter object to save the output video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output_path = "uploads/output_video.mp4"
out = cv2.VideoWriter(output_path, fourcc, 20.0, (int(cap.get(3)), int(cap.get(4))))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# Perform object detection
results = yolo_model(frame)
logger.info(f"Processed frame {frame_count} with YOLO")
# Extract bounding boxes, confidences, and class IDs
boxes = results[0].boxes.xyxy.cpu().numpy() # Assuming the first element contains the bounding boxes
confidences = results[0].boxes.conf.cpu().numpy() # Assuming the first element contains the confidences
class_ids = results[0].boxes.cls.cpu().numpy().astype(int) # Assuming the first element contains the class IDs
draw_bounding_boxes(frame, boxes, confidences, class_ids, class_names)
features = extract_features_from_boxes(boxes)
if features.shape[0] < 10: # Ensure the sequence length is consistent
padding = np.zeros((10 - features.shape[0], 4))
features = np.concatenate((features, padding), axis=0)
else:
features = features[:10, :]
# Append each feature vector individually to the sequence
for feature in features:
feature_sequence.append(feature)
if len(feature_sequence) > sequence_length:
feature_sequence.pop(0)
if len(feature_sequence) == sequence_length:
feature_sequence_np = np.array(feature_sequence)
feature_sequence_np = feature_sequence_np.reshape((1, sequence_length, 4)) # Reshape to match model input
motion_detected = lstm_model.predict(feature_sequence_np)[0][0]
logger.info(f"Motion detection result for frame {frame_count}: {motion_detected}")
if motion_detected > 0.5 and not check_proximity(boxes, class_ids, personnel_classes, icu_patient_class_id):
logger.info(f"Motion detected on frame: {frame_count}")
logger.info("The doctor should visit as soon as possible")
# Save frame to file
frame_file = f"uploads/frame_{frame_count}.jpg"
cv2.imwrite(frame_file, frame)
frame_confidences.append((frame_count, frame_file, max(confidences))) # Store frame and confidence
# Write the frame into the output video file
out.write(frame)
cap.release()
out.release()
# Sort frames by confidence and return the top 10
frame_confidences.sort(key=lambda x: x[2], reverse=True)
top_10_results = frame_confidences[:10]
return [(frame[0], frame[1]) for frame in top_10_results], output_path
except Exception as e:
logger.error(f"Failed to process video: {e}")
raise
@app.route('/')
def index():
return "Welcome to the Motion Detection API. Use /upload to upload a video and /process to process it."
@app.route('/upload', methods=['POST'])
def upload_video():
try:
if 'video' not in request.files:
return jsonify({'error': 'No video file provided'}), 400
video = request.files['video']
video_path = os.path.join("uploads", video.filename)
video.save(video_path)
logger.info(f'Video uploaded successfully: {video_path}')
return jsonify({'message': 'Video uploaded successfully', 'video_path': video_path}), 200
except Exception as e:
logger.error(f"Failed to upload video: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/process', methods=['POST'])
def process_uploaded_video():
try:
data = request.json
video_path = data.get('video_path')
yaml_file = "data.yaml"
model_weights = "best.pt"
if not video_path or not os.path.exists(video_path):
return jsonify({'error': 'Invalid video file path'}), 400
if not yaml_file or not os.path.exists(yaml_file):
return jsonify({'error': 'Invalid YAML file path'}), 400
if not model_weights or not os.path.exists(model_weights):
return jsonify({'error': 'Invalid model weights file path'}), 400
results, output_video_path = process_video(video_path)
return jsonify({
'message': 'Processing complete',
'results': [
{'frame_number': frame_number, 'frame_image': frame_image}
for frame_number, frame_image in results
],
'output_video': output_video_path
}), 200
except Exception as e:
logger.error(f"Error during video processing: {e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
os.makedirs('uploads', exist_ok=True)
app.run(host='0.0.0.0', port=9000)