# -*- coding: utf-8 -*-
#
# This file is part of the SKA Mid.CBF MCS project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
# Copyright (c) 2019 National Research Council of Canada
from __future__ import annotations
import logging
from typing import Callable, Optional, Tuple
import tango
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import PowerMode, SimulationMode
from ska_mid_cbf_mcs.component.component_manager import (
CbfComponentManager,
CommunicationStatus,
)
from ska_mid_cbf_mcs.power_switch.apc_pdu_driver import ApcPduDriver
from ska_mid_cbf_mcs.power_switch.apc_snmp_driver import ApcSnmpDriver
from ska_mid_cbf_mcs.power_switch.dli_pro_switch_driver import (
DLIProSwitchDriver,
)
from ska_mid_cbf_mcs.power_switch.power_switch_simulator import (
PowerSwitchSimulator,
)
from ska_mid_cbf_mcs.power_switch.st_switched_pro2_driver import (
STSwitchedPRO2Driver,
)
__all__ = ["PowerSwitchComponentManager"]
[docs]class PowerSwitchComponentManager(CbfComponentManager):
"""
A component manager for the power switch. Calls either the power
switch driver or the power switch simulator based on the value of simulation
mode.
:param simulation_mode: simulation mode identifies if the real power switch
driver or the simulator should be used
:param protocol: Connection protocol (HTTP or HTTPS) for the power switch
:param ip: IP address of the power switch
:param login: Login username of the power switch
:param password: Login password for the power switch
:param logger: a logger for this object to use
"""
def __init__(
self: PowerSwitchComponentManager,
model: str,
ip: str,
login: str,
password: str,
logger: logging.Logger,
push_change_event_callback: Optional[Callable],
communication_status_changed_callback: Callable[
[CommunicationStatus], None
],
component_power_mode_changed_callback: Callable[[PowerMode], None],
component_fault_callback: Callable[[bool], None],
simulation_mode: SimulationMode = SimulationMode.TRUE,
) -> None:
"""
Initialize a new instance.
:param model: Name of the power switch model
:param ip: IP address of the power switch
:param login: Login username of the power switch
:param password: Login password for the power switch
:param logger: a logger for this object to use
:param push_change_event: method to call when the base classes
want to send an event
:param communication_status_changed_callback: callback to be
called when the status of the communications channel between
the component manager and its component changes
:param component_power_mode_changed_callback: callback to be
called when the component power mode changes
:param component_fault_callback: callback to be
called when the component has faulted
:param simulation_mode: simulation mode identifies if the real power switch
driver or the simulator should be used
"""
self.connected = False
self._simulation_mode = simulation_mode
self.power_switch_driver = self.get_power_switch_driver(
model=model, ip=ip, login=login, password=password, logger=logger
)
self.power_switch_simulator = PowerSwitchSimulator(
model=model, logger=logger
)
super().__init__(
logger=logger,
push_change_event_callback=push_change_event_callback,
communication_status_changed_callback=communication_status_changed_callback,
component_power_mode_changed_callback=component_power_mode_changed_callback,
component_fault_callback=component_fault_callback,
)
@property
def num_outlets(self: PowerSwitchComponentManager) -> int:
"""
Get number of outlets present in this power switch.
:return: number of outlets
"""
if self.simulation_mode:
return self.power_switch_simulator.num_outlets
else:
return self.power_switch_driver.num_outlets
@property
def is_communicating(self: PowerSwitchComponentManager) -> bool:
"""
Returns whether or not the power switch can be communicated with.
:return: whether the power switch is communicating
"""
# If we haven't started communicating yet, don't check power switch
# communication status
if self.connected is False:
return False
# If we have started communicating, check the actual communication
# status of the power switch
if self.simulation_mode:
return self.power_switch_simulator.is_communicating
else:
return self.power_switch_driver.is_communicating
@property
def simulation_mode(self: PowerSwitchComponentManager) -> SimulationMode:
"""
Get the simulation mode of the component manager.
:return: simulation mode of the component manager
"""
return self._simulation_mode
@simulation_mode.setter
def simulation_mode(
self: PowerSwitchComponentManager, value: SimulationMode
) -> None:
"""
Set the simulation mode of the component manager.
:param value: value to set simulation mode to
"""
self._simulation_mode = value
[docs] def start_communicating(self: PowerSwitchComponentManager) -> None:
"""
Perform any setup needed for communicating with the power switch.
"""
if self.connected:
self._logger.info("Already communicating.")
return
super().start_communicating()
if not self._simulation_mode:
self.power_switch_driver.initialize()
self.update_communication_status(CommunicationStatus.ESTABLISHED)
self.update_component_power_mode(PowerMode.ON)
self.connected = True
[docs] def stop_communicating(self: PowerSwitchComponentManager) -> None:
"""Stop communication with the component."""
super().stop_communicating()
self.update_component_power_mode(PowerMode.UNKNOWN)
self.connected = False
if not self._simulation_mode:
self.power_switch_driver.stop()
[docs] def get_outlet_power_mode(
self: PowerSwitchComponentManager, outlet: str
) -> PowerMode:
"""
Get the power mode of a specific outlet.
:param outlet: outlet ID
:return: power mode of the outlet
:raise AssertionError: if outlet ID is out of bounds
"""
if self.simulation_mode:
return self.power_switch_simulator.get_outlet_power_mode(outlet)
else:
return self.power_switch_driver.get_outlet_power_mode(outlet)
[docs] def turn_on_outlet(
self: PowerSwitchComponentManager, outlet: str
) -> Tuple[ResultCode, str]:
"""
Tell the power switch to turn on a specific outlet.
:param outlet: outlet ID to turn on
:return: a tuple containing a return code and a string
message indicating status
:raise AssertionError: if outlet ID is out of bounds
"""
if self.simulation_mode:
return self.power_switch_simulator.turn_on_outlet(outlet)
else:
return self.power_switch_driver.turn_on_outlet(outlet)
[docs] def turn_off_outlet(
self: PowerSwitchComponentManager, outlet: str
) -> Tuple[ResultCode, str]:
"""
Tell the power switch to turn off a specific outlet.
:param outlet: outlet ID to turn off
:return: a tuple containing a return code and a string
message indicating status
:raise AssertionError: if outlet ID is out of bounds
"""
if self.simulation_mode:
return self.power_switch_simulator.turn_off_outlet(outlet)
else:
return self.power_switch_driver.turn_off_outlet(outlet)
[docs] def get_power_switch_driver(
self: PowerSwitchComponentManager,
model: str,
ip: str,
login: str,
password: str,
logger: logging.Logger,
):
# The text must match the powerswitch.yaml
if model == "DLI LPC9":
return DLIProSwitchDriver(
ip=ip, login=login, password=password, logger=logger
)
elif model == "Server Technology Switched PRO2":
return STSwitchedPRO2Driver(
ip=ip, login=login, password=password, logger=logger
)
elif model == "APC AP8681 SSH":
return ApcPduDriver(
ip=ip, login=login, password=password, logger=logger
)
elif model == "APC AP8681 SNMP":
return ApcSnmpDriver(
ip=ip, login=login, password=password, logger=logger
)
else:
err = f"Model name {model} is not supported."
logger.error(err)
tango.Except.throw_exception(
"PowerSwitch_CreateDriverFailed",
err,
"get_power_switch_driver()",
)