Source code for vis4d.eval.nuscenes.detect3d

"""NuScenes 3D detection evaluation code."""

from __future__ import annotations

import json
import os
from collections.abc import Callable
from typing import Any

import numpy as np

from vis4d.common.array import array_to_numpy
from vis4d.common.imports import NUSCENES_AVAILABLE
from vis4d.common.logging import rank_zero_warn
from vis4d.common.typing import ArrayLike, DictStrAny, MetricLogs
from vis4d.data.datasets.nuscenes import (
    nuscenes_attribute_map,
    nuscenes_class_map,
)

from ..base import Evaluator

if NUSCENES_AVAILABLE:
    from nuscenes import NuScenes as NuScenesDevkit
    from nuscenes.eval.detection.config import config_factory
    from nuscenes.eval.detection.evaluate import NuScenesEval
    from nuscenes.utils.data_classes import Quaternion
else:
    raise ImportError("nuscenes-devkit is not installed.")


def _parse_high_level_metrics(
    mean_ap: float,
    tp_errors: dict[str, float],
    nd_score: float,
    eval_time: float,
) -> tuple[MetricLogs, list[str]]:
    """Collect high-level metrics."""
    log_dict: MetricLogs = {
        "mAP": mean_ap,
        "mATE": tp_errors["trans_err"],
        "mASE": tp_errors["scale_err"],
        "mAOE": tp_errors["orient_err"],
        "mAVE": tp_errors["vel_err"],
        "mAAE": tp_errors["attr_err"],
        "NDS": nd_score,
    }

    str_summary_list = ["\nHigh-level metrics:"]
    for k, v in log_dict.items():
        str_summary_list.append(f"{k}: {v:.4f}")

    str_summary_list.append(f"Eval time: {eval_time:.1f}s")

    return log_dict, str_summary_list


def _parse_per_class_metrics(
    str_summary_list: list[str],
    class_aps: dict[str, float],
    class_tps: dict[str, dict[str, float]],
) -> list[str]:
    """Collect per-class metrics."""
    str_summary_list.append("\nPer-class results:")
    str_summary_list.append("Object Class\tAP\tATE\tASE\tAOE\tAVE\tAAE")

    for class_name in class_aps.keys():
        tmp_str_list = [class_name]
        tmp_str_list.append(f"{class_aps[class_name]:.3f}")
        tmp_str_list.append(f"{class_tps[class_name]['trans_err']:.3f}")
        tmp_str_list.append(f"{class_tps[class_name]['scale_err']:.3f}")
        tmp_str_list.append(f"{class_tps[class_name]['orient_err']:.3f}")
        tmp_str_list.append(f"{class_tps[class_name]['vel_err']:.3f}")
        tmp_str_list.append(f"{class_tps[class_name]['attr_err']:.3f}")

        str_summary_list.append("\t".join(tmp_str_list))
    return str_summary_list


