Source code for ska_db_oda.infrastructure.filesystem.repository

This module contains implementations of the AbstractRepository class, using
the filesystem as the data store.
import functools
import logging
import operator
import os
import re
from os import W_OK, PathLike, access, environ
from pathlib import Path
from typing import Dict, List, Optional, TypeVar, Union

from ska_oso_pdm import Metadata

from ska_db_oda.domain import (
from ska_db_oda.domain.query import DateQuery, MatchType, QueryParams, UserQuery
from ska_db_oda.domain.repository import RepositoryBridge
from ska_db_oda.infrastructure.filesystem.mapping import FilesystemMapping

LOGGER = logging.getLogger(__name__)

T = TypeVar("T", bound=OSOEntity)
U = TypeVar("U")

[docs] class FilesystemBridge(RepositoryBridge[T, U]): """ Implementation of the Repository bridge which persists entities to a filesystem. Entities will be stored under the following filesystem structure: `/<base_working_dir>/<entity_type_dir/<entity_id>/<version>.json` For example, by default version 1 of an SBDefinition with sbd_id sbi-mvp01-20200325-00001 will be stored at: `/var/lib/oda/sbd/sbi-mvp01-20200325-00001/1.json` """ def __init__( self, filesystem_mapping: FilesystemMapping, base_working_dir: Union[str, PathLike] = Path("/var/lib/oda"), ): base_working_dir = Path(environ.get("ODA_DATA_DIR", base_working_dir)) "Initialising ODA filesystem backend. Working directory=%s", base_working_dir, ) if not base_working_dir.is_dir(): raise FileNotFoundError(f"Directory {base_working_dir} not found") if not access(base_working_dir, W_OK): raise PermissionError(f"Directory {base_working_dir} not writable") self._base_working_dir = base_working_dir self.working_dir = self._base_working_dir / filesystem_mapping.entity_type_dir Path(self.working_dir).mkdir(parents=True, exist_ok=True) self._transactions: Dict[Path, str] = {} self._serialise = filesystem_mapping.serialise self._deserialise = filesystem_mapping.deserialise self._entity_id_from_path = filesystem_mapping.entity_id_from_path def __len__(self): """ Return the size of this repository. Note that this is a naive implementation that simply counts the number JSON files in the working directory. It does not verify that each JSON file is a serialised, valid SB. """ return sum(1 for _ in self.working_dir.rglob("*.json")) def __contains__(self, entity_id: U): """ Return True if a version of an entity with the given ID is present in this repository. :param entity_id: ID to search for """ entity_dir = self._path_for_entity_id_dir(entity_id) return bool(entity_dir.exists() and os.listdir(entity_dir))
[docs] def create(self, entity: T) -> T: """Implementation of the RepositoryBridge method. To mimic the real database, entities are added to a list of pending transactions and only written to the filesystem when the unit of work is committed. See :func:`~ska_db_oda.domain.repository.RepositoryBridge.create` docstring for details """ entity_id = get_identifier_or_fetch_from_skuid(entity) entity = self.update_metadata(entity) Path(self._path_for_entity_id_dir(entity_id)).mkdir(parents=True, exist_ok=True) entity_path = self._path_for_entity(entity) LOGGER.debug( "Adding entity with ID %s to the filesystem transactions under path %s", entity_id, entity_path, ) serialised_entity = self._serialise(entity) self._transactions[entity_path] = serialised_entity return entity
[docs] def read(self, entity_id: U) -> T: """ Gets the latest version of the entity with the given entity_id. As this method will always be accessed in the context of a UnitOfWork, the pending transactions also need to be checked for a version to return. (Similar to with a database implementation where an entity that was added to a transaction but not committed would still be accessible inside the transaction.) """ LOGGER.debug("Getting entity with ID %s from the filesystem", entity_id) pending_versions = [ int(os.path.splitext([0]) for path in self._transactions.keys() if entity_id in str(path) ] entity_dir_path = self._path_for_entity_id_dir(entity_id) # The metadata checks will mean a version in the pending transactions would always # be a newer version than any in the filesystem, so check the pending versions first. if pending_versions: latest_entity_path = entity_dir_path / f"{max(pending_versions)}.json" return self._deserialise(self._transactions[latest_entity_path]) elif entity_dir_path.exists(): # Filenames are of the form 1.json versions = [ int(os.path.splitext([0]) for entity_path in entity_dir_path.glob("*.json") ] if versions: latest_entity_path = entity_dir_path / f"{max(versions)}.json" return self._deserialise(latest_entity_path.read_text()) raise KeyError( f"Not found. The requested entity_id {entity_id} could not be found." )
[docs] def update(self, entity: T) -> T: """Implementation of the RepositoryBridge method. To mimic the real database, entities are added to a list of pending transactions and only written to the filesystem when the unit of work is committed. See :func:`~ska_db_oda.domain.repository.RepositoryBridge.update` docstring for details """ entity_id = get_identifier_or_fetch_from_skuid(entity) entity = self._set_new_metadata(entity) Path(self._path_for_entity_id_dir(entity_id)).mkdir(parents=True, exist_ok=True) entity_path = self._path_for_entity(entity) LOGGER.debug( "Adding entity with ID %s to the filesystem transactions under path %s", entity_id, entity_path, ) serialised_entity = self._serialise(entity) self._transactions[entity_path] = serialised_entity return entity
[docs] def query(self, qry_params: QueryParams) -> List[T]: # strategy for this implementation is to: # # 1. create a list of filter functions matching the requirements of the query # 2. for each entity in the repo, apply the filter functions # 3. if the entity passes each test, add its ID to the list of results # # With this strategy we can reuse filter functions to build compound # complex queries, e.g., entities created by user X after 1/1/2023 final_result = [] filter_fns = QueryFilterFactory.filter_functions_for_query(qry_params) for entity_id in self._all_entity_ids(): entity = if all(fn(entity) for fn in filter_fns): final_result.append(entity) return final_result
def _get_latest_metadata(self, entity: T) -> Optional[Metadata]: """Implementation of the abstract MetaDataMixin method for a filesystem backend. See :func:`~ska_db_oda.domain.metadatamixin.MetadataMixin._get_latest_metadata` docstring for details """ try: return except KeyError: return None def _path_for_entity(self, entity: T) -> Path: """ Returns the final part of path where the serialised entity is stored, eg `sbd/sbi-mvp01-20200325-00001/2.json` """ return ( self.working_dir / get_identifier(entity) / f"{entity.metadata.version}.json" ) def _path_for_entity_id_dir(self, entity_id: U) -> Path: """ Returns the path of the directory that all versions of the entity with the given entity_id are stored under, eg `/var/lib/oda/sbd/sbi-mvp01-20200325-00001/` """ return self.working_dir / str(entity_id) def _all_entity_ids(self) -> List[U]: """ Return a list of entity IDs, one entity ID for each entity stored in the repository. """ # path format is <entity ID>/<version>.json so the list of directories # in the working directory should give a list of entity IDs. # iterdir() gives paths whereas we entity IDs of the correct type, # hence we call _entity_id_from_path for each directory return [ self._entity_id_from_path(f) for f in self.working_dir.iterdir() if f.is_dir() ]
[docs] class QueryFilterFactory: """ Factory class that returns a list of Python functions equivalent to a user query. Each function processes an entity, returning True if the entity passes the query test. """ @staticmethod def filter_functions_for_query(query: QueryParams): filter_fns = [] if isinstance(query, UserQuery): filter_fns.append(QueryFilterFactory.match_editor(query)) elif isinstance(query, DateQuery): filter_fns.append(QueryFilterFactory.filter_between_dates(query)) else: raise ValueError(f"Unrecognised query: {query}") return filter_fns
[docs] @staticmethod def match_editor(query: UserQuery): """ Returns a function that returns True if a document editor matches a (sub)string. """ def regex_match(obj): """ creted new function for pattern match """ pattern = re.compile(r"{}".format(query.entity_id)) if return obj if (query.user is None or query.user == "") and ( query.entity_id is None or query.entity_id == "" ): raise ValueError( f"User or Entity match must to be specified. Got {query.user!r} or" f" {query.entity_id!r}" ) if query.match_type == MatchType.EQUALS: match_fn = functools.partial( operator.eq, query.user if query.user else query.entity_id ) elif query.match_type == MatchType.STARTS_WITH: match_fn = operator.methodcaller( "startswith", query.user if query.user else query.entity_id ) elif query.match_type == MatchType.CONTAINS: # created custom fuction for regex and pass to match_fn match_fn = regex_match else: raise ValueError(f"Invalid match type: {query.match_type}") def match(obj): if query.user: return match_fn(obj.metadata.created_by) or match_fn( obj.metadata.last_modified_by ) # id is difer for each entity so added if conditions for each classes # SBDefinition has sbd_id, Project has prj_id if query.entity_id: return match_fn(get_identifier(obj)) return match
[docs] @staticmethod def filter_between_dates(query: DateQuery): """ Returns a function that returns True if a date is between a given range. """ if query.query_type == DateQuery.QueryType.CREATED_BETWEEN: accessor = operator.attrgetter("metadata.created_on") elif query.query_type == DateQuery.QueryType.MODIFIED_BETWEEN: accessor = operator.attrgetter("metadata.last_modified_on") else: raise ValueError(f"Unrecognised date query type: {query.query_type}") if query.start is None and query.end is None: raise ValueError("Query start and query end can not be None") if ( query.start is not None and query.end is not None and query.start >= query.end ): raise ValueError("Query end date must be later than query start date") def ge_start(value): # always match if no start date specified if query.start is None: return True return value.timestamp() >= query.start.timestamp() def lt_end(value): # always match if no end date specified if query.end is None: return True return value.timestamp() < query.end.timestamp() def match(obj): o = accessor(obj) return ge_start(o) and lt_end(o) return match