Source code for vis4d.data.datasets.nuscenes

"""NuScenes multi-sensor video dataset."""

from __future__ import annotations

import os
from collections import defaultdict
from collections.abc import Sequence

import numpy as np
import torch
from scipy.spatial.transform import Rotation as R
from tqdm import tqdm

from vis4d.common.imports import NUSCENES_AVAILABLE
from vis4d.common.logging import rank_zero_info
from vis4d.common.time import Timer
from vis4d.common.typing import (
    ArgsType,
    DictStrAny,
    NDArrayBool,
    NDArrayF32,
    NDArrayI64,
)
from vis4d.data.const import AxisMode
from vis4d.data.const import CommonKeys as K
from vis4d.data.typing import DictData
from vis4d.op.geometry.projection import generate_depth_map
from vis4d.op.geometry.transform import (
    inverse_rigid_transform,
    transform_points,
)

from .base import VideoDataset, VideoMapping
from .util import CacheMappingMixin, im_decode, print_class_histogram

if NUSCENES_AVAILABLE:
    from nuscenes import NuScenes as NuScenesDevkit
    from nuscenes.can_bus.can_bus_api import NuScenesCanBus
    from nuscenes.eval.common.utils import quaternion_yaw
    from nuscenes.eval.detection.utils import category_to_detection_name
    from nuscenes.scripts.export_2d_annotations_as_json import (
        post_process_coords,
    )
    from nuscenes.utils.data_classes import Quaternion
    from nuscenes.utils.geometry_utils import (
        box_in_image,
        transform_matrix,
        view_points,
    )
    from nuscenes.utils.splits import create_splits_scenes
else:
    raise ImportError("nusenes-devkit is not available.")

nuscenes_class_map = {
    "bicycle": 0,
    "motorcycle": 1,
    "pedestrian": 2,
    "bus": 3,
    "car": 4,
    "trailer": 5,
    "truck": 6,
    "construction_vehicle": 7,
    "traffic_cone": 8,
    "barrier": 9,
}

nuscenes_attribute_map = {
    "cycle.with_rider": 0,
    "cycle.without_rider": 1,
    "pedestrian.moving": 2,
    "pedestrian.standing": 3,
    "pedestrian.sitting_lying_down": 4,
    "vehicle.moving": 5,
    "vehicle.parked": 6,
    "vehicle.stopped": 7,
    "": 8,
}

nuscenes_detection_range_map = {
    "bicycle": 40,
    "motorcycle": 40,
    "pedestrian": 40,
    "bus": 50,
    "car": 50,
    "trailer": 50,
    "truck": 50,
    "construction_vehicle": 50,
    "traffic_cone": 30,
    "barrier": 30,
}


def _get_extrinsics(
    ego_pose: DictStrAny, car_from_sensor: DictStrAny
) -> NDArrayF32:
    """Get NuScenes sensor to global extrinsics."""
    global_from_car = transform_matrix(
        ego_pose["translation"],
        Quaternion(ego_pose["rotation"]),
        inverse=False,
    )
    car_from_sensor_ = transform_matrix(
        car_from_sensor["translation"],
        Quaternion(car_from_sensor["rotation"]),
        inverse=False,
    )
    extrinsics = np.dot(global_from_car, car_from_sensor_).astype(np.float32)
    return extrinsics


