Calibration Store Database Connection

This module implements the connection to the calibration database.

class CalibrationStoreDatabaseConnection(logger: Logger, communication_state_callback: Callable[[ska_control_model.CommunicationStatus], None], selection_manager: SelectionManager, timeout: float = 10, connection_max_tries: int = 5, window_failure_size: float = 15.0, window_failure_threshold: int = 1, circuit_open_timeout: float = 5.0)[source]

A connection to a postgres database for the calibration store.

__init__(logger: Logger, communication_state_callback: Callable[[ska_control_model.CommunicationStatus], None], selection_manager: SelectionManager, timeout: float = 10, connection_max_tries: int = 5, window_failure_size: float = 15.0, window_failure_threshold: int = 1, circuit_open_timeout: float = 5.0) None[source]

Initialise a new instance of a database connection.

Parameters:
  • logger – a logger for this object to use

  • communication_state_callback – callback to be called when the status of the communications channel between the component manager and its component changes

  • selection_manager – The SelectionManager for device should use.

  • timeout – the timeout for database operations

  • connection_max_tries – the maximum number of attempts to connect to the database

  • window_failure_size – The time in seconds to evaluate failures in.

  • window_failure_threshold – The number of failures within a window before we class is as a database failure

  • circuit_open_timeout – A time to block db connections after a database failure if detected.

check_calibration_id(cal_id: str) bool[source]

Check if the given cal id is in the database already.

Parameters:

cal_id – the cal ID to check.

Returns:

True if the cal ID is already in the database

close_pool() None[source]

Close the database connection pool safely.

Can be called multiple times. If the pool is already closed, does nothing.

execute_db(fn: Callable, timeout: float | None = None) Any[source]

Execute a command.

Parameters:
  • fn – the function to execute

  • timeout – A optional timeout.

Returns:

the response from the function being executed.

Raises:

RuntimeError – when DB is unavailable

get_calibration_ids() list[str][source]

Get all calibration_ids.

Returns:

A list of all calibration_ids

get_fit_params(station_id: int, calibration_id: str | None = None) tuple[list[float], list[float], list[bool] | None] | None[source]

Get per-antenna phase fit parameters for the given station.

Parameters:
  • station_id – the id of the station to get fit params for.

  • calibration_id – optional unique identifier for the calibration.

Returns:

dict with phase and phase_gradient lists, or {} if not found.

get_latest_preferred_job_id(station_id: int) str[source]

Get the user_friendly_name of the most recent preferred job for a station.

Parameters:

station_id – the station to query for.

Returns:

the user_friendly_name of the most recent job where preferred = TRUE for the given station, or an empty string if none exists.

get_solution(station_id: int, frequency_channel: int, calibration_id: str | None = None) list[float][source]

Get a solution for the provided frequency and station id.

This at present will return the most recently stored solution for the inputs.

Parameters:
  • frequency_channel – the frequency channel of the desired solution.

  • station_id – the id of the station to get a solution for.

  • calibration_id – the optional id of the calibration we want.

Returns:

a calibration solution from the database. Or a empty list if a solution could not be read from artefact.

mark_calibration_job_preferred(user_friendly_name: str) tuple[list[ska_control_model.ResultCode], list[str]][source]

Mark a calibration job as preferred in the database.

Sets preferred = TRUE on the named calibration_job row.

Parameters:

user_friendly_name – the unique name of the calibration job to mark.

Returns:

tuple of result code and message.

store_calibration_job(**kwargs: Any) tuple[ska_control_model.ResultCode, int][source]

Store the provided calibration job in the database.

Parameters:

kwargs – fields to populate in database.

Returns:

tuple of result code and message.

store_frequency_sweep(**kwargs: Any) tuple[ska_control_model.ResultCode, int][source]

Store the provided frequency sweep in the database.

Parameters:

kwargs – fields to populate in database.

Returns:

tuple of result code and message.

store_phase_fit_params(**kwargs: Any) tuple[list[ska_control_model.ResultCode], list[str]][source]

Store per-antenna phase fit parameters in calibration_per_sweep.

Parameters:

kwargs – fields to populate in database.

Returns:

tuple of result code and message.

store_solution(**kwargs: Any) tuple[list[ska_control_model.ResultCode], list[str]][source]

Store the provided solution in the database.

Parameters:

kwargs – fields to populate in database.

Raises:

Exception – If we cant generate loading instructions

Returns:

