Source code for vis4d.data.datasets.scalabel

"""Scalabel type dataset."""

from __future__ import annotations

import os
from collections import defaultdict
from collections.abc import Callable, Sequence
from typing import Union

import numpy as np
import torch

from vis4d.common.distributed import broadcast
from vis4d.common.imports import SCALABEL_AVAILABLE
from vis4d.common.logging import rank_zero_info
from vis4d.common.time import Timer
from vis4d.common.typing import (
    ArgsType,
    ListAny,
    NDArrayF32,
    NDArrayI64,
    NDArrayUI8,
)
from vis4d.data.const import AxisMode
from vis4d.data.const import CommonKeys as K
from vis4d.data.datasets.util import CacheMappingMixin, DatasetFromList
from vis4d.data.io import DataBackend
from vis4d.data.typing import DictData
from vis4d.op.geometry.rotation import (
    euler_angles_to_matrix,
    matrix_to_quaternion,
)

from .base import VideoDataset, VideoMapping
from .util import DatasetFromList, im_decode, ply_decode, print_class_histogram

if SCALABEL_AVAILABLE:
    from scalabel.label.io import load, load_label_config
    from scalabel.label.transforms import (
        box2d_to_xyxy,
        poly2ds_to_mask,
        rle_to_mask,
    )
    from scalabel.label.typing import Config
    from scalabel.label.typing import Dataset as ScalabelData
    from scalabel.label.typing import (
        Extrinsics,
        Frame,
        ImageSize,
        Intrinsics,
        Label,
    )
    from scalabel.label.utils import (
        check_crowd,
        check_ignored,
        get_leaf_categories,
        get_matrix_from_extrinsics,
        get_matrix_from_intrinsics,
    )
else:
    raise ImportError("scalabel is not installed.")


