Source code for ska_pst.lmc.component.grpc_lmc_client

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the gRPC LMC client to external processes."""

from __future__ import annotations

import functools
import logging
from threading import Event
from typing import Any, Callable, Dict, Generator, NoReturn, Type, TypeVar, cast

import grpc
from grpc import Channel, StatusCode
from ska_control_model import ObsState
from ska_pst.common.constants import DEFAULT_HEALTH_CHECK_INTERVAL_MS, DEFAULT_MONITORING_INTERVAL_MS
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
    AbortRequest,
    AsciiHeaderProto,
    ConfigureBeamRequest,
    ConfigureScanRequest,
    ConnectionRequest,
    ConnectionResponse,
    DeconfigureBeamRequest,
    DeconfigureScanRequest,
    EnvValue,
    ErrorCode,
    GetBeamConfigurationRequest,
    GetBeamConfigurationResponse,
    GetEnvironmentRequest,
    GetLogLevelRequest,
    GetScanConfigurationRequest,
    GetScanConfigurationResponse,
    GetStateRequest,
    GetStateResponse,
    GoToFaultRequest,
    HealthCheckRequest,
    HealthCheckResponse,
    LogLevel,
    MonitorRequest,
    MonitorResponse,
    ResetRequest,
    RestartRequest,
    SetLogLevelRequest,
    StartScanRequest,
    Status,
    StopScanRequest,
)
from ska_pst.grpc.lmc.ska_pst_lmc_pb2_grpc import PstLmcServiceStub
from ska_pst.lmc.health_check import HealthCheckState
from ska_pst.lmc.util.timeout_iterator import TimeoutIterator
from ska_pydada import AsciiHeader

GRPC_STATUS_DETAILS_METADATA_KEY = "grpc-status-details-bin"


[docs]def ascii_header_to_protobuf(ascii_header: AsciiHeader) -> AsciiHeaderProto: """ Convert an ``AsciiHeader`` instance to a protobuf equivalent message. The protobuf AsciiHeader message uses the NewType pattern to wrap a string which allows for type safety on both the Python and C++ side of the LMC. :param ascii_header: input ascii header that will be converted a protobuf message. :type ascii_header: AsciiHeader :return: a protobuf message equivalent of the input AsciiHeader. :rtype: AsciiHeaderProto """ return AsciiHeaderProto(header=str(ascii_header))
[docs]def protobuf_to_ascii_header(msg: AsciiHeaderProto) -> AsciiHeader: """ Convert a protobuf AsciiHeader message to a Python ``AsciiHeader``. :param msg: input protobuf message. :type msg: AsciiHeaderProto :return: the input message as an ``AsciiHeader`` instance. :rtype: AsciiHeader """ return AsciiHeader.from_str(msg.header)
class BaseGrpcException(Exception): """Base exception to capture gRPC related exceptions.""" def __init__(self: BaseGrpcException, message: str) -> None: """Initialise exception.""" self.message = message super().__init__() def __str__(self: BaseGrpcException) -> str: """ Get the exception message. This is used in Python f-strings to get the message of the base exception. The SKA base classes expect a string representation of the exception. :return: the exception message :rtype: str """ return self.message def __repr__(self: BaseGrpcException) -> str: """ Get a string representation of the exception. This method is different to the ``__str__`` method as it also includes the class name of the exception. :return: a string representation of the exception. :rtype: str """ cls = self.__class__.__name__ return f"{cls}('{self.message}')" class AlreadyScanningException(BaseGrpcException): """ Exception for when the process is already scanning. Raised when the server is already scanning and is in the SCANNING ObsState state. If this exception is raised it is likely due to a mismatch in the state model of the LMC and the server, which could be the case if a command line interface has interacted with the server directly. The LMC can recover from this as it should only be raised when the scan command is called. The LMC should log this happened but can safely go into SCANNING state. """ class NotScanningException(BaseGrpcException): """ Exception for when trying to end scan but component is not scanning. Raised when the server is not in a scanning state but received an end scan command. Just like :py:class:`AlreadyScanningException` it is possible for the LMC to recover from this as this exception is only raised during stop_scan. The LMC should log this happened but can safely go into a READY state. """ class ResourcesAlreadyAssignedException(BaseGrpcException): """ Exception for when resources were already assigned. Raised when the server is already in an assigned resources state and the request should not have been called. """ class ResourcesNotAssignedException(BaseGrpcException): """ Exception for when resources have not been assigned. Raised when the server does not have any resources assigned. This request should not have been called. """ class ScanConfiguredAlreadyException(BaseGrpcException): """ Exception for when scan has already been configured. Raised when the server is in a READY state and is already configured for scan. This request should have not been made. """ class NotConfiguredForScanException(BaseGrpcException): """ Exception for when server has no scan configuration. Raised when the server does not have a scan configuration but as request to deconfigure, scan, or get scan configuration was made but no configuration existed. """ class FaultOccurredWhileAborting(BaseGrpcException): """ Exception for when server went into a FAULT state while being commanded to ABORT. Both ABORT and FAULT states are resettable, as such we don't want the LMC command to fail but provide a warning that this occurred. """ class InvalidRequestException(BaseGrpcException): """ Exception with the actual request parameters. This is raised when the server validates the request and request is not correct, such as the assign resources message has a protobuf Oneof field for resources and the incorrect one was applied. """ class ServerError(BaseGrpcException): """ Exception when an exception on the server side happens. The server raised an exception during the processing of the request and the logs of the server should be checked. The client is not expected to handle this exception. """ def __init__(self: ServerError, error_code: int, message: str, **kwargs: Any) -> None: """Initialise exception.""" self.error_code = error_code super().__init__(message) class ServiceUnavailable(ServerError): """Exception raised when the gRPC service is not available.""" class TimeoutException(ServerError): """Exception raised when the gRPC service takes longer than expected.""" def __init__(self: TimeoutException, *, timeout: float, **kwargs: Any) -> None: """Initialise exception.""" self.timeout = timeout super().__init__(**kwargs) class UnknownGrpcException(BaseGrpcException): """ An unknown gRPC exception. This error occurs due to gRPC itself. The client is not expected to handle this request. """ def __init__(self: UnknownGrpcException, error_code: int, message: str) -> None: """Initialise exception.""" self.error_code = error_code super().__init__(message) ERROR_CODE_EXCEPTION_MAP: Dict[ErrorCode, Type[BaseGrpcException]] = { ErrorCode.ALREADY_SCANNING: AlreadyScanningException, ErrorCode.NOT_SCANNING: NotScanningException, ErrorCode.INVALID_REQUEST: InvalidRequestException, ErrorCode.CONFIGURED_FOR_BEAM_ALREADY: ResourcesAlreadyAssignedException, ErrorCode.NOT_CONFIGURED_FOR_BEAM: ResourcesNotAssignedException, ErrorCode.CONFIGURED_FOR_SCAN_ALREADY: ScanConfiguredAlreadyException, ErrorCode.NOT_CONFIGURED_FOR_SCAN: NotConfiguredForScanException, ErrorCode.FAULT_OCCURRED_WHILE_ABORTING: FaultOccurredWhileAborting, } Wrapped = TypeVar("Wrapped", bound=Callable[..., Any]) def grpc_request(func: Wrapped) -> Wrapped: """ Return a decorated function that handles gRPC requests. Methods decorated with this decorator will perform a gRPC call and this decorator will handle mapping gRPC errors into the PST specific errors. :param func: the wrapped function :type func: Wrapped :return: the wrapped function """ @functools.wraps(func) def _wrapper( client: PstGrpcLmcClient, *args: Any, timeout: float | None = None, **kwargs: Any, ) -> None: timeout = timeout or client.default_timeout try: return func(client, *args, timeout=timeout, **kwargs) except Exception as e: _handle_grpc_error(e, timeout=timeout) return cast(Wrapped, _wrapper) def _handle_server_error(error: grpc.RpcError, timeout: float) -> None: if hasattr(error, "code"): error_code = error.code() if error_code in [ StatusCode.FAILED_PRECONDITION, StatusCode.INTERNAL, StatusCode.INVALID_ARGUMENT, ]: return if hasattr(error, "details"): message = error.details() else: message = "Unknown" if error_code == StatusCode.UNAVAILABLE: raise ServiceUnavailable(error_code=error_code, message=message) from error elif error_code == StatusCode.DEADLINE_EXCEEDED: raise TimeoutException(timeout=timeout, error_code=error_code, message=message) from error raise ServerError(error_code=error_code, message=message) from error def _handle_pst_error(error: grpc.RpcError) -> None: grpc_error_code = StatusCode.UNKNOWN if hasattr(error, "code"): grpc_error_code = error.code() if grpc_error_code not in [ StatusCode.FAILED_PRECONDITION, StatusCode.INTERNAL, StatusCode.INVALID_ARGUMENT, ]: return if hasattr(error, "trailing_metadata"): for k, v in error.trailing_metadata(): if k == GRPC_STATUS_DETAILS_METADATA_KEY: msg = Status() msg.ParseFromString(v) error_code = msg.code if error_code in ERROR_CODE_EXCEPTION_MAP: raise ERROR_CODE_EXCEPTION_MAP[error_code](msg.message) from error def _handle_grpc_error(error: grpc.RpcError, timeout: float) -> NoReturn: _handle_server_error(error, timeout) _handle_pst_error(error) # if here, we have an unknown error if hasattr(error, "code"): error_code = error.code() else: error_code = -1 logging.warning(f"Unknown exception found. {error_code=}, {error.details()=}", exc_info=True) raise UnknownGrpcException(error_code, error.details()) from error
[docs]class PstGrpcLmcClient: """ The client API that connects to a remote gRPC service. This client is a wrapper around the :py:class:`PstLmcServiceStub` that is generated from the gRPC/Protobuf bindings. Once fully implemented this class will be able to be used by any of the LMC components :py:class:`PstProcessApi` implementations. """ _client_id: str _channel: Channel _endpoint: str _service: PstLmcServiceStub _logger: logging.Logger _service_name: str | None _service_uuid: str | None def __init__( self: PstGrpcLmcClient, client_id: str, endpoint: str, default_timeout: float = 60.0, logger: logging.Logger | None = None, **kwargs: Any, ) -> None: """ Initialise gRPC client. :param client_id: the client id to allow for tracing in logs. :type client_id: str :param endpoint: the gRPC endpoint to send requests to. :type endpoint: str :param default_timeout: the default timeout, in seconds, to use for all requests, defaults to 60.0 (e.g. 1 minute) :type default_timeout: float, optional :param logger: the logger to use for logging information from client, defaults to None :type logger: logging.Logger | None, optional """ self._logger = logger or logging.getLogger(__name__) self._client_id = client_id self._endpoint = endpoint self._logger.info(f"Connecting '{client_id}' to remote endpoint '{endpoint}'") self._channel = grpc.insecure_channel(endpoint, options=[("wait_for_ready", True)]) self._service = PstLmcServiceStub(channel=self._channel) self._service_name = None self._service_uuid = None self.default_timeout = default_timeout
[docs] @grpc_request def connect(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None: """ Connect client to the remote gRPC service. This is used to let the server know that a client has connected. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout self._logger.debug(f"Connect called for client {self._client_id}") request = ConnectionRequest(client_id=self._client_id) response: ConnectionResponse = self._service.connect(request, timeout=timeout) self._logger.info(f"Connected to {response.service_name} identified with UUID={response.uuid}") self._service_name = response.service_name self._service_uuid = response.uuid
[docs] @grpc_request def configure_beam( self: PstGrpcLmcClient, request: ConfigureBeamRequest, timeout: float | None = None, **kwargs: Any ) -> None: """ Call configure_beam on remote gRPC service. :param request: the configure beam request object. :type request: ConfigureBeamRequest :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout if request.dry_run: self._logger.debug(f"Validating beam configuration on '{self._client_id}'.") else: self._logger.debug(f"Configuring beam on '{self._client_id}'.") self._service.configure_beam(request, timeout=timeout)
[docs] @grpc_request def deconfigure_beam(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None: """ Call deconfigure_beam on remote gRPC service. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout self._logger.debug(f"Deconfiguring beam on '{self._client_id}'") self._service.deconfigure_beam(DeconfigureBeamRequest(), timeout=timeout)
[docs] @grpc_request def get_beam_configuration( self: PstGrpcLmcClient, timeout: float | None = None ) -> GetBeamConfigurationResponse: """ Call get_beam_configuration on remote gRPC service. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional :return: the current beam configuration on the remote gRPC service. :rtype: GetBeamConfigurationResponse """ timeout = timeout or self.default_timeout self._logger.debug(f"Getting beam configuration from '{self._client_id}'.") return self._service.get_beam_configuration(GetBeamConfigurationRequest(), timeout=timeout)
[docs] @grpc_request def configure_scan( self: PstGrpcLmcClient, request: ConfigureScanRequest, timeout: float | None = None ) -> None: """ Call configure_scan on remote gRPC service. :param request: the configure can request object for the remote gRPC service. :type request: ConfigureScanRequest :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout if request.dry_run: self._logger.debug(f"Validating scan configuration on '{self._client_id}'.") else: self._logger.debug(f"Configuring scan on '{self._client_id}'.") self._service.configure_scan(request, timeout=timeout)
[docs] @grpc_request def deconfigure_scan(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None: """ Call deconfigure_scan on remote gRPC service. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout self._logger.debug(f"Deconfiguring scan on '{self._client_id}'.") self._service.deconfigure_scan(DeconfigureScanRequest(), timeout=timeout)
[docs] @grpc_request def get_scan_configuration( self: PstGrpcLmcClient, timeout: float | None = None ) -> GetScanConfigurationResponse: """ Call get_scan_configuration on remote gRPC service. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional :return: the current scan configuration on the remote gRPC service :rtype: GetScanConfigurationResponse """ timeout = timeout or self.default_timeout self._logger.debug(f"Getting scan configuration from '{self._client_id}'.") return self._service.get_scan_configuration(GetScanConfigurationRequest(), timeout=timeout)
[docs] @grpc_request def start_scan( self: PstGrpcLmcClient, request: StartScanRequest, timeout: float | None = None, **kwargs: Any ) -> None: """ Call start_scan on remote gRPC service. :param request: the start scan request to send to the remote gRPC service :type request: StartScanRequest :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout self._service.start_scan(request, timeout=timeout)
[docs] @grpc_request def stop_scan(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None: """ Call stop_scan on remote gRPC service. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout self._logger.debug(f"Stopping scan on '{self._client_id}'.") self._service.stop_scan(StopScanRequest(), timeout=timeout)
[docs] @grpc_request def go_to_fault(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None: """ Put the gRPC service in to a FAULT state. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout self._logger.debug(f"Forcing '{self._client_id}' into FAULT state.") self._service.go_to_fault(GoToFaultRequest(), timeout=timeout)
[docs] @grpc_request def get_state(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> ObsState: """ Call get_state on remote gRPC service. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional :return: the current observation state of the remote gRPC service :rtype: ObsState """ timeout = timeout or self.default_timeout self._logger.debug(f"Calling get_state for '{self._client_id}'.") result: GetStateResponse = self._service.get_state(GetStateRequest(), timeout=timeout) return ObsState(result.state)
[docs] @grpc_request def get_env(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> dict: """ Get the environment values from the remote gRPC service. This will map the Protobuf `EnvVal` objects to the appropriate Python types. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional :return: the environment values from the remote gRPC service. :rtype: dict """ timeout = timeout or self.default_timeout def _map_value(value: EnvValue) -> Any: if value.HasField("string_value"): return value.string_value elif value.HasField("float_value"): return value.float_value elif value.HasField("signed_int_value"): return value.signed_int_value else: return value.unsigned_int_value response = self._service.get_env(GetEnvironmentRequest(), timeout=timeout) values = response.values return {k: _map_value(values[k]) for k in values}
[docs] @grpc_request def abort(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None: """ Abort scanning. This method is to be used by the LMC device that needs to abort a long running action, in particular scan. The ObsState model allows for this to be called if in IDLE (resources assigned), CONFIGURING (configuring a scan), READY (configured for a scan but not scanning), SCANNING (a scan is running), or RESETTING (is trying to reset from ABORTED/FAULT state). After this call the state of the service should be ABORTED. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout self._logger.debug(f"Calling Abort on '{self._client_id}'.") self._service.abort(AbortRequest(), timeout=timeout)
[docs] @grpc_request def reset(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None: """ Reset service. This method is to be used by the LMC device that is currently in an ABORTED or FAULT state to reset the service. After this call the state of the service should be in IDLE (resources assigned and not configured for a scan). :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout self._logger.debug(f"Calling Reset on '{self._client_id}'.") self._service.reset(ResetRequest(), timeout=timeout)
[docs] @grpc_request def restart(self: PstGrpcLmcClient, timeout: float | None = None) -> None: """ Restart service. This method is to be used by the LMC device to restart a service regardless of its ObsState. After this call the state of the service should be in IDLE (resources assigned and not configured for a scan). :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout try: self._logger.warning(f"Restart called for {self._client_id}.") self._service.restart(RestartRequest(), timeout=timeout) except grpc.RpcError: self._logger.warning(f"Service {self._client_id} is unavailable")
[docs] def monitor( self: PstGrpcLmcClient, abort_event: Event, *, polling_rate: int = DEFAULT_MONITORING_INTERVAL_MS, ) -> Generator[MonitorResponse, None, None]: """ Call monitor on remote gRPC service. This method is not decorated with the ``grpc_request`` as it handles timeout differently but this does handle the gRPC errors just like the ``grpc_request`` decorator does. :param abort_event: a :py:class:`threading.Event` that can be used to signal to stop monitoring. :type abort_event: Event :param polling_rate: the rate, in milliseconds, at which the monitoring should poll. The default value is 5000ms (i.e. 5 seconds). :type polling_rate: int, optional :yield: the current monitoring state from the remote gRPC service :rtype: Generator[MonitorResponse, None, None] """ self._logger.debug(f"Starting background monitoring on '{self._client_id}'.") # have timeout set to max of 5s or 2 times the polling interval timeout = max(5.0, 2.0 * polling_rate / 1000.0) try: monitor_stream = self._service.monitor(MonitorRequest(polling_rate=polling_rate)) self._monitor_stream = TimeoutIterator( iterator=monitor_stream, timeout=timeout, abort_event=abort_event, expected_period=polling_rate / 1000.0, ) while not abort_event.is_set(): try: for t in self._monitor_stream: yield t except TimeoutError: if abort_event.is_set(): # received timeout during abort event being set. continue self._logger.warning( f"received timeout during monitoring for '{self._client_id}' before abort event set." ) except grpc.RpcError as e: _handle_grpc_error(e, timeout=timeout)
[docs] @grpc_request def set_log_level( self: PstGrpcLmcClient, request: SetLogLevelRequest, timeout: float | None = None, **kwargs: Any ) -> None: """ Set the LogLevel of the remote gRPC service. :param request: The request containing LogLevel to be set on the remote gRPC service. :type request: SetLogLevelRequest :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional """ timeout = timeout or self.default_timeout self._logger.debug(f"Setting log level on '{self._client_id}'. log_level={request.log_level}") self._service.set_log_level(request=request, timeout=timeout)
[docs] @grpc_request def get_log_level(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> LogLevel: """ Get the LogLevel of the remote gRPC service. :param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout`` :type timeout: float | None, optional :returns: The current LogLevel of the remote gRPC service. :rtype: LogLevel """ timeout = timeout or self.default_timeout self._logger.debug(f"Calling get_log_level for '{self._client_id}'.") return self._service.get_log_level(request=GetLogLevelRequest(), timeout=timeout)
[docs] def perform_health_check( self: PstGrpcLmcClient, abort_event: Event, *, health_check_interval: int = DEFAULT_HEALTH_CHECK_INTERVAL_MS, ) -> Generator[HealthCheckResponse, None, None]: """ Perform health check on remote gRPC service. This method is not decorated with the ``grpc_request`` as it handles timeout differently but this does handle the gRPC errors just like the ``grpc_request`` decorator does. :param abort_event: a :py:class:`threading.Event` that can be used to signal to stop performing the health check of the remote gRPC service. :type abort_event: Event :param health_check_interval: the interval, in milliseconds, to expect a response back from the remote gRPC service, defaults to 1000 :type health_check_interval: int, optional :yield: the current health check response :rtype: Generator[HealthCheckResponse, None, None] """ def _map_response(grpc_response: HealthCheckResponse) -> HealthCheckState: obs_state = ObsState(grpc_response.obs_state) fault_message: str | None = None if obs_state == ObsState.FAULT: fault_message = grpc_response.fault_message return HealthCheckState( service_name=self._service_name, # type: ignore service_uuid=self._service_uuid, # type: ignore actual_service_uuid=grpc_response.uuid, obs_state=ObsState(grpc_response.obs_state), fault_message=fault_message, ) self._logger.debug(f"Starting background health check on '{self._client_id}'.") # have timeout set to max of 5s or 2 time the health check interval timeout = max(5.0, 2.0 * health_check_interval / 1000.0) try: health_check_stream = self._service.health_check( HealthCheckRequest(health_check_interval=health_check_interval) ) self._health_check_stream = TimeoutIterator( iterator=health_check_stream, timeout=timeout, abort_event=abort_event, expected_period=health_check_interval / 1000.0, # this allows getting state changes as they occur background_thread_wait=False, ) while not abort_event.is_set(): try: for t in self._health_check_stream: yield _map_response(t) except TimeoutError: if abort_event.is_set(): # received timeout during abort event being set. continue self._logger.warning( f"received timeout during health check for '{self._client_id}' " f"before abort event set." ) except grpc.RpcError as e: _handle_grpc_error(e, timeout=timeout)
def __repr__(self: PstGrpcLmcClient) -> str: """ Get a string representation of instance. :return: a string representation of instance :rtype: str """ return f"PstGrpcLmcClient(client_id='{self._client_id}', endpoint='{self._endpoint}')"