Source code for ska_oso_oet.activity.ui

"""
The ska_oso_oet.activity.ui module contains code that belongs to the activity
UI/presentation layer. This layer is the means by which external users or
systems would interact with activities.
"""

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from ska_aaa_authhelpers import Role

from ska_oso_oet.activity.application import ActivityCommand, ActivitySummary
from ska_oso_oet.auth import (
    OPERATOR_ROLE_FOR_TELESCOPE,
    OST_CLIENT_ID,
    Permissions,
    Scopes,
)
from ska_oso_oet.event import topics
from ska_oso_oet.utils.ui import (
    ProcedureInput,
    ScriptArgs,
    call_and_respond,
    convert_request_to_procedure_input,
)

activities_router = APIRouter(prefix="/activities", tags=["Activities"])


[docs] class ActivityPostRequest(BaseModel): sbd_id: str activity_name: str = Field(examples=["observe"]) script_args: ScriptArgs = Field( default=ScriptArgs(init=ProcedureInput(args=[], kwargs={})), examples=[ScriptArgs(init=ProcedureInput(kwargs={}))], ) prepare_only: bool = False create_env: bool = False
@activities_router.get( "/{activity_id}", response_model=ActivitySummary, summary="Get the Activity with the given activity_id", description=( "Returns a summary of the Activity if it exists " "within the OET, with details of its state and related Procedure" ), dependencies=[ Permissions( roles={Role.SW_ENGINEER, OPERATOR_ROLE_FOR_TELESCOPE, Role.APP2APP}, scopes={Scopes.ACTIVITY_READ}, app_ids={OST_CLIENT_ID}, ) ], ) def get_activity(activity_id: int) -> ActivitySummary: summaries = call_and_respond( topics.request.activity.list, topics.activity.pool.list, activity_ids=[activity_id], ) if not summaries: detail = { "type": "ResourceNotFound", "Message": f"No information available for ID={activity_id}", } raise HTTPException(404, detail=detail) else: return summaries[0] @activities_router.get( "", response_model=list[ActivitySummary], summary="Get all Activities", description="Returns a list of all the Activity summaries.", dependencies=[ Permissions( roles={Role.SW_ENGINEER, OPERATOR_ROLE_FOR_TELESCOPE, Role.APP2APP}, scopes={Scopes.ACTIVITY_READ}, app_ids={OST_CLIENT_ID}, ) ], ) def get_activities() -> list[ActivitySummary]: summaries = call_and_respond( topics.request.activity.list, topics.activity.pool.list ) return summaries @activities_router.post( "", status_code=201, response_model=ActivitySummary, summary="Create a new Activity and start its execution", description=( "Loads the SBDefinition from the ODA and the script from Git or the filesystem" " for the given Activity. It is prepared for execution as an associated" " Procedure and then executed (unless prepare_only=True)." ), dependencies=[ Permissions( roles={Role.SW_ENGINEER, OPERATOR_ROLE_FOR_TELESCOPE, Role.APP2APP}, scopes={Scopes.ACTIVITY_EXECUTE}, app_ids={OST_CLIENT_ID}, ) ], ) def run_activity( request_body: ActivityPostRequest, ) -> ActivitySummary: script_args = { fn: convert_request_to_procedure_input(fn_args) for (fn, fn_args) in request_body.script_args if fn_args is not None } cmd = ActivityCommand( request_body.activity_name, request_body.sbd_id, request_body.prepare_only, request_body.create_env, script_args, ) summary = call_and_respond( topics.request.activity.run, topics.activity.lifecycle.running, cmd=cmd ) return summary