Source code for scaffolder.utils

#!/usr/bin/env python3
"""
scaffolder/utils.py - Utility functions for scaffolder/generator.py
"""
import json
import os
import pwd
import re
import textwrap
from datetime import date
from pathlib import Path

NL = "\n"
TODAY = date.today()
OUTPUT_ROOT = Path(__file__).resolve().parent.parent / "output"
REPO_ROOT = OUTPUT_ROOT.parent
DEFAULT_DIR_MODE = 0o777
DEFAULT_FILE_MODE = 0o666


def _env_int(*names: str) -> int | None:
    for name in names:
        value = os.environ.get(name)
        if value is None:
            continue
        value = value.strip()
        if not value:
            continue
        try:
            return int(value)
        except ValueError:
            continue
    return None


def _env_user(*names: str) -> tuple[int, int] | None:
    for name in names:
        username = os.environ.get(name)
        if not username:
            continue
        try:
            entry = pwd.getpwnam(username)
        except KeyError:
            continue
        return entry.pw_uid, entry.pw_gid
    return None


def _resolve_target_ids() -> tuple[int | None, int | None]:
    repo_stat = REPO_ROOT.stat()
    uid: int | None = None
    gid: int | None = None
    uid_src: str | None = None
    gid_src: str | None = None

    ska_uid = _env_int("SKA_WIDGET_OUTPUT_UID")
    if ska_uid is not None:
        uid = ska_uid
        uid_src = "env"
    else:
        host_uid = _env_int("HOST_UID", "SUDO_UID", "PKEXEC_UID")
        if host_uid not in (None, 0):
            uid = host_uid
        if uid is not None:
            uid_src = "env"

    ska_gid = _env_int("SKA_WIDGET_OUTPUT_GID")
    if ska_gid is not None:
        gid = ska_gid
        gid_src = "env"
    else:
        host_gid = _env_int("HOST_GID", "SUDO_GID", "PKEXEC_GID")
        if host_gid not in (None, 0):
            gid = host_gid
        if gid is not None:
            gid_src = "env"

    user_hint = _env_user("SKA_WIDGET_OUTPUT_USER", "USER")

    if user_hint:
        user_uid, user_gid = user_hint
        if uid is None:
            uid = user_uid
            uid_src = "user"
        if gid is None:
            gid = user_gid
            gid_src = "user"

    current_uid = os.getuid()
    current_gid = os.getgid()

    if uid is None:
        uid = current_uid
        uid_src = "current"
    if gid is None:
        gid = current_gid
        gid_src = "current"

    if uid == 0 and uid_src != "env":
        repo_uid = repo_stat.st_uid or None
        if repo_uid and repo_uid != 0:
            uid = repo_uid
            uid_src = "repo"
        else:
            uid = None
    if gid == 0 and gid_src != "env":
        repo_gid = repo_stat.st_gid or None
        if repo_gid and repo_gid != 0:
            gid = repo_gid
            gid_src = "repo"
        else:
            gid = None

    return uid, gid


TARGET_UID, TARGET_GID = _resolve_target_ids()


def _within_output(path: Path) -> bool:
    try:
        path.resolve().relative_to(OUTPUT_ROOT)
    except ValueError:
        return False
    return True


def _set_mode(path: Path, mode: int) -> None:
    try:
        os.chmod(path, mode)
    except (FileNotFoundError, PermissionError, OSError):
        # If we cannot change permissions we continue gracefully; another
        # process may already manage the permissions on this path.
        pass


def _set_owner(path: Path) -> None:
    if TARGET_UID is None and TARGET_GID is None:
        return
    try:
        stat_result = path.stat()
    except FileNotFoundError:
        return

    uid = TARGET_UID if TARGET_UID is not None else stat_result.st_uid
    gid = TARGET_GID if TARGET_GID is not None else stat_result.st_gid

    if stat_result.st_uid == uid and stat_result.st_gid == gid:
        return

    try:
        os.chown(path, uid, gid)
    except (PermissionError, OSError):
        # Lack of privileges should not abort the run.
        pass


