Source code for ska_sdp_e2e_batch_continuum_imaging.pipelines.environment

# pylint: disable=too-few-public-methods

import os
from contextlib import contextmanager


[docs] class EnvironmentContext: """ Class deals with environment context detection for the pipeline. """
[docs] @staticmethod def is_multinode() -> bool: """ Determine if the current execution environment is multinode. Returns ------- True if running in a multinode environment, False otherwise. """ return ( EnvironmentContext.slurm_job_id() is not None and EnvironmentContext.slurm_nodes() > 1 )
[docs] @staticmethod def slurm_job_id() -> str | None: """ Return job id of the slurm job if running inside a slurm resource allocation, else return None. """ return os.environ.get("SLURM_JOB_ID")
[docs] @staticmethod def slurm_nodes() -> int: """ Return total number of nodes in a slurm resource allocation, else return 1. """ return int(os.environ.get("SLURM_JOB_NUM_NODES", 1))
[docs] @contextmanager def use_env_vars(env: dict): """ Temporarily updates the environment variables within a context. This function is intended to be used as a context manager. It updates the current process's environment variables with the provided dictionary for the duration of the context, and restores the original environment afterwards. Parameters ---------- env : dict A dictionary of environment variables to set temporarily. Yields ------ None Control is yielded to the context block. Examples -------- >>> with use_env_vars({'MY_VAR': 'value'}): ... # 'MY_VAR' is set within this block ... pass # 'MY_VAR' is removed/restored to its previous value after the block """ _envs = os.environ.copy() os.environ.update(env) yield os.environ = _envs
[docs] def filter_invalid(env_vars: dict) -> dict: """ Filter out environment variables with None values. Parameters ---------- env_vars : dict A dictionary of environment variables. Returns ------- dict A filtered dictionary with only valid environment variables. """ return {k: v for k, v in env_vars.items() if v is not None}