Source code for vis4d.eval.common.depth

"""Depth estimation evaluator."""

from __future__ import annotations

import itertools

import numpy as np

from vis4d.common.array import array_to_numpy
from vis4d.common.typing import (
    ArrayLike,
    GenericFunc,
    MetricLogs,
    NDArrayFloat,
)
from vis4d.eval.base import Evaluator

from ..metrics.depth import (
    absolute_error,
    absolute_relative_error,
    delta_p,
    log_10_error,
    root_mean_squared_error,
    root_mean_squared_error_log,
    scale_invariant_log,
    squared_relative_error,
)


[docs] class DepthEvaluator(Evaluator): """Depth estimation evaluator.""" METRIC_DEPTH = "Depth" KEY_DELTA05 = "d05" KEY_DELTA1 = "d1" KEY_DELTA2 = "d2" KEY_DELTA3 = "d3" KEY_ABS_REL = "AbsRel" KEY_ABS_ERR = "AbsErr" KEY_SQ_REL = "SqRel" KEY_RMSE = "RMSE" KEY_RMSE_LOG = "RMSELog" KEY_SILOG = "SILog" KEY_LOG10 = "Log10" def __init__( self, min_depth: float = 0.0, max_depth: float = 80.0, scale: float = 1.0, epsilon: float = 1e-3, ) -> None: """Initialize the optical flow evaluator. Args: min_depth (float): Minimum depth to evaluate. Defaults to 0.001. max_depth (float): Maximum depth to evaluate. Defaults to 80.0. scale (float): Scale factor for depth. Defaults to 1.0. epsilon (float): Small value to avoid logarithms of small values. Defaults to 1e-3. """ super().__init__() self.min_depth = min_depth self.max_depth = max_depth self.epsilon = epsilon self.scale = scale self._metrics_list: list[dict[str, float]] = []
[docs] def __repr__(self) -> str: """Concise representation of the evaluator.""" return "Common Depth Evaluator"
@property def metrics(self) -> list[str]: """Supported metrics.""" return [self.METRIC_DEPTH]
[docs] def reset(self) -> None: """Reset evaluator for new round of evaluation.""" self._metrics_list = []
[docs] def gather(self, gather_func: GenericFunc) -> None: """Accumulate predictions across processes.""" all_metrics = gather_func(self._metrics_list) if all_metrics is not None: self._metrics_list = list(itertools.chain(*all_metrics))
def _apply_mask( self, prediction: NDArrayFloat, target: NDArrayFloat ) -> tuple[NDArrayFloat, NDArrayFloat]: """Apply mask to prediction and target.""" mask = (target > self.min_depth) & (target <= self.max_depth) return prediction[mask], target[mask]
[docs] def process_batch( # type: ignore # pylint: disable=arguments-differ self, prediction: ArrayLike, groundtruth: ArrayLike ) -> None: """Process a batch of data. Args: prediction (np.array): Prediction optical flow, in shape (H, W, 2). groundtruth (np.array): Target optical flow, in shape (H, W, 2). """ preds = ( array_to_numpy(prediction, n_dims=None, dtype=np.float32) * self.scale ) gts = array_to_numpy(groundtruth, n_dims=None, dtype=np.float32) for pred, gt in zip(preds, gts): pred, gt = self._apply_mask(pred, gt) self._metrics_list.append( { self.KEY_ABS_REL: absolute_relative_error(pred, gt), self.KEY_ABS_ERR: absolute_error(pred, gt), self.KEY_SQ_REL: squared_relative_error(pred, gt), self.KEY_RMSE: root_mean_squared_error(pred, gt), self.KEY_RMSE_LOG: root_mean_squared_error_log(pred, gt), self.KEY_SILOG: scale_invariant_log(pred, gt), self.KEY_DELTA05: delta_p(pred, gt, 0.5), self.KEY_DELTA1: delta_p(pred, gt, 1.0), self.KEY_DELTA2: delta_p(pred, gt, 2.0), self.KEY_DELTA3: delta_p(pred, gt, 3.0), self.KEY_LOG10: log_10_error(pred, gt), } )
[docs] def evaluate(self, metric: str) -> tuple[MetricLogs, str]: """Evaluate predictions. Returns a dict containing the raw data and a short description string containing a readablae result. Args: metric (str): Metric to use. See @property metric Returns: metric_data, description tuple containing the metric data (dict with metric name and value) as well as a short string with shortened information. Raises: RuntimeError: if no data has been registered to be evaluated. ValueError: if metric is not supported. """ if len(self._metrics_list) == 0: raise RuntimeError( """No data registered to calculate metric. Register data using .process() first!""" ) metric_data: MetricLogs = {} short_description = "\n" if metric == self.METRIC_DEPTH: abs_rel = np.mean( [x[self.KEY_ABS_REL] for x in self._metrics_list] ) metric_data[self.KEY_ABS_REL] = float(abs_rel) short_description += f"Absolute relative error: {abs_rel:.3f}\n" abs_err = np.mean( [x[self.KEY_ABS_ERR] for x in self._metrics_list] ) metric_data[self.KEY_ABS_ERR] = float(abs_err) short_description += f"Absolute error: {abs_err:.3f}\n" sq_rel = np.mean([x[self.KEY_SQ_REL] for x in self._metrics_list]) metric_data[self.KEY_SQ_REL] = float(sq_rel) short_description += f"Squared relative error: {sq_rel:.3f}\n" rmse = np.mean([x[self.KEY_RMSE] for x in self._metrics_list]) metric_data[self.KEY_RMSE] = float(rmse) short_description += f"RMSE: {rmse:.3f}\n" rmse_log = np.mean( [x[self.KEY_RMSE_LOG] for x in self._metrics_list] ) metric_data[self.KEY_RMSE_LOG] = float(rmse_log) short_description += f"RMSE log: {rmse_log:.3f}\n" silog = np.mean([x[self.KEY_SILOG] for x in self._metrics_list]) metric_data[self.KEY_SILOG] = float(silog) short_description += f"SILog: {silog:.3f}\n" delta05 = np.mean( [x[self.KEY_DELTA05] for x in self._metrics_list] ) metric_data[self.KEY_DELTA05] = float(delta05) short_description += f"Delta 0.5: {delta05:.3f}\n" delta1 = np.mean([x[self.KEY_DELTA1] for x in self._metrics_list]) metric_data[self.KEY_DELTA1] = float(delta1) short_description += f"Delta 1: {delta1:.3f}\n" delta2 = np.mean([x[self.KEY_DELTA2] for x in self._metrics_list]) metric_data[self.KEY_DELTA2] = float(delta2) short_description += f"Delta 2: {delta2:.3f}\n" delta3 = np.mean([x[self.KEY_DELTA3] for x in self._metrics_list]) metric_data[self.KEY_DELTA3] = float(delta3) short_description += f"Delta 3: {delta3:.3f}\n" log10 = np.mean([x[self.KEY_LOG10] for x in self._metrics_list]) metric_data[self.KEY_LOG10] = float(log10) short_description += f"Log10 error: {log10:.3f}\n" else: raise ValueError( f"Unsupported metric: {metric}" ) # pragma: no cover return metric_data, short_description