import json
import os
from unittest import mock
from ska_pact_tango.provider import Provider
[docs]class Pact:
"""
Represents a contract between a consumer and provider.
Provides Python context handlers to configure the Pact mock service to
perform tests with a Tango device (consumer).
Also generates and writes a pact file.
"""
def __init__(self, providers, consumer_name, consumer_class=None, pact_dir=None, pact_file_name=""):
"""
Constructor for Pact.
This class is usually constructed by `Consumer.has_pact_with`. For example:
pact = Consumer("test/consumer/1", consumer_cls=ConsumerDevice).has_pact_with(
providers=[
Provider("test/provider/1").add_interaction(
Interaction()
.given("The provider is in Init State", "Init")
.upon_receiving("A read attribute request for the attribute random_float")
.with_request("read_attribute", "random_float")
.will_respond_with(DeviceAttribute, device_attribute_response)
)
]
)
:param providers: A list of providers that this contract has
:type provider: List[pact.Provider]
:param consumer_name: The name of the consumer
:type consumer_name: str
:param consumer_class: The consumer class
:type consumer_class: Optional[class]
:param pact_dir: Directory where the resulting pact files will be
written. Defaults to the current directory.
:type pact_dir: str
:param pact_file_name: The name of the pact file for this interaction.
Defaults to <consumer_name>-pact.json
:type pact_file_name: str
"""
self.version = "1.0"
self.consumer_name = consumer_name
self.consumer_class = consumer_class
self.providers = providers
self.pact_dir = pact_dir or os.getcwd()
self._mock_handler = None
if pact_file_name:
self.pact_file_name = pact_file_name
else:
self.pact_file_name = f"{self.consumer_name}-pact.json"
if not os.path.exists(self.pact_dir):
raise ValueError(f"Pact destination directory {pact_dir} does not exist")
self.pact_json_file_path = os.path.join(self.pact_dir, self.pact_file_name)
[docs] def get_interactions_for_provider(self, provider_name: str) -> list:
"""Fetch all the interactions associated with the device name passed in.
:param provider_name: The provider name
:type provider_name: str
:return: Interactions for the provider
:rtype: list
"""
providers = [provider for provider in self.providers if provider.device_name == provider_name]
if not providers:
return []
return providers[0].interactions
[docs] def to_dict(self):
"""Construct a pact dictionary
:return: A dictionary of the Pact
:rtype: dict
"""
return {
"consumer": {"name": self.consumer_name},
"providers": [provider.to_dict() for provider in self.providers],
"metadata": {"specification": {"version": self.version}},
}
[docs] def write_pact(self):
"""Write the pact to disk"""
with open(self.pact_json_file_path, "w") as outfile:
json.dump(self.to_dict(), outfile)
[docs] @classmethod
def from_dict(cls, pact_dict: dict):
"""Generate an instance of Pact from a dictionary
:param pact_dict: Pact in a dictionary format
:type pact_dict: dict
:return: an instance of Pact
:rtype: Pact
"""
consumer_name = pact_dict["consumer"]["name"]
providers = []
for provider in pact_dict["providers"]:
providers.append(Provider.from_dict(provider["provider"]))
pact = Pact(providers, consumer_name=consumer_name)
return pact
[docs] @classmethod
def from_file(cls, pact_file_path: str):
"""Parses a pact file and returns an instance of Pact
:param pact_file_path: Path to a Pact file
:type pact_file_path: str
:return: an instance of Pact
:rtype: Pact
"""
assert os.path.isfile(pact_file_path), f"No file at path {pact_file_path}"
with open(pact_file_path, "r") as pact_file:
pact_data = json.load(pact_file)
return Pact.from_dict(pact_data)
def __enter__(self):
"""Set up the mock response and start mocking"""
def _side_effect(*args, **_): # pylint: disable=W0613
"""Set up the mocked proxy"""
mock_proxy = mock.MagicMock()
interactions = self.get_interactions_for_provider(args[0])
if not interactions:
providers = [provider.device_name for provider in self.providers]
raise ValueError(
(f"There are no interactions for provider [{args[0]}], " f"available providers: {providers}")
)
for interaction in interactions:
request_type = interaction.request["type"]
request_name = interaction.request["name"]
response = interaction.response["response"]
# Set up the mock object
if request_type in ["command", "method"]:
# E.g provider_device.no_arg_command()
# provider_device.with_arg_command(1)
# provider_device.write_attribute("random_float", 5.0)
setattr(mock_proxy, request_name, mock.MagicMock(return_value=response))
elif request_type == "read_attribute":
# E.g provider_device.read_attribute("random_float")
setattr(mock_proxy, request_type, mock.MagicMock(return_value=response))
else:
# E.g provider_device.random_float
setattr(mock_proxy, request_name, response)
return mock_proxy
mod_device_proxy = f"{self.consumer_class.__module__}.tango.DeviceProxy"
self._mock_handler = mock.patch(mod_device_proxy, side_effect=_side_effect)
self._mock_handler.start()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop the mock handler"""
self._mock_handler.stop()
def __eq__(self, other_pact):
"""Check equality"""
if not isinstance(other_pact, Pact):
return False
if self.version != other_pact.version:
return False
if self.consumer_name != other_pact.consumer_name:
return False
if len(self.providers) != len(other_pact.providers):
return False
for this_pact_provider, other_pact_provider in list(zip(self.providers, other_pact.providers)):
if this_pact_provider.to_dict() != other_pact_provider.to_dict():
return False
return True