Source code for vis4d.eval.nuscenes.track3d

"""NuScenes 3D tracking evaluation code."""

from __future__ import annotations

import json
from collections.abc import Callable
from typing import Any

import numpy as np
from nuscenes.utils.data_classes import Quaternion

from vis4d.common.array import array_to_numpy
from vis4d.common.typing import ArrayLike, DictStrAny, MetricLogs
from vis4d.data.datasets.nuscenes import nuscenes_class_map

from ..base import Evaluator


[docs] class NuScenesTrack3DEvaluator(Evaluator): """NuScenes 3D tracking evaluation class.""" inv_nuscenes_class_map = {v: k for k, v in nuscenes_class_map.items()} tracking_cats = [ "bicycle", "motorcycle", "pedestrian", "bus", "car", "trailer", "truck", ] def __init__(self, metadata: tuple[str, ...] = ("use_camera",)) -> None: """Initialize NuScenes evaluator.""" super().__init__() self.meta_data = { "use_camera": False, "use_lidar": False, "use_radar": False, "use_map": False, "use_external": False, } for m in metadata: self.meta_data[m] = True self.tracks_3d: DictStrAny = {} self.reset()
[docs] def __repr__(self) -> str: """Concise representation of the dataset evaluator.""" return "NuScenes 3D Tracking Evaluator"
@property def metrics(self) -> list[str]: """Supported metrics.""" return ["track_3d"]
[docs] def gather( # type: ignore self, gather_func: Callable[[Any], Any] ) -> None: """Gather variables in case of distributed setting (if needed). Args: gather_func (Callable[[Any], Any]): Gather function. """ tracks_3d_list = gather_func(self.tracks_3d) if tracks_3d_list is not None: collated_track_3d: DictStrAny = {} for prediction in tracks_3d_list: for k, v in prediction.items(): if k not in collated_track_3d: collated_track_3d[k] = v else: collated_track_3d[k].extend(v) self.tracks_3d = collated_track_3d
[docs] def reset(self) -> None: """Reset evaluator.""" self.tracks_3d.clear()
def _process_track_3d( self, token: str, boxes_3d: ArrayLike, velocities: ArrayLike, scores_3d: ArrayLike, class_ids: ArrayLike, track_ids: ArrayLike, ) -> None: """Process 3D tracking results.""" annos = [] boxes_3d_np = array_to_numpy(boxes_3d, n_dims=None, dtype=np.float32) velocities_np = array_to_numpy( velocities, n_dims=None, dtype=np.float32 ) scores_3d_np = array_to_numpy(scores_3d, n_dims=None, dtype=np.float32) class_ids_np = array_to_numpy(class_ids, n_dims=None, dtype=np.int64) track_ids_np = array_to_numpy(track_ids, n_dims=None, dtype=np.int64) if len(boxes_3d_np) != 0: for box_3d, velocity, score_3d, class_id, track_id in zip( boxes_3d_np, velocities_np, scores_3d_np, class_ids_np, track_ids_np, ): category = self.inv_nuscenes_class_map[int(class_id)] if not category in self.tracking_cats: continue translation = box_3d[0:3] dimension = box_3d[3:6] rotation = Quaternion(box_3d[6:].tolist()) score = float(score_3d) velocity_list = velocity.tolist() nusc_anno = { "sample_token": token, "translation": translation.tolist(), "size": dimension.tolist(), "rotation": rotation.elements.tolist(), "velocity": [velocity_list[0], velocity_list[1]], "tracking_id": int(track_id), "tracking_name": category, "tracking_score": score, } annos.append(nusc_anno) self.tracks_3d[token] = annos
[docs] def process_batch( # type: ignore # pylint: disable=arguments-differ self, tokens: list[str], boxes_3d: list[ArrayLike], velocities: list[ArrayLike], class_ids: list[ArrayLike], scores_3d: list[ArrayLike], track_ids: list[ArrayLike], ) -> None: """Process the results.""" for i, token in enumerate(tokens): self._process_track_3d( token, boxes_3d[i], velocities[i], scores_3d[i], class_ids[i], track_ids[i], )
[docs] def evaluate(self, metric: str) -> tuple[MetricLogs, str]: """Evaluate the results.""" return {}, "Currently only save the json file."
[docs] def save(self, metric: str, output_dir: str) -> None: """Save the results to json files.""" nusc_annos = {"results": self.tracks_3d, "meta": self.meta_data} result_file = f"{output_dir}/track_3d_predictions.json" with open(result_file, mode="w", encoding="utf-8") as f: json.dump(nusc_annos, f)