Source code for ska_telmodel.data.backend

import abc
import datetime
import io
import json
import logging
import os
import pathlib
import re
import shutil
import tempfile
import typing
import urllib.parse
import urllib.request
import warnings
from collections.abc import Iterable
from contextlib import contextmanager

from . import cache
from .sources import DEFAULT_FRAGMENT, DEFAULT_NETLOC, DEFAULT_PATH

LOGGING = logging.getLogger(__name__)


[docs]class TMDataBackend(metaclass=abc.ABCMeta): """ Base class for telescope model data backends Sub-classes should override :py:meth:`backend_name`, then utilise :py:meth:`telmodel_backend` to register the telescope model data backend. A minimal implementation should furthermore provide :py:meth:`list_keys` and :py:meth:`get`. """ # Filenames must have an extension FILE_RE = r"[A-Za-z][A-Za-z_\-\.0-9]*\.[a-z_\-0-9]+" KEY_RE = re.compile(r"([A-Za-z][A-Za-z_\-0-9]*/)*" + FILE_RE) PREFIX_RE = re.compile( r"([A-Za-z][A-Za-z_\-0-9]*/)*([A-Za-z][A-Za-z_\-0-9]*|" + FILE_RE + r")?" )
[docs] @classmethod def valid_key(cls, key: str) -> bool: """Check whether this is a valid key we could store data for For this to be valid, it needs to: * Have every path segment start with a letter * Have no dot in directory names, and a dot in file name :returns: Validity of key """ return cls.KEY_RE.fullmatch(key) is not None
[docs] @classmethod def valid_prefix(cls, key: str) -> bool: """Check whether argument could be a valid prefix to a key For this to be valid, it needs to: * Have every path segment start with a letter * Have no dot in directory names, and a dot in file name :returns: Validity of key """ return cls.PREFIX_RE.fullmatch(key) is not None
[docs] @classmethod @abc.abstractmethod def backend_name(cls) -> str: """Returns the name of the backend. Will be used for the scheme in URIs to identify the backend type of a telescope model data source. """ pass # pragma: no cover
def __init__(self, uri: str, update: bool = False): self._uri = uri # pragma: no cover
[docs] def get_uri(self, pinned: bool) -> str: # pragma: no cover """Returns URI for this telescope model data backend :param pinned: Attempt to return an URI that will continue to refer to this specific version of telescope model data :returns: URI identifying data source """ if pinned: raise ValueError( f"Backend {self.backend_name()} does not support pinned URIs!" ) return self._uri
[docs] @abc.abstractmethod def list_keys(self, key_prefix: str = "") -> Iterable[str]: """List children keys Yields all keys with prefix "{key_prefix}/" in ascending order. Exception is if the path is empty, in which case all available keys are listed. :param key_prefix: Path to query """ pass # pragma: no cover
[docs] def exists(self, key: str) -> bool: """Check whether a given key exists. :param key: Key to query :returns: True if key exists """ # Inefficient default implementation return self.get(key) is not None
[docs] @abc.abstractmethod def get(self, key: str) -> bytes: """Get the data stored with the given key :param key: Key to query :returns: Data stored at key, or None if it doesn't exist """ pass # pragma: no cover
[docs] def open(self, key: str) -> typing.IO[bytes]: """Access data at given key as a file-like object Raises `KeyError` if the key does not exist :param key: Key to query """ # Inefficient default implementation data = self.get(key) if data is None: # pragma: no cover raise KeyError( f"Key {key} does not exist in telescope model data!" ) return io.BytesIO(data)
[docs] def copy(self, key: str, dest: str): """Write key contents to a file. Raises `KeyError` if the key does not exist :param key: Key to query :param dest: Path of destination file """ with self.open(key) as fsrc: with open(dest, "wb") as fdest: shutil.copyfileobj(fsrc, fdest)
TELMODEL_BACKENDS = {} def telmodel_backend(cls): TELMODEL_BACKENDS[cls.backend_name()] = cls return cls
[docs]@telmodel_backend class MemoryBackend(TMDataBackend): """ Represents in-memory data. URIs should look as follows:: mem://?[key1]=[value1]&[key2]=[value2] This will directly set the given telescope model data keys to the given values. Useful for testing, and overriding single values in telescope model data. """
[docs] @classmethod def backend_name(cls) -> str: return "mem"
def __init__(self, uri: str, update: bool = False): self._uri = uri parsed = urllib.parse.urlparse(uri) if parsed.netloc or (parsed.path and parsed.path != "/"): raise ValueError( f"Memory TMData backend got path, this is bogus: {uri}" ) # Parse query parameters (the actual "data") qs = urllib.parse.parse_qs( parsed.query, keep_blank_values=True, strict_parsing=True ) # Concatenate any duplicated entries self._contents = { k: "\n".join(vs + [""]).encode("utf8") for k, vs in qs.items() } for k in self._contents: if not TMDataBackend.valid_key(k): LOGGING.warning(f'"{k}" is not a valid telescope model key!')
[docs] def get_uri(self, pinned: bool) -> str: """Returns URI for this telescope model data backend :param pinned: Attempt to return an URI that will continue to refer to this specific version of telescope model data :returns: URI identifying data source """ # Inherently pinned return self._uri
[docs] def list_keys(self, key_prefix: str = "") -> Iterable[str]: """List children keys Yields all keys with prefix "{key_prefix}/" in ascending order :param key_prefix: Path to query """ if not key_prefix: return self._contents.keys() else: return ( k for k in self._contents.keys() if k.startswith(key_prefix) )
[docs] def get(self, key: str) -> bytes: """Get the data stored with the given key :param key: Key to query :returns: Bytes stored at key """ return self._contents.get(key)
[docs]@telmodel_backend class FilesystemBackend(TMDataBackend): """ Retrieves data from a locally accessible file system. URI format:: file://[absolute path] Note that changes to the file system are outside of our control. Consistency must be ensured externally. """
[docs] @classmethod def backend_name(cls) -> str: return "file"
def __init__(self, uri: str, update: bool = False): parsed = urllib.parse.urlparse(uri) self.base_path = pathlib.Path(parsed.netloc + parsed.path).resolve() if not self.base_path.exists() or not self.base_path.is_dir(): raise ValueError(f"Base path does not exist: {self.base_path}") self._uri = self.base_path.as_uri()
[docs] def get_uri(self, pinned: bool) -> str: """Returns URI for this telescope model data backend :param pinned: Attempt to return an URI that will continue to refer to this specific version of telescope model data :returns: URI identifying data source """ # About as permanent as the file system... return self._uri
def _prefix_to_path(self, key: str) -> pathlib.Path: if not TMDataBackend.valid_prefix(key): raise ValueError(f"Not a valid key: {key}") # pragma: no cover return pathlib.Path(self.base_path, key) def _key_to_path(self, key: str) -> pathlib.Path: if not TMDataBackend.valid_key(key): raise ValueError(f"Not a valid key: {key}") # pragma: no cover return pathlib.Path(self.base_path, key) def _path_to_key(self, path: pathlib.Path) -> str: return str(path.relative_to(self.base_path))
[docs] def list_keys(self, key_prefix: str = "") -> Iterable[str]: """List children keys Yields all keys with prefix "{key_prefix}/" in ascending order :param key_prefix: Path to query """ # Get identified path if key_prefix: path_to_list = self._prefix_to_path(key_prefix) else: path_to_list = self.base_path if not path_to_list.exists(): return [] if path_to_list.is_file(): return [self._path_to_key(path_to_list)] return self._list_path(path_to_list)
def _list_path(self, path: pathlib.Path) -> Iterable[str]: """Recursively yield keys for a path.""" # List elements for nested in sorted(path.iterdir()): # A directory? recurse if nested.is_dir(): for key in self._list_path(nested): yield key elif nested.name[0] != ".": key = self._path_to_key(nested) if TMDataBackend.valid_key(key): yield key else: LOGGING.warning("File name no valid key, ignored: %s", key)
[docs] def exists(self, key: str) -> bool: """Check whether a given key exists. :param key: Key to query :returns: True if key exists """ path = self._key_to_path(key) return path.exists() and path.is_file()
[docs] def get(self, key: str) -> bytes: """Get the data stored with the given key :param key: Key to query :returns: Bytes stored at key """ # Check whether file exists path = self._key_to_path(key) if not path.exists() or not path.is_file(): return None # pragma: no cover # Otherwise read the (entire) file with open(path, "rb") as f: return f.read()
[docs] def open(self, key: str, binary: bool = True) -> typing.IO[bytes]: """Access data at given key as a file-like object Raises `KeyError` if the key does not exist :param key: Key to query """ # Check whether path exists path = self._key_to_path(key) if not path.exists(): raise KeyError( f"Key {key} does not exist in telescope model data!" ) # pragma: no cover # Only allow read access return open(path, "rb" if binary else "r")
[docs] def copy(self, key: str, dest: str): """Write key contents to a file. Raises `KeyError` if the key does not exist :param key: Key to query :param dest: Path of destination file """ # Check whether path exists path = self._key_to_path(key) if not path.exists(): raise KeyError( f"Key {key} does not exist in telescope model data!" ) # pragma: no cover # Copy return shutil.copyfile(path, dest)
[docs]@telmodel_backend class GitlabBackend(TMDataBackend): """Represents data in a GitLab repository. URI format:: gitlab://[gitlab server]/[project name]?[branch]#[directory] So for instance:: gitlab://gitlab.com/ska-telescope/ska-telmodel?master#tmdata Would refer to data contained in the ``ska-telmodel`` repository itself. Repositories accessed in this way should make sure to activate the ``tmdata`` standard continuous integration stages (see https://gitlab.com/ska-telescope/templates-repository ) to ensure that telescope model data is cached in the SKAO central artefact repository. Once that has been done, this library will never actually query GitLab directly. Furthermore, this backend will cache all loaded data locally, including resolved Gitlab references (like ``master`` in the example above). This especially means that once instantiated, the version of data will be "pinned" even between different instances (and processes). Use the ``update`` parameter to :py:class:`ska_telmodel.data.TMData` or :py:class:`GitlabBackend` respectively to refresh the local cache. """
[docs] @classmethod def backend_name(cls) -> str: return "gitlab"
def __init__( self, uri: str, update: bool = False, gl: "gitlab.Gitlab" = None, # noqa: F821 try_nexus: bool = True, nexus_url: str = None, env=None, ): parsed = urllib.parse.urlparse(uri) self._server = parsed.netloc self._project_name = parsed.path[1:] self._path = parsed.fragment self._ref = parsed.query self._uri = uri self._try_nexus = try_nexus self._env = env or os.environ # Determine Nexus URL to try self._nexus_url = nexus_url if self._nexus_url is None: self._nexus_url = self._env.get("SKA_TELMODEL_NEXUS") if self._nexus_url is None: self._nexus_url = self._env.get("CAR_TMDATA_REPOSITORY_URL") if self._nexus_url is None: self._nexus_url = self._env.get("CAR_RAW_REPOSITORY_URL") if self._nexus_url is not None: if self._nexus_url.endswith("-internal"): self._nexus_url = ( self._nexus_url[: -len("-internal")] + "-telmodel" ) if self._nexus_url is None: # pragma: no cover self._nexus_url = ( "https://artefact.skao.int/repository/raw-telmodel" ) # (Try to) instantiate Gitlab API try: import gitlab except ModuleNotFoundError: # pragma: no cover raise ValueError( "python-gitlab must be installed to use " "Gitlab telescope model backend!" ) # pragma: no cover # Get project and branch if gl is None: self._gitlab = gitlab.Gitlab(url=f"https://{self._server}") else: self._gitlab = gl self._project = None # Load commit ID, then list of files. Actual file checkout # will be done on demand if required. self._load_commit_id(update) self._load_tree() self._file_backend = None
[docs] def get_uri(self, pinned: bool) -> str: """Returns URI for this telescope model data backend :param pinned: Attempt to return an URI that will continue to refer to this specific version of telescope model data :returns: URI identifying data source """ if pinned: pinned_uri = urllib.parse.ParseResult( scheme=self.backend_name(), netloc=self._server, path=self._project_name, params="", query="~" + self._commit_id, fragment=self._path, ) return urllib.parse.urlunparse(pinned_uri) return self._uri
def _disable_nexus(self): # Warn about it. if self._try_nexus: warning = ( "gitlab://" f"{self._server}/{self._project_name}?{self._ref}#{self._path}" " not cached in SKA CAR - make sure to add tmdata CI!" ) # Loudly if it looks like the kind of ref that should # really be cached. tag_re = re.compile(r"v?[\.0-9]+") if self._ref in ["main", "master"] or tag_re.match(self._ref): warnings.warn(warning) else: LOGGING.info(warning) # pragma: no cover # Stop using Nexus, GitLab is source of truth now self._try_nexus = False def _find_project(self): import gitlab if self._project is not None: return self._project try: self._project = self._gitlab.projects.get(self._project_name) return self._project except gitlab.exceptions.GitlabGetError: raise RuntimeError( f"Could not access {self._server} " f"project {self._project_name}!" ) def _get_default_branch(self): """ Identify default branch of project Only used if not explicitly given """ # Find default branch branches = self._find_project().branches.list() for branch in branches: if branch.attributes["default"]: return branch.name # Fall back to any branch... Shouldn't really happen LOGGING.warning( "No default branch found, using first branch from the list" ) # pragma: no cover return branches[0] # pragma: no cover def _cache_path(self, *sub_path): """ Compose a path for storing something from this telescope model data source """ return pathlib.Path( self._server, self._project_name, self._path, *sub_path ) def _load_commit_id(self, update: bool = False): """ Populates self._commit_id with the hash of the commit pointed to by self._ref. This tries, in order: 1. If the ref is of form ~abc, then "abc" is the hash 2. If the resolved ref is in local cache, we use that (but might continue to check whether it's stale) 3. We check the artefact repository 4. We check GitLab :param update: Disregard cached commit ID """ # Do we have to look up the ref? This is only skipped if we # have a "pinned" ref. We utilise "~" to mark references that # do not need to be looked up - because otherwise there would # always be the possibility that what we are looking at is # actually a branch name in disguise. self._commit_id = None if self._ref.startswith("~"): self._commit_id = self._ref[1:] return if not self._ref: self._ref = "~default~" # Have cached ref? Use self._tree = None ref_cache_name = self._cache_path(self._ref) cached_commit_id = None if cache.cache_exists(ref_cache_name, self._env) and not update: with open(cache.cache_path(ref_cache_name, self._env)) as f: cached_commit_id = f.readline().strip() # If cache is fresh just return cache_time = cache.get_cache_time(ref_cache_name, self._env) if (datetime.datetime.now() - cache_time).days < 1: self._commit_id = cached_commit_id return else: # Otherwise fall through and re-check. We touch the # cache so this doesn't happen too often cache.cache_path( ref_cache_name, self._env ).touch() # pragma: no cover try: # Attempt to look up ref using Nexus if self._try_nexus and self._commit_id is None: with self._open_nexus(self._ref) as f: if f is not None: self._commit_id = f.readline().decode().strip() # Finally try Gitlab if self._commit_id is None: self._disable_nexus() if self._ref == "~default~": self._ref = self._get_default_branch() parsed = urllib.parse.urlparse(self._uri) suggested = urllib.parse.ParseResult( scheme=parsed.scheme, netloc=parsed.netloc, path=parsed.path, params=parsed.params, query=self._ref, fragment=parsed.fragment, ) LOGGING.warning( "Telescope model Gitlab source has no ref, using " + urllib.parse.urlunparse(suggested) ) self._find_project() commit = self._project.commits.get(self._ref) self._commit_id = commit.id # Disagreement with cache? if cached_commit_id != self._commit_id: # No cache? Write if cached_commit_id is None: path = cache.cache_path(ref_cache_name, self._env) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: f.write(self._commit_id) else: # pragma: no cover # Out of date? Warn, use cached commit ID warnings.warn( f"{self._uri} has been updated to " f"{self._commit_id[:6]} " f"(local: {cached_commit_id[:6]}). " "Consider 'ska-telmodel --update pin'" ) self._commit_id = cached_commit_id except urllib.error.URLError as e: # pragma: no cover # Have a cached commit ID? Then this is not fatal, convert # it into a warning if cached_commit_id is None: raise self._commit_id = cached_commit_id warnings.warn(f"{self._uri} could not be checked: {e}") assert self._commit_id is not None def _load_tree(self): """ Load list of files from repository """ # Have cached files? Use self._tree = None tree_cache_name = self._cache_path("~" + self._commit_id, "tree.json") if cache.cache_exists(tree_cache_name, self._env): with open(cache.cache_path(tree_cache_name, self._env)) as f: self._tree = json.load(f) # Try to query from Nexus if self._try_nexus and self._tree is None: with self._open_nexus("tmtree.json", self._commit_id) as json_file: if json_file is not None: self._tree = json.load(json_file) # Load using Gitlab API if self._tree is None: # Stop using Nexus, GitLab is source of truth now self._disable_nexus() self._find_project() self._tree = self._project.repository_tree( path=self._path, ref=self._commit_id, recursive=True, all=True ) # Write cache if not cache.cache_exists(tree_cache_name, self._env): cache.cache_path(tree_cache_name, self._env).parent.mkdir( parents=True, exist_ok=True ) with open(cache.cache_path(tree_cache_name, self._env), "w") as f: json.dump(self._tree, f) # Filter + sort tree self._keys = [] _path_prefix = self._path + "/" if self._path else "" for entry in self._tree: # Make sure it is a file if entry["type"] != "blob": continue # Strip path if not entry["path"].startswith(_path_prefix): continue key = entry["path"][len(_path_prefix) :] # Check that it is a valid key if not TMDataBackend.valid_key(key): LOGGING.debug( "Gitlab file name no valid key, ignored: %s", key ) else: self._keys.append(key) self._keys = sorted(self._keys) def _load_data(self): """ Load files from repository """ # Load + cache repository content, if not already done so cache_name = self._cache_path("~" + self._commit_id, "tmdata") if not cache.cache_exists(cache_name, self._env): # Attempt to get from nexus self._directory = None if self._try_nexus: filename_patterns = [ f"{self._path}/*", ] with self._open_nexus("tmdata.tar.gz", self._commit_id) as f: if f is not None: LOGGING.info(f"{self._uri}: Loading Nexus tarball...") self._directory = cache.create_from_tarball( cache_name, f, self._env, filename_patterns ) # Fall back to loading directly from GitLab if self._directory is None: self._disable_nexus() self._directory = self._load_data_gitlab(cache_name) else: self._directory = cache.cache_path(cache_name, self._env) # Instantiate backend to access checkout self._file_backend = FilesystemBackend(f"file://{self._directory}/") @contextmanager def _open_nexus( self, file_name: str, commit_id: str = None ) -> Iterable[str]: # Download to temporary location with tempfile.NamedTemporaryFile(suffix=file_name) as temp: # Compose URL url = f"{self._nexus_url}/{self._server}/" url += f"{self._project_name}/{self._path}/" if commit_id: url += f"~{commit_id}/" url += file_name LOGGING.debug(f"Querying Nexus: GET {url}") # Attempt to obtain data from nexus try: urllib.request.urlretrieve(url, temp.name) except urllib.error.HTTPError as err: if err.code == 404: LOGGING.debug(f"... got {err.code} ({err.reason})") yield None return raise # pragma: no cover except urllib.error.URLError as err: if isinstance(err.reason, FileNotFoundError): LOGGING.debug(f"... not found ({err.reason})") yield None return raise # pragma: no cover # (Re)open file for reading with open(temp.name, "rb") as f: yield f def _load_data_gitlab(self, name: str): """ Load tarball of repository content from GitLab and cache it locally """ LOGGING.info(f"{self._uri}: Loading GitLab tarball...") # Download tarball tarball_fd, tarball_name = tempfile.mkstemp( suffix=".tar.gz", prefix="tmdata-" ) try: with os.fdopen(tarball_fd, "w+b") as wfile: # Cannot use GitLab Python wrapper, as it does not # currently support passing "path". So compose the REST # call directly. result = self._gitlab.http_get( f"/projects/{self._find_project().encoded_id}/" "repository/archive", query_data=dict(sha=self._commit_id, path=self._path), raw=True, streamed=True, ) import gitlab.utils gitlab.utils.response_content( result, True, action=wfile.write, chunk_size=1024, iterator=False, ) # Unpack + cache project_name = pathlib.Path(self._project_name).name filename_patterns = [ # Gitlab tarballs have this prepended f"{project_name}-{self._commit_id}-" f"{self._commit_id}-{self._path.replace('/', '-')}" f"/{self._path}/*", ] wfile.seek(0) return cache.create_from_tarball( name, wfile, self._env, filename_patterns ) finally: os.remove(tarball_name)
[docs] def list_keys(self, key_prefix: str = "") -> Iterable[str]: for key in self._keys: if key.startswith(key_prefix): yield key
[docs] def exists(self, key: str) -> bytes: return key in self._keys
[docs] def get(self, key: str) -> bytes: if self._file_backend is None: self._load_data() return self._file_backend.get(key)
[docs] def open(self, key: str) -> typing.IO[bytes]: if self._file_backend is None: self._load_data() return self._file_backend.open(key)
[docs] def copy(self, key: str, dest: str): if self._file_backend is None: self._load_data() return self._file_backend.copy(key, dest)
[docs]@telmodel_backend class CARBackend(GitlabBackend): """Represents data in (a mirror of) the SKA central artefact repository. Permissible URI formats:: car:[project name]?[branch]#[directory] car://[gitlab server]/[project name]?[branch]#[directory] So for instance:: car:ska-telmodel?master car://gitlab.com/ska-telescope/ska-telmodel?master#tmdata The source of truth might still be Gitlab, yet this backend will only work with artefacts that have been uploaded to the CAR. The short form URI will be expanded into the long form automatically. """
[docs] @classmethod def backend_name(cls) -> str: return "car"
def __init__(self, uri: str, *args, **kwargs): # If we have no netloc: Assume that the prefix is # gitlab.com/ska-telescope parsed = urllib.parse.urlparse(uri) if not parsed.netloc: parsed = parsed._replace(netloc=DEFAULT_NETLOC) parsed = parsed._replace(path=DEFAULT_PATH + parsed.path) if not parsed.fragment: parsed = parsed._replace(fragment=DEFAULT_FRAGMENT) super().__init__(parsed.geturl(), *args, **kwargs) def _shorten_uri(self, uri: str): """ Remove default netloc/path and fragment from URI, if appropriate. :param uri: Input URI :returns: Possibly shortened URI """ parsed = urllib.parse.urlparse(uri) if parsed.netloc == DEFAULT_NETLOC and parsed.path.startswith( DEFAULT_PATH ): parsed = parsed._replace(netloc=None) parsed = parsed._replace(path=parsed.path[len(DEFAULT_PATH) :]) if parsed.fragment == DEFAULT_FRAGMENT: parsed = parsed._replace(fragment=None) return parsed.geturl() def _disable_nexus(self): # Always raise an error raise RuntimeError( "gitlab://" f"{self._server}/{self._project_name}?{self._ref}#{self._path}" " not found in SKA CAR - make sure to add tmdata CI!" )
[docs] def get_uri(self, pinned: bool) -> str: """Returns URI for this telescope model data backend :param pinned: Attempt to return an URI that will continue to refer to this specific version of telescope model data :returns: URI identifying data source """ return self._shorten_uri(super().get_uri(pinned))