Pointing
This notebook and more can be downloaded from from the repository notebook directory
In [16]:
# Install dependencies
!pip3 install pytango
!pip3 install ska_mid_dish_manager --extra-index=https://artefact.skao.int/repository/pypi-internal/simple
Set up the environment¶
In [17]:
import os
import time
from ska_mid_dish_manager.models.dish_enums import TrackTableLoadMode
os.environ["TANGO_HOST"] = "databaseds-tango-base-test:10000"
Confirm device subscriptions¶
In [18]:
import tango
dish_manager_device_proxy = tango.DeviceProxy("mid-dish/dish-manager/ska001")
ds_manager_device_proxy = tango.DeviceProxy("mid-dish/ds-manager/SKA001")
spfrx_device_proxy = tango.DeviceProxy("mid-dish/simulator-spfrx/SKA001")
spfc_device_proxy = tango.DeviceProxy("mid-dish/simulator-spfc/SKA001")
print(dish_manager_device_proxy.ping())
print(ds_manager_device_proxy.ping())
print(spfrx_device_proxy.ping())
print(spfc_device_proxy.ping())
Check the Modes¶
In [19]:
# Check the deployment
print("Dish Mode:\t", dish_manager_device_proxy.dishMode.name)
print("DS Manager:\t", ds_manager_device_proxy.operatingMode.name)
print("SPFRx:\t", spfrx_device_proxy.operatingMode.name)
print("SPFC:\t", spfc_device_proxy.operatingMode.name)
Set Up Subscriptions to print output as values change¶
In [20]:
from ska_mid_dish_manager.models.dish_enums import PointingState
# Clear 10 subscriptions. Just to prevent duplicate messages when rerunning the cell
for i in range(10):
try:
dish_manager_device_proxy.unsubscribe_event(i)
except KeyError:
pass
try:
ds_manager_device_proxy.unsubscribe_event(i)
except KeyError:
pass
def print_az_el(ev):
if ev.attr_value:
print(
(
f"DishManager achievedPointing: Time: {ev.attr_value.value[0]} AZ: "
f"{ev.attr_value.value[1]} EL: {ev.attr_value.value[2]}"
)
)
def print_pointing_state(ev):
if ev.attr_value:
print(f"DishManager PointingState {PointingState(ev.attr_value.value).name}")
def print_dm_event(ev):
if ev.attr_value:
print(f"DishManager {ev.attr_value.name} {ev.attr_value.value}")
def print_dsm_event(ev):
if ev.attr_value:
print(f"DishStructureManager {ev.attr_value.name} {ev.attr_value.value}")
_ = dish_manager_device_proxy.subscribe_event(
"achievedPointing",
tango.EventType.CHANGE_EVENT,
print_az_el,
)
_ = dish_manager_device_proxy.subscribe_event(
"longrunningcommandprogress",
tango.EventType.CHANGE_EVENT,
print_dm_event,
)
_ = dish_manager_device_proxy.subscribe_event(
"longrunningcommandstatus",
tango.EventType.CHANGE_EVENT,
print_dm_event,
)
_ = dish_manager_device_proxy.subscribe_event(
"longRunningCommandResult",
tango.EventType.CHANGE_EVENT,
print_dm_event,
)
_ = dish_manager_device_proxy.subscribe_event(
"pointingState",
tango.EventType.CHANGE_EVENT,
print_pointing_state,
)
_ = ds_manager_device_proxy.subscribe_event(
"longrunningcommandprogress",
tango.EventType.CHANGE_EVENT,
print_dsm_event,
)
_ = ds_manager_device_proxy.subscribe_event(
"longrunningcommandstatus",
tango.EventType.CHANGE_EVENT,
print_dsm_event,
)
_ = ds_manager_device_proxy.subscribe_event(
"longrunningcommandresult",
tango.EventType.CHANGE_EVENT,
print_dsm_event,
)
Transition to Full Power¶
In [21]:
if dish_manager_device_proxy.dishMode.name == "STANDBY_LP":
dish_manager_device_proxy.SetStandbyFPMode()
Configure a band¶
In [22]:
dish_manager_device_proxy.ConfigureBand2(True)
Out[22]:
Transition to Operate mode¶
In [23]:
if dish_manager_device_proxy.dishMode.name == "STANDBY_FP":
dish_manager_device_proxy.SetOperateMode()
Build a track table¶
In [24]:
# get tai offset of current time with SKA epoch
def get_current_tai_timestamp():
return time.time() - 946684763.0
In [25]:
def build_track_table(
number_of_coordinates, time_delta, az_delta, el_delta, start_time, start_az, start_el
):
"""Build a single track table entry 1 second in the future
Get the current position of the dish
Add 10 seconds to the current time
return
- The start time of the table
- a table with the instruction to create a new one on the DSC
"""
entry_time = start_time
entry_az = start_az
entry_el = start_el
track_table = [entry_time, start_az, start_el]
for i in range(number_of_coordinates):
entry_time += time_delta
entry_az += az_delta
entry_el += el_delta
track_table.append(entry_time)
track_table.append(entry_az)
track_table.append(entry_el)
return track_table
In [26]:
# Build a table to start 5s in the future
track_table = build_track_table(5, 1, 0.5, 1.5, get_current_tai_timestamp() + 5, 100, 50)
# Init the track table
dish_manager_device_proxy.trackTableLoadMode = TrackTableLoadMode.NEW
dish_manager_device_proxy.programTrackTable = track_table
print("Inititalized table with: ", track_table)
Kick off tracking¶
In [27]:
dish_manager_device_proxy.Track()
Out[27]:
In [ ]:
# Load some more coordinates
track_table = build_track_table(20, 1, 0.5, 1.5, get_current_tai_timestamp() + 30, 100, 50)
print("Appended: ", track_table)
dish_manager_device_proxy.trackTableLoadMode = TrackTableLoadMode.APPEND
dish_manager_device_proxy.programTrackTable = track_table
# If the table has been consumed already, then start tracking again
if dish_manager_device_proxy.pointingstate == PointingState.READY:
dish_manager_device_proxy.Track()