#!/usr/bin/env python3
"""
scaffolder/utils.py - Utility functions for scaffolder/generator.py
"""
import json
import os
import pwd
import re
import textwrap
from datetime import date
from pathlib import Path
NL = "\n"
TODAY = date.today()
OUTPUT_ROOT = Path(__file__).resolve().parent.parent / "output"
REPO_ROOT = OUTPUT_ROOT.parent
DEFAULT_DIR_MODE = 0o777
DEFAULT_FILE_MODE = 0o666
def _env_int(*names: str) -> int | None:
for name in names:
value = os.environ.get(name)
if value is None:
continue
value = value.strip()
if not value:
continue
try:
return int(value)
except ValueError:
continue
return None
def _env_user(*names: str) -> tuple[int, int] | None:
for name in names:
username = os.environ.get(name)
if not username:
continue
try:
entry = pwd.getpwnam(username)
except KeyError:
continue
return entry.pw_uid, entry.pw_gid
return None
def _resolve_target_ids() -> tuple[int | None, int | None]:
repo_stat = REPO_ROOT.stat()
uid: int | None = None
gid: int | None = None
uid_src: str | None = None
gid_src: str | None = None
ska_uid = _env_int("SKA_WIDGET_OUTPUT_UID")
if ska_uid is not None:
uid = ska_uid
uid_src = "env"
else:
host_uid = _env_int("HOST_UID", "SUDO_UID", "PKEXEC_UID")
if host_uid not in (None, 0):
uid = host_uid
if uid is not None:
uid_src = "env"
ska_gid = _env_int("SKA_WIDGET_OUTPUT_GID")
if ska_gid is not None:
gid = ska_gid
gid_src = "env"
else:
host_gid = _env_int("HOST_GID", "SUDO_GID", "PKEXEC_GID")
if host_gid not in (None, 0):
gid = host_gid
if gid is not None:
gid_src = "env"
user_hint = _env_user("SKA_WIDGET_OUTPUT_USER", "USER")
if user_hint:
user_uid, user_gid = user_hint
if uid is None:
uid = user_uid
uid_src = "user"
if gid is None:
gid = user_gid
gid_src = "user"
current_uid = os.getuid()
current_gid = os.getgid()
if uid is None:
uid = current_uid
uid_src = "current"
if gid is None:
gid = current_gid
gid_src = "current"
if uid == 0 and uid_src != "env":
repo_uid = repo_stat.st_uid or None
if repo_uid and repo_uid != 0:
uid = repo_uid
uid_src = "repo"
else:
uid = None
if gid == 0 and gid_src != "env":
repo_gid = repo_stat.st_gid or None
if repo_gid and repo_gid != 0:
gid = repo_gid
gid_src = "repo"
else:
gid = None
return uid, gid
TARGET_UID, TARGET_GID = _resolve_target_ids()
def _within_output(path: Path) -> bool:
try:
path.resolve().relative_to(OUTPUT_ROOT)
except ValueError:
return False
return True
def _set_mode(path: Path, mode: int) -> None:
try:
os.chmod(path, mode)
except (FileNotFoundError, PermissionError, OSError):
# If we cannot change permissions we continue gracefully; another
# process may already manage the permissions on this path.
pass
def _set_owner(path: Path) -> None:
if TARGET_UID is None and TARGET_GID is None:
return
try:
stat_result = path.stat()
except FileNotFoundError:
return
uid = TARGET_UID if TARGET_UID is not None else stat_result.st_uid
gid = TARGET_GID if TARGET_GID is not None else stat_result.st_gid
if stat_result.st_uid == uid and stat_result.st_gid == gid:
return
try:
os.chown(path, uid, gid)
except (PermissionError, OSError):
# Lack of privileges should not abort the run.
pass
[docs]
def ensure_dir_permissions(path: Path) -> None:
if _within_output(path):
_set_mode(path, DEFAULT_DIR_MODE)
_set_owner(path)
[docs]
def ensure_file_permissions(path: Path) -> None:
if _within_output(path):
_set_mode(path, DEFAULT_FILE_MODE)
_set_owner(path)
[docs]
def ensure_dir_chain_permissions(path: Path) -> None:
resolved = path.resolve()
for directory in [resolved, *resolved.parents]:
if not _within_output(directory):
break
ensure_dir_permissions(directory)
[docs]
def pascal_to_kebab(n: str) -> str:
s1 = re.sub(r"(.)([A-Z][a-z]+)", r"\1-\2", n)
return re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", s1).lower()
[docs]
def pascal_to_camel(n: str) -> str:
return n[0].lower() + n[1:] if n else n
[docs]
def js_lit(v):
def _js_key(key: str) -> str:
if re.match(r"^[A-Za-z_$][A-Za-z0-9_$]*$", key):
return key
return "'" + key.replace("'", r"\'") + "'"
if isinstance(v, str):
s = v.strip()
if (s.startswith("[") and s.endswith("]")) or (
s.startswith("{") and s.endswith("}")
):
try:
return js_lit(json.loads(s))
except Exception:
pass
return "'" + v.replace("'", r"\'") + "'"
if isinstance(v, bool):
return "true" if v else "false"
if v is None:
return "null"
if isinstance(v, list):
return "[" + ", ".join(js_lit(item) for item in v) + "]"
if isinstance(v, dict):
inner = ", ".join(
f"{_js_key(str(k))}: {js_lit(val)}" for k, val in v.items()
)
return "{ " + inner + " }"
return json.dumps(v, ensure_ascii=False)
[docs]
def gql_to_json_type(gql_type: str) -> str:
base = gql_type.strip("![]")
if gql_type.lstrip().startswith("["):
return "array"
if base in ("Int", "ID"):
return "integer"
if base == "Float":
return "number"
if base == "Boolean":
return "boolean"
return "string"
def _inject_header(op_kw: str, body: str, var_types: dict) -> str:
body = body.strip()
if not body:
return body
has_header = body.startswith(op_kw) and re.search(r"\(\s*\$", body)
if has_header or not var_types:
return body
defs = ", ".join(f"${n}: {var_types.get(n,'JSON')}" for n in var_types)
if body.startswith(op_kw):
brace = body.find("{")
header = body[:brace].rstrip()
selection = body[brace:].lstrip()
return f"{header}({defs}) {selection}"
if not body.startswith("{"):
body = "{\n" + body + "\n}"
return f"{op_kw} Execute({defs}) {body}"
def _prepare_query(body: str, var_types: dict) -> str:
return _inject_header("query", body, var_types)
def _prepare_subscription(body: str, var_types: dict) -> str:
return _inject_header("subscription", body, var_types)
def _prepare_mutation(body: str, var_types: dict) -> str:
return _inject_header("mutation", body, var_types)
def _repl(txt: str, ph: dict) -> str:
txt = re.sub(
r"\{\s*\{\s*([A-Z_]+)\s*\}\s*\}Widget",
lambda m: ph.get(m[1], m[0]) + "Widget",
txt,
)
return re.sub(
r"\{\s*\{\s*([A-Z_]+)\s*\}\s*\}", lambda m: ph.get(m[1], m[0]), txt
)
def _make_default_cfg(var_names, var_defaults, extra_defaults=None) -> str:
combined: dict = {}
for key in var_names:
combined[key] = var_defaults[key]
if isinstance(extra_defaults, dict):
for key in sorted(extra_defaults):
combined[key] = extra_defaults[key]
if not combined:
return "{}"
inner = ",\n".join(f" {k}: {js_lit(combined[k])}" for k in combined)
return "{\n" + inner + "\n}"
def _make_schema(var_names, var_types, extra_schema=None) -> str:
props = []
required: list[str] = []
for var in var_names:
gql_type = var_types.get(var, "String")
json_type = gql_to_json_type(gql_type)
required.append(var)
props.append(
" {name}: {{ type: '{json}', title: '{name}', description: 'GraphQL variable {name} ({gql}).' }}".format(
name=var, json=json_type, gql=gql_type
)
)
if isinstance(extra_schema, dict):
for key in sorted(extra_schema):
props.append(f" {key}: {extra_schema[key]}")
if not props:
return "{ type: 'object', properties: {} }"
req_literal = ""
if required:
req_literal = (
",\n required: ["
+ ", ".join(f"'{name}'" for name in required)
+ "]"
)
return (
"{\n type: 'object'"
+ req_literal
+ ",\n properties: {\n"
+ ",\n".join(props)
+ "\n }\n}"
)
def _build_readme(
pascal,
repo_slug,
names,
vdef,
vtype,
operations: list[dict] | None = None,
) -> str:
kebab = pascal_to_kebab(pascal)
package_name = f"@ska-octopus-widgets/{kebab}-widget"
if names:
rows = [
"| Name | Default | GraphQL type |",
"|------|---------|--------------|",
]
for var in names:
default = (
json.dumps(vdef[var]) if vdef[var] is not None else "null"
)
rows.append(f"| {var} | `{default}` | `{vtype.get(var, '')}` |")
var_tbl = "\n".join(rows)
else:
var_tbl = "_No variables_"
op_types = sorted(
{
(op.get("type") or "").lower()
for op in (operations or [])
if (op.get("type") or "").strip()
}
)
mode_map = {
"polling": "polling queries (HTTP)",
"stream": "subscriptions (WebSocket)",
"mutation": "mutations (commands)",
}
if op_types:
operation_modes = ", ".join(mode_map.get(op, op) for op in op_types)
else:
operation_modes = "none scaffolded (structure only)"
return textwrap.dedent(
f"""
# {pascal} Widget
_Generated on {TODAY.isoformat()}._
## Purpose
`{pascal}` is an Octopus detached widget package. It can run standalone
during development and can also be loaded dynamically by
`ska-octopus-frontend` at runtime.
## Bigger Picture (Octopus Suite)
The `ska-octopus-suite` combines backend, frontend, deployment charts, and
helper tooling. In that architecture:
- `ska-octopus-backend` exposes GraphQL over HTTP/WebSocket and handles
runtime widget delivery metadata.
- `ska-octopus-frontend` loads widget bundles from the backend catalog and
each bundle self-registers via `Octopus.registerWidget(...)`.
- Widgets stay decoupled from frontend releases, so teams can ship widget
updates without rebuilding the host app.
Reference:
https://developer.skao.int/projects/ska-octopus-suite/en/latest/overview.html
## Widget Metadata
- Repository folder: `{repo_slug}`
- Package name: `{package_name}`
- GraphQL modes scaffolded: {operation_modes}
## Runtime Configuration
The widget configuration schema is defined in `src/index.ts`.
Variables detected during scaffolding:
{var_tbl}
## Local Development (Standalone)
```bash
git clone <repo-url> {repo_slug}
cd {repo_slug}
npm install
npm run dev
```
Default dev endpoints are configured in `dev/main.tsx`:
- GraphQL HTTP: `http://localhost:8000/graphql`
- GraphQL WS: `ws://localhost:8000/graphql`
Build and test:
```bash
npm run build
npm run test
```
## Integrate with Local `ska-octopus-frontend`
1. Add a Vite alias to your local widget source:
`@ska-octopus-widgets/{kebab}-widget` -> `<path-to>/{repo_slug}/src`
2. Import the module in `src/dashboard.ts`:
`import '@ska-octopus-widgets/{kebab}-widget';`
3. Start frontend dev server and add the widget from the picker.
## Integrate with Deployed Octopus
1. Publish `{package_name}` to your npm registry.
2. Add/update it in Octopus Config UI -> Widgets Store.
3. The backend serves widget bundle metadata; frontend fetches and executes
the bundle at runtime.
Note: deployments can restrict allowed widget registries/scopes (for
example through `WIDGET_ALLOWED_NPM_SCOPES`).
"""
)
def _write(path: Path, txt: str):
txt = textwrap.dedent(txt).replace("\r\n", NL)
if not txt.endswith(NL):
txt += NL
path.parent.mkdir(parents=True, exist_ok=True)
ensure_dir_chain_permissions(path.parent)
path.write_text(txt, encoding="utf-8")
ensure_file_permissions(path)