tuple of result code and message.

update_frequency_sweep(**kwargs: Any) tuple[list[ska_control_model.ResultCode], list[str]][source]

Update the provided frequency sweep in the database.

Parameters:

kwargs – fields to update in database.

Returns:

tuple of result code and message.

class DatabaseCalibrationJob(sweep_id: int, initial_mask: list[float] | None = None, calibration_parameters: str | None = None, user_friendly_name: str | None = None, preferred: bool | None = None)[source]

Class to hold a calibration job.

This class holds a calibration job and offers some notion of checking types and mandatory keys before attempting to load into the calibration_job table.

__init__(sweep_id: int, initial_mask: list[float] | None = None, calibration_parameters: str | None = None, user_friendly_name: str | None = None, preferred: bool | None = None) None
generate_loading_instruction() tuple[psycopg.sql.Composed, tuple][source]

Generate instruction needed to load into calibration_job.

Returns:

Templated information to load into database. templating here will improve security against sql injection attacks.

class DatabaseFrequencySweep(station_id: int | None = None, channel_start: int | None = None, channel_stop: int | None = None, path_to_data: str | None = None, start_time: str | None = None, nof_samples: int | None = None, initial_static_delays: list[float] | None = None, sweep_id: int | None = None)[source]

Class to hold a frequency sweep.

This class holds a frequency sweep and offers some notion of checking types and mandatory keys before attempting to load into the frequency_sweep table.

__init__(station_id: int | None = None, channel_start: int | None = None, channel_stop: int | None = None, path_to_data: str | None = None, start_time: str | None = None, nof_samples: int | None = None, initial_static_delays: list[float] | None = None, sweep_id: int | None = None) None
generate_loading_instruction() tuple[psycopg.sql.Composed, tuple][source]

Generate instruction needed to load into frequency_sweep.

Returns:

Templated information to load into database. templating here will improve security against sql injection attacks.

generate_update_instructions() tuple[psycopg.sql.Composed, tuple][source]

Generate instruction needed to update a frequency sweep entry.

Raises:

ValueError – if the sweep_id is not given, or no fields to update provided.

Returns:

Templated information to load into database. templating here will improve security against sql injection attacks.

class DatabasePhaseFitParams(job_id: int, phase: list[float], phase_gradient: list[float], phase_fit_cost: list[float], phase_fit_covariance: list[list[list[float]]], final_mask: list[bool])[source]

Class to hold per-antenna phase fit parameters.

Holds the output of fit_gains and offers type checking before loading into the calibration_per_sweep table.

__init__(job_id: int, phase: list[float], phase_gradient: list[float], phase_fit_cost: list[float], phase_fit_covariance: list[list[list[float]]], final_mask: list[bool]) None
generate_loading_instruction() tuple[psycopg.sql.Composed, tuple][source]

Generate instruction needed to load into calibration_per_sweep.

Returns:

Templated information to load into database.

class DatabaseSolution(acquisition_time: int, frequency_channel: int, station_id: int, preferred: bool, solution: list[float], calibration_path: str, corrcoeff: list[float] | None = None, residual_max: list[float] | None = None, residual_std: list[float] | None = None, xy_phase: list[float] | None = None, n_masked_initial: int | None = None, n_masked_final: int | None = None, lst: float | None = None, galactic_centre_elevation: float | None = None, sun_elevation: float | None = None, sun_adjustment_factor: float | None = None, masked_antennas: list[int] | None = None, job_id: int | None = None, solution_type: str | None = None)[source]

Class to hold a solution.

This class holds a solution and offers some notion of checking types and mandatory keys before attempting to load into the calibration_per_channel table.

__init__(acquisition_time: int, frequency_channel: int, station_id: int, preferred: bool, solution: list[float], calibration_path: str, corrcoeff: list[float] | None = None, residual_max: list[float] | None = None, residual_std: list[float] | None = None, xy_phase: list[float] | None = None, n_masked_initial: int | None = None, n_masked_final: int | None = None, lst: float | None = None, galactic_centre_elevation: float | None = None, sun_elevation: float | None = None, sun_adjustment_factor: float | None = None, masked_antennas: list[int] | None = None, job_id: int | None = None, solution_type: str | None = None) None
generate_loading_instruction() tuple[psycopg.sql.Composed, tuple][source]

Generate instruction needed to load into calibration_per_channel.

Returns:

Templated information to load into database. templating here will improve security against sql injection attacks.