Source code for ska_ser_skallop.bdd_test_data_manager.data_manager

import os
import tempfile

import requests

USERNAME = os.environ.get("CAR_RAW_USERNAME")
PASSWORD = os.environ.get("CAR_RAW_PASSWORD")
HOST = os.environ.get("CAR_RAW_REPOSITORY_URL")


[docs]class FileAlreadyExists(Exception): """Enforces version control for the artefact"""
def _modify_artefact_directory(artefact_directory, path_format): # path/to/files/ if path_format == "path/to/files/": if artefact_directory.startswith("/"): artefact_directory = artefact_directory[1:] if not artefact_directory.endswith("/"): artefact_directory = artefact_directory + "/" # /path/to/files elif path_format == "/path/to/files": if not artefact_directory.startswith("/"): artefact_directory = "/" + artefact_directory if artefact_directory.endswith("/"): artefact_directory = artefact_directory[:-1] return artefact_directory
[docs]def upload_test_data(file_path: str, artefact_directory: str) -> bool: """ Upload the test data file to the repository. :param file_path: the absolute path of the file to be uploaded. :param artefact_directory: destination directory for files to be uploaded. e.g. path/to/files/ :return: An indication of the outcome of the request """ # Make sure artefact_directory has the expected number of slashes i.e. # path/to/files/ artefact_directory = _modify_artefact_directory( artefact_directory, "path/to/files/" ) file_name = file_path.split("/")[-1] # Uploading the same file is not supported; only the meta data is updated for an # already existing file but the content will remain the same. To prevent this, # inform the user # that the file already exists and abort request. test_data_file = search_for_file(file_name, artefact_directory) if f"{artefact_directory}{file_name}" in test_data_file: raise FileAlreadyExists( f"Artefact ({file_name}) already exists in repository " f"({artefact_directory})" ) with open(file_path, "rb") as open_file: response = requests.post( f"{HOST}/service/rest/v1/components", params={"repository": "raw"}, auth=requests.auth.HTTPBasicAuth(USERNAME, PASSWORD), files={ "raw.asset1": (file_name, open_file), "raw.directory": (None, artefact_directory), "raw.asset1.filename": (None, file_name), }, ) response.raise_for_status() return response.ok
[docs]def download_test_data(file_name: str, artefact_directory: str) -> str: """ Download the test data file from the repository. :param file_name: The name of the file on the repository. :param artefact_directory: Destination directory for specified file. e.g. path/to/files/ :return: The path to the downloaded file. """ # Make sure artefact_directory has the expected number of slashes i.e. # path/to/files/ artefact_directory = _modify_artefact_directory( artefact_directory, "path/to/files/" ) download_url = f"{HOST}/repository/raw/{artefact_directory}{file_name}" response = requests.get( download_url, auth=requests.auth.HTTPBasicAuth(USERNAME, PASSWORD) ) response.raise_for_status() with tempfile.NamedTemporaryFile("wb", delete=False) as tmp_file: tmp_file.write(response.content) return tmp_file.name
[docs]def search_for_file(filtering_string: str, artefact_directory: str) -> list[str]: """ Search a directory for a specified file. :param filtering_string: The name of the file on the repository. It can also be a wildcard which will have all files listed in that directory. :param artefact_directory: Directory to search for the test data. e.g. /path/to/files Note: contents of sub-directories in the path will not be shown for nexus. :return: A list of all file paths or a specified file path in the artefact directory. """ # Make sure artefact_directory has the expected number of slashes i.e. # /path/to/files artefact_directory = _modify_artefact_directory( artefact_directory, "/path/to/files" ) test_data_files = [] continuation_token_is_not_null = True params = { "repository": "raw", "group": artefact_directory, "name": f"{artefact_directory[1:]}/{filtering_string}", } while continuation_token_is_not_null: response = requests.get( f"{HOST}/service/rest/v1/search/assets", params, auth=requests.auth.HTTPBasicAuth(USERNAME, PASSWORD), ) response.raise_for_status() json_response = response.json() for item in json_response["items"]: test_data_files.append(item["path"]) if json_response["continuationToken"]: params["continuationToken"] = json_response["continuationToken"] else: continuation_token_is_not_null = False return test_data_files