Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initial updates on all files #1

Merged
merged 4 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
data/
output/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
112 changes: 112 additions & 0 deletions demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import os
import cv2
from PIL import Image

from face_detection import RetinaFace
from torchvision import transforms
import torch

import numpy as np
import time
import argparse

from utils import helpers
from models import resnet


def parse_args():
"""Parse input arguments."""
parser = argparse.ArgumentParser(description='Head pose estimation using the 6DRepNet.')
parser.add_argument('--gpu', type=int, default=0, help='GPU device id to use [0], set -1 to use CPU')
parser.add_argument('--cam', type=int, default=0, help='Camera device id to use [0]')
parser.add_argument('--snapshot', type=str, default='_epoch_12.tar', help='Name of model snapshot.')
parser.add_argument('--save_viz', action='store_true', help='Save images with pose cube.')

return parser.parse_args()


transformations = transforms.Compose([
transforms.Resize(224),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


def main():
args = parse_args()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = resnet.resnet18(num_classes=6)
weights = torch.load(args.snapshot, map_location=device)
model.load_state_dict(weights['model_state_dict'])
model.to(device)
model.eval() # Set model to evaluation mode

print('Loading data.')

detector = RetinaFace(gpu_id=0)

# Initialize video capture
cap = cv2.VideoCapture(args.cam)
if not cap.isOpened():
raise IOError("Cannot open webcam")

with torch.no_grad():
while True:
ret, frame = cap.read()
if not ret:
break

faces = detector(frame)
for box, landmarks, score in faces:
if score < 0.95:
continue

x_min, y_min, x_max, y_max = map(int, box)
bbox_width = abs(x_max - x_min)
bbox_height = abs(y_max - y_min)

x_min = max(0, x_min - int(0.2 * bbox_height))
y_min = max(0, y_min - int(0.2 * bbox_width))
x_max += int(0.2 * bbox_height)
y_max += int(0.2 * bbox_width)

img = frame[y_min:y_max, x_min:x_max]
img = Image.fromarray(img).convert('RGB')
img = transformations(img).unsqueeze(0).to(device)

start = time.time()
R_pred = model(img)
end = time.time()
print('Head pose estimation: %.2f ms' % ((end - start) * 1000))

euler = helpers.compute_euler_angles_from_rotation_matrices(R_pred) * 180 / np.pi
p_pred_deg = euler[:, 0].cpu()
y_pred_deg = euler[:, 1].cpu()
r_pred_deg = euler[:, 2].cpu()

helpers.plot_pose_cube(
frame, y_pred_deg, p_pred_deg, r_pred_deg,
x_min + int(0.5 * (x_max - x_min)),
y_min + int(0.5 * (y_max - y_min)),
size=bbox_width
)

# helpers.draw_axis(frame, y_pred_deg, p_pred_deg, r_pred_deg, x_min +
# int(.5*(x_max-x_min)), y_min + int(.4*(y_max-y_min)), size=100)

cv2.imshow("Demo", frame)
if cv2.waitKey(1) == 27: # Exit on ESC key
break

cap.release()
cv2.destroyAllWindows()


if __name__ == '__main__':
main()


if __name__ == '__main__':
main()
Empty file added models/__init__.py
Empty file.
237 changes: 237 additions & 0 deletions models/mobilenetv2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import torch
from torch import nn, Tensor
from torchvision.models import MobileNet_V2_Weights

from typing import Any, Callable, List, Optional

from utils import helpers


__all__ = ["mobilenet_v2"]


def _make_divisible(v: float, divisor: int = 8) -> int:
"""This function ensures that all layers have a channel number divisible by 8"""
new_v = max(divisor, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v


class Conv2dNormActivation(torch.nn.Sequential):
"""Convolutional block, consists of nn.Conv2d, nn.BatchNorm2d, nn.ReLU"""

def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int = 3,
stride: int = 1,
padding: Optional = None,
groups: int = 1,
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU,
dilation: int = 1,
inplace: Optional[bool] = True,
bias: bool = False,
) -> None:

if padding is None:
padding = (kernel_size - 1) // 2 * dilation

layers: List[nn.Module] = [
nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias,
),
nn.BatchNorm2d(num_features=out_channels, eps=0.001, momentum=0.01)
]

if activation_layer is not None:
params = {} if inplace is None else {"inplace": inplace}
layers.append(activation_layer(**params))
super().__init__(*layers)


class InvertedResidual(nn.Module):
def __init__(self, in_planes: int, out_planes: int, stride: int, expand_ratio: int) -> None:
super().__init__()
self.stride = stride
if stride not in [1, 2]:
raise ValueError(f"stride should be 1 or 2 instead of {stride}")

hidden_dim = int(round(in_planes * expand_ratio))
self.use_res_connect = self.stride == 1 and in_planes == out_planes

layers: List[nn.Module] = []
if expand_ratio != 1:
# pw
layers.append(
Conv2dNormActivation(
in_planes,
hidden_dim,
kernel_size=1,
activation_layer=nn.ReLU6
)
)
layers.extend(
[
# dw
Conv2dNormActivation(
hidden_dim,
hidden_dim,
stride=stride,
groups=hidden_dim,
activation_layer=nn.ReLU6,
),
# pw-linear
nn.Conv2d(hidden_dim, out_planes, 1, 1, 0, bias=False),
nn.BatchNorm2d(out_planes),
]
)
self.conv = nn.Sequential(*layers)
self.out_channels = out_planes
self._is_cn = stride > 1

def forward(self, x: Tensor) -> Tensor:
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)