[docs] class NuScenesDet3DEvaluator(Evaluator): """NuScenes 3D detection evaluation class.""" inv_nuscenes_attribute_map = { v: k for k, v in nuscenes_attribute_map.items() } DefaultAttribute = { "car": "vehicle.parked", "pedestrian": "pedestrian.moving", "trailer": "vehicle.parked", "truck": "vehicle.parked", "bus": "vehicle.moving", "motorcycle": "cycle.without_rider", "construction_vehicle": "vehicle.parked", "bicycle": "cycle.without_rider", "barrier": "", "traffic_cone": "", } def __init__( self, data_root: str, version: str, split: str, save_only: bool = False, class_map: dict[str, int] | None = None, metadata: tuple[str, ...] = ("use_camera",), use_default_attr: bool = False, velocity_thres: float = 1.0, ) -> None: """Initialize NuScenes evaluator.""" super().__init__() self.data_root = data_root self.version = version self.split = split self.save_only = save_only self.use_default_attr = use_default_attr self.velocity_thres = velocity_thres self.meta_data = { "use_camera": False, "use_lidar": False, "use_radar": False, "use_map": False, "use_external": False, } for m in metadata: self.meta_data[m] = True class_map = class_map or nuscenes_class_map self.inv_nuscenes_class_map = {v: k for k, v in class_map.items()} self.output_dir = "" self.detect_3d: DictStrAny = {} self.reset()
[docs] def __repr__(self) -> str: """Concise representation of the dataset evaluator.""" return "NuScenes 3D Detection Evaluator"
@property def metrics(self) -> list[str]: """Supported metrics.""" return ["detect_3d"]
[docs] def gather( # type: ignore self, gather_func: Callable[[Any], Any] ) -> None: """Gather variables in case of distributed setting (if needed). Args: gather_func (Callable[[Any], Any]): Gather function. """ detect_3d_list = gather_func(self.detect_3d) if detect_3d_list is not None: collated_detect_3d: DictStrAny = {} for prediction in detect_3d_list: for k, v in prediction.items(): if k not in collated_detect_3d: collated_detect_3d[k] = v else: collated_detect_3d[k].extend(v) self.detect_3d = collated_detect_3d
[docs] def reset(self) -> None: """Reset evaluator.""" self.detect_3d.clear()
[docs] def get_attributes(self, name: str, velocity: list[float]) -> str: """Get nuScenes attributes.""" if self.use_default_attr: return self.DefaultAttribute[name] if np.sqrt(velocity[0] ** 2 + velocity[1] ** 2) > self.velocity_thres: if name in { "car", "construction_vehicle", "bus", "truck", "trailer", }: attr = "vehicle.moving" elif name in {"bicycle", "motorcycle"}: attr = "cycle.with_rider" else: attr = self.DefaultAttribute[name] elif name in {"pedestrian"}: attr = "pedestrian.standing" elif name in {"bus"}: attr = "vehicle.stopped" else: attr = self.DefaultAttribute[name] return attr
def _process_detect_3d( self, token: str, boxes_3d: ArrayLike, velocities: ArrayLike, scores_3d: ArrayLike, class_ids: ArrayLike, attributes: ArrayLike | None = None, ) -> None: """Process 3D detection results.""" annos = [] boxes_3d_np = array_to_numpy(boxes_3d, n_dims=None, dtype=np.float32) velocities_np = array_to_numpy( velocities, n_dims=None, dtype=np.float32 ) scores_3d_np = array_to_numpy(scores_3d, n_dims=None, dtype=np.float32) class_ids_np = array_to_numpy(class_ids, n_dims=None, dtype=np.int64) if len(boxes_3d_np) != 0: for i, (box_3d, velocity, score_3d, class_id) in enumerate( zip( boxes_3d_np, velocities_np, scores_3d_np, class_ids_np, ) ): category = self.inv_nuscenes_class_map[int(class_id)] translation = box_3d[0:3] dims = box_3d[3:6].tolist() dimension = [d if d >= 0 else 0.1 for d in dims] rotation = Quaternion(box_3d[6:].tolist()) score = float(score_3d) velocity_list = velocity.tolist() if attributes is None: attribute_name = self.get_attributes( category, velocity_list ) else: attribute = array_to_numpy( attributes[i], n_dims=None, dtype=np.int64 # type: ignore # pylint: disable=line-too-long ) attribute_name = self.inv_nuscenes_attribute_map[ int(attribute) ] nusc_anno = { "sample_token": token, "translation": translation.tolist(), "size": dimension, "rotation": rotation.elements.tolist(), "velocity": [velocity_list[0], velocity_list[1]], "detection_name": category, "detection_score": score, "attribute_name": attribute_name, } annos.append(nusc_anno) self.detect_3d[token] = annos
[docs] def process_batch( # type: ignore # pylint: disable=arguments-differ self, tokens: list[str], boxes_3d: list[ArrayLike], velocities: list[ArrayLike], class_ids: list[ArrayLike], scores_3d: list[ArrayLike], attributes: list[ArrayLike] | None = None, ) -> None: """Process the results.""" for i, token in enumerate(tokens): self._process_detect_3d( token, boxes_3d[i], velocities[i], scores_3d[i], class_ids[i], attributes[i] if attributes is not None else None, )
[docs] def evaluate(self, metric: str) -> tuple[MetricLogs, str]: """Evaluate the results.""" assert metric == "detect_3d" if self.save_only: return {}, "Results are saved to the json file." try: nusc = NuScenesDevkit( version=self.version, dataroot=self.data_root, verbose=False, ) nusc_eval = NuScenesEval( nusc, config=config_factory("detection_cvpr_2019"), result_path=f"{self.output_dir}/detect_3d_predictions.json", eval_set=self.split, output_dir=os.path.join(self.output_dir, "detection"), ) metrics, _ = nusc_eval.evaluate() metrics_summary = metrics.serialize() log_dict, str_summary_list = _parse_high_level_metrics( metrics_summary["mean_ap"], metrics_summary["tp_errors"], metrics_summary["nd_score"], metrics_summary["eval_time"], ) class_aps = metrics_summary["mean_dist_aps"] class_tps = metrics_summary["label_tp_errors"] str_summary_list = _parse_per_class_metrics( str_summary_list, class_aps, class_tps ) str_summary = "\n".join(str_summary_list) except Exception as e: # pylint: disable=broad-except error_msg = "".join(e.args) rank_zero_warn(f"Evaluation error: {error_msg}") log_dict = {} str_summary = ( "Evaluation failure might be raised due to sanity check" + "or all emtpy boxes." ) rank_zero_warn(str_summary) return log_dict, str_summary
[docs] def save(self, metric: str, output_dir: str) -> None: """Save the results to json files.""" assert metric == "detect_3d" nusc_annos = {"results": self.detect_3d, "meta": self.meta_data} result_file = f"{output_dir}/detect_3d_predictions.json" with open(result_file, mode="w", encoding="utf-8") as f: json.dump(nusc_annos, f) self.output_dir = output_dir