Skip to content

Commit

Permalink
v2.0.23: Added waymo eval
Browse files Browse the repository at this point in the history
  • Loading branch information
kylevedder committed Aug 22, 2024
1 parent 9b0b60d commit 844f05c
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 5 deletions.
3 changes: 2 additions & 1 deletion bucketed_scene_flow_eval/datasets/waymoopen/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class WaymoOpenCausalSceneFlow(CausalSeqLoaderDataset):
def __init__(
self,
root_dir: Path,
flow_folder: Path | None = None,
subsequence_length: int = 2,
cache_root: Path = Path("/tmp/"),
eval_type: str = "bucketed_epe",
Expand All @@ -51,7 +52,7 @@ def __init__(
eval_args=dict(),
) -> None:
self.sequence_loader = WaymoSupervisedSceneFlowSequenceLoader(
root_dir, log_subset=log_subset
root_dir, log_subset=log_subset, with_rgb=with_rgb, flow_dir=flow_folder
)
super().__init__(
sequence_loader=self.sequence_loader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
AbstractAVLidarSequence,
CachedSequenceLoader,
)
from bucketed_scene_flow_eval.utils import load_pickle
from bucketed_scene_flow_eval.utils import load_feather, load_pickle

CATEGORY_MAP = {
0: "BACKGROUND",
Expand All @@ -30,9 +30,20 @@


class WaymoSupervisedSceneFlowSequence(AbstractAVLidarSequence):
def __init__(self, sequence_folder: Path, verbose: bool = False):
def __init__(self, sequence_folder: Path, flow_folder: Path | None, verbose: bool = False):
self.sequence_folder = Path(sequence_folder)
self.sequence_files = sorted(self.sequence_folder.glob("*.pkl"))
if flow_folder is not None:
self.flow_folder = Path(flow_folder)
self.flow_files = sorted(self.flow_folder.glob("*.feather"))
assert len(self.sequence_files) == len(self.flow_files), (
f"number of frames in {self.sequence_folder} does not match number of frames in "
f"{self.flow_folder}"
)
else:
self.flow_folder = None
self.flow_files = None

assert len(self.sequence_files) > 0, f"no frames found in {self.sequence_folder}"

def __repr__(self) -> str:
Expand All @@ -49,6 +60,12 @@ def _load_idx(self, idx: int):
pkl = load_pickle(pickle_path, verbose=False)
pc = PointCloud(pkl["car_frame_pc"])
flow = pkl["flow"]
if self.flow_folder is not None:
flow_path = self.flow_files[idx]
flow = load_feather(flow_path).to_numpy()
assert len(flow) == len(
pc
), f"number of points in flow {len(flow)} does not match number of points in pc {len(pc)}"
labels = pkl["label"]
pose = SE3.from_array(pkl["pose"])
return pc, flow, labels, pose
Expand Down Expand Up @@ -145,6 +162,7 @@ class WaymoSupervisedSceneFlowSequenceLoader(CachedSequenceLoader):
def __init__(
self,
sequence_dir: Path,
flow_dir: Path | None = None,
log_subset: Optional[list[str]] = None,
verbose: bool = False,
with_rgb: bool = False,
Expand All @@ -159,6 +177,16 @@ def __init__(
sequence_dir_lst = sorted(self.dataset_dir.glob("*/"))

self.log_lookup = {e.name: e for e in sequence_dir_lst}
if flow_dir is not None:
flow_dir = Path(flow_dir)
flow_dir_lst = sorted(flow_dir.glob("*/"))
assert len(sequence_dir_lst) == len(flow_dir_lst), (
f"number of sequences in {self.dataset_dir} does not match number of sequences in "
f"{flow_dir}; {len(sequence_dir_lst)} vs {len(flow_dir_lst)}"
)
self.flow_lookup: dict[str, Path | None] = {e.name: e for e in flow_dir_lst}
else:
self.flow_lookup = {k: None for k in self.log_lookup.keys()}

# Intersect with log_subset
if log_subset is not None:
Expand All @@ -179,7 +207,8 @@ def get_sequence_ids(self):

def _load_sequence_uncached(self, log_id: str) -> WaymoSupervisedSceneFlowSequence:
sequence_folder = self.log_lookup[log_id]
return WaymoSupervisedSceneFlowSequence(sequence_folder, verbose=self.verbose)
flow_folder = self.flow_lookup[log_id]
return WaymoSupervisedSceneFlowSequence(sequence_folder, flow_folder, verbose=self.verbose)

@staticmethod
def category_ids() -> list[int]:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ exclude = [

[project]
name = "bucketed_scene_flow_eval"
version = "2.0.22"
version = "2.0.23"
authors = [
{ name="Kyle Vedder", email="[email protected]" },
]
Expand Down
187 changes: 187 additions & 0 deletions scripts/evals/waymo_eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Set OMP_NUM_THREADS=1 to avoid slamming the CPU
import os

os.environ["OMP_NUM_THREADS"] = "1"

import argparse
import multiprocessing
from pathlib import Path

import tqdm

from bucketed_scene_flow_eval.datasets import WaymoOpenCausalSceneFlow
from bucketed_scene_flow_eval.eval import Evaluator


def _make_range_shards(total_len: int, num_shards: int) -> list[tuple[int, int]]:
"""
Return a list of tuples of (start, end) indices for each shard.
The function divides the range specified by total_len into num_shards shards.
Each shard is represented by a tuple of (start, end) indices.
The division tries to distribute the elements as evenly as possible among the shards.
"""
shards = []
shard_len = total_len // num_shards
remainder = total_len % num_shards

start = 0
for _ in range(num_shards):
end = start + shard_len + (1 if remainder > 0 else 0)
shards.append((start, min(end, total_len)))
start = end
remainder -= 1

return shards


def _make_index_shards(
dataset: WaymoOpenCausalSceneFlow, num_shards: int, every_kth_in_sequence: int
) -> list[list[int]]:
dataset_valid_indices: list[int] = [
dataset_idx
for (
_,
(subsequence_start_idx, subsequence_end_idx),
), dataset_idx in dataset.sequence_subsequence_idx_to_dataset_idx.items()
if (subsequence_start_idx % every_kth_in_sequence) == 0
]

tuple_shards = _make_range_shards(len(dataset_valid_indices), num_shards)
return [dataset_valid_indices[start:end] for start, end in tuple_shards]


def _work(
shard_idx: int,
shard_list: list[int],
gt_dataset: WaymoOpenCausalSceneFlow,
est_dataset: WaymoOpenCausalSceneFlow,
evaluator: Evaluator,
verbose: bool = True,
) -> Evaluator:
# Set tqdm bar on the row of the terminal corresponding to the shard index
iterator = shard_list
if verbose:
iterator = tqdm.tqdm(shard_list, position=shard_idx + 1, desc=f"Shard {shard_idx}")

for idx in iterator:
gt_lst = gt_dataset[idx]
est_lst = est_dataset[idx]
assert len(gt_lst) == len(est_lst) == 2, f"GT and estimated lists must have length 2."
gt_frame0, gt_frame1 = gt_lst
est_frame0, est_frame1 = est_lst
evaluator.eval(est_frame0.flow, gt_frame0)

return evaluator


def _work_wrapper(
args: tuple[int, list[int], WaymoOpenCausalSceneFlow, WaymoOpenCausalSceneFlow, Evaluator, bool]
) -> Evaluator:
return _work(*args)


def run_eval(
data_dir: Path,
est_flow_dir: Path,
output_path: Path,
cpu_count: int,
cache_root: Path,
every_kth: int = 5,
eval_type: str = "bucketed_epe",
verbose: bool = True,
) -> None:
assert data_dir.exists(), f"Data directory {data_dir} does not exist."
assert est_flow_dir.exists(), f"Estimated flow directory {est_flow_dir} does not exist."

# Make the output directory if it doesn't exist
output_path.mkdir(parents=True, exist_ok=True)

gt_dataset = WaymoOpenCausalSceneFlow(
root_dir=data_dir,
flow_folder=None,
with_rgb=False,
eval_type=eval_type,
eval_args=dict(output_path=output_path),
cache_root=cache_root,
)

est_dataset = WaymoOpenCausalSceneFlow(
root_dir=data_dir,
flow_folder=est_flow_dir,
with_rgb=False,
use_cache=False,
eval_type=eval_type,
cache_root=cache_root,
)

dataset_evaluator = gt_dataset.evaluator()

assert len(gt_dataset) == len(
est_dataset
), f"GT and estimated datasets must be the same length, but are {len(gt_dataset)} and {len(est_dataset)} respectively."

# Shard the dataset into pieces for each CPU
shard_lists = _make_index_shards(gt_dataset, cpu_count, every_kth)
args_list = [
(shard_idx, shard_list, gt_dataset, est_dataset, dataset_evaluator, verbose)
for shard_idx, shard_list in enumerate(shard_lists)
]

if cpu_count > 1:
print(f"Running evaluation on {len(gt_dataset)} scenes using {cpu_count} CPUs.")
# Run the evaluation in parallel
with multiprocessing.Pool(cpu_count) as pool:
sharded_evaluators = pool.map(_work_wrapper, args_list)
else:
print(f"Running evaluation on {len(gt_dataset)} scenes using 1 CPU.")
# Run the evaluation serially
sharded_evaluators = [_work_wrapper(args) for args in args_list]

# Combine the sharded evaluators
gathered_evaluator: Evaluator = sum(sharded_evaluators)
gathered_evaluator.compute_results()


if __name__ == "__main__":
multiprocessing.set_start_method("spawn")
parser = argparse.ArgumentParser(
description="Iterate over .feather files in a result zip file."
)
parser.add_argument("data_dir", type=Path, help="Path to the data_dir directory of the dataset")
parser.add_argument("est_flow_dir", type=Path, help="Path to the estimated flow directory")
parser.add_argument("output_path", type=Path, help="Path to save the results")
parser.add_argument(
"--cpu_count",
type=int,
default=multiprocessing.cpu_count(),
help="Number of CPUs to use",
)
parser.add_argument(
"--every_kth", type=int, default=5, help="Only evaluate every kth scene in a sequence"
)
parser.add_argument("--eval_type", type=str, default="bucketed_epe", help="Type of evaluation")
parser.add_argument(
"--cache_root",
type=Path,
default=Path("/tmp/av2_eval_cache/"),
help="Path to the cache root directory",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress output",
)

args = parser.parse_args()

run_eval(
data_dir=args.data_dir,
est_flow_dir=args.est_flow_dir,
output_path=args.output_path,
cpu_count=args.cpu_count,
every_kth=args.every_kth,
eval_type=args.eval_type,
cache_root=args.cache_root,
verbose=not args.quiet,
)

0 comments on commit 844f05c

Please sign in to comment.