class MobileNetV2(nn.Module):
def __init__(
self,
num_classes: int = 1000,
width_mult: float = 1.0,
inverted_residual_setting: Optional[List[List[int]]] = None,
round_nearest: int = 8,
dropout: float = 0.2,
) -> None:
"""
MobileNet V2 main class

Args:
num_classes (int): Number of classes
width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
inverted_residual_setting: Network structure
round_nearest (int): Round the number of channels in each layer to be a multiple of this number
Set to 1 to turn off rounding
block: Module specifying inverted residual building block for mobilenet
dropout (float): The droupout probability

"""
super().__init__()

input_channel = 32
last_channel = 1280

if inverted_residual_setting is None:
inverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1],
[6, 24, 2, 2],
[6, 32, 3, 2],
[6, 64, 4, 2],
[6, 96, 3, 1],
[6, 160, 3, 2],
[6, 320, 1, 1],
]

# only check the first element, assuming user knows t,c,n,s are required
if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
raise ValueError(
f"inverted_residual_setting should be non-empty or a 4-element list, got {inverted_residual_setting}"
)

# building first layer
input_channel = _make_divisible(input_channel * width_mult, round_nearest)
self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
features: List[nn.Module] = [
Conv2dNormActivation(3, input_channel, stride=2, activation_layer=nn.ReLU6)
]
# building inverted residual blocks
for t, c, n, s in inverted_residual_setting:
output_channel = _make_divisible(c * width_mult, round_nearest)
for i in range(n):
stride = s if i == 0 else 1
features.append(InvertedResidual(input_channel, output_channel, stride, expand_ratio=t))
input_channel = output_channel
# building last several layers
features.append(
Conv2dNormActivation(
input_channel, self.last_channel, kernel_size=1, activation_layer=nn.ReLU6
)
)
# make it nn.Sequential
self.features = nn.Sequential(*features)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

# building classifier
# self.classifier = nn.Sequential(
# nn.Dropout(p=dropout),
# nn.Linear(self.last_channel, num_classes),
# )

self.linear_reg = nn.Linear(self.last_channel, num_classes)

# weight initialization
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out")
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.zeros_(m.bias)

def forward(self, x: Tensor) -> Tensor:
x = self.features(x)
# Cannot use "squeeze" as batch-size can be 1
x = self.avgpool(x)
x = torch.flatten(x, 1)

# Original FC layer from MobileNet V2
# x = self.classifier(x)

x = self.linear_reg(x)
return helpers.compute_rotation_matrix_from_ortho6d(x)


def load_filtered_state_dict(model, state_dict):
"""Update the model's state dictionary with filtered parameters.

Args:
model: The model instance to update (must have `state_dict` and `load_state_dict` methods).
state_dict: A dictionary of parameters to load into the model.
"""
current_model_dict = model.state_dict()
filtered_state_dict = {key: value for key, value in state_dict.items() if key in current_model_dict}
current_model_dict.update(filtered_state_dict)
model.load_state_dict(current_model_dict)


def mobilenet_v2(*, pretrained: bool = True, progress: bool = True, **kwargs: Any) -> MobileNetV2:

if pretrained:
weights = MobileNet_V2_Weights.IMAGENET1K_V1
else:
weights = None

model = MobileNetV2(**kwargs)

if weights is not None:
state_dict = weights.get_state_dict(progress=progress, check_hash=True)
load_filtered_state_dict(model, state_dict)

return model
Loading