Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto be now return model on class creation #1669

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 120 additions & 50 deletions boxmot/appearance/backends/base_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,31 @@


class BaseModelBackend:
def __init__(self, weights, device, half):
def __new__(cls, weights: str, device: torch.device, half: bool):
"""
Creates a new instance of the model and returns the initialized model directly.

Args:
weights (str): Path to the model weights file.
device (torch.device): Device to load the model on ('cpu' or 'cuda').
half (bool): Whether to use half precision.

Returns:
torch.nn.Module: Initialized model.
"""
instance = super(BaseModelBackend, cls).__new__(cls)
instance.__init__(weights, device, half)
return instance.model

def __init__(self, weights: str, device: torch.device, half: bool):
"""
Initializes the model backend with the specified weights, device, and precision.

Args:
weights (str): Path to the model weights file.
device (torch.device): Device to load the model on ('cpu' or 'cuda').
half (bool): Whether to use half precision.
"""
self.weights = weights[0] if isinstance(weights, list) else weights
self.device = device
self.half = half
Expand All @@ -34,50 +58,56 @@ def __init__(self, weights, device, half):
self.checker = RequirementsChecker()
self.load_model(self.weights)

def get_crops(self, xyxys, img):
def get_crops(self, xyxys: np.ndarray, img: np.ndarray) -> torch.Tensor:
"""
Extracts and preprocesses crops from the input image based on bounding boxes.

Args:
xyxys (np.ndarray): Bounding boxes in the format [x1, y1, x2, y2].
img (np.ndarray): The input image.

Returns:
torch.Tensor: Preprocessed crops as a batch of tensors.
"""
crops = []
h, w = img.shape[:2]
resize_dims = (128, 256)
interpolation_method = cv2.INTER_LINEAR
mean_array = np.array([0.485, 0.456, 0.406])
std_array = np.array([0.229, 0.224, 0.225])
# dets are of different sizes so batch preprocessing is not possible

for box in xyxys:
x1, y1, x2, y2 = box.astype('int')
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(w - 1, x2)
y2 = min(h - 1, y2)
crop = img[y1:y2, x1:x2]
# resize
crop = cv2.resize(
crop,
resize_dims, # from (x, y) to (128, 256) | (w, h)
interpolation=interpolation_method,
)

# (cv2) BGR 2 (PIL) RGB. The ReID models have been trained with this channel order
crop = cv2.resize(crop, resize_dims, interpolation=interpolation_method)
crop = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)

crop = torch.from_numpy(crop).float()
crops.append(crop)

# List of torch tensor crops to unified torch tensor
crops = torch.stack(crops, dim=0)

# Normalize the batch
crops = crops / 255.0

# Standardize the batch
crops = (crops - mean_array) / std_array

crops = torch.permute(crops, (0, 3, 1, 2))
crops = crops.to(dtype=torch.half if self.half else torch.float, device=self.device)

return crops

@torch.no_grad()
def get_features(self, xyxys, img):
def get_features(self, xyxys: np.ndarray, img: np.ndarray) -> np.ndarray:
"""
Extracts feature vectors for the input bounding boxes and image.

Args:
xyxys (np.ndarray): Bounding boxes in the format [x1, y1, x2, y2].
img (np.ndarray): The input image.

Returns:
np.ndarray: Normalized feature vectors.
"""
if xyxys.size != 0:
crops = self.get_crops(xyxys, img)
crops = self.inference_preprocess(crops)
Expand All @@ -88,61 +118,101 @@ def get_features(self, xyxys, img):
features = features / np.linalg.norm(features)
return features

def warmup(self, imgsz=[(256, 128, 3)]):
# warmup model by running inference once
def warmup(self, imgsz: list = [(256, 128, 3)]):
"""
Warms up the model by performing a dummy forward pass.

Args:
imgsz (list): List of image size dimensions.
"""
if self.device.type != "cpu":
im = np.random.randint(0, 255, *imgsz, dtype=np.uint8)
crops = self.get_crops(xyxys=np.array(
[[0, 0, 64, 64], [0, 0, 128, 128]]),
img=im
)
crops = self.get_crops(np.array([[0, 0, 64, 64], [0, 0, 128, 128]]), img=im)
crops = self.inference_preprocess(crops)
self.forward(crops) # warmup
self.forward(crops)

def to_numpy(self, x: torch.Tensor) -> np.ndarray:
"""
Converts a torch tensor to a numpy array.

def to_numpy(self, x):
Args:
x (torch.Tensor): Input tensor.

Returns:
np.ndarray: Numpy array.
"""
return x.cpu().numpy() if isinstance(x, torch.Tensor) else x

