# pylint: disable=no-member
import copy
from dataclasses import dataclass
import numpy as np
from astropy import units as u
from ska_oso_pdm import (
CoordinateKind,
FivePointParameters,
SinglePointParameters,
SolarSystemObjectName,
SpecialCoordinates,
Target,
TelescopeType,
)
from ska_oso_pdm._shared import PointedMosaicParameters
from ska_oso_pdm.sb_definition import LSTConstraint, ObservingConstraints, ScanDefinition
from ska_oso_services.common.static.constants import (
LOW_LOCATION,
MID_LOCATION,
SOLAR_TO_SIDEREAL_CONVERSION_FACTOR,
low_maximum_elevation,
low_minimum_elevation,
mid_maximum_elevation,
mid_minimum_elevation,
)
from ska_oso_services.validation.model import (
ValidationContext,
ValidationIssue,
ValidationIssueType,
check_relevant_context_contains,
validate,
validator,
)
[docs]
@dataclass
class TargetLSTConstraints:
target: Target
lst_constraint: LSTConstraint
[docs]
@dataclass
class TargetElevation:
min: u.Quantity
max: u.Quantity
mid: u.Quantity
[docs]
@validator
def validate_constraints(
constraints_context: ValidationContext[ObservingConstraints],
) -> list[ValidationIssue]:
"""
param constraints_context: a ValidationContext containing ObserveringConstraints
object to be validated
return: the collated list of ValidationIssues resulting from applying each of the
relevant ObservingConstraints validators
"""
if hasattr(constraints_context.primary_entity, "lst"):
validators = [
validate_icrs_galactic_target_elevation_limits_are_within_their_lst_constraint,
validate_sso_targets_do_not_have_separation_constraints,
]
else:
validators = [validate_sso_targets_do_not_have_separation_constraints]
return validate(constraints_context, validators)
[docs]
@validator
def validate_icrs_galactic_target_elevation_limits_are_within_their_lst_constraint(
constraints_context: ValidationContext,
) -> list[ValidationIssue]:
"""
function to check that the targets in the scans have an elevation
above the limit set in the constraints throughout their individual LST
windows
:param constraints_context:a ValidationContext containing an Observing
Constraint to be validated
:return: a list of validation issues if the targets exceed the elevation
limits
"""
check_relevant_context_contains(["targets", "scan_definitions"], constraints_context)
targets = constraints_context.relevant_context["targets"]
scan_definitions = constraints_context.relevant_context["scan_definitions"]
constraints = copy.copy(constraints_context.primary_entity)
if constraints.lst is None:
return []
# in theory, it's possible the elevation constraint might have
# a max but no min or a min but no max. Pulling from OSD if
# only one is set
if getattr(constraints.altitude, "max", None) is None:
constraints.altitude.max = (
mid_maximum_elevation()
if constraints_context.telescope == TelescopeType.SKA_MID
else low_maximum_elevation()
)
if getattr(constraints.altitude, "min", None) is None:
constraints.altitude.min = (
mid_minimum_elevation()
if constraints_context.telescope == TelescopeType.SKA_MID
else low_minimum_elevation()
)
target_lst_constraints = create_target_lst_list(
targets,
scan_definitions,
constraints.lst,
)
validation_issues = []
for target_lst_constraint in target_lst_constraints:
# this is only for Galactic or ICRS targets
target = target_lst_constraint.target
if target.reference_coordinate.kind in (CoordinateKind.ICRS, CoordinateKind.GALACTIC):
target_elevation = calculate_elevation_implied_from_lst_constraint(
constraints_context.telescope,
target_lst_constraint.target,
target_lst_constraint.lst_constraint,
)
if (
target_elevation.min < constraints.altitude.min
or target_elevation.max > constraints.altitude.max
) or target_elevation.mid < 0:
validation_issues.append(
ValidationIssue(
level=ValidationIssueType.ERROR,
message="Elevation and LST constraints are incompatible "
f"for target {target.name}",
)
)
return validation_issues
[docs]
@validator
def validate_sso_targets_do_not_have_separation_constraints(
constraints_context: ValidationContext[ObservingConstraints],
) -> list[ValidationIssue]:
"""
function to check that e.g. an observation of Jupiter does not
have a jupiter separation constraint
:param constraints_context:a ValidationContext containing an Observing
Constraint to be validated
:return: a list of validation issues if the targets and separation constraints
are incompatible
"""
check_relevant_context_contains(["targets"], constraints_context)
targets = constraints_context.relevant_context["targets"]
constraints = constraints_context.primary_entity
validation_issues = [
ValidationIssue(
level=ValidationIssueType.ERROR,
message=f"{target.reference_coordinate.name.value} Avoidance Zone "
"must be 0.0 degrees for a Scheduling Block with "
f"{target.reference_coordinate.name} as a target",
)
for target in targets
if target_is_jupiter_sun_or_moon(target)
and has_an_incompatible_constraint(target.reference_coordinate, constraints)
]
return validation_issues
def has_an_incompatible_constraint(
coordinate: SpecialCoordinates, constraints: ObservingConstraints
) -> bool:
sso_name = coordinate.name.lower()
attr_name = f"{sso_name}_separation"
objects_with_potential_separations = [
SolarSystemObjectName.SUN,
SolarSystemObjectName.MOON,
SolarSystemObjectName.JUPITER,
]
if coordinate.name in objects_with_potential_separations:
return getattr(constraints, attr_name).min != 0.0
return False
[docs]
def calculate_elevation_implied_from_lst_constraint(
telescope: TelescopeType, target: Target, lst_constraint: LSTConstraint
) -> TargetElevation:
"""
private function to calculate the altitude of a target at a given hourangle and telescope
"""
match telescope:
case TelescopeType.SKA_LOW:
latitude = LOW_LOCATION.lat
case TelescopeType.SKA_MID:
latitude = MID_LOCATION.lat
case _:
raise ValueError(f"Telescope {telescope} not supported")
latitude_radians = float(latitude.to(u.rad).value)
target_skycoord = target.reference_coordinate.to_sky_coord()
declination_radian = float(target_skycoord.icrs.dec.to(u.rad).value)
# an HA constraint can be implied from the target R.A. and LST constraint
hourangle_constraint_radian = [
(lst - target_skycoord.icrs.ra).to(u.rad)
for lst in [lst_constraint.start, lst_constraint.end]
]
elevation_from_lst = [
np.arcsin(
np.sin(latitude_radians) * np.sin(declination_radian)
+ np.cos(latitude_radians) * np.cos(declination_radian) * np.cos(hourangle_radian)
)
for hourangle_radian in hourangle_constraint_radian
]
# now we need to calculate the elevation mid-scan to make sure the target
# isn't below the ground. First finding the scan midpoint in time:
mid_window_lst = __midpoint_hour_angle(lst_constraint.start, lst_constraint.end)
mid_window_hourangle = (mid_window_lst - target_skycoord.icrs.ra).to(u.rad)
elevation_mid_scan = np.arcsin(
np.sin(latitude_radians) * np.sin(declination_radian)
+ np.cos(latitude_radians) * np.cos(declination_radian) * np.cos(mid_window_hourangle)
)
target_elevation = TargetElevation(
min=min(elevation_from_lst), max=max(elevation_from_lst), mid=elevation_mid_scan
)
return target_elevation
[docs]
def create_target_lst_list(
targets: list[Target],
scan_definitions: list[ScanDefinition] | list[list[ScanDefinition]],
sbd_lst_constraint: LSTConstraint,
) -> list[TargetLSTConstraints]:
"""
the LST constraint in the SBD is for *starting* the SBD, therefore
each target has its own LST window during which it must be validated;
a target that might not have risen when the SBD is executed may well be
valid when its own scan commences. This function is to create an LSTConstraint
for each target, based on the window when its scan will be executed
"""
# the scan definitions passed to this function could either be a list of
# ScanDefinitions (if the SBD is for MID or LOW with 1 subarray beam) OR
# a list of lists of ScanDefinitions. If the former, we want to manipulate
# it to the latter.
if isinstance(scan_definitions[0], ScanDefinition):
scan_definitions = [scan_definitions]
target_lst_constraints_list = []
for scans in scan_definitions:
cumulative_execution_time = 0 * u.ms
for scan in scans:
# extracting the target
target = next(target for target in targets if target.target_id == scan.target_ref)
# The LST constraint in the SBD is the time the SBD must be *started* as such every
# target with a scan in the SBD has its own LST window during which it must exceed
# elevation limit that is equal to:
# start: lst_constraint_min + cumulative_execution
# end: lst_constraint_max + target_scan_duration + cumulative_execution
# calculating the scan_duration
pointing_parameters = target.pointing_pattern.parameters[0]
# getting the pointing pattern
match pointing_parameters:
case PointedMosaicParameters():
n_scans = len(pointing_parameters.offsets)
case FivePointParameters():
n_scans = 5
case SinglePointParameters():
n_scans = 1
case _:
raise ValueError(
f"pointing pattern {pointing_parameters.kind.value} not supported"
)
# calculating the total scan duration in LST and converting to an angle
# sadly astropy can't handle this natively so fudging
total_scan_duration_lst = (
scan.scan_duration * n_scans * SOLAR_TO_SIDEREAL_CONVERSION_FACTOR
).to(u.hour).value * u.hourangle
cumulative_execution_time_lst = (
cumulative_execution_time * SOLAR_TO_SIDEREAL_CONVERSION_FACTOR
).to(u.hour).value * u.hourangle
# creating the target specific LSTConstraint Object
target_lst_constraint = LSTConstraint(
start=__normalise_hourangle(
sbd_lst_constraint.start + cumulative_execution_time_lst
),
end=__normalise_hourangle(
sbd_lst_constraint.end
+ cumulative_execution_time_lst
+ total_scan_duration_lst
),
)
target_lst_constraints_list.append(
TargetLSTConstraints(
target=target,
lst_constraint=target_lst_constraint,
)
)
# finally - adding the scan duration to the cumulative scan time
cumulative_execution_time += scan.scan_duration * n_scans
return target_lst_constraints_list
def target_is_jupiter_sun_or_moon(target: Target) -> bool:
if target.reference_coordinate.kind == CoordinateKind.SPECIAL:
if target.reference_coordinate.name in (
SolarSystemObjectName.SUN,
SolarSystemObjectName.MOON,
SolarSystemObjectName.JUPITER,
):
return True
return False
def __normalise_hourangle(hourangle: u.Quantity) -> u.Quantity:
"""
can't use the python modulus operator with Quantities, so writing
a quick function for it
"""
return (hourangle.value % 24.0) * u.hourangle
def __midpoint_hour_angle(start: u.Quantity, end: u.Quantity) -> u.Quantity:
diff = (end - start).value % 24
midpoint = (start.value + diff / 2.0) % 24
return midpoint * u.hourangle