import dataclasses
import multiprocessing
import os
import shutil
import sys
import venv
from ska_oso_oet.procedure.gitmanager import GitArgs, GitManager
PYPI_EXTRA_URLS = os.getenv(
"PYPI_EXTRA_URLS", "https://artefact.skao.int/repository/pypi-internal/simple"
).split(",")
[docs]
@dataclasses.dataclass
class Environment:
"""
Represents a Python virtual environment for script execution.
The creating and created Events are used to coordinate duplicate
environment creation prevention when multiple procedures request
the same environment.
These Events are created in the main process and inherited by
ScriptWorker child processes, allowing cross-process coordination
without pickling.
"""
env_id: str
creating: multiprocessing.Event
created: multiprocessing.Event
location: str
site_packages: str
[docs]
class EnvironmentManager:
"""
Manages Python virtual environments for script execution.
Creates the venv structure and Event objects in the main process.
The actual pip install happens in ScriptWorker processes, coordinated
via the creating/created Events.
"""
[docs]
def __init__(
self,
mp_context: multiprocessing.context.BaseContext | None = None,
base_dir: str = "/tmp/environments/",
):
if mp_context is None:
mp_context = multiprocessing.get_context()
self._mp_context = mp_context
self.base_dir = base_dir
self._envs: dict[str, Environment] = {}
[docs]
def create_env(self, git_args: GitArgs) -> Environment:
"""
Get an existing environment or create a new one.
Creates the venv structure but NOT the pip install - that happens
in ScriptWorker processes coordinated via the creating/created Events.
:param git_args: Git repository arguments
:return: Environment object with venv created but pip not yet installed
"""
if git_args.git_commit:
git_commit = git_args.git_commit
else:
git_commit = GitManager.get_commit_hash(git_args)
# Return existing environment if already created
if git_commit in self._envs:
return self._envs[git_commit]
# Create new environment (venv only, pip install happens later in ScriptWorker)
project_name = GitManager.get_project_name(git_args.git_repo)
# Create a new Python virtual environment
venv_dir = os.path.join(self.base_dir, project_name, git_commit)
venv.create(
env_dir=venv_dir,
clear=True,
with_pip=True,
symlinks=True,
)
# Calculate site-packages path
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
venv_site_pkgs = os.path.join(
venv_dir, "lib", f"python{python_version}", "site-packages"
)
environment = Environment(
env_id=git_commit,
creating=self._mp_context.Event(),
created=self._mp_context.Event(),
location=venv_dir,
site_packages=venv_site_pkgs,
)
self._envs[git_commit] = environment
return environment
[docs]
def delete_env(self, env_id: str) -> None:
"""
Delete an environment.
:param env_id: ID of environment to delete
"""
env = self._envs.get(env_id)
if env:
shutil.rmtree(env.location, ignore_errors=True)
del self._envs[env_id]