def inference_preprocess(self, x):
def inference_preprocess(self, x: torch.Tensor) -> torch.Tensor:
"""
Preprocesses the input tensor for inference.

Args:
x (torch.Tensor): Input tensor.

Returns:
torch.Tensor: Preprocessed tensor.
"""
if self.half:
if isinstance(x, torch.Tensor):
if x.dtype != torch.float16:
x = x.half()
elif isinstance(x, np.ndarray):
if x.dtype != np.float16:
x = x.astype(np.float16)
x = x.half() if isinstance(x, torch.Tensor) and x.dtype != torch.float16 else x

if self.nhwc:
if isinstance(x, torch.Tensor):
x = x.permute(0, 2, 3, 1) # Convert from NCHW to NHWC
elif isinstance(x, np.ndarray):
x = np.transpose(x, (0, 2, 3, 1)) # Convert from NCHW to NHWC
x = x.permute(0, 2, 3, 1) if isinstance(x, torch.Tensor) else np.transpose(x, (0, 2, 3, 1))
return x

def inference_postprocess(self, features):

def inference_postprocess(self, features) -> np.ndarray:
"""
Postprocesses the feature output after inference.

Args:
features: Feature output from the model.

Returns:
np.ndarray: Postprocessed features as numpy arrays.
"""
if isinstance(features, (list, tuple)):
return (
self.to_numpy(features[0]) if len(features) == 1 else [self.to_numpy(x) for x in features]
)
return self.to_numpy(features[0]) if len(features) == 1 else [self.to_numpy(x) for x in features]
else:
return self.to_numpy(features)

@abstractmethod
def forward(self, im_batch):
def forward(self, im_batch: torch.Tensor):
"""
Forward pass through the model.

Args:
im_batch (torch.Tensor): Batch of input images.

Raises:
NotImplementedError: Needs to be implemented by subclass.
"""
raise NotImplementedError("This method should be implemented by subclasses.")

@abstractmethod
def load_model(self, w):
def load_model(self, w: str):
"""
Loads the model weights.

Args:
w (str): Path to model weights.

Raises:
NotImplementedError: Needs to be implemented by subclass.
"""
raise NotImplementedError("This method should be implemented by subclasses.")

def download_model(self, w: str):
"""
Downloads the model weights if not available locally.

def download_model(self, w):
Args:
w (str): Path to model weights.
"""
if w.suffix == ".pt":
model_url = get_model_url(w)
if not w.exists() and model_url is not None:
gdown.download(model_url, str(w), quiet=False)
elif not w.exists():
LOGGER.error(
f"No URL associated with the chosen StrongSORT weights ({w}). Choose between:"
)
LOGGER.error(f"No URL associated with the chosen StrongSORT weights ({w}).")
show_downloadable_models()
exit()
exit()
4 changes: 1 addition & 3 deletions boxmot/trackers/botsort/bot_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,7 @@ def __init__(

self.with_reid = with_reid
if self.with_reid:
self.model = ReidAutoBackend(
weights=reid_weights, device=device, half=half
).model
self.model = ReidAutoBackend(weights=reid_weights, device=device, half=half)

self.cmc = SOF()
self.fuse_first_associate = fuse_first_associate
Expand Down
5 changes: 2 additions & 3 deletions boxmot/trackers/deepocsort/deep_ocsort.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,8 @@ def __init__(
self.Q_s_scaling = Q_s_scaling
KalmanBoxTracker.count = 1

self.model = ReidAutoBackend(
weights=reid_weights, device=device, half=half
).model
self.model = ReidAutoBackend(weights=reid_weights, device=device, half=half)

# "similarity transforms using feature point extraction, optical flow, and RANSAC"
self.cmc = get_cmc_method('sof')()
self.embedding_off = embedding_off
Expand Down
4 changes: 1 addition & 3 deletions boxmot/trackers/hybridsort/hybridsort.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,7 @@ def __init__(self, reid_weights, device, half, det_thresh, per_class=False, max_
self.ECC: bool = False
KalmanBoxTracker.count = 0

self.model = ReidAutoBackend(
weights=reid_weights, device=device, half=half
).model
self.model = ReidAutoBackend(weights=reid_weights, device=device, half=half)
self.cmc = get_cmc_method('ecc')()

def camera_update(self, trackers, warp_matrix):
Expand Down
5 changes: 1 addition & 4 deletions boxmot/trackers/imprassoc/impr_assoc_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,7 @@ def __init__(

self.with_reid = with_reid
if self.with_reid:
rab = ReidAutoBackend(
weights=reid_weights, device=device, half=half
)
self.model = rab.get_backend()
self.model = ReidAutoBackend(weights=reid_weights, device=device, half=half)

self.cmc = SOF()

Expand Down
4 changes: 1 addition & 3 deletions boxmot/trackers/strongsort/strong_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ def __init__(
):

self.per_class = per_class
self.model = ReidAutoBackend(
weights=reid_weights, device=device, half=half
).model
self.model = ReidAutoBackend(weights=reid_weights, device=device, half=half)

self.tracker = Tracker(
metric=NearestNeighborDistanceMetric("cosine", max_cos_dist, nn_budget),
Expand Down
Loading
Loading