[docs] def load_intrinsics(intrinsics: Intrinsics) -> NDArrayF32: """Transform intrinsic camera matrix according to augmentations.""" return get_matrix_from_intrinsics(intrinsics).astype(np.float32)
[docs] def load_extrinsics(extrinsics: Extrinsics) -> NDArrayF32: """Transform extrinsics from Scalabel to Vis4D.""" return get_matrix_from_extrinsics(extrinsics).astype(np.float32)
[docs] def load_image( url: str, backend: DataBackend, image_channel_mode: str ) -> NDArrayF32: """Load image tensor from url.""" im_bytes = backend.get(url) image = im_decode(im_bytes, mode=image_channel_mode) return np.ascontiguousarray(image, dtype=np.float32)[None]
[docs] def load_pointcloud(url: str, backend: DataBackend) -> NDArrayF32: """Load pointcloud tensor from url.""" assert url.endswith(".ply"), "Only PLY files are supported now." ply_bytes = backend.get(url) pointcloud = ply_decode(ply_bytes) return pointcloud.astype(np.float32)
[docs] def instance_ids_to_global( frames: list[Frame], local_instance_ids: dict[str, list[str]] ) -> None: """Use local (per video) instance ids to produce global ones.""" video_names = list(local_instance_ids.keys()) for frame_id, ann in enumerate(frames): if ann.labels is None: # pragma: no cover continue for label in ann.labels: assert label.attributes is not None if not check_crowd(label) and not check_ignored(label): video_name = ( ann.videoName if ann.videoName is not None else "no-video-" + str(frame_id) ) sum_previous_vids = sum( ( len(local_instance_ids[v]) for v in video_names[: video_names.index(video_name)] ) ) label.attributes["instance_id"] = ( sum_previous_vids + local_instance_ids[video_name].index(label.id) )
[docs] def add_data_path(data_root: str, frames: list[Frame]) -> None: """Add filepath to frame using data_root.""" for ann in frames: assert ann.name is not None if ann.url is None: if ann.videoName is not None: ann.url = os.path.join(data_root, ann.videoName, ann.name) else: ann.url = os.path.join(data_root, ann.name) else: ann.url = os.path.join(data_root, ann.url)
[docs] def discard_labels_outside_set( dataset: list[Frame], class_set: list[str] ) -> None: """Discard labels outside given set of classes. Args: dataset (list[Frame]): List of frames to filter. class_set (list[str]): List of classes to keep. """ for frame in dataset: remove_anns = [] if frame.labels is not None: for i, ann in enumerate(frame.labels): if not ann.category in class_set: remove_anns.append(i) for i in reversed(remove_anns): frame.labels.pop(i)
[docs] def remove_empty_samples(frames: list[Frame]) -> list[Frame]: """Remove empty samples.""" new_frames = [] for frame in frames: if frame.labels is None: continue labels_used = [] for label in frame.labels: assert label.attributes is not None and label.category is not None if not check_crowd(label) and not check_ignored(label): labels_used.append(label) if len(labels_used) != 0: frame.labels = labels_used new_frames.append(frame) rank_zero_info(f"Filtered {len(frames) - len(new_frames)} empty frames.") del frames return new_frames
[docs] def prepare_labels( frames: list[Frame], class_list: list[str], global_instance_ids: bool = False, ) -> dict[str, int]: """Add category id and instance id to labels, return class frequencies. Args: frames (list[Frame]): List of frames. class_list (list[str]): List of classes. global_instance_ids (bool): Whether to use global instance ids. Defaults to False. """ instance_ids: dict[str, list[str]] = defaultdict(list) frequencies = {cat: 0 for cat in class_list} for frame_id, ann in enumerate(frames): if ann.labels is None: # pragma: no cover continue for label in ann.labels: attr: dict[str, bool | int | float | str] = {} if label.attributes is not None: attr = label.attributes if check_crowd(label) or check_ignored(label): continue assert label.category is not None frequencies[label.category] += 1 video_name = ( ann.videoName if ann.videoName is not None else "no-video-" + str(frame_id) ) if label.id not in instance_ids[video_name]: instance_ids[video_name].append(label.id) attr["instance_id"] = instance_ids[video_name].index(label.id) label.attributes = attr if global_instance_ids: instance_ids_to_global(frames, instance_ids) return frequencies
[docs] def filter_frames_by_attributes( frames: list[Frame], attributes_to_load: Sequence[dict[str, str | float]] | None, ) -> list[Frame]: """Filter frames based on attributes.""" if attributes_to_load is None: return frames filtered_frames: list[Frame] = [] for frame in frames: for attribute_dict in attributes_to_load: if hasattr(frame, "attributes") and frame.attributes is not None: if all( frame.attributes.get(key) == value for key, value in attribute_dict.items() ): filtered_frames.append(frame) break else: raise ValueError( "Attribute to load is specified but no attributes " "are found in the frame." ) return filtered_frames
# Not using | operator because of a bug in Python 3.9 # https://bugs.python.org/issue42233 CategoryMap = Union[dict[str, int], dict[str, dict[str, int]]]
[docs] class Scalabel(CacheMappingMixin, VideoDataset): """Scalabel type dataset. This class loads scalabel format data into Vis4D. """ def __init__( self, data_root: str, annotation_path: str, keys_to_load: Sequence[str] = (K.images, K.boxes2d), category_map: None | CategoryMap = None, config_path: None | str | Config = None, global_instance_ids: bool = False, bg_as_class: bool = False, skip_empty_samples: bool = False, attributes_to_load: Sequence[dict[str, str | float]] | None = None, cache_as_binary: bool = False, cached_file_path: str | None = None, **kwargs: ArgsType, ) -> None: """Creates an instance of the class. Args: data_root (str): Root directory of the data. annotation_path (str): Path to the annotation json(s). keys_to_load (Sequence[str, ...], optional): Keys to load from the dataset. Defaults to (K.images, K.boxes2d). category_map (None | CategoryMap, optional): Mapping from a Scalabel category string to an integer index. If None, the standard mapping in the dataset config will be used. Defaults to None. config_path (None | str | Config, optional): Path to the dataset config, can be added if it is not provided together with the labels or should be modified. Defaults to None. global_instance_ids (bool): Whether to convert tracking IDs of annotations into dataset global IDs or stay with local, per-video IDs. Defaults to false. bg_as_class (bool): Whether to include background pixels as an additional class for masks. skip_empty_samples (bool): Whether to skip samples without annotations. attributes_to_load (Sequence[dict[str, str]]): List of attributes dictionaries to load. Each dictionary is a mapping from the attribute name to its desired value. If any of the attributes dictionaries is matched, the corresponding frame will be loaded. Defaults to None. cache_as_binary (bool): Whether to cache the dataset as binary. Default: False. cached_file_path (str | None): Path to a cached file. If cached file exist then it will load it instead of generating the data mapping. Default: None. """ super().__init__(**kwargs) assert SCALABEL_AVAILABLE, "Scalabel is not installed." self.data_root = data_root self.annotation_path = annotation_path self.keys_to_load = keys_to_load self.global_instance_ids = global_instance_ids self.bg_as_class = bg_as_class self.config_path = config_path self.skip_empty_samples = skip_empty_samples self.cats_name2id: dict[str, dict[str, int]] = {} self.category_map = category_map self.attributes_to_load = attributes_to_load self.frames, self.cfg = self._load_mapping( self._generate_mapping, remove_empty_samples, cache_as_binary=cache_as_binary, cached_file_path=cached_file_path, ) assert self.cfg is not None, ( "No dataset configuration found. Please provide a configuration " "via config_path." ) if self.category_map is None: class_list = list( c.name for c in get_leaf_categories(self.cfg.categories) ) self.category_map = {c: i for i, c in enumerate(class_list)} self._setup_categories() self.video_mapping = self._generate_video_mapping() def _generate_video_mapping(self) -> VideoMapping: """Group all dataset sample indices (int) by their video ID (str). Returns: VideoMapping: Mapping of video IDs to sample indices and frame IDs. """ video_to_indices: dict[str, list[int]] = defaultdict(list) video_to_frame_ids: dict[str, list[int]] = defaultdict(list) for idx, frame in enumerate(self.frames): # type: ignore if frame.videoName is not None: assert ( frame.frameIndex is not None ), "found videoName but no frameIndex!" video_to_indices[frame.videoName].append(idx) video_to_frame_ids[frame.videoName].append(frame.frameIndex) return self._sort_video_mapping( { "video_to_indices": video_to_indices, "video_to_frame_ids": video_to_frame_ids, } ) def _setup_categories(self) -> None: """Setup categories.""" assert self.category_map is not None for target in self.keys_to_load: if isinstance(list(self.category_map.values())[0], int): self.cats_name2id[target] = self.category_map # type: ignore else: assert ( target in self.category_map ), f"Target={target} not specified in category_mapping" target_map = self.category_map[target] assert isinstance(target_map, dict) self.cats_name2id[target] = target_map def _load_mapping( # type: ignore self, generate_map_func: Callable[[], ScalabelData], filter_func: Callable[[ListAny], ListAny] = lambda x: x, cache_as_binary: bool = True, cached_file_path: str | None = None, ) -> tuple[DatasetFromList, Config]: """Load cached mapping or generate if not exists.""" timer = Timer() data = self._load_mapping_data( generate_map_func, cache_as_binary, cached_file_path ) if data is not None: frames, cfg = data.frames, data.config add_data_path(self.data_root, frames) rank_zero_info(f"Loading {self} takes {timer.time():.2f} seconds.") if self.category_map is None: class_list = list( c.name for c in get_leaf_categories(cfg.categories) ) self.category_map = {c: i for i, c in enumerate(class_list)} else: class_list = list(self.category_map.keys()) assert len(set(class_list)) == len( class_list ), "Class names are not unique!" discard_labels_outside_set(frames, class_list) frames = filter_frames_by_attributes( frames, self.attributes_to_load ) if self.skip_empty_samples: frames = filter_func(frames) t = Timer() frequencies = prepare_labels( frames, class_list, global_instance_ids=self.global_instance_ids, ) rank_zero_info( f"Preprocessing {len(frames)} frames takes {t.time():.2f}" " seconds." ) print_class_histogram(frequencies) frames_dataset = DatasetFromList(frames) else: frames_dataset = None cfg = None frames_dataset = broadcast(frames_dataset) cfg = broadcast(cfg) assert frames_dataset is not None return frames_dataset, cfg def _generate_mapping(self) -> ScalabelData: """Generate data mapping.""" data = load(self.annotation_path) if self.config_path is not None: if isinstance(self.config_path, str): data.config = load_label_config(self.config_path) else: data.config = self.config_path return data def _load_inputs(self, frame: Frame) -> DictData: """Load inputs given a scalabel frame.""" data: DictData = {} if K.images in self.keys_to_load: assert frame.url is not None, "url is None!" image = load_image( frame.url, self.data_backend, self.image_channel_mode ) input_hw = (image.shape[1], image.shape[2]) data[K.images] = image data[K.input_hw] = input_hw # Original image data[K.original_images] = image data[K.original_hw] = input_hw data[K.axis_mode] = AxisMode.OPENCV data[K.frame_ids] = frame.frameIndex data[K.sample_names] = frame.name data[K.sequence_names] = frame.videoName if K.points3d in self.keys_to_load: assert frame.url is not None, "url is None!" data[K.points3d] = load_pointcloud(frame.url, self.data_backend) if frame.intrinsics is not None and K.intrinsics in self.keys_to_load: data[K.intrinsics] = load_intrinsics(frame.intrinsics) if frame.extrinsics is not None and K.extrinsics in self.keys_to_load: data[K.extrinsics] = load_extrinsics(frame.extrinsics) return data def _add_annotations(self, frame: Frame, data: DictData) -> None: """Add annotations given a scalabel frame and a data dictionary.""" labels_used, instid_map = [], {} if frame.labels is not None: for label in frame.labels: assert ( label.attributes is not None and label.category is not None ) if not check_crowd(label) and not check_ignored(label): labels_used.append(label) if label.id not in instid_map: instid_map[label.id] = int( label.attributes["instance_id"] ) image_size = ( ImageSize(height=data[K.input_hw][0], width=data[K.input_hw][1]) if K.input_hw in data else frame.size ) if K.boxes2d in self.keys_to_load: cats_name2id = self.cats_name2id[K.boxes2d] boxes2d, classes, track_ids = boxes2d_from_scalabel( labels_used, cats_name2id, instid_map ) data[K.boxes2d] = boxes2d data[K.boxes2d_classes] = classes data[K.boxes2d_track_ids] = track_ids if K.instance_masks in self.keys_to_load: # NOTE: instance masks' mapping is consistent with boxes2d cats_name2id = self.cats_name2id[K.instance_masks] instance_masks = instance_masks_from_scalabel( labels_used, cats_name2id, image_size ) data[K.instance_masks] = instance_masks if K.seg_masks in self.keys_to_load: sem_map = self.cats_name2id[K.seg_masks] semantic_masks = semantic_masks_from_scalabel( labels_used, sem_map, image_size, self.bg_as_class ) data[K.seg_masks] = semantic_masks if K.boxes3d in self.keys_to_load: boxes3d, classes, track_ids = boxes3d_from_scalabel( labels_used, self.cats_name2id[K.boxes3d], instid_map ) data[K.boxes3d] = boxes3d data[K.boxes3d_classes] = classes data[K.boxes3d_track_ids] = track_ids
[docs] def __len__(self) -> int: """Length of dataset.""" return len(self.frames)
[docs] def __getitem__(self, index: int) -> DictData: """Get item from dataset at given index.""" frame = self.frames[index] data = self._load_inputs(frame) # load annotations to input sample self._add_annotations(frame, data) return data
[docs] def boxes2d_from_scalabel( labels: list[Label], class_to_idx: dict[str, int], label_id_to_idx: dict[str, int] | None = None, ) -> tuple[NDArrayF32, NDArrayI64, NDArrayI64]: """Convert from scalabel format to Vis4D. NOTE: The box definition in Scalabel includes x2y2 in the box area, whereas Vis4D and other software libraries like detectron2 and mmdet do not include this, which is why we convert via box2d_to_xyxy. Args: labels (list[Label]): list of scalabel labels. class_to_idx (dict[str, int]): mapping from class name to index. label_id_to_idx (dict[str, int] | None, optional): mapping from label id to index. Defaults to None. Returns: tuple[NDArrayF32, NDArrayI64, NDArrayI64]: boxes, classes, track_ids """ box_list, cls_list, idx_list = [], [], [] for i, label in enumerate(labels): box, box_cls, l_id = label.box2d, label.category, label.id if box is None: continue if box_cls in class_to_idx: cls_list.append(class_to_idx[box_cls]) else: continue box_list.append(box2d_to_xyxy(box)) idx = label_id_to_idx[l_id] if label_id_to_idx is not None else i idx_list.append(idx) if len(box_list) == 0: return ( np.empty((0, 4), dtype=np.float32), np.empty((0,), dtype=np.int64), np.empty((0,), dtype=np.int64), ) box_tensor = np.array(box_list, dtype=np.float32) class_ids = np.array(cls_list, dtype=np.int64) track_ids = np.array(idx_list, dtype=np.int64) return box_tensor, class_ids, track_ids
[docs] def instance_masks_from_scalabel( labels: list[Label], class_to_idx: dict[str, int], image_size: ImageSize | None = None, ) -> NDArrayUI8: """Convert instance masks from scalabel format to Vis4D. Args: labels (list[Label]): list of scalabel labels. class_to_idx (dict[str, int]): mapping from class name to index. image_size (ImageSize, optional): image size. Defaults to None. Returns: NDArrayUI8: instance masks. """ bitmask_list = [] for _, label in enumerate(labels): if label.category not in class_to_idx: # pragma: no cover continue # skip unknown classes if label.poly2d is None and label.rle is None: continue if label.rle is not None: bitmask = rle_to_mask(label.rle) elif label.poly2d is not None: assert ( image_size is not None ), "image size must be specified for masks with polygons!" bitmask_raw = poly2ds_to_mask(image_size, label.poly2d) bitmask: NDArrayUI8 = (bitmask_raw > 0).astype( # type: ignore bitmask_raw.dtype ) else: raise ValueError("No mask found in label.") bitmask_list.append(bitmask) if len(bitmask_list) == 0: # pragma: no cover return np.empty((0, 0, 0), dtype=np.uint8) mask_array = np.array(bitmask_list, dtype=np.uint8) return mask_array
[docs] def nhw_to_hwc_mask( masks: NDArrayUI8, class_ids: NDArrayI64, ignore_class: int = 255 ) -> NDArrayUI8: """Convert N binary HxW masks to HxW semantic mask. Args: masks (NDArrayUI8): Masks with shape [N, H, W]. class_ids (NDArrayI64): Class IDs with shape [N, 1]. ignore_class (int, optional): Ignore label. Defaults to 255. Returns: NDArrayUI8: Masks with shape [H, W], where each location indicate the class label. """ hwc_mask = np.full(masks.shape[1:], ignore_class, dtype=masks.dtype) for mask, cat_id in zip(masks, class_ids): hwc_mask[mask > 0] = cat_id return hwc_mask
[docs] def semantic_masks_from_scalabel( labels: list[Label], class_to_idx: dict[str, int], image_size: ImageSize | None = None, bg_as_class: bool = False, ) -> NDArrayUI8: """Convert masks from scalabel format to Vis4D. Args: labels (list[Label]): list of scalabel labels. class_to_idx (dict[str, int]): mapping from class name to index. image_size (ImageSize, optional): image size. Defaults to None. bg_as_class (bool, optional): whether to include background as a class. Defaults to False. Returns: NDArrayUI8: instance masks. """ bitmask_list, cls_list = [], [] if bg_as_class: foreground: NDArrayUI8 | None = None for _, label in enumerate(labels): if label.poly2d is None and label.rle is None: continue mask_cls = label.category if mask_cls in class_to_idx: cls_list.append(class_to_idx[mask_cls]) else: # pragma: no cover continue # skip unknown classes if label.rle is not None: bitmask = rle_to_mask(label.rle) elif label.poly2d is not None: assert ( image_size is not None ), "image size must be specified for masks with polygons!" bitmask_raw = poly2ds_to_mask(image_size, label.poly2d) bitmask: NDArrayUI8 = (bitmask_raw > 0).astype( # type: ignore bitmask_raw.dtype ) else: raise ValueError("No mask found in label.") bitmask_list.append(bitmask) if bg_as_class: foreground = ( bitmask if foreground is None else np.logical_or(foreground, bitmask) ) if bg_as_class: if foreground is None: # pragma: no cover assert image_size is not None foreground = np.zeros( (image_size.height, image_size.width), dtype=np.uint8 ) bitmask_list.append(np.logical_not(foreground)) assert "background" in class_to_idx, ( '"bg_as_class" requires "background" class to be ' "in category_mapping" ) cls_list.append(class_to_idx["background"]) if len(bitmask_list) == 0: # pragma: no cover return np.empty((0, 0), dtype=np.uint8) mask_array = np.array(bitmask_list, dtype=np.uint8) class_ids = np.array(cls_list, dtype=np.int64) return nhw_to_hwc_mask(mask_array, class_ids)
[docs] def boxes3d_from_scalabel( labels: list[Label], class_to_idx: dict[str, int], label_id_to_idx: dict[str, int] | None = None, ) -> tuple[NDArrayF32, NDArrayI64, NDArrayI64]: """Convert 3D bounding boxes from scalabel format to Vis4D.""" box_list, cls_list, idx_list = [], [], [] for i, label in enumerate(labels): box, box_cls, l_id = label.box3d, label.category, label.id if box is None: continue if box_cls in class_to_idx: cls_list.append(class_to_idx[box_cls]) else: continue quaternion = ( matrix_to_quaternion( euler_angles_to_matrix(torch.tensor([box.orientation])) )[0] .numpy() .tolist() ) box_list.append([*box.location, *box.dimension, *quaternion]) idx = label_id_to_idx[l_id] if label_id_to_idx is not None else i idx_list.append(idx) if len(box_list) == 0: return ( np.empty((0, 10), dtype=np.float32), np.empty((0,), dtype=np.int64), np.empty((0,), dtype=np.int64), ) box_tensor = np.array(box_list, dtype=np.float32) class_ids = np.array(cls_list, dtype=np.int64) track_ids = np.array(idx_list, dtype=np.int64) return box_tensor, class_ids, track_ids