[docs] class NuScenes(CacheMappingMixin, VideoDataset): """NuScenes multi-sensor video dataset. This dataset loads both LiDAR and camera inputs from the NuScenes dataset into the Vis4D expected format for multi-sensor, video datasets. """ DESCRIPTION = "NuScenes multi-sensor driving video dataset." HOMEPAGE = "https://www.nuscenes.org/" PAPER = "https://arxiv.org/abs/1903.11027" LICENSE = "https://www.nuscenes.org/license" KEYS = [ K.images, K.original_hw, K.input_hw, K.intrinsics, K.extrinsics, K.timestamp, K.axis_mode, K.boxes2d, K.boxes2d_classes, K.boxes2d_track_ids, K.boxes3d, K.boxes3d_classes, K.boxes3d_track_ids, ] SENSORS = [ "LIDAR_TOP", "CAM_FRONT", "CAM_FRONT_LEFT", "CAM_FRONT_RIGHT", "CAM_BACK", "CAM_BACK_LEFT", "CAM_BACK_RIGHT", ] CAMERAS = [ "CAM_FRONT", "CAM_FRONT_LEFT", "CAM_FRONT_RIGHT", "CAM_BACK", "CAM_BACK_LEFT", "CAM_BACK_RIGHT", ] def __init__( self, data_root: str, keys_to_load: Sequence[str] = ( K.images, K.boxes2d, K.boxes3d, ), sensors: Sequence[str] = ( "LIDAR_TOP", "CAM_FRONT", "CAM_FRONT_LEFT", "CAM_FRONT_RIGHT", "CAM_BACK", "CAM_BACK_LEFT", "CAM_BACK_RIGHT", ), version: str = "v1.0-trainval", split: str = "train", max_sweeps: int = 10, skip_empty_samples: bool = False, point_based_filter: bool = False, distance_based_filter: bool = False, cache_as_binary: bool = False, cached_file_path: str | None = None, **kwargs: ArgsType, ) -> None: """Creates an instance of the class. Args: data_root (str): Root directory of nuscenes data in original format. keys_to_load (tuple[str, ...]): Keys to load from the dataset. Defaults to (K.images, K.boxes2d, K.boxes3d). sensors (Sequence[str, ...]): Which sensor to load. Defaults to ("LIDAR_TOP", "CAM_FRONT", "CAM_FRONT_LEFT", "CAM_FRONT_RIGHT", "CAM_BACK", "CAM_BACK_LEFT", "CAM_BACK_RIGHT"). version (str, optional): Version of the data to load. Defaults to "v1.0-trainval". split (str, optional): Split of the data to load. Defaults to "train". max_sweeps (int, optional): Maximum number of sweeps for a single key-frame to load. Defaults to 10. skip_empty_samples (bool, optional): Whether to skip samples without annotations. Defaults to False. point_based_filter (bool, optional): Whether to filter out samples based on the number of points in the point cloud. Defaults to False. distance_based_filter (bool, optional): Whether to filter out samples based on the distance of the object from the ego vehicle. Defaults to False. cache_as_binary (bool): Whether to cache the dataset as binary. Default: False. cached_file_path (str | None): Path to a cached file. If cached file exist then it will load it instead of generating the data mapping. Default: None. """ super().__init__(**kwargs) self.data_root = data_root self.keys_to_load = keys_to_load self.sensors = sensors self._check_version_and_split(version, split) self.max_sweeps = max_sweeps self.skip_empty_samples = skip_empty_samples self.point_based_filter = point_based_filter self.distance_based_filter = distance_based_filter # Load annotations self.samples, self.original_len = self._load_mapping( self._generate_data_mapping, self._filter_data, cache_as_binary=cache_as_binary, cached_file_path=cached_file_path, ) # Generate video mapping self.video_mapping = self._generate_video_mapping() # Needed for CBGS
[docs] def get_cat_ids(self, idx: int) -> list[int]: """Return the samples.""" return self.samples[idx]["LIDAR_TOP"]["annotations"]["boxes3d_classes"]
def _filter_data(self, data: list[DictStrAny]) -> list[DictStrAny]: """Remove empty samples.""" if self.split == "test": return data samples = [] frequencies = {cat: 0 for cat in nuscenes_class_map} inv_nuscenes_class_map = {v: k for k, v in nuscenes_class_map.items()} t = Timer() for sample in data: ( _, boxes3d, boxes3d_classes, boxes3d_attributes, boxes3d_track_ids, boxes3d_velocities, ) = self._filter_boxes(sample["LIDAR_TOP"]["annotations"]) sample["LIDAR_TOP"]["annotations"]["boxes3d"] = boxes3d sample["LIDAR_TOP"]["annotations"][ "boxes3d_classes" ] = boxes3d_classes sample["LIDAR_TOP"]["annotations"][ "boxes3d_attributes" ] = boxes3d_attributes sample["LIDAR_TOP"]["annotations"][ "boxes3d_track_ids" ] = boxes3d_track_ids sample["LIDAR_TOP"]["annotations"][ "boxes3d_velocities" ] = boxes3d_velocities for box3d_class in boxes3d_classes: frequencies[inv_nuscenes_class_map[box3d_class]] += 1 for cam in NuScenes.CAMERAS: ( mask, boxes3d, boxes3d_classes, boxes3d_attributes, boxes3d_track_ids, boxes3d_velocities, ) = self._filter_boxes(sample[cam]["annotations"]) sample[cam]["annotations"]["boxes3d"] = boxes3d sample[cam]["annotations"]["boxes3d_classes"] = boxes3d_classes sample[cam]["annotations"][ "boxes3d_attributes" ] = boxes3d_attributes sample[cam]["annotations"][ "boxes3d_track_ids" ] = boxes3d_track_ids sample[cam]["annotations"][ "boxes3d_velocities" ] = boxes3d_velocities sample[cam]["annotations"]["boxes2d"] = sample[cam][ "annotations" ]["boxes2d"][mask] if self.skip_empty_samples: if len(sample["LIDAR_TOP"]["annotations"]["boxes3d"]) > 0: samples.append(sample) else: samples.append(sample) rank_zero_info( f"Preprocessing {len(data)} frames takes {t.time():.2f}" " seconds." ) print_class_histogram(frequencies) if self.skip_empty_samples: rank_zero_info( f"Filtered {len(data) - len(samples)} empty frames." ) return samples def _check_version_and_split(self, version: str, split: str) -> None: """Check that the version and split are valid.""" assert version in { "v1.0-trainval", "v1.0-test", "v1.0-mini", }, f"Invalid version {version} for NuScenes!" self.version = version if "mini" in version: valid_splits = {"mini_train", "mini_val"} elif "test" in version: valid_splits = {"test"} else: valid_splits = {"train", "val"} assert ( split in valid_splits ), f"Invalid split {split} for NuScenes {version}!" self.split = split
[docs] def __repr__(self) -> str: """Concise representation of the dataset.""" return f"NuScenesDataset {self.version} {self.split}"
def _generate_video_mapping(self) -> VideoMapping: """Group dataset sample indices by their associated video ID. The sample index is an integer while video IDs are string. Returns: VideoMapping: Mapping of video IDs to sample indices and frame IDs. """ video_to_indices: dict[str, list[int]] = defaultdict(list) video_to_frame_ids: dict[str, list[int]] = defaultdict(list) for i, sample in enumerate(self.samples): # type: ignore seq = sample["scene_name"] video_to_indices[seq].append(i) video_to_frame_ids[seq].append(sample["frame_ids"]) return self._sort_video_mapping( { "video_to_indices": video_to_indices, "video_to_frame_ids": video_to_frame_ids, } ) def _generate_data_mapping(self) -> list[DictStrAny]: """Generate data mapping. Returns: List[DictStrAny]: List of items required to load for a single dataset sample. """ data = NuScenesDevkit( version=self.version, dataroot=self.data_root, verbose=False ) can_bus_data = NuScenesCanBus(dataroot=self.data_root) frames = [] instance_tokens: list[str] = [] scene_names_per_split = create_splits_scenes() scenes = [ scene for scene in data.scene if scene["name"] in scene_names_per_split[self.split] ] for scene in tqdm(scenes): scene_name = scene["name"] frame_ids = 0 sample_token = scene["first_sample_token"] while sample_token: frame = {} sample = data.get("sample", sample_token) frame["scene_name"] = scene_name frame["token"] = sample["token"] frame["frame_ids"] = frame_ids sd_rec = data.get("sample_data", sample["data"]["LIDAR_TOP"]) # Can bus data can_bus = self._load_can_bus_data( scene_name, can_bus_data, sample["timestamp"] ) pose_record = data.get("ego_pose", sd_rec["ego_pose_token"]) rotation = Quaternion(pose_record["rotation"]) translation = pose_record["translation"] can_bus[:3] = translation can_bus[3:7] = rotation patch_angle = quaternion_yaw(rotation) / np.pi * 180 patch_angle += 360 if patch_angle < 0 else 0 can_bus[-2] = patch_angle / 180 * np.pi can_bus[-1] = patch_angle frame["can_bus"] = can_bus # LIDAR data lidar_token = sample["data"]["LIDAR_TOP"] frame["LIDAR_TOP"] = self._load_lidar_data(data, lidar_token) if self.split != "test": frame["LIDAR_TOP"]["annotations"] = self._load_annotations( data, frame["LIDAR_TOP"]["extrinsics"], sample["anns"], instance_tokens, axis_mode=AxisMode.LIDAR, ) # obtain sweeps for a single key-frame sweeps: list[DictStrAny] = [] while len(sweeps) < self.max_sweeps: if sd_rec["prev"] != "": sweep = self._load_lidar_data(data, sd_rec["prev"]) sweeps.append(sweep) sd_rec = data.get("sample_data", sd_rec["prev"]) else: break frame["LIDAR_TOP"]["sweeps"] = sweeps # Get the sample data for each camera for cam in self.CAMERAS: cam_token = sample["data"][cam] frame[cam] = self._load_cam_data(data, cam_token) if self.split != "test": frame[cam]["annotations"] = self._load_annotations( data, frame[cam]["extrinsics"], sample["anns"], instance_tokens, axis_mode=AxisMode.OPENCV, export_2d_annotations=True, intrinsics=frame[cam]["intrinsics"], image_hw=frame[cam]["image_hw"], ) # TODO add RADAR, Map frames.append(frame) sample_token = sample["next"] frame_ids += 1 return frames def _load_can_bus_data( self, scene_name: str, can_bus_data: NuScenesCanBus, sample_timestamp: int, ) -> list[float]: """Load can bus data.""" try: pose_list = can_bus_data.get_messages(scene_name, "pose") except: # pylint: disable=bare-except # server scenes do not have can bus information. return [0.0] * 18 # during each scene, the first timestamp of can_bus may be large than # the first sample's timestamp can_bus = [] last_pose = pose_list[0] for pose in pose_list: if pose["utime"] > sample_timestamp: break last_pose = pose last_pose.pop("utime") pos = last_pose.pop("pos") rotation = last_pose.pop("orientation") can_bus.extend(pos) can_bus.extend(rotation) # 16 elements for key in last_pose.keys(): can_bus.extend(last_pose[key]) can_bus.extend([0.0, 0.0]) return can_bus def _load_lidar_data( self, data: NuScenesDevkit, lidar_token: str ) -> DictStrAny: """Load LiDAR data. Args: data (NuScenesDevkit): NuScenes toolkit. lidar_token (str): LiDAR token. Returns: DictStrAny: LiDAR data. """ lidar_data = data.get("sample_data", lidar_token) sample_name = ( lidar_data["filename"].split("/")[-1].replace(".pcd.bin", "") ) lidar_path = os.path.join(self.data_root, lidar_data["filename"]) calibration_lidar = data.get( "calibrated_sensor", lidar_data["calibrated_sensor_token"] ) ego_pose = data.get("ego_pose", lidar_data["ego_pose_token"]) extrinsics = _get_extrinsics(ego_pose, calibration_lidar) return { "sample_name": sample_name, "lidar_path": lidar_path, "extrinsics": extrinsics, "timestamp": lidar_data["timestamp"], } def _load_cam_data( self, data: NuScenesDevkit, cam_token: str ) -> DictStrAny: """Load camera data. Args: data (NuScenesDevkit): NuScenes toolkit. cam_token (str): Camera token. Returns: DictStrAny: Camera data containing the sample name, image path, image height and width, intrinsics, extrinsics, and timestamp. """ cam_data = data.get("sample_data", cam_token) sample_name = ( cam_data["filename"] .split("/")[-1] .replace(f".{cam_data['fileformat']}", "") ) image_path = os.path.join(self.data_root, cam_data["filename"]) calibration_cam = data.get( "calibrated_sensor", cam_data["calibrated_sensor_token"] ) intrinsics = np.array( calibration_cam["camera_intrinsic"], dtype=np.float32 ) ego_pose = data.get("ego_pose", cam_data["ego_pose_token"]) extrinsics = _get_extrinsics(ego_pose, calibration_cam) return { "sample_name": sample_name, "image_path": image_path, "image_hw": (cam_data["height"], cam_data["width"]), "intrinsics": intrinsics, "extrinsics": extrinsics, "timestamp": cam_data["timestamp"], } def _load_annotations( self, data: NuScenesDevkit, extrinsics: NDArrayF32, ann_tokens: list[str], instance_tokens: list[str], axis_mode: AxisMode = AxisMode.ROS, export_2d_annotations: bool = False, intrinsics: NDArrayF32 | None = None, image_hw: tuple[int, int] | None = None, ) -> DictStrAny: """Load annonations.""" boxes3d = np.empty((1, 10), dtype=np.float32)[1:] boxes3d_classes = np.empty((1,), dtype=np.int64)[1:] boxes3d_attributes = np.empty((1,), dtype=np.int64)[1:] boxes3d_track_ids = np.empty((1,), dtype=np.int64)[1:] boxes3d_velocities = np.empty((1, 3), dtype=np.float32)[1:] boxes3d_num_lidar_pts = np.empty((1,), dtype=np.int64)[1:] boxes3d_num_radar_pts = np.empty((1,), dtype=np.int64)[1:] if export_2d_annotations: assert ( axis_mode == AxisMode.OPENCV ), "2D annotations are only supported in camera coordinates." assert intrinsics is not None, "Intrinsics must be provided." boxes2d = np.empty((1, 4), dtype=np.float32)[1:] sensor_from_global = inverse_rigid_transform( torch.from_numpy(extrinsics) ) translation = sensor_from_global[:3, 3].numpy() rotation = Quaternion( matrix=sensor_from_global[:3, :3].numpy(), atol=1e-5 ) for ann_token in ann_tokens: ann_info = data.get("sample_annotation", ann_token) box3d_class = category_to_detection_name(ann_info["category_name"]) if box3d_class is None: continue # 3D box in global coordinates box3d = data.get_box(ann_info["token"]) # Get 3D box velocity box3d.velocity = data.box_velocity(ann_info["token"]) # Move 3D box to sensor coordinates box3d.rotate(rotation) box3d.translate(translation) if export_2d_annotations: assert ( image_hw is not None ), "Image height and width must be provided." if not box_in_image( box3d, intrinsics, (image_hw[1], image_hw[0]) ): continue # Number of points in the 3D box boxes3d_num_lidar_pts = np.concatenate( [ boxes3d_num_lidar_pts, np.array([ann_info["num_lidar_pts"]], dtype=np.int64), ] ) boxes3d_num_radar_pts = np.concatenate( [ boxes3d_num_radar_pts, np.array([ann_info["num_radar_pts"]], dtype=np.int64), ] ) # Get 2D box if export_2d_annotations: corner_coords = ( view_points(box3d.corners(), intrinsics, True) .T[:, :2] .tolist() ) boxes2d = np.concatenate( [ boxes2d, np.array( [post_process_coords(corner_coords)], dtype=np.float32, ), ] ) # Get 3D box yaw. Use extrinsic rotation to align with PyTorch3D. if axis_mode == AxisMode.OPENCV: yaw = -box3d.orientation.yaw_pitch_roll[0] x, y, z, w = R.from_euler("XYZ", [0, yaw, 0]).as_quat() else: yaw = box3d.orientation.yaw_pitch_roll[0] x, y, z, w = R.from_euler("XYZ", [0, 0, yaw]).as_quat() orientation = Quaternion([w, x, y, z]) boxes3d = np.concatenate( [ boxes3d, np.array( [[*box3d.center, *box3d.wlh, *orientation.elements]], dtype=np.float32, ), ] ) # Get 3D box class id boxes3d_classes = np.concatenate( [ boxes3d_classes, np.array( [nuscenes_class_map[box3d_class]], dtype=np.int64 ), ] ) # Get 3D box attribute id if len(ann_info["attribute_tokens"]) == 0: box3d_attr = "" else: box3d_attr = data.get( "attribute", ann_info["attribute_tokens"][0] )["name"] boxes3d_attributes = np.concatenate( [ boxes3d_attributes, np.array( [nuscenes_attribute_map[box3d_attr]], dtype=np.int64 ), ] ) # Get 3D box track id instance_token = data.get("sample_annotation", box3d.token)[ "instance_token" ] if not instance_token in instance_tokens: instance_tokens.append(instance_token) track_id = instance_tokens.index(instance_token) boxes3d_track_ids = np.concatenate( [boxes3d_track_ids, np.array([track_id], dtype=np.int64)] ) # 3D bounding box velocity velocity = box3d.velocity.astype(np.float32) if np.any(np.isnan(velocity)): velocity = np.zeros(3, dtype=np.float32) boxes3d_velocities = np.concatenate( [boxes3d_velocities, velocity[None]] ) annotations = { "boxes3d": boxes3d, "boxes3d_classes": boxes3d_classes, "boxes3d_attributes": boxes3d_attributes, "boxes3d_track_ids": boxes3d_track_ids, "boxes3d_velocities": boxes3d_velocities, "boxes3d_num_lidar_pts": boxes3d_num_lidar_pts, "boxes3d_num_radar_pts": boxes3d_num_radar_pts, } if export_2d_annotations: annotations["boxes2d"] = boxes2d return annotations def _accumulate_sweeps( self, points: NDArrayF32, lidar2global: NDArrayF32, sweeps: list[DictStrAny], ) -> NDArrayF32: """Accumulate LiDAR sweeps.""" if len(sweeps) == 0: return points global2lidar = inverse_rigid_transform(torch.from_numpy(lidar2global)) points_sweeps = [torch.from_numpy(points)] for sweep in sweeps: points_bytes = self.data_backend.get(sweep["lidar_path"]) lidar_points = np.frombuffer( bytearray(points_bytes), dtype=np.float32 ) lidar_points = lidar_points.reshape(-1, 5)[:, :3] # Transform LiDAR points to global frame global_lidar_points = transform_points( torch.from_numpy(lidar_points), torch.from_numpy(sweep["extrinsics"]), ) # Transform LiDAR points to current LiDAR frame current_lidar_points = transform_points( global_lidar_points, global2lidar ) points_sweeps.append(current_lidar_points) return torch.cat(points_sweeps).numpy() def _load_depth_map( self, points_lidar: NDArrayF32, lidar2global: NDArrayF32, cam2global: NDArrayF32, intrinsics: NDArrayF32, image_hw: tuple[int, int], ) -> NDArrayF32: """Load depth map. Args: points_lidar (NDArrayF32): LiDAR points. lidar2global (NDArrayF32): LiDAR to global extrinsics. cam2global (NDArrayF32): Camera to global extrinsics. intrinsics (NDArrayF32): Camera intrinsic matrix. image_hw (tuple[int, int]): Image height and width. Returns: NDArrayF32: Depth map. """ cam2global_ = torch.from_numpy(cam2global) lidar2global_ = torch.from_numpy(lidar2global) intrinsics_ = torch.from_numpy(intrinsics) points_lidar_ = torch.from_numpy(np.copy(points_lidar)) lidar2cam = torch.matmul(torch.inverse(cam2global_), lidar2global_) cam2img = torch.eye(4, 4) cam2img[:3, :3] = intrinsics_ points_cam = points_lidar_[:, :3] @ (lidar2cam[:3, :3].T) + lidar2cam[ :3, 3 ].unsqueeze(0) depth_map = generate_depth_map(points_cam, intrinsics_, image_hw) return depth_map.numpy() def _filter_boxes( self, annotations: DictStrAny ) -> tuple[ NDArrayBool, NDArrayF32, NDArrayI64, NDArrayI64, NDArrayI64, NDArrayF32 ]: """Load boxes.""" valid_mask = np.full(annotations["boxes3d"].shape[0], True) if self.point_based_filter: boxes3d_num_lidar_pts = annotations["boxes3d_num_lidar_pts"] boxes3d_num_radar_pts = annotations["boxes3d_num_radar_pts"] valid_mask = np.logical_and( (boxes3d_num_lidar_pts + boxes3d_num_radar_pts) > 0, valid_mask ) if self.distance_based_filter: raise NotImplementedError( "Distance based filter not implemented yet" ) boxes3d = annotations["boxes3d"][valid_mask] boxes3d_classes = annotations["boxes3d_classes"][valid_mask] boxes3d_attributes = annotations["boxes3d_attributes"][valid_mask] boxes3d_track_ids = annotations["boxes3d_track_ids"][valid_mask] boxes3d_velocities = annotations["boxes3d_velocities"][valid_mask] return ( valid_mask, boxes3d, boxes3d_classes, boxes3d_attributes, boxes3d_track_ids, boxes3d_velocities, )
[docs] def __len__(self) -> int: """Length.""" return len(self.samples)
[docs] def __getitem__(self, idx: int) -> DictData: """Get single sample. Args: idx (int): Index of sample. Returns: DictData: sample at index in Vis4D input format. """ sample = self.samples[idx] data_dict: DictData = {} # metadata data_dict["token"] = sample["token"] data_dict[K.frame_ids] = sample["frame_ids"] data_dict[K.sequence_names] = sample["scene_name"] data_dict["can_bus"] = sample["can_bus"] if "LIDAR_TOP" in self.sensors: lidar_data = sample["LIDAR_TOP"] # load LiDAR frame data_dict["LIDAR_TOP"] = { K.sample_names: lidar_data["sample_name"], K.timestamp: lidar_data["timestamp"], K.extrinsics: lidar_data["extrinsics"], K.axis_mode: AxisMode.LIDAR, } if ( K.points3d in self.keys_to_load or K.depth_maps in self.keys_to_load ): points_bytes = self.data_backend.get(lidar_data["lidar_path"]) lidar_points = np.frombuffer( bytearray(points_bytes), dtype=np.float32 ) lidar_points = lidar_points.reshape(-1, 5)[:, :3] lidar_points = self._accumulate_sweeps( lidar_points, lidar_data["extrinsics"], lidar_data["sweeps"], ) if K.points3d in self.keys_to_load: data_dict["LIDAR_TOP"][K.points3d] = lidar_points if K.boxes3d in self.keys_to_load: data_dict["LIDAR_TOP"][K.boxes3d] = lidar_data["annotations"][ "boxes3d" ] data_dict["LIDAR_TOP"][K.boxes3d_classes] = lidar_data[ "annotations" ]["boxes3d_classes"] data_dict["LIDAR_TOP"][K.boxes3d_track_ids] = lidar_data[ "annotations" ]["boxes3d_track_ids"] data_dict["LIDAR_TOP"][K.boxes3d_velocities] = lidar_data[ "annotations" ]["boxes3d_velocities"] data_dict["LIDAR_TOP"]["attributes"] = lidar_data[ "annotations" ]["boxes3d_attributes"] # load camera frame for cam in NuScenes.CAMERAS: if cam in self.sensors: cam_data = sample[cam] data_dict[cam] = {K.timestamp: cam_data["timestamp"]} if K.images in self.keys_to_load: im_bytes = self.data_backend.get(cam_data["image_path"]) image = np.ascontiguousarray( im_decode(im_bytes, mode=self.image_channel_mode), dtype=np.float32, )[None] data_dict[cam][K.images] = image data_dict[cam][K.input_hw] = cam_data["image_hw"] data_dict[cam][K.sample_names] = cam_data["sample_name"] data_dict[cam][K.intrinsics] = cam_data["intrinsics"] data_dict[cam][K.extrinsics] = cam_data["extrinsics"] data_dict[cam][K.axis_mode] = AxisMode.OPENCV if K.original_images in self.keys_to_load: data_dict[cam][K.original_images] = image data_dict[cam][K.original_hw] = cam_data["image_hw"] if ( K.boxes3d in self.keys_to_load or K.boxes2d in self.keys_to_load ): if K.boxes3d in self.keys_to_load: data_dict[cam][K.boxes3d] = cam_data["annotations"][ "boxes3d" ] data_dict[cam][K.boxes3d_classes] = cam_data[ "annotations" ]["boxes3d_classes"] data_dict[cam][K.boxes3d_track_ids] = cam_data[ "annotations" ]["boxes3d_track_ids"] data_dict[cam][K.boxes3d_velocities] = cam_data[ "annotations" ]["boxes3d_velocities"] data_dict[cam]["attributes"] = cam_data["annotations"][ "boxes3d_attributes" ] if K.boxes2d in self.keys_to_load: boxes2d = cam_data["annotations"]["boxes2d"] data_dict[cam][K.boxes2d] = boxes2d data_dict[cam][K.boxes2d_classes] = data_dict[cam][ K.boxes3d_classes ] data_dict[cam][K.boxes2d_track_ids] = data_dict[cam][ K.boxes3d_track_ids ] if K.depth_maps in self.keys_to_load: depth_maps = self._load_depth_map( lidar_points, lidar_data["extrinsics"], cam_data["extrinsics"], cam_data["intrinsics"], cam_data["image_hw"], ) data_dict[cam][K.depth_maps] = depth_maps return data_dict