Source code for ska_sdp_instrumental_calibration.data_managers.solution_interval

from typing import Literal, Union

import numpy as np
import xarray as xr


[docs] class SolutionIntervals: """ A group of intervals over time data of visibility. The INST pipeline performs calibration of each interval individually. Parameters ---------- time: np.ndarray The numpy array containing time values of visibility timeslice: float or "full", optional Determines the width of each solution interval """ def __init__( self, time: np.ndarray, timeslice: Union[float, Literal["full"], None] = None, ): dim_name = "time" time_xdr = xr.DataArray(time, coords={dim_name: time}) if timeslice == "full": nbins = 1 elif (timeslice is None) or (timeslice <= 0.0): nbins = time_xdr.size else: # Determine number of equal width bins # TODO: Should bins always be of equal interval? nbins = min( max( 1, int( np.ceil( ( time_xdr[dim_name].data.max() - time_xdr[dim_name].data.min() ) / timeslice ) ), ), time_xdr[dim_name].size, ) self._time_bins = time_xdr.groupby_bins(dim_name, nbins, squeeze=False) @property def size(self): """ Returns number of solution intervals Returns ------- int """ return len(self._time_bins) @property def solution_time(self): """ Returns time values for each solution interval Returns ------- np.ndarray """ return self._time_bins.mean().data @property def indices(self): """ Returns list of slice objects corresponding to indices of each solution interval If converting to slice is not possible, it returns the indices Returns ------- list[slice | list[int]] """ return [ self.__indices_to_slice(idx) for idx in self._time_bins.groups.values() ] @property def intervals(self): """ Returns numpy array containing the width of each solution interval Returns ------- np.ndarray """ return np.array( [(iv.right - iv.left) for iv in self._time_bins.groups.keys()] ) def __indices_to_slice( self, index_array: np.ndarray | list[int], ) -> slice | np.ndarray: """ Convert a 1D integer index array to a slice **if** it represents an arithmetic progression: same step between all consecutive elements. Otherwise return the original index array. Parameters ---------- index_array : array_like of int Returns ------- slice or numpy array """ index_array = np.asarray(index_array) # Empty input if index_array.size == 0: return index_array # Must be integer-valued if not np.issubdtype(index_array.dtype, np.integer): return index_array # One element → trivial slice if index_array.size == 1: i = index_array[0] return slice(i, i + 1, 1) # Sort (groupby_bins etc. may produce unordered indices) sorted_idx = np.sort(index_array) # Compute steps between elements steps = np.diff(sorted_idx) # Step must be consistent step = steps[0] if np.all(steps == step): start = sorted_idx[0] stop = sorted_idx[-1] + step return slice(start, stop, step) # Not a simple arithmetic progression → return indices return index_array