[docs] def ensure_dir_permissions(path: Path) -> None: if _within_output(path): _set_mode(path, DEFAULT_DIR_MODE) _set_owner(path)
[docs] def ensure_file_permissions(path: Path) -> None: if _within_output(path): _set_mode(path, DEFAULT_FILE_MODE) _set_owner(path)
[docs] def ensure_dir_chain_permissions(path: Path) -> None: resolved = path.resolve() for directory in [resolved, *resolved.parents]: if not _within_output(directory): break ensure_dir_permissions(directory)
[docs] def pascal_to_kebab(n: str) -> str: s1 = re.sub(r"(.)([A-Z][a-z]+)", r"\1-\2", n) return re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", s1).lower()
[docs] def pascal_to_camel(n: str) -> str: return n[0].lower() + n[1:] if n else n
[docs] def js_lit(v): def _js_key(key: str) -> str: if re.match(r"^[A-Za-z_$][A-Za-z0-9_$]*$", key): return key return "'" + key.replace("'", r"\'") + "'" if isinstance(v, str): s = v.strip() if (s.startswith("[") and s.endswith("]")) or ( s.startswith("{") and s.endswith("}") ): try: return js_lit(json.loads(s)) except Exception: pass return "'" + v.replace("'", r"\'") + "'" if isinstance(v, bool): return "true" if v else "false" if v is None: return "null" if isinstance(v, list): return "[" + ", ".join(js_lit(item) for item in v) + "]" if isinstance(v, dict): inner = ", ".join( f"{_js_key(str(k))}: {js_lit(val)}" for k, val in v.items() ) return "{ " + inner + " }" return json.dumps(v, ensure_ascii=False)
[docs] def gql_to_json_type(gql_type: str) -> str: base = gql_type.strip("![]") if gql_type.lstrip().startswith("["): return "array" if base in ("Int", "ID"): return "integer" if base == "Float": return "number" if base == "Boolean": return "boolean" return "string"
def _inject_header(op_kw: str, body: str, var_types: dict) -> str: body = body.strip() if not body: return body has_header = body.startswith(op_kw) and re.search(r"\(\s*\$", body) if has_header or not var_types: return body defs = ", ".join(f"${n}: {var_types.get(n,'JSON')}" for n in var_types) if body.startswith(op_kw): brace = body.find("{") header = body[:brace].rstrip() selection = body[brace:].lstrip() return f"{header}({defs}) {selection}" if not body.startswith("{"): body = "{\n" + body + "\n}" return f"{op_kw} Execute({defs}) {body}" def _prepare_query(body: str, var_types: dict) -> str: return _inject_header("query", body, var_types) def _prepare_subscription(body: str, var_types: dict) -> str: return _inject_header("subscription", body, var_types) def _prepare_mutation(body: str, var_types: dict) -> str: return _inject_header("mutation", body, var_types) def _repl(txt: str, ph: dict) -> str: txt = re.sub( r"\{\s*\{\s*([A-Z_]+)\s*\}\s*\}Widget", lambda m: ph.get(m[1], m[0]) + "Widget", txt, ) return re.sub( r"\{\s*\{\s*([A-Z_]+)\s*\}\s*\}", lambda m: ph.get(m[1], m[0]), txt ) def _make_default_cfg(var_names, var_defaults, extra_defaults=None) -> str: combined: dict = {} for key in var_names: combined[key] = var_defaults[key] if isinstance(extra_defaults, dict): for key in sorted(extra_defaults): combined[key] = extra_defaults[key] if not combined: return "{}" inner = ",\n".join(f" {k}: {js_lit(combined[k])}" for k in combined) return "{\n" + inner + "\n}" def _make_schema(var_names, var_types, extra_schema=None) -> str: props = [] required: list[str] = [] for var in var_names: gql_type = var_types.get(var, "String") json_type = gql_to_json_type(gql_type) required.append(var) props.append( " {name}: {{ type: '{json}', title: '{name}', description: 'GraphQL variable {name} ({gql}).' }}".format( name=var, json=json_type, gql=gql_type ) ) if isinstance(extra_schema, dict): for key in sorted(extra_schema): props.append(f" {key}: {extra_schema[key]}") if not props: return "{ type: 'object', properties: {} }" req_literal = "" if required: req_literal = ( ",\n required: [" + ", ".join(f"'{name}'" for name in required) + "]" ) return ( "{\n type: 'object'" + req_literal + ",\n properties: {\n" + ",\n".join(props) + "\n }\n}" ) def _build_readme( pascal, repo_slug, names, vdef, vtype, operations: list[dict] | None = None, ) -> str: kebab = pascal_to_kebab(pascal) package_name = f"@ska-octopus-widgets/{kebab}-widget" if names: rows = [ "| Name | Default | GraphQL type |", "|------|---------|--------------|", ] for var in names: default = ( json.dumps(vdef[var]) if vdef[var] is not None else "null" ) rows.append(f"| {var} | `{default}` | `{vtype.get(var, '')}` |") var_tbl = "\n".join(rows) else: var_tbl = "_No variables_" op_types = sorted( { (op.get("type") or "").lower() for op in (operations or []) if (op.get("type") or "").strip() } ) mode_map = { "polling": "polling queries (HTTP)", "stream": "subscriptions (WebSocket)", "mutation": "mutations (commands)", } if op_types: operation_modes = ", ".join(mode_map.get(op, op) for op in op_types) else: operation_modes = "none scaffolded (structure only)" return textwrap.dedent( f""" # {pascal} Widget _Generated on {TODAY.isoformat()}._ ## Purpose `{pascal}` is an Octopus detached widget package. It can run standalone during development and can also be loaded dynamically by `ska-octopus-frontend` at runtime. ## Bigger Picture (Octopus Suite) The `ska-octopus-suite` combines backend, frontend, deployment charts, and helper tooling. In that architecture: - `ska-octopus-backend` exposes GraphQL over HTTP/WebSocket and handles runtime widget delivery metadata. - `ska-octopus-frontend` loads widget bundles from the backend catalog and each bundle self-registers via `Octopus.registerWidget(...)`. - Widgets stay decoupled from frontend releases, so teams can ship widget updates without rebuilding the host app. Reference: https://developer.skao.int/projects/ska-octopus-suite/en/latest/overview.html ## Widget Metadata - Repository folder: `{repo_slug}` - Package name: `{package_name}` - GraphQL modes scaffolded: {operation_modes} ## Runtime Configuration The widget configuration schema is defined in `src/index.ts`. Variables detected during scaffolding: {var_tbl} ## Local Development (Standalone) ```bash git clone <repo-url> {repo_slug} cd {repo_slug} npm install npm run dev ``` Default dev endpoints are configured in `dev/main.tsx`: - GraphQL HTTP: `http://localhost:8000/graphql` - GraphQL WS: `ws://localhost:8000/graphql` Build and test: ```bash npm run build npm run test ``` ## Integrate with Local `ska-octopus-frontend` 1. Add a Vite alias to your local widget source: `@ska-octopus-widgets/{kebab}-widget` -> `<path-to>/{repo_slug}/src` 2. Import the module in `src/dashboard.ts`: `import '@ska-octopus-widgets/{kebab}-widget';` 3. Start frontend dev server and add the widget from the picker. ## Integrate with Deployed Octopus 1. Publish `{package_name}` to your npm registry. 2. Add/update it in Octopus Config UI -> Widgets Store. 3. The backend serves widget bundle metadata; frontend fetches and executes the bundle at runtime. Note: deployments can restrict allowed widget registries/scopes (for example through `WIDGET_ALLOWED_NPM_SCOPES`). """ ) def _write(path: Path, txt: str): txt = textwrap.dedent(txt).replace("\r\n", NL) if not txt.endswith(NL): txt += NL path.parent.mkdir(parents=True, exist_ok=True) ensure_dir_chain_permissions(path.parent) path.write_text(txt, encoding="utf-8") ensure_file_permissions(path)