import os
import uuid
import logging
import secrets
from typing import List, Optional
from sqlalchemy.orm import Session, joinedload
from fastapi import HTTPException
from app.api import schemas
from app.db import models
from app.db.models.agent import AgentTemplate, AgentInstance, AgentTrigger
from app.api.dependencies import ServiceContainer
logger = logging.getLogger(__name__)
class AgentService:
def __init__(self, services: ServiceContainer = None):
self.services = services
def get_agent_instance(self, db: Session, agent_id: str, user_id: str) -> AgentInstance:
instance = db.query(AgentInstance).options(
joinedload(AgentInstance.template),
joinedload(AgentInstance.session)
).filter(
AgentInstance.id == agent_id,
AgentInstance.user_id == user_id
).first()
if not instance:
raise HTTPException(status_code=404, detail="Agent not found")
self.ensure_workspace_binding(db, instance)
return instance
def list_user_agents(self, db: Session, user_id: str) -> List[AgentInstance]:
agents = db.query(AgentInstance).options(
joinedload(AgentInstance.template),
joinedload(AgentInstance.session)
).filter(
AgentInstance.user_id == user_id
).all()
changed = False
for instance in agents:
if self.ensure_workspace_binding(db, instance):
changed = True
if changed:
db.commit()
return agents
def ensure_workspace_binding(self, db: Session, instance: AgentInstance) -> bool:
if not instance or not instance.session:
return False
workspace_id = self._derive_workspace_id(instance.current_workspace_jail, instance.session_id)
desired_jail = f"/tmp/cortex/{workspace_id}/"
changed = False
if instance.session.sync_workspace_id != workspace_id:
instance.session.sync_workspace_id = workspace_id
changed = True
if instance.current_workspace_jail != desired_jail:
instance.current_workspace_jail = desired_jail
changed = True
if changed:
db.flush()
try:
orchestrator = getattr(self.services, "orchestrator", None)
if orchestrator and instance.mesh_node_id:
orchestrator.assistant.push_workspace(instance.mesh_node_id, workspace_id)
orchestrator.assistant.control_sync(instance.mesh_node_id, workspace_id, action="START")
orchestrator.assistant.control_sync(instance.mesh_node_id, workspace_id, action="UNLOCK")
except Exception as e:
logger.error(f"Failed to heal workspace binding for agent {instance.id}: {e}")
return changed
def _derive_workspace_id(self, jail_path: Optional[str], session_id: Optional[int]) -> str:
if jail_path:
base = os.path.basename(jail_path.rstrip("/"))
if base: return base
if session_id is not None:
return f"session-{session_id}"
return f"agent-{uuid.uuid4().hex[:8]}"
def deploy_agent(self, db: Session, user_id: str, request: schemas.DeployAgentRequest) -> dict:
from app.config import settings
# 1. Create Template
template = AgentTemplate(
name=request.name,
description=request.description,
system_prompt_path=request.system_prompt,
user_id=user_id,
max_loop_iterations=request.max_loop_iterations,
co_worker_quality_gate=request.co_worker_quality_gate,
rework_threshold=request.rework_threshold,
max_rework_attempts=request.max_rework_attempts
)
db.add(template)
db.flush()
# Resolve provider
resolved_provider = request.provider_name
if not resolved_provider:
sys_prefs = self.services.user_service.get_system_settings(db)
resolved_provider = sys_prefs.get('llm', {}).get('active_provider', settings.ACTIVE_LLM_PROVIDER)
# 2. Create Session
new_session = models.Session(
user_id=user_id,
provider_name=resolved_provider,
feature_name="agent_harness",
is_locked=True,
system_prompt_override=request.system_prompt,
attached_node_ids=[request.mesh_node_id] if getattr(request, "mesh_node_id", None) else []
)
db.add(new_session)
db.flush()
workspace_id = f"agent_{template.id[:8]}"
new_session.sync_workspace_id = workspace_id
workspace_jail = f"/tmp/cortex/{workspace_id}/"
db.flush()
# Bootstrap Orchestrator
try:
orchestrator = getattr(self.services, "orchestrator", None)
if orchestrator and request.mesh_node_id:
orchestrator.assistant.push_workspace(request.mesh_node_id, workspace_id)
orchestrator.assistant.control_sync(request.mesh_node_id, workspace_id, action="START")
orchestrator.assistant.control_sync(request.mesh_node_id, workspace_id, action="UNLOCK")
except Exception as e:
logger.error(f"Failed to bootstrap Orchestrator Sync for Agent Deploy: {e}")
# 3. Create Instance
instance = AgentInstance(
template_id=template.id,
user_id=user_id,
session_id=new_session.id,
mesh_node_id=request.mesh_node_id,
status="idle",
current_workspace_jail=workspace_jail
)
db.add(instance)
db.flush()
# 4. Trigger
trigger = AgentTrigger(
instance_id=instance.id,
trigger_type=request.trigger_type or "manual",
cron_expression=request.cron_expression,
interval_seconds=request.interval_seconds,
default_prompt=request.default_prompt
)
if trigger.trigger_type == "webhook":
trigger.webhook_secret = secrets.token_hex(16)
db.add(trigger)
db.flush()
db.commit()
db.refresh(instance)
return {
"instance_id": instance.id,
"session_id": instance.session_id,
"template_id": instance.template_id,
"sync_workspace_id": workspace_id
}
def create_template(self, db: Session, user_id: str, request: schemas.AgentTemplateCreate) -> AgentTemplate:
template = AgentTemplate(**request.model_dump())
template.user_id = user_id
db.add(template)
db.commit()
db.refresh(template)
return template
def create_instance(self, db: Session, user_id: str, request: schemas.AgentInstanceCreate) -> AgentInstance:
template = db.query(AgentTemplate).filter(AgentTemplate.id == request.template_id).first()
if not template:
raise HTTPException(status_code=404, detail="Template not found")
instance = AgentInstance(**request.model_dump())
instance.user_id = user_id
db.add(instance)
db.commit()
db.refresh(instance)
return instance
def update_status(self, db: Session, agent_id: str, user_id: str, status: str) -> AgentInstance:
instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id, AgentInstance.user_id == user_id).first()
if not instance: raise HTTPException(status_code=404, detail="Instance not found")
instance.status = status
if status == "idle":
instance.last_error = None
instance.evaluation_status = None
db.commit()
db.refresh(instance)
return instance
def update_config(self, db: Session, agent_id: str, user_id: str, request: schemas.AgentConfigUpdate) -> AgentInstance:
from app.db.models.session import Session as SessionModel
instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id, AgentInstance.user_id == user_id).first()
if not instance: raise HTTPException(status_code=404, detail="Instance not found")
template = db.query(AgentTemplate).filter(AgentTemplate.id == instance.template_id).first()
if template:
if request.name is not None: template.name = request.name
if request.system_prompt is not None: template.system_prompt_path = request.system_prompt
if request.max_loop_iterations is not None: template.max_loop_iterations = request.max_loop_iterations
if request.co_worker_quality_gate is not None: template.co_worker_quality_gate = request.co_worker_quality_gate
if request.rework_threshold is not None: template.rework_threshold = request.rework_threshold
if request.max_rework_attempts is not None: template.max_rework_attempts = request.max_rework_attempts
if request.mesh_node_id is not None: instance.mesh_node_id = request.mesh_node_id
if instance.session_id:
session = db.query(SessionModel).filter(SessionModel.id == instance.session_id).first()
if session:
if request.system_prompt is not None: session.system_prompt_override = request.system_prompt
if hasattr(request, 'provider_name') and request.provider_name is not None: session.provider_name = request.provider_name
if hasattr(request, 'model_name') and request.model_name is not None: session.model_name = request.model_name
if request.mesh_node_id is not None:
try:
self.services.session_service.attach_nodes(db, session.id, schemas.NodeAttachRequest(node_ids=[request.mesh_node_id] if request.mesh_node_id else []))
except: session.attached_node_ids = [request.mesh_node_id] if request.mesh_node_id else []
if hasattr(request, 'restrict_skills') and request.restrict_skills is not None: session.restrict_skills = request.restrict_skills
if hasattr(request, 'is_locked') and request.is_locked is not None: session.is_locked = request.is_locked
if hasattr(request, 'auto_clear_history') and request.auto_clear_history is not None: session.auto_clear_history = request.auto_clear_history
db.commit()
db.refresh(instance)
return instance
def delete_agent(self, db: Session, agent_id: str, user_id: str):
instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id, AgentInstance.user_id == user_id).first()
if not instance: raise HTTPException(status_code=404, detail="Agent not found")
db.delete(instance)
db.commit()
def list_agent_triggers(self, db: Session, agent_id: str, user_id: str) -> List[AgentTrigger]:
instance = self.get_agent_instance(db, agent_id, user_id)
return db.query(AgentTrigger).filter(AgentTrigger.instance_id == agent_id).all()
def create_agent_trigger(self, db: Session, agent_id: str, user_id: str, request: schemas.AgentTriggerCreate) -> AgentTrigger:
self.get_agent_instance(db, agent_id, user_id)
trigger = AgentTrigger(**request.model_dump())
trigger.instance_id = agent_id
if trigger.trigger_type == "webhook" and not trigger.webhook_secret:
trigger.webhook_secret = secrets.token_hex(16)
db.add(trigger)
db.commit()
db.refresh(trigger)
return trigger
def delete_agent_trigger(self, db: Session, trigger_id: str):
trigger = db.query(AgentTrigger).filter(AgentTrigger.id == trigger_id).first()
if not trigger:
raise HTTPException(status_code=404, detail="Trigger not found")
db.delete(trigger)
db.commit()
def reset_agent_metrics(self, db: Session, agent_id: str, user_id: str) -> AgentInstance:
instance = self.get_agent_instance(db, agent_id, user_id)
instance.total_runs = 0
instance.successful_runs = 0
instance.total_input_tokens = 0
instance.total_output_tokens = 0
instance.total_tokens_accumulated = 0
instance.total_running_time_seconds = 0
instance.tool_call_counts = {}
db.commit()
db.refresh(instance)
return instance
def validate_webhook_trigger(self, db: Session, agent_id: str, token: Optional[str]) -> AgentInstance:
instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first()
if not instance:
raise HTTPException(status_code=404, detail="Instance not found")
webhook_triggers = db.query(AgentTrigger).filter(
AgentTrigger.instance_id == agent_id,
AgentTrigger.trigger_type == "webhook"
).all()
if webhook_triggers:
secrets_list = [t.webhook_secret for t in webhook_triggers if t.webhook_secret]
if secrets_list and token not in secrets_list:
raise HTTPException(status_code=403, detail="Invalid webhook token")
return instance