From 844f05c9c6797359cdaf32c906378b0285b374de Mon Sep 17 00:00:00 2001
From: Kyle Vedder <kyle.c.vedder@gmail.com>
Date: Thu, 22 Aug 2024 14:11:45 -0400
Subject: [PATCH] v2.0.23: Added waymo eval

---
 .../datasets/waymoopen/dataset.py             |   3 +-
 .../waymoopen/waymo_supervised_flow.py        |  35 +++-
 pyproject.toml                                |   2 +-
 scripts/evals/waymo_eval.py                   | 187 ++++++++++++++++++
 4 files changed, 222 insertions(+), 5 deletions(-)
 create mode 100644 scripts/evals/waymo_eval.py

diff --git a/bucketed_scene_flow_eval/datasets/waymoopen/dataset.py b/bucketed_scene_flow_eval/datasets/waymoopen/dataset.py
index f04ff00..09ff1a8 100644
--- a/bucketed_scene_flow_eval/datasets/waymoopen/dataset.py
+++ b/bucketed_scene_flow_eval/datasets/waymoopen/dataset.py
@@ -42,6 +42,7 @@ class WaymoOpenCausalSceneFlow(CausalSeqLoaderDataset):
     def __init__(
         self,
         root_dir: Path,
+        flow_folder: Path | None = None,
         subsequence_length: int = 2,
         cache_root: Path = Path("/tmp/"),
         eval_type: str = "bucketed_epe",
@@ -51,7 +52,7 @@ def __init__(
         eval_args=dict(),
     ) -> None:
         self.sequence_loader = WaymoSupervisedSceneFlowSequenceLoader(
-            root_dir, log_subset=log_subset
+            root_dir, log_subset=log_subset, with_rgb=with_rgb, flow_dir=flow_folder
         )
         super().__init__(
             sequence_loader=self.sequence_loader,
diff --git a/bucketed_scene_flow_eval/datasets/waymoopen/waymo_supervised_flow.py b/bucketed_scene_flow_eval/datasets/waymoopen/waymo_supervised_flow.py
index 2c79eea..a1d8825 100644
--- a/bucketed_scene_flow_eval/datasets/waymoopen/waymo_supervised_flow.py
+++ b/bucketed_scene_flow_eval/datasets/waymoopen/waymo_supervised_flow.py
@@ -18,7 +18,7 @@
     AbstractAVLidarSequence,
     CachedSequenceLoader,
 )
-from bucketed_scene_flow_eval.utils import load_pickle
+from bucketed_scene_flow_eval.utils import load_feather, load_pickle
 
 CATEGORY_MAP = {
     0: "BACKGROUND",
@@ -30,9 +30,20 @@
 
 
 class WaymoSupervisedSceneFlowSequence(AbstractAVLidarSequence):
-    def __init__(self, sequence_folder: Path, verbose: bool = False):
+    def __init__(self, sequence_folder: Path, flow_folder: Path | None, verbose: bool = False):
         self.sequence_folder = Path(sequence_folder)
         self.sequence_files = sorted(self.sequence_folder.glob("*.pkl"))
+        if flow_folder is not None:
+            self.flow_folder = Path(flow_folder)
+            self.flow_files = sorted(self.flow_folder.glob("*.feather"))
+            assert len(self.sequence_files) == len(self.flow_files), (
+                f"number of frames in {self.sequence_folder} does not match number of frames in "
+                f"{self.flow_folder}"
+            )
+        else:
+            self.flow_folder = None
+            self.flow_files = None
+
         assert len(self.sequence_files) > 0, f"no frames found in {self.sequence_folder}"
 
     def __repr__(self) -> str:
@@ -49,6 +60,12 @@ def _load_idx(self, idx: int):
         pkl = load_pickle(pickle_path, verbose=False)
         pc = PointCloud(pkl["car_frame_pc"])
         flow = pkl["flow"]
+        if self.flow_folder is not None:
+            flow_path = self.flow_files[idx]
+            flow = load_feather(flow_path).to_numpy()
+            assert len(flow) == len(
+                pc
+            ), f"number of points in flow {len(flow)} does not match number of points in pc {len(pc)}"
         labels = pkl["label"]
         pose = SE3.from_array(pkl["pose"])
         return pc, flow, labels, pose
@@ -145,6 +162,7 @@ class WaymoSupervisedSceneFlowSequenceLoader(CachedSequenceLoader):
     def __init__(
         self,
         sequence_dir: Path,
+        flow_dir: Path | None = None,
         log_subset: Optional[list[str]] = None,
         verbose: bool = False,
         with_rgb: bool = False,
@@ -159,6 +177,16 @@ def __init__(
         sequence_dir_lst = sorted(self.dataset_dir.glob("*/"))
 
         self.log_lookup = {e.name: e for e in sequence_dir_lst}
+        if flow_dir is not None:
+            flow_dir = Path(flow_dir)
+            flow_dir_lst = sorted(flow_dir.glob("*/"))
+            assert len(sequence_dir_lst) == len(flow_dir_lst), (
+                f"number of sequences in {self.dataset_dir} does not match number of sequences in "
+                f"{flow_dir}; {len(sequence_dir_lst)} vs {len(flow_dir_lst)}"
+            )
+            self.flow_lookup: dict[str, Path | None] = {e.name: e for e in flow_dir_lst}
+        else:
+            self.flow_lookup = {k: None for k in self.log_lookup.keys()}
 
         # Intersect with log_subset
         if log_subset is not None:
@@ -179,7 +207,8 @@ def get_sequence_ids(self):
 
     def _load_sequence_uncached(self, log_id: str) -> WaymoSupervisedSceneFlowSequence:
         sequence_folder = self.log_lookup[log_id]
-        return WaymoSupervisedSceneFlowSequence(sequence_folder, verbose=self.verbose)
+        flow_folder = self.flow_lookup[log_id]
+        return WaymoSupervisedSceneFlowSequence(sequence_folder, flow_folder, verbose=self.verbose)
 
     @staticmethod
     def category_ids() -> list[int]:
diff --git a/pyproject.toml b/pyproject.toml
index dbb9a4a..700598a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -15,7 +15,7 @@ exclude = [
 
 [project]
 name = "bucketed_scene_flow_eval"
-version = "2.0.22"
+version = "2.0.23"
 authors = [
   { name="Kyle Vedder", email="kvedder@seas.upenn.edu" },
 ]
diff --git a/scripts/evals/waymo_eval.py b/scripts/evals/waymo_eval.py
new file mode 100644
index 0000000..9d75314
--- /dev/null
+++ b/scripts/evals/waymo_eval.py
@@ -0,0 +1,187 @@
+# Set OMP_NUM_THREADS=1 to avoid slamming the CPU
+import os
+
+os.environ["OMP_NUM_THREADS"] = "1"
+
+import argparse
+import multiprocessing
+from pathlib import Path
+
+import tqdm
+
+from bucketed_scene_flow_eval.datasets import WaymoOpenCausalSceneFlow
+from bucketed_scene_flow_eval.eval import Evaluator
+
+
+def _make_range_shards(total_len: int, num_shards: int) -> list[tuple[int, int]]:
+    """
+    Return a list of tuples of (start, end) indices for each shard.
+
+    The function divides the range specified by total_len into num_shards shards.
+    Each shard is represented by a tuple of (start, end) indices.
+    The division tries to distribute the elements as evenly as possible among the shards.
+    """
+    shards = []
+    shard_len = total_len // num_shards
+    remainder = total_len % num_shards
+
+    start = 0
+    for _ in range(num_shards):
+        end = start + shard_len + (1 if remainder > 0 else 0)
+        shards.append((start, min(end, total_len)))
+        start = end
+        remainder -= 1
+
+    return shards
+
+
+def _make_index_shards(
+    dataset: WaymoOpenCausalSceneFlow, num_shards: int, every_kth_in_sequence: int
+) -> list[list[int]]:
+    dataset_valid_indices: list[int] = [
+        dataset_idx
+        for (
+            _,
+            (subsequence_start_idx, subsequence_end_idx),
+        ), dataset_idx in dataset.sequence_subsequence_idx_to_dataset_idx.items()
+        if (subsequence_start_idx % every_kth_in_sequence) == 0
+    ]
+
+    tuple_shards = _make_range_shards(len(dataset_valid_indices), num_shards)
+    return [dataset_valid_indices[start:end] for start, end in tuple_shards]
+
+
+def _work(
+    shard_idx: int,
+    shard_list: list[int],
+    gt_dataset: WaymoOpenCausalSceneFlow,
+    est_dataset: WaymoOpenCausalSceneFlow,
+    evaluator: Evaluator,
+    verbose: bool = True,
+) -> Evaluator:
+    # Set tqdm bar on the row of the terminal corresponding to the shard index
+    iterator = shard_list
+    if verbose:
+        iterator = tqdm.tqdm(shard_list, position=shard_idx + 1, desc=f"Shard {shard_idx}")
+
+    for idx in iterator:
+        gt_lst = gt_dataset[idx]
+        est_lst = est_dataset[idx]
+        assert len(gt_lst) == len(est_lst) == 2, f"GT and estimated lists must have length 2."
+        gt_frame0, gt_frame1 = gt_lst
+        est_frame0, est_frame1 = est_lst
+        evaluator.eval(est_frame0.flow, gt_frame0)
+
+    return evaluator
+
+
+def _work_wrapper(
+    args: tuple[int, list[int], WaymoOpenCausalSceneFlow, WaymoOpenCausalSceneFlow, Evaluator, bool]
+) -> Evaluator:
+    return _work(*args)
+
+
+def run_eval(
+    data_dir: Path,
+    est_flow_dir: Path,
+    output_path: Path,
+    cpu_count: int,
+    cache_root: Path,
+    every_kth: int = 5,
+    eval_type: str = "bucketed_epe",
+    verbose: bool = True,
+) -> None:
+    assert data_dir.exists(), f"Data directory {data_dir} does not exist."
+    assert est_flow_dir.exists(), f"Estimated flow directory {est_flow_dir} does not exist."
+
+    # Make the output directory if it doesn't exist
+    output_path.mkdir(parents=True, exist_ok=True)
+
+    gt_dataset = WaymoOpenCausalSceneFlow(
+        root_dir=data_dir,
+        flow_folder=None,
+        with_rgb=False,
+        eval_type=eval_type,
+        eval_args=dict(output_path=output_path),
+        cache_root=cache_root,
+    )
+
+    est_dataset = WaymoOpenCausalSceneFlow(
+        root_dir=data_dir,
+        flow_folder=est_flow_dir,
+        with_rgb=False,
+        use_cache=False,
+        eval_type=eval_type,
+        cache_root=cache_root,
+    )
+
+    dataset_evaluator = gt_dataset.evaluator()
+
+    assert len(gt_dataset) == len(
+        est_dataset
+    ), f"GT and estimated datasets must be the same length, but are {len(gt_dataset)} and {len(est_dataset)} respectively."
+
+    # Shard the dataset into pieces for each CPU
+    shard_lists = _make_index_shards(gt_dataset, cpu_count, every_kth)
+    args_list = [
+        (shard_idx, shard_list, gt_dataset, est_dataset, dataset_evaluator, verbose)
+        for shard_idx, shard_list in enumerate(shard_lists)
+    ]
+
+    if cpu_count > 1:
+        print(f"Running evaluation on {len(gt_dataset)} scenes using {cpu_count} CPUs.")
+        # Run the evaluation in parallel
+        with multiprocessing.Pool(cpu_count) as pool:
+            sharded_evaluators = pool.map(_work_wrapper, args_list)
+    else:
+        print(f"Running evaluation on {len(gt_dataset)} scenes using 1 CPU.")
+        # Run the evaluation serially
+        sharded_evaluators = [_work_wrapper(args) for args in args_list]
+
+    # Combine the sharded evaluators
+    gathered_evaluator: Evaluator = sum(sharded_evaluators)
+    gathered_evaluator.compute_results()
+
+
+if __name__ == "__main__":
+    multiprocessing.set_start_method("spawn")
+    parser = argparse.ArgumentParser(
+        description="Iterate over .feather files in a result zip file."
+    )
+    parser.add_argument("data_dir", type=Path, help="Path to the data_dir directory of the dataset")
+    parser.add_argument("est_flow_dir", type=Path, help="Path to the estimated flow directory")
+    parser.add_argument("output_path", type=Path, help="Path to save the results")
+    parser.add_argument(
+        "--cpu_count",
+        type=int,
+        default=multiprocessing.cpu_count(),
+        help="Number of CPUs to use",
+    )
+    parser.add_argument(
+        "--every_kth", type=int, default=5, help="Only evaluate every kth scene in a sequence"
+    )
+    parser.add_argument("--eval_type", type=str, default="bucketed_epe", help="Type of evaluation")
+    parser.add_argument(
+        "--cache_root",
+        type=Path,
+        default=Path("/tmp/av2_eval_cache/"),
+        help="Path to the cache root directory",
+    )
+    parser.add_argument(
+        "--quiet",
+        action="store_true",
+        help="Suppress output",
+    )
+
+    args = parser.parse_args()
+
+    run_eval(
+        data_dir=args.data_dir,
+        est_flow_dir=args.est_flow_dir,
+        output_path=args.output_path,
+        cpu_count=args.cpu_count,
+        every_kth=args.every_kth,
+        eval_type=args.eval_type,
+        cache_root=args.cache_root,
+        verbose=not args.quiet,
+    )