"""
The ska_oso_pdm._shared.target module defines a Python representation of the
target of the observation.
"""
import operator
from abc import ABC, abstractmethod
from typing import Any, ClassVar, List, Literal, Optional, Tuple, Union
from astropy import units
from astropy.coordinates import Angle, SkyCoord, get_body
from astropy.time import Time
from astropy.units import Unit
from pydantic import Field, field_validator, model_validator
from typing_extensions import Annotated, Self
from ska_oso_pdm._shared import PdmObject, TargetID, TerseStrEnum
from ska_oso_pdm._shared.custom_types import QuantityModel
UnitInputType = Union[str, List[str], Tuple[str]]
# -----
[docs]
class RadialVelocityDefinition(TerseStrEnum):
"""
Enumeration of reference definitions supported by a RadialVelocity.
The sky frequency (ν) at which we must observe a spectral line is derived
from the rest frequency of the spectral line (ν₀), the line-of-sight
velocity of the source (V), and the speed of light (c). The relativistic
velocity, or true line-of-sight velocity, is related to the observed and
rest frequencies by
V= c * (ν₀²− ν²) / (v₀² + v²)
This equation is a bit cumbersome to use; in astronomy two different
approximations are typically used:
Optical Velocity:
Voptical = c * (λ − λ₀) / λ₀ = cz
(z is the redshift of the source; λ and λ₀ are the corresponding observed
and rest wavelengths, respectively)
Radio Velocity:
Vradio = c * (ν₀ − ν) / v₀ = c * (λ−λ₀) / λ
The radio and optical velocities are not identical. Particularly, Voptical
and Vradio diverge for large velocities. Optical velocities are commonly
used for (Helio/Barycentric) extragalactic observations; (LSRK) radio
velocities are typical for Galactic observations.
Taken from https://science.nrao.edu/facilities/vla/docs/manuals/obsguide/modes/line
"""
OPTICAL = "OPTICAL"
RADIO = "RADIO"
RELATIVISTIC = "RELATIVISTIC"
[docs]
class RadialVelocityReferenceFrame(TerseStrEnum):
"""
Reference frame in which the radial velocity is defined.
The Earth rotates, revolves around the Sun, rotates around the Galaxy,
moves within the Local Group, and shows motion against the Cosmic
Microwave Background. As for the convention above, any source velocity
must therefore also always be specified relative to a reference frame.
Various velocity rest frames are used in the literature. The following
table lists their name, the motion that is corrected for, and the maximum
amplitude of the velocity correction. Each rest frame correction is
incremental to the preceding row:
Velocity Rest Frame Correct for + max correction (km/s)
=========================== ====================================== ======================================
Topocentric Telescope Nothing (0)
Geocentric Earth Center Earth rotation (0.5)
Earth-Moon Barycentric Earth+Moon center of mass Motion around Earth+Moon center of mass (0.013)
Heliocentric Center of the Sun Earth orbital motion (30)
Barycentric Earth+Sun center of mass Earth+Sun center of mass (0.012)
Local Standard of Rest Center of Mass of local stars Solar motion relative to nearby stars (20)
Galactocentric Center of Milky Way Milky Way Rotation (230)
Local Group Barycentric Local Group center of mass Milky Way Motion (100)
Virgocentric Center of the Local Virgo supercluster Local Group motion (300)
Cosmic Microwave Background CMB Local Supercluster Motion (600)
The velocity frame should be chosen based on the science. For most
observations, however, one of the following three reference frames is
commonly used:
- Topocentric is the reference frame of the observatory (defining the sky
frequency of the observations). Visibilities in a measurement set are
typically stored in this frame.
- Local Standard of Rest is the native output of images in CASA. Note that
there are two varieties of LSR: the kinematic LSR (LSRK) and the dynamic
(LSRD) definitions for the kinematic and dynamic centers, respectively.
In almost all cases LSRK is being used and the less precise name LSR is
usually used synonymously with the more modern LSRK definition.
- Barycentric is a commonly used frame that has virtually replaced the
older Heliocentric standard. Given the small difference between the
Barycentric and Heliocentric frames, they are frequently used
interchangeably.
Taken from https://science.nrao.edu/facilities/vla/docs/manuals/obsguide/modes/line
"""
TOPOCENTRIC = "TOPOCENTRIC"
LSRK = "LSRK"
BARYCENTRIC = "BARYCENTRIC"
[docs]
class RadialVelocityUnits(TerseStrEnum):
"""
Units for radial velocities.
"""
# Values MUST be str of an AstroPy unit.
M_PER_SEC = "m / s"
KM_PER_SEC = "km / s"
[docs]
class RadialVelocityQuantity(QuantityModel):
"""
Value/unit pair to describe the radial velocity measurement.
"""
value: float
unit: RadialVelocityUnits = RadialVelocityUnits.KM_PER_SEC
@model_validator(mode="after")
def _ensure_constrained_units(self, v: Any) -> Self:
# required to validate unit when the model is set to an existing
# Quantity instances
if isinstance(v, RadialVelocityUnits):
return self
# We now want to assert that the units are a value of the enumeration.
# Creating an enum instance does this automatically, also including a
# good error message on failure. We don't do anything with this enum
# instance, we just want to take advantage of the checks already coded
# in the enum module.
RadialVelocityUnits(str(self.unit))
return self
[docs]
class RadialVelocity(PdmObject):
"""
Radial velocity measures the line-of-sight velocity of an astronomical
source.
In principle, the radial velocity can be converted to and from the target
redshift. However, these values are persisted separately to give the user
the option of inputting either value.
A velocity must also define the reference frame and definition that are
applicable to the velocity. By default, these have values of:
- definition = RADIO
- reference_frame = LSRK
- redshift = 0.0
"""
quantity: Annotated[units.Quantity, RadialVelocityQuantity] = Field(
default_factory=lambda: units.Quantity(
value=0.0, unit=RadialVelocityUnits.KM_PER_SEC
),
)
definition: RadialVelocityDefinition = RadialVelocityDefinition.RADIO
reference_frame: RadialVelocityReferenceFrame = RadialVelocityReferenceFrame.LSRK
redshift: float = 0.0
[docs]
class PointingKind(TerseStrEnum):
FIVE_POINT = "FivePointParameters"
SINGLE_POINT = "SinglePointParameters"
CROSS_SCAN = "CrossScanParameters"
RASTER = "RasterParameters"
STAR_RASTER = "StarRasterParameters"
[docs]
class PointingPatternParameters(PdmObject, ABC):
"""
PointingPatternParameters is an abstract base class extended by classes
that define receptor pointing patterns.
"""
@classmethod
@property
@abstractmethod
def kind(cls) -> str:
"""
Discriminator field used to distinguish the subclass type of instances that
extend PointingPatternParameters. This field is a requirement of the JSON
representation, so that the correct class can be instantiated as the JSON is
unmarshalled.
kind is an abstract class property and must be defined in any subclass.
"""
# 'raise NotImplementedError' causes an error in docs
pass # pylint: disable=unnecessary-pass
[docs]
class FivePointParameters(PointingPatternParameters):
"""
FivePointParameters defines the properties of an observing pattern that
uses a five-point observing pattern centred on a reference position.
"""
kind: Literal[PointingKind.FIVE_POINT] = PointingKind.FIVE_POINT
offset_arcsec: float = 0.0
[docs]
class CrossScanParameters(PointingPatternParameters):
"""
CrossScanParameters defines the properties of an observing pattern that
uses a cross scan observing pattern, typically used for pointing
calibrations.
"""
kind: Literal[PointingKind.CROSS_SCAN] = PointingKind.CROSS_SCAN
offset_arcsec: float = 0.0
[docs]
class SinglePointParameters(PointingPatternParameters):
"""
SinglePointParameters defines the properties for an observing pattern
consisting of a single receptor pointing with an optional offset from
the reference position.
"""
kind: Literal[PointingKind.SINGLE_POINT] = PointingKind.SINGLE_POINT
offset_x_arcsec: float = 0.0
offset_y_arcsec: float = 0.0
[docs]
class RasterParameters(PointingPatternParameters):
"""
RasterParameters defines the properties of an observing pattern that
uses a raster pattern centred on a reference position.
"""
kind: Literal[PointingKind.RASTER] = PointingKind.RASTER
row_length_arcsec: float = 0.0
row_offset_arcsec: float = 0.0
n_rows: int = 1
pa: float = 0.0
unidirectional: bool = False
[docs]
class StarRasterParameters(PointingPatternParameters):
"""
StarRasterParameters defines the properties of an observing pattern that
uses a star raster pattern centred on a reference position.
"""
kind: Literal[PointingKind.STAR_RASTER] = PointingKind.STAR_RASTER
row_length_arcsec: float = 0.0
n_rows: int = 1
row_offset_angle: float = 0.0
unidirectional: bool = False
PointingParametersUnion = Annotated[
Union[
FivePointParameters,
CrossScanParameters,
SinglePointParameters,
RasterParameters,
StarRasterParameters,
],
Field(discriminator="kind"),
]
[docs]
class PointingPattern(PdmObject):
"""
PointingPattern holds the user-configured pointing patterns and current active
pattern for receptor pointing patterns associated with a target.
One of each pointing pattern type can be held in the parameters list. Only the
active pattern will be used for observing; the remainder provide an easy way to
recover previously edited observing parameters for the target.
"""
active: PointingKind = PointingKind.SINGLE_POINT
parameters: list[PointingParametersUnion] = Field(
default_factory=lambda: [SinglePointParameters()], min_length=1
)
@model_validator(mode="before")
@classmethod
def active_xor_parameters(cls, data: Any) -> Any:
if (data.get("active") is None) ^ (data.get("parameters") is None):
raise ValueError("Must provide active and parameters. Only one specified")
return data
@model_validator(mode="after")
def active_must_be_among_params(self):
# pylint: disable=not-an-iterable
parameter_kinds = {p.kind for p in self.parameters}
# complain if active not in the given parameters or duplicate detected
if self.active not in parameter_kinds:
raise ValueError(
f"Invalid pointing parameters state: active={self.active} parameters={self.parameters}"
)
if len(parameter_kinds) != len(self.parameters):
raise ValueError(f"Duplicate parameter types in input: {self.parameters}")
return self
def __eq__(self, other):
if not isinstance(other, PointingPattern):
return False
self_parameters_by_kind = sorted(
self.parameters, key=operator.attrgetter("kind")
)
other_parameters_by_kind = sorted(
other.parameters, key=operator.attrgetter("kind")
)
return (
other.active == self.active
and self_parameters_by_kind == other_parameters_by_kind
)
[docs]
class CoordinateKind(TerseStrEnum):
EQUATORIAL = "equatorial"
HORIZONTAL = "horizontal"
SSO = "sso" # SolarSystem Object
[docs]
class Coordinates(PdmObject, ABC, arbitrary_types_allowed=True):
"""
Coordinates is an abstract base class for pointing coordinates.
"""
EQUALITY_TOLERANCE: ClassVar[Angle] = Angle(
6e-17, unit="rad"
) # Arbitrary small angle
kind: Literal
_coord: SkyCoord
def __str__(self):
return repr(self)
# Do we really need all this? Could we just
# pass through whatever the caller sends and trust astropy to
# throw an appropriate error from SkyCoord()?
@field_validator("unit", check_fields=False)
@classmethod
def handle_units(cls, unit: UnitInputType) -> Tuple[str, str]:
"""
Validates unit names with Astropy Unit() and converts strings to
tuples.
"""
if isinstance(unit, str):
unit = unit.split(",")
if len(unit) == 1:
alpha, beta = unit[0], unit[0]
elif len(unit) == 2:
alpha, beta = unit
else:
raise ValueError("unit= must be one unit (for both axes) or two units")
return Unit(alpha).name, Unit(beta).name
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
# Please replace this with a more elegant way of dealing with differences
# due to floating point arithmetic when comparing targets
# defined in different ways.
return (
self._coord.frame.name == other._coord.frame.name
and self._coord.separation(other._coord) < self.EQUALITY_TOLERANCE
)
[docs]
class EquatorialCoordinatesReferenceFrame(TerseStrEnum):
"""
Enumeration of reference frames supported by an EquatorialCoordinates
"""
ICRS = "icrs"
FK5 = "fk5"
[docs]
class EquatorialCoordinates(Coordinates):
"""
SiderealTarget represents the argument for SKA scheduling block.
"""
kind: Literal[CoordinateKind.EQUATORIAL] = CoordinateKind.EQUATORIAL
ra: Optional[float | str] = 0.0
dec: Optional[float | str] = 0.0
reference_frame: Optional[
EquatorialCoordinatesReferenceFrame
] = EquatorialCoordinatesReferenceFrame.ICRS
unit: UnitInputType = ("hourangle", "deg")
@model_validator(mode="after")
def _set_coord(self):
self._coord = SkyCoord(
self.ra, self.dec, unit=self.unit, frame=self.reference_frame.value
)
return self
def __repr__(self):
raw_ra = self._coord.ra.value
raw_dec = self._coord.dec.value
unit = (self._coord.ra.unit.name, self._coord.dec.unit.name)
return f"EquatorialCoordinates(ra={raw_ra}, dec={raw_dec}, unit={unit}, frame={self.reference_frame!r})"
def __str__(self):
return (
f"<EquatorialCoordinates: "
f'({self._coord.to_string(style="hmsdms")} {self.reference_frame.name})>'
)
[docs]
class HorizontalCoordinatesReferenceFrame(TerseStrEnum):
"""
Enumeration of reference frames supported by a HorizontalCoordinates.
"""
ALTAZ = "altaz"
[docs]
class HorizontalCoordinates(Coordinates):
"""
DriftScanTarget defines AltAz target for SKA scheduling block.
"""
kind: Literal[CoordinateKind.HORIZONTAL] = CoordinateKind.HORIZONTAL
az: float
el: float
unit: UnitInputType = ("deg", "deg")
reference_frame: Optional[
HorizontalCoordinatesReferenceFrame
] = HorizontalCoordinatesReferenceFrame.ALTAZ
@model_validator(mode="after")
def _set_coord(self):
self._coord = SkyCoord(
az=self.az, alt=self.el, unit=self.unit, frame=self.reference_frame.value
)
return self
def __repr__(self):
raw_az = self._coord.az.value
raw_alt = self._coord.alt.value
unit = (self._coord.az.unit.name, self._coord.alt.unit.name)
return f"HorizontalCoordinates(az={raw_az}, el={raw_alt}, unit={unit}, reference_frame={self.reference_frame!r})>"
def __str__(self):
return f"<HorizontalCoordinates: ({self._coord.to_string()} {self.reference_frame.name})>"
[docs]
class SolarSystemObjectName(TerseStrEnum):
"""
SolarSystemObjectName represents name of the solar system object.
"""
MERCURY = "mercury"
VENUS = "venus"
MARS = "mars"
[docs]
class SolarSystemObject(Coordinates):
"""
Planet represents the argument for SKA scheduling block.
"""
kind: Literal[CoordinateKind.SSO] = CoordinateKind.SSO
name: SolarSystemObjectName
@property
def coord(self) -> SkyCoord:
return get_body(self.name.value, Time.now())
def __eq__(self, other):
if not isinstance(other, SolarSystemObject):
return False
return self.name == other.name
CoordinatesUnion = Annotated[
Union[
EquatorialCoordinates,
SolarSystemObject,
HorizontalCoordinates,
],
Field(discriminator="kind"),
]
[docs]
class Target(PdmObject):
"""
Target represents the receptor pointing for an SKA observation, consisting
of a reference position and a pointing pattern to be used when observing
the target.
Default pointing patterns and equatorial coordinates will be set if not
provided.
"""
target_id: TargetID = ""
pointing_pattern: PointingPattern = Field(default_factory=PointingPattern)
reference_coordinate: CoordinatesUnion = Field(
default_factory=EquatorialCoordinates
)
radial_velocity: RadialVelocity = Field(default_factory=RadialVelocity)
def __str__(self):
# pylint: disable=no-member, useless-suppression
kind = {
"SinglePointParameters": "single point",
"FivePointParameters": "five-point",
"CrossScanParameters": "cross scan",
}.get(self.pointing_pattern.active, self.pointing_pattern.active)
return f"<Target={self.target_id} | {kind} on {self.reference_coordinate._coord.to_string()}>"