Developer Guidelines
Please refer to the Developer Guidelines for information on how to contribute to this project, including coding standards, testing procedures, and other best practices.
Authentication
The Global Execution API routes are added to the permission API configuration.
Users with valid audience (global-execution-api) in their SRCNet IAM token, will be able to access the routes.
Additionally the user needs to be part of src/production group to submit the PanDA job.
The following sections assume that the API has been integrated with both IAM and the Permissions API. This involves:
Creating an IAM client (for Services to obtain access via a
client_credentialsgrant),Passing the credentials (id/secret) in the
values.yaml(helm), andCreating the permissions policy and loading it in to the Permissions API.
Access can then be granted for either a User or Service.
User
To access this API as a user, the user needs to have first authenticated with the SRCNet and to have exchanged the token resulting from this initial authentication with one that allows access to this specific service. See the Authentication Mechanism and Token Exchange Mechanism sections of the Authentication API for more specifics.
Service
For service-to-service interactions, it is possible to obtain a token via a client_credentials grant to the ska-src-api-global-execution IAM client.
Deployment
Deployment is managed by docker-compose or helm. It is recommended to use the helm chart for deployment onto a k8s cluster, as this allows for better management of the service will use kubernetes Job to provision the Compute Jobs.
The docker-compose file can be used to bring up the necessary services locally i.e. the REST API, setting the mandatory
environment variables. Sensitive environment variables, including those relating to the IAM client, should be kept in
.env files to avoid committing them to the repository. There is also a helm chart for deployment onto a k8s cluster.
If deployed locally, the rapidoc openapi operator interface will be made available at
http://localhost:8080/v1/www/docs/oper.
Example via docker-compose
Edit the .env.template file accordingly and rename to .env, then:
ska_src_api_global_execution$ docker-compose -f docker-compose.local.yaml up --build
Example via Helm
First build the docker image locally:
ska_src_api_global_execution$ make oci-image-build
Then install the chart (assumes Minikube):
ska_src_api_global_execution$ make k8s-install-chart
Or Go to helm chart directory and run:
ska_src_api_global_execution$ cd etc/helm
ska_src_api_global_execution$ helm install --create-namespace -n global-execution-api . -f values.yaml
Development
There 2 types of WorksSteps that Global Execution API currently supports.
Batch Job: We are using PanDA as the WMS backend for batch jobs. The job submission and monitoring are done using the PanDA client library.
Interactive Job: This is currently in the design phase and will be implemented in the future. We will use canfar for submitting interactive jobs.
At the start of the job submission, we are creating a ExecutionModel instance with the defined stage and storing it in the database.
workstep_model.created_by = username
execution_model = ExecutionModel(workstep=workstep_model)
await self.execution_repository.initialize_execution_data(execution_model=execution_model)
When submitting a compute job, we record the job provisioning step in the state field and store it in the database.
Currently, we use Redis for this purpose because it is lightweight and fast. However we are planning to migrate to MongoDB in the future,
which will allow us to store, retrieve and query in more detailed information about the job and its execution.
This API is WMS backend-agnostic.
You can switch between different Batch Job backend with a simple environment variable WMS_BACKEND change, without needing to change the code.
def get_background_tasks_manager() -> BackgroundTaskManager:
"""Create and return a BackgroundTaskManager instance."""
return BackgroundTaskManager()
def panda_batch_job_executor(
panda_client: PandaClient = Depends(get_panda_client),
kubernetes_client: Kubernetes = Depends(get_kubernetes_client),
site_capabilities_queue_client: QueueService = Depends(get_site_capabilities_queue_client),
background_task_manager: BackgroundTaskManager = Depends(get_background_tasks_manager),
) -> BatchJobExecutor:
"""Create and return a Panda Job Factory instance."""
return PandaBatchJobExecutor(
panda_client=panda_client,
kubernetes_client=kubernetes_client,
site_capabilities_queue_client=site_capabilities_queue_client,
background_task_manager=background_task_manager,
)
def batch_job_executor_provider(provider: str = os.getenv("WMS_BACKEND", "test")):
"""Create and return a map of job factories functions."""
return {
"panda": panda_batch_job_executor,
"test": test_batch_job_executor,
}.get(provider, test_batch_job_executor)
def get_jobs_service(
job_repository: ExecutionRepository = Depends(get_job_repository),
batch_job_executor: BatchJobExecutor = Depends(batch_job_executor_provider()), # this is not typo, it is batch_job_executor_provider()
interactive_job_executor: InteractiveJobExecutor = Depends(interactive_job_executor_provider),
background_task_manager: BackgroundTaskManager = Depends(get_background_tasks_manager),
) -> ExecutionService:
"""Create and return an ExecutionService instance."""
return ExecutionService(
execution_repository=job_repository,
batch_job_executor=batch_job_executor,
interactive_job_executor=interactive_job_executor,
background_task_manager=background_task_manager,
)
Each Batch Job Executor must implement the BatchJobExecutor abstract class,
class BatchJobExecutor(ABC):
"""Abstract base class for job factories."""
@abstractmethod
async def submit(self, token: str, execution_model: ExecutionModel) -> ExecutionModel:
"""Submit a job request."""
@abstractmethod
async def fetch(self, token: str, execution_model: ExecutionModel) -> ExecutionModel:
"""Get a job request."""
class TestBatchJobExecutor(BatchJobExecutor, metaclass=Singleton):
"""Factory for creating Test jobs."""
logger = LoggerClient.get_logger(__name__)
async def submit(self, token: str, execution_model: ExecutionModel) -> ExecutionModel:
"""Submit a test job."""
self.logger.info("Submitting test job with request: %s", execution_model)
self.logger.info("Submitting job with ID: %s", execution_model.workstep.id)
return execution_model
async def fetch(self, token: str, execution_model: ExecutionModel) -> ExecutionModel:
"""Retrieve a task by its job ID."""
self.logger.info("Getting test job with request: %s", execution_model)
self.logger.debug("Getting job with ID: %s", execution_model.workstep.id)
self.logger.debug("Token used for job submission: %s", token)
return execution_model
Api Testing
Submitting a WorkStep
Note: Parameters are optional.
If you provide any parameters, the default options in the Panda job submission will be overridden by these request parameters.
HTTP Request
POST /v1/worksteps
Headers:
Accept: application/jsonAuthorization: Bearer $GE_API_TOKENContent-Type: application/json
Request Body:
{
"container_image": "docker://python:3.12-alpine",
"executable_entrypoint": "echo 'Hello, World!'",
"execution": "batch",
"system_parameters": {
"src_node": ["CHSRC"]
}
}
Response:
{
"response_code": 200,
"response_text": "Job submitted successfully.",
"id": "5ee72fd7-9968-46cd-aad4-c62ea30a6c12"
}
Getting a job status
HTTP Request
GET /v1/worksteps/{execution_id}
Headers:
Authorization: Bearer $TOKEN
Response:
{
"id": "5ee72fd7-9968-46cd-aad4-c62ea30a6c12",
"workflow_id": "216",
"created_by": "abhishekghosh",
"container_image": "docker://python:3.12-alpine",
"executable_entrypoint": "echo 'Hello, World!'",
"execution": "batch",
"system_parameters": {
"src_node": [
"CHSRC"
],
"cpu_cores": 1,
"memory_gb": 1.953125
},
"input_data": [],
"output_data": [],
"state": "running",
"started_at": "2026-02-19T07:29:19.322269",
"provenance_json": {}
}
Batch Jobs
Panda Jobs
Job Submission
To submit a job (including data product ID, container image, and runnable script), the token must be provided in the request header.
Accessing it from ~/.pathena/.token is not suitable for this use case.
A workaround is to run the submission code as a separate Kubernetes Job, passing the token and other job parameters through environment variables. For the full code example, refer to the panda.py file.
Submitted jobs can be viewed from the PanDA DOMA dashboard.
"""This python script will be used by the kubernetes job to submit a job to the PANDA API."""
import asyncio
import os
from ska_src_api_global_execution.cache.redis import RedisCache
from ska_src_api_global_execution.client.iam_client import IAMClient
from ska_src_api_global_execution.client.logger_client import LoggerClient
from ska_src_api_global_execution.client.panda_client import PandaClient
from ska_src_api_global_execution.models.constants import EXECUTION_ID, JEDI_TASK_ID, PANDA_AUTH_ID_TOKEN, USER_ACCESS_TOKEN, USER_NAME
from ska_src_api_global_execution.models.workstep import ExecutionState
from ska_src_api_global_execution.rest.beans import get_job_repository, get_panda_client, get_redis_cache
from ska_src_api_global_execution.service.utils import get_from_env, str_to_bool, str_to_int
LoggerClient.setup_logging()
logger = LoggerClient.get_logger(__name__)
def get_iam_client(redis_cache: RedisCache | None = None) -> IAMClient:
"""Create and return an IAMClient instance using environment variables."""
use_cache = str_to_bool(get_from_env("TOKEN_EXCHANGE_CLIENT_USE_CACHE", "true"))
return IAMClient(
iam_url=get_from_env("TOKEN_EXCHANGE_CLIENT_IAM"),
client_id=get_from_env("TOKEN_EXCHANGE_CLIENT_ID"),
client_secret=get_from_env("TOKEN_EXCHANGE_CLIENT_SECRET", "DUMMY_CLIENT_SECRET"),
scope=get_from_env("TOKEN_EXCHANGE_CLIENT_SCOPE"),
global_timeout=str_to_int(get_from_env("TOKEN_EXCHANGE_CLIENT_TIMEOUT", "30")),
redis_cache=redis_cache if use_cache else None,
)
async def exchange_token(iam_client: IAMClient, user_access_token: str) -> dict:
"""Exchange the user access token for an ID token using the IAMClient."""
token_response = await iam_client.exchange_token(
subject_token=user_access_token,
audience=get_from_env("TOKEN_EXCHANGE_CLIENT_AUDIENCE"),
public_client=str_to_bool(get_from_env("TOKEN_EXCHANGE_CLIENT_IS_PUBLIC", "false")),
refresh_id_token=True,
)
logger.info("token exchanged successfully")
return token_response
async def main():
"""Main function to submit a job to the PANDA API."""
user = get_from_env(USER_NAME)
logger.info("submitting job for user %s", user)
execution_id = get_from_env(EXECUTION_ID)
logger.info("generated job id is %s", execution_id)
redis_cache = get_redis_cache()
job_repository = get_job_repository(redis_cache=redis_cache)
panda_client = get_panda_client()
iam_client = get_iam_client(redis_cache=redis_cache)
for key, value in os.environ.items():
logger.debug("%s: %s", key, value)
try:
await redis_cache.test_connection()
logger.info("Connected to Redis cache successfully.")
except Exception as e:
logger.error("Failed to connect to Redis cache: %s", e)
raise e
# find job data from cache
execution_model = await job_repository.get_execution_model(user_name=user, execution_id=execution_id)
if not execution_model:
logger.error("Internal Redis Error : Job data not found in cache for execution ID: %s", execution_id)
raise ValueError(f"Job data not found in cache for execution ID: {execution_id}")
metadata = execution_model.metadata
workstep_model = execution_model.workstep
try:
# exchange user access token with PanDA IAM client
exchanged_token_data = await exchange_token(iam_client, get_from_env(USER_ACCESS_TOKEN))
os.environ[PANDA_AUTH_ID_TOKEN] = exchanged_token_data.get("id_token")
# Test connection to Panda API
await panda_client.test_connection()
# Build job definition for Panda API
job_definition = await PandaClient.build_job_definition(workstep=workstep_model, metadata=metadata)
logger.info("Command to execute : %s", job_definition)
# Submit the job to the Panda API
status, task_dict = await panda_client.submit_task(job_definition, console_log=True)
# Log the response from the Panda API
logger.info("Panda API response: %s", status)
logger.info("Panda API task dictionary: %r", task_dict)
# Check if the job submission was successful
if not status or not task_dict:
logger.error("Failed to submit job to Panda API.")
raise ValueError("Failed to submit job to Panda API.")
# Update workstep state to queued and metadata
workstep_model.state = ExecutionState.queued
jedi_task_id = str(task_dict.get("jediTaskID"))
workstep_model.workflow_id = jedi_task_id
metadata[JEDI_TASK_ID] = jedi_task_id
logger.info("Job %s submitted successfully with JEDI Task ID %s", execution_id, jedi_task_id)
except Exception as e:
workstep_model.state = ExecutionState.failed
execution_model.error = str(e)
logger.error("Error submitting job to Panda API: %s", e)
raise e
finally:
await job_repository.save_execution_data(user_name=user, execution_id=execution_id, execution_model=execution_model)
if PANDA_AUTH_ID_TOKEN in os.environ:
del os.environ[PANDA_AUTH_ID_TOKEN]
if __name__ == "__main__":
asyncio.run(main())
Job Configuration
This is the configuration currently used for submitting and fetching PanDA jobs. There are two sections in this config:
1. Connecting to the PanDA Server:
Since there is no PanDA server deployed in the SRCNet, the CERN DOMA PanDA server is used.
The default configuration can be found in the official PanDA client documentation.
PanDA server uses PanDA DOMA IAM, so you must first create an account in that IAM.
2. Job Submission Config:
Currently, there are two harvester setups: one at CHSRC and another at INSRC (Thoughtworks Kubernetes).
These can be used as queue names.
For working groups, you must be part of a specific group to access the corresponding queue or site.
You can apply for group access from the IAM Dashboard.
See the “Running ordinary analysis” section in the documentation for all job parameters.
panda:
server:
config:
PANDA_URL_SSL: https://pandaserver-doma.cern.ch/server/panda
PANDA_URL: https://pandaserver-doma.cern.ch/server/panda
PANDACACHE_URL: https://pandaserver-doma.cern.ch/server/panda
PANDAMON_URL: https://panda-doma.cern.ch
PANDA_AUTH: oidc
PANDA_AUTH_VO: Rubin
PANDA_USE_NATIVE_HTTPLIB: "1"
PANDA_BEHIND_REAL_LB: "true"
jobRunOn:
config:
QUEUE_NAME: "CHSRC_TEST_IN_TWSRC"
JOBS_COUNT: "1"
PROD_SOURCE_LABEL: "test"
WORKING_GROUP: "Rubin"
VO: "wlcg"
NO_BUILD: "true"
NO_SEPARATE_LOG: "true"