"""
The entities.common.procedures module defines a Python representation of the
procedures listed in the activities of the SKA scheduling block.
"""
from typing import Dict, List, Optional
__all__ = ["EmbeddedScript", "FileSystemScript", "GitScript"]
class PythonArguments:
"""
Represents the arguments for a PythonProcedure in an SKA scheduling block.
"""
def __init__(self, args: List = None, kwargs: Dict = None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
self.args = args
self.kwargs = kwargs
def __eq__(self, other):
return self.args == other.args and self.kwargs == other.kwargs
def __repr__(self):
return f"<PythonArguments(" f"args={self.args}, " f"kwargs={self.kwargs})>"
class PythonProcedure:
"""
Represents a PythonProcedure to be ran as an activity in an SKA scheduling block.
"""
def __init__(self, default_args: Dict[str, PythonArguments] = None):
if default_args is None:
default_args = {}
self.default_args = default_args
[docs]class EmbeddedScript(PythonProcedure):
"""
Represents an EmbeddedScript to be ran as an activity in an SKA scheduling block.
"""
def __init__(self, content: str, default_args: Dict[str, PythonArguments] = None):
self.content = content
super().__init__(default_args)
def __eq__(self, other):
if not isinstance(other, EmbeddedScript):
return False
return self.content == other.content and self.default_args == other.default_args
def __repr__(self):
return (
f"<EmbeddedScript("
f"default_args={self.default_args}, "
f"content={self.content})>"
)
[docs]class FileSystemScript(PythonProcedure):
"""
Represents an FileSystemScript to be ran as an activity in an SKA scheduling block.
"""
def __init__(self, path: str, default_args: Dict[str, PythonArguments] = None):
self.path = path
super().__init__(default_args)
def __eq__(self, other):
if not isinstance(other, FileSystemScript):
return False
return self.path == other.path and self.default_args == other.default_args
def __repr__(self):
return (
f"<FileSystemScript("
f"default_args={self.default_args}, "
f"path={self.path})>"
)
[docs]class GitScript(FileSystemScript):
"""
Represents an GitScript to be ran as an activity in an SKA scheduling block.
"""
def __init__(
self,
repo: str,
path: str,
branch: Optional[str] = None,
commit: Optional[str] = None,
default_args: Dict[str, PythonArguments] = None,
): # pylint: disable=too-many-arguments
self.repo = repo
self.branch = branch
self.commit = commit
super().__init__(path, default_args)
def __eq__(self, other):
if not isinstance(other, GitScript):
return False
return (
self.repo == other.repo
and self.path == other.path
and self.branch == other.branch
and self.commit == other.commit
and self.default_args == other.default_args
)
def __repr__(self):
return (
f"<GitScript("
f"default_args={self.default_args}, "
f"path={self.path}, "
f"repo={self.repo}, "
f"branch={self.branch}, "
f"commit={self.commit})>"
)