Source code for ska.sdp.msadd.uvw

"""
This module deals with the UVW calculation. Essentially taking the antenna locations held by the AntennaTable
and turning them into J2000 UVW.

The machinery used for this is the python wrappers to the casacore measures module.

"""

import math
import numpy
from os import path
import datetime

from astropy.coordinates import Angle
from astropy import units as u
from astropy.time import Time

from casacore.measures import measures
from casacore.quanta import quantity
from casacore import tables

from .utils import _subtable, get_antenna_uvw
from .antenna import AntennaTable


# TODO Clean up these methods and make some private

[docs]class UpdateUvwColumn(AntennaTable): """ A simple class that updates the UVW column in the main table - derived from ANTENNA locations in the ANTENNA table """ def __init__(self, input_table_name=None, json_file_location=None, output_table_name=None, swap_baselines=False, delay_ra: Angle = None, delay_dec: Angle = None): """ :param str input_table_name: The name of the reference table - only used for the antenna locations :param str json_file_location: Location of json antenna location file (can be URL) :param str output_table_name: The name of the main table to be updated :param Bool swap_baselines: default is ant 2 - ant 1 - this flag reverses that assumption :param float delay_ra: astropy Angle for RA - this can come from the MS :param float delay_dec: astropy Angle for Dec - this can come from the MS """ super().__init__(input_table_name=input_table_name, json_file_location=json_file_location) self.uvw_table = [] self.delay_dir = [] self.output_table_name = output_table_name self.input_table_name = input_table_name if output_table_name: self.load_from_output_table() if delay_ra == None or delay_dec == None: self.get_delay_dir_from_table() else: target_ra = Angle(delay_ra * u.rad) target_dec = Angle(delay_dec * u.rad) self.delay_dir = [target_ra, target_dec] else: self.output_table = None self.swap_baselines = swap_baselines self.position_frame = 'itrf'
[docs] def load_from_output_table(self): """ If the input table name exists - set the input table """ if path.exists(self.output_table_name): self.load_columns_from_ms(reference_table_name=self.output_table_name) else: raise FileExistsError
[docs] def get_antenna1_from_row(self, row): """ Get the antenna index of antenna 1 for the given row :param int row: row index """ antenna1 = self.uvw_table[row].get('antenna1') return antenna1
[docs] def get_antenna2_from_row(self, row): """ Get the antenna index of antenna 2 for the given row :param int row: row index """ antenna2 = self.uvw_table[row].get('antenna2') return antenna2
[docs] def get_antenna_position_from_index(self, index): """ Get the antenna position from the internal representation of the antenna table :param int index: index """ return self.get_antenna_pos(index)
[docs] def get_time_from_row(self, row): """ Get the time from the internal representation of the main table row :param int row: row index :return time: :rtype: astropy.Time """ time_centroid = self.uvw_table[row].get('time') # Time in MJD Secs time = Time(float(time_centroid) / (3600. * 24), format='mjd') # Time in MJD format return time
[docs] def get_delay_dir_from_table(self, row=0): """ Pull the delay direction from the reference measurement set :param int row: row to take it from (0) :return: a list of form [ra,dec] in radian measure :rtype: [target_ra, target_dec] """ field_id = self.uvw_table[row].get('field_id') field = _subtable(self.output_table_name, 'FIELD') target = field.getcol('DELAY_DIR', field_id, 1, 1) # direction in rad target_ra = Angle(target[0][0][0] * u.rad) target_dec = Angle(target[0][0][1] * u.rad) self.delay_dir = [target_ra, target_dec]
[docs] def calculate_uvw_for_row(self, row): """ Calculate the UVW for the row of internal representation of the table. Note this does not update the row in the measurement set. The geocentric UVW of each antenna is calculated separately using its position - this is probably better than doing it to some reference position. :param int row: The row to calculate :return uvw: The UVW in J2000 epoch :rtype: [u,v,w] """ ant1_index = self.get_antenna1_from_row(row) ant2_index = self.get_antenna2_from_row(row) ant1_pos = self.get_antenna_position_from_index(ant1_index) ant2_pos = self.get_antenna_position_from_index(ant2_index) row_time = self.get_time_from_row(row) target_ra = self.delay_dir[0] target_dec = self.delay_dir[1] uvw_1 = get_antenna_uvw(ant1_pos, row_time, target_ra, target_dec, swap_baselines=self.swap_baselines, position_frame=self.position_frame) uvw_2 = get_antenna_uvw(ant2_pos, row_time, target_ra, target_dec, swap_baselines=self.swap_baselines, position_frame=self.position_frame) return uvw_2 - uvw_1
[docs] def update_uvw_for_all_rows(self): """ Calculate the UVW for the all the rows of internal representation of the table. This does update the measurement set on disk """ uvw = [] for r in range(len(self.uvw_table)): uvw.append(self.calculate_uvw_for_row(r)) col_to_put = numpy.array(uvw) with tables.table(self.output_table_name, readonly=False) as table: assert len(col_to_put) == table.nrows() table.putcol('UVW', col_to_put, 0, table.nrows(), 1)
[docs] def load_columns_from_ms(self, reference_table_name): """ If the input table name exists - set the input table load the values into a dictionary """ if not path.exists(reference_table_name): raise FileExistsError with tables.table(reference_table_name, readonly=True) as table: antenna1 = table.col("ANTENNA1") antenna2 = table.col('ANTENNA2') time = table.col("TIME") uvw = table.col("UVW") field_id = table.col("FIELD_ID") for row in range(table.nrows()): baseline = {"antenna1": antenna1[row], "antenna2": antenna2[row], "time": time[row], "uvw": uvw[row], "field_id": field_id[row]} self.uvw_table.append(baseline)