Source code for ska_oso_pdm._shared.target

"""
The ska_oso_pdm._shared.target module defines a Python representation of the
target of the observation.
"""
import operator
from abc import ABC, abstractmethod
from typing import Any, ClassVar, List, Literal, Optional, Tuple, Union

from astropy import units
from astropy.coordinates import Angle, SkyCoord, get_body
from astropy.time import Time
from astropy.units import Unit
from pydantic import Field, field_validator, model_validator
from typing_extensions import Annotated, Self

from ska_oso_pdm._shared import PdmObject, TargetID, TerseStrEnum
from ska_oso_pdm._shared.custom_types import QuantityModel

UnitInputType = Union[str, List[str], Tuple[str]]

# -----


[docs] class RadialVelocityDefinition(TerseStrEnum): """ Enumeration of reference definitions supported by a RadialVelocity. The sky frequency (ν) at which we must observe a spectral line is derived from the rest frequency of the spectral line (ν₀), the line-of-sight velocity of the source (V), and the speed of light (c). The relativistic velocity, or true line-of-sight velocity, is related to the observed and rest frequencies by V= c * (ν₀²− ν²) / (v₀² + v²) This equation is a bit cumbersome to use; in astronomy two different approximations are typically used: Optical Velocity: Voptical = c * (λ − λ₀) / λ₀ = cz (z is the redshift of the source; λ and λ₀ are the corresponding observed and rest wavelengths, respectively) Radio Velocity: Vradio = c * (ν₀ − ν) / v₀ = c * (λ−λ₀) / λ The radio and optical velocities are not identical. Particularly, Voptical and Vradio diverge for large velocities. Optical velocities are commonly used for (Helio/Barycentric) extragalactic observations; (LSRK) radio velocities are typical for Galactic observations. Taken from https://science.nrao.edu/facilities/vla/docs/manuals/obsguide/modes/line """ OPTICAL = "OPTICAL" RADIO = "RADIO" RELATIVISTIC = "RELATIVISTIC"
[docs] class RadialVelocityReferenceFrame(TerseStrEnum): """ Reference frame in which the radial velocity is defined. The Earth rotates, revolves around the Sun, rotates around the Galaxy, moves within the Local Group, and shows motion against the Cosmic Microwave Background. As for the convention above, any source velocity must therefore also always be specified relative to a reference frame. Various velocity rest frames are used in the literature. The following table lists their name, the motion that is corrected for, and the maximum amplitude of the velocity correction. Each rest frame correction is incremental to the preceding row: Velocity Rest Frame Correct for + max correction (km/s) =========================== ====================================== ====================================== Topocentric Telescope Nothing (0) Geocentric Earth Center Earth rotation (0.5) Earth-Moon Barycentric Earth+Moon center of mass Motion around Earth+Moon center of mass (0.013) Heliocentric Center of the Sun Earth orbital motion (30) Barycentric Earth+Sun center of mass Earth+Sun center of mass (0.012) Local Standard of Rest Center of Mass of local stars Solar motion relative to nearby stars (20) Galactocentric Center of Milky Way Milky Way Rotation (230) Local Group Barycentric Local Group center of mass Milky Way Motion (100) Virgocentric Center of the Local Virgo supercluster Local Group motion (300) Cosmic Microwave Background CMB Local Supercluster Motion (600) The velocity frame should be chosen based on the science. For most observations, however, one of the following three reference frames is commonly used: - Topocentric is the reference frame of the observatory (defining the sky frequency of the observations). Visibilities in a measurement set are typically stored in this frame. - Local Standard of Rest is the native output of images in CASA. Note that there are two varieties of LSR: the kinematic LSR (LSRK) and the dynamic (LSRD) definitions for the kinematic and dynamic centers, respectively. In almost all cases LSRK is being used and the less precise name LSR is usually used synonymously with the more modern LSRK definition. - Barycentric is a commonly used frame that has virtually replaced the older Heliocentric standard. Given the small difference between the Barycentric and Heliocentric frames, they are frequently used interchangeably. Taken from https://science.nrao.edu/facilities/vla/docs/manuals/obsguide/modes/line """ TOPOCENTRIC = "TOPOCENTRIC" LSRK = "LSRK" BARYCENTRIC = "BARYCENTRIC"
[docs] class RadialVelocityUnits(TerseStrEnum): """ Units for radial velocities. """ # Values MUST be str of an AstroPy unit. M_PER_SEC = "m / s" KM_PER_SEC = "km / s"
[docs] class RadialVelocityQuantity(QuantityModel): """ Value/unit pair to describe the radial velocity measurement. """ value: float unit: RadialVelocityUnits = RadialVelocityUnits.KM_PER_SEC @model_validator(mode="after") def _ensure_constrained_units(self, v: Any) -> Self: # required to validate unit when the model is set to an existing # Quantity instances if isinstance(v, RadialVelocityUnits): return self # We now want to assert that the units are a value of the enumeration. # Creating an enum instance does this automatically, also including a # good error message on failure. We don't do anything with this enum # instance, we just want to take advantage of the checks already coded # in the enum module. RadialVelocityUnits(str(self.unit)) return self
[docs] class RadialVelocity(PdmObject): """ Radial velocity measures the line-of-sight velocity of an astronomical source. In principle, the radial velocity can be converted to and from the target redshift. However, these values are persisted separately to give the user the option of inputting either value. A velocity must also define the reference frame and definition that are applicable to the velocity. By default, these have values of: - definition = RADIO - reference_frame = LSRK - redshift = 0.0 """ quantity: Annotated[units.Quantity, RadialVelocityQuantity] = Field( default_factory=lambda: units.Quantity( value=0.0, unit=RadialVelocityUnits.KM_PER_SEC ), ) definition: RadialVelocityDefinition = RadialVelocityDefinition.RADIO reference_frame: RadialVelocityReferenceFrame = RadialVelocityReferenceFrame.LSRK redshift: float = 0.0
[docs] class PointingKind(TerseStrEnum): FIVE_POINT = "FivePointParameters" SINGLE_POINT = "SinglePointParameters" CROSS_SCAN = "CrossScanParameters" RASTER = "RasterParameters" STAR_RASTER = "StarRasterParameters"
[docs] class PointingPatternParameters(PdmObject, ABC): """ PointingPatternParameters is an abstract base class extended by classes that define receptor pointing patterns. """ @classmethod @property @abstractmethod def kind(cls) -> str: """ Discriminator field used to distinguish the subclass type of instances that extend PointingPatternParameters. This field is a requirement of the JSON representation, so that the correct class can be instantiated as the JSON is unmarshalled. kind is an abstract class property and must be defined in any subclass. """ # 'raise NotImplementedError' causes an error in docs pass # pylint: disable=unnecessary-pass
[docs] class FivePointParameters(PointingPatternParameters): """ FivePointParameters defines the properties of an observing pattern that uses a five-point observing pattern centred on a reference position. """ kind: Literal[PointingKind.FIVE_POINT] = PointingKind.FIVE_POINT offset_arcsec: float = 0.0
[docs] class CrossScanParameters(PointingPatternParameters): """ CrossScanParameters defines the properties of an observing pattern that uses a cross scan observing pattern, typically used for pointing calibrations. """ kind: Literal[PointingKind.CROSS_SCAN] = PointingKind.CROSS_SCAN offset_arcsec: float = 0.0
[docs] class SinglePointParameters(PointingPatternParameters): """ SinglePointParameters defines the properties for an observing pattern consisting of a single receptor pointing with an optional offset from the reference position. """ kind: Literal[PointingKind.SINGLE_POINT] = PointingKind.SINGLE_POINT offset_x_arcsec: float = 0.0 offset_y_arcsec: float = 0.0
[docs] class RasterParameters(PointingPatternParameters): """ RasterParameters defines the properties of an observing pattern that uses a raster pattern centred on a reference position. """ kind: Literal[PointingKind.RASTER] = PointingKind.RASTER row_length_arcsec: float = 0.0 row_offset_arcsec: float = 0.0 n_rows: int = 1 pa: float = 0.0 unidirectional: bool = False
[docs] class StarRasterParameters(PointingPatternParameters): """ StarRasterParameters defines the properties of an observing pattern that uses a star raster pattern centred on a reference position. """ kind: Literal[PointingKind.STAR_RASTER] = PointingKind.STAR_RASTER row_length_arcsec: float = 0.0 n_rows: int = 1 row_offset_angle: float = 0.0 unidirectional: bool = False
PointingParametersUnion = Annotated[ Union[ FivePointParameters, CrossScanParameters, SinglePointParameters, RasterParameters, StarRasterParameters, ], Field(discriminator="kind"), ]
[docs] class PointingPattern(PdmObject): """ PointingPattern holds the user-configured pointing patterns and current active pattern for receptor pointing patterns associated with a target. One of each pointing pattern type can be held in the parameters list. Only the active pattern will be used for observing; the remainder provide an easy way to recover previously edited observing parameters for the target. """ active: PointingKind = PointingKind.SINGLE_POINT parameters: list[PointingParametersUnion] = Field( default_factory=lambda: [SinglePointParameters()], min_length=1 ) @model_validator(mode="before") @classmethod def active_xor_parameters(cls, data: Any) -> Any: if (data.get("active") is None) ^ (data.get("parameters") is None): raise ValueError("Must provide active and parameters. Only one specified") return data @model_validator(mode="after") def active_must_be_among_params(self): # pylint: disable=not-an-iterable parameter_kinds = {p.kind for p in self.parameters} # complain if active not in the given parameters or duplicate detected if self.active not in parameter_kinds: raise ValueError( f"Invalid pointing parameters state: active={self.active} parameters={self.parameters}" ) if len(parameter_kinds) != len(self.parameters): raise ValueError(f"Duplicate parameter types in input: {self.parameters}") return self def __eq__(self, other): if not isinstance(other, PointingPattern): return False self_parameters_by_kind = sorted( self.parameters, key=operator.attrgetter("kind") ) other_parameters_by_kind = sorted( other.parameters, key=operator.attrgetter("kind") ) return ( other.active == self.active and self_parameters_by_kind == other_parameters_by_kind )
[docs] class CoordinateKind(TerseStrEnum): EQUATORIAL = "equatorial" HORIZONTAL = "horizontal" SSO = "sso" # SolarSystem Object
[docs] class Coordinates(PdmObject, ABC, arbitrary_types_allowed=True): """ Coordinates is an abstract base class for pointing coordinates. """ EQUALITY_TOLERANCE: ClassVar[Angle] = Angle( 6e-17, unit="rad" ) # Arbitrary small angle kind: Literal _coord: SkyCoord def __str__(self): return repr(self) # Do we really need all this? Could we just # pass through whatever the caller sends and trust astropy to # throw an appropriate error from SkyCoord()? @field_validator("unit", check_fields=False) @classmethod def handle_units(cls, unit: UnitInputType) -> Tuple[str, str]: """ Validates unit names with Astropy Unit() and converts strings to tuples. """ if isinstance(unit, str): unit = unit.split(",") if len(unit) == 1: alpha, beta = unit[0], unit[0] elif len(unit) == 2: alpha, beta = unit else: raise ValueError("unit= must be one unit (for both axes) or two units") return Unit(alpha).name, Unit(beta).name def __eq__(self, other): if not isinstance(other, self.__class__): return False # Please replace this with a more elegant way of dealing with differences # due to floating point arithmetic when comparing targets # defined in different ways. return ( self._coord.frame.name == other._coord.frame.name and self._coord.separation(other._coord) < self.EQUALITY_TOLERANCE )
[docs] class EquatorialCoordinatesReferenceFrame(TerseStrEnum): """ Enumeration of reference frames supported by an EquatorialCoordinates """ ICRS = "icrs" FK5 = "fk5"
[docs] class EquatorialCoordinates(Coordinates): """ SiderealTarget represents the argument for SKA scheduling block. """ kind: Literal[CoordinateKind.EQUATORIAL] = CoordinateKind.EQUATORIAL ra: Optional[float | str] = 0.0 dec: Optional[float | str] = 0.0 reference_frame: Optional[ EquatorialCoordinatesReferenceFrame ] = EquatorialCoordinatesReferenceFrame.ICRS unit: UnitInputType = ("hourangle", "deg") @model_validator(mode="after") def _set_coord(self): self._coord = SkyCoord( self.ra, self.dec, unit=self.unit, frame=self.reference_frame.value ) return self def __repr__(self): raw_ra = self._coord.ra.value raw_dec = self._coord.dec.value unit = (self._coord.ra.unit.name, self._coord.dec.unit.name) return f"EquatorialCoordinates(ra={raw_ra}, dec={raw_dec}, unit={unit}, frame={self.reference_frame!r})" def __str__(self): return ( f"<EquatorialCoordinates: " f'({self._coord.to_string(style="hmsdms")} {self.reference_frame.name})>' )
[docs] class HorizontalCoordinatesReferenceFrame(TerseStrEnum): """ Enumeration of reference frames supported by a HorizontalCoordinates. """ ALTAZ = "altaz"
[docs] class HorizontalCoordinates(Coordinates): """ DriftScanTarget defines AltAz target for SKA scheduling block. """ kind: Literal[CoordinateKind.HORIZONTAL] = CoordinateKind.HORIZONTAL az: float el: float unit: UnitInputType = ("deg", "deg") reference_frame: Optional[ HorizontalCoordinatesReferenceFrame ] = HorizontalCoordinatesReferenceFrame.ALTAZ @model_validator(mode="after") def _set_coord(self): self._coord = SkyCoord( az=self.az, alt=self.el, unit=self.unit, frame=self.reference_frame.value ) return self def __repr__(self): raw_az = self._coord.az.value raw_alt = self._coord.alt.value unit = (self._coord.az.unit.name, self._coord.alt.unit.name) return f"HorizontalCoordinates(az={raw_az}, el={raw_alt}, unit={unit}, reference_frame={self.reference_frame!r})>" def __str__(self): return f"<HorizontalCoordinates: ({self._coord.to_string()} {self.reference_frame.name})>"
[docs] class SolarSystemObjectName(TerseStrEnum): """ SolarSystemObjectName represents name of the solar system object. """ MERCURY = "mercury" VENUS = "venus" MARS = "mars"
[docs] class SolarSystemObject(Coordinates): """ Planet represents the argument for SKA scheduling block. """ kind: Literal[CoordinateKind.SSO] = CoordinateKind.SSO name: SolarSystemObjectName @property def coord(self) -> SkyCoord: return get_body(self.name.value, Time.now()) def __eq__(self, other): if not isinstance(other, SolarSystemObject): return False return self.name == other.name
CoordinatesUnion = Annotated[ Union[ EquatorialCoordinates, SolarSystemObject, HorizontalCoordinates, ], Field(discriminator="kind"), ]
[docs] class Target(PdmObject): """ Target represents the receptor pointing for an SKA observation, consisting of a reference position and a pointing pattern to be used when observing the target. Default pointing patterns and equatorial coordinates will be set if not provided. """ target_id: TargetID = "" pointing_pattern: PointingPattern = Field(default_factory=PointingPattern) reference_coordinate: CoordinatesUnion = Field( default_factory=EquatorialCoordinates ) radial_velocity: RadialVelocity = Field(default_factory=RadialVelocity) def __str__(self): # pylint: disable=no-member, useless-suppression kind = { "SinglePointParameters": "single point", "FivePointParameters": "five-point", "CrossScanParameters": "cross scan", }.get(self.pointing_pattern.active, self.pointing_pattern.active) return f"<Target={self.target_id} | {kind} on {self.reference_coordinate._coord.to_string()}>"