Source code for ska_pact_tango.pact

import json
import os
from unittest import mock

from ska_pact_tango.provider import Provider


[docs]class Pact: """ Represents a contract between a consumer and provider. Provides Python context handlers to configure the Pact mock service to perform tests with a Tango device (consumer). Also generates and writes a pact file. """ def __init__(self, providers, consumer_name, consumer_class=None, pact_dir=None, pact_file_name=""): """ Constructor for Pact. This class is usually constructed by `Consumer.has_pact_with`. For example: pact = Consumer("test/consumer/1", consumer_cls=ConsumerDevice).has_pact_with( providers=[ Provider("test/provider/1").add_interaction( Interaction() .given("The provider is in Init State", "Init") .upon_receiving("A read attribute request for the attribute random_float") .with_request("read_attribute", "random_float") .will_respond_with(DeviceAttribute, device_attribute_response) ) ] ) :param providers: A list of providers that this contract has :type provider: List[pact.Provider] :param consumer_name: The name of the consumer :type consumer_name: str :param consumer_class: The consumer class :type consumer_class: Optional[class] :param pact_dir: Directory where the resulting pact files will be written. Defaults to the current directory. :type pact_dir: str :param pact_file_name: The name of the pact file for this interaction. Defaults to <consumer_name>-pact.json :type pact_file_name: str """ self.version = "1.0" self.consumer_name = consumer_name self.consumer_class = consumer_class self.providers = providers self.pact_dir = pact_dir or os.getcwd() self._mock_handler = None if pact_file_name: self.pact_file_name = pact_file_name else: self.pact_file_name = f"{self.consumer_name}-pact.json" if not os.path.exists(self.pact_dir): raise ValueError(f"Pact destination directory {pact_dir} does not exist") self.pact_json_file_path = os.path.join(self.pact_dir, self.pact_file_name)
[docs] def get_interactions_for_provider(self, provider_name: str) -> list: """Fetch all the interactions associated with the device name passed in. :param provider_name: The provider name :type provider_name: str :return: Interactions for the provider :rtype: list """ providers = [provider for provider in self.providers if provider.device_name == provider_name] if not providers: return [] return providers[0].interactions
[docs] def to_dict(self): """Construct a pact dictionary :return: A dictionary of the Pact :rtype: dict """ return { "consumer": {"name": self.consumer_name}, "providers": [provider.to_dict() for provider in self.providers], "metadata": {"specification": {"version": self.version}}, }
[docs] def write_pact(self): """Write the pact to disk""" with open(self.pact_json_file_path, "w") as outfile: json.dump(self.to_dict(), outfile)
[docs] @classmethod def from_dict(cls, pact_dict: dict): """Generate an instance of Pact from a dictionary :param pact_dict: Pact in a dictionary format :type pact_dict: dict :return: an instance of Pact :rtype: Pact """ consumer_name = pact_dict["consumer"]["name"] providers = [] for provider in pact_dict["providers"]: providers.append(Provider.from_dict(provider["provider"])) pact = Pact(providers, consumer_name=consumer_name) return pact
[docs] @classmethod def from_file(cls, pact_file_path: str): """Parses a pact file and returns an instance of Pact :param pact_file_path: Path to a Pact file :type pact_file_path: str :return: an instance of Pact :rtype: Pact """ assert os.path.isfile(pact_file_path), f"No file at path {pact_file_path}" with open(pact_file_path, "r") as pact_file: pact_data = json.load(pact_file) return Pact.from_dict(pact_data)
def __enter__(self): """Set up the mock response and start mocking""" def _side_effect(*args, **_): # pylint: disable=W0613 """Set up the mocked proxy""" mock_proxy = mock.MagicMock() interactions = self.get_interactions_for_provider(args[0]) if not interactions: providers = [provider.device_name for provider in self.providers] raise ValueError( (f"There are no interactions for provider [{args[0]}], " f"available providers: {providers}") ) for interaction in interactions: request_type = interaction.request["type"] request_name = interaction.request["name"] response = interaction.response["response"] # Set up the mock object if request_type in ["command", "method"]: # E.g provider_device.no_arg_command() # provider_device.with_arg_command(1) # provider_device.write_attribute("random_float", 5.0) setattr(mock_proxy, request_name, mock.MagicMock(return_value=response)) elif request_type == "read_attribute": # E.g provider_device.read_attribute("random_float") setattr(mock_proxy, request_type, mock.MagicMock(return_value=response)) else: # E.g provider_device.random_float setattr(mock_proxy, request_name, response) return mock_proxy mod_device_proxy = f"{self.consumer_class.__module__}.tango.DeviceProxy" self._mock_handler = mock.patch(mod_device_proxy, side_effect=_side_effect) self._mock_handler.start() def __exit__(self, exc_type, exc_val, exc_tb): """Stop the mock handler""" self._mock_handler.stop() def __eq__(self, other_pact): """Check equality""" if not isinstance(other_pact, Pact): return False if self.version != other_pact.version: return False if self.consumer_name != other_pact.consumer_name: return False if len(self.providers) != len(other_pact.providers): return False for this_pact_provider, other_pact_provider in list(zip(self.providers, other_pact.providers)): if this_pact_provider.to_dict() != other_pact_provider.to_dict(): return False return True