Newer
Older
cortex-hub / ai-hub / app / core / services / sub_agent.py
import asyncio
import time
import logging
import json
from typing import Optional, List, Dict, Any
from app.protos import agent_pb2

logger = logging.getLogger(__name__)

class SubAgent:
    """
    A stateful watcher for a specific task on an agent node.
    Handles execution, result accumulation, and state monitoring.
    """
    def __init__(self, name: str, task_fn, args: dict, retries: int = 1, 
                 llm_provider=None, assistant=None, subagent_system_prompt: str = None,
                 on_event=None):
        self.name = name
        self.task_fn = task_fn
        self.args = args
        self.retries = retries
        self.llm = llm_provider
        self.assistant = assistant
        self.subagent_system_prompt = subagent_system_prompt
        self.on_event = on_event
        self.status = "PENDING"
        self.result = None
        self.start_time = None
        self.end_time = None
        self.error = None
        self.task_id = None

    async def run(self):
        self.start_time = time.time()
        self.status = "RUNNING"
        
        # Every Sub-Agent task is now strictly atomic. 
        # The AI 'Observe-Think-Act' loop happens in the Main RagPipeline.
        return await self._run_standard()

    async def _run_standard(self):
        """Atomic execution with monitoring and simple retry logic."""
        node_id = self.args.get("node_id") or "swarm"
        
        # Improved task description logic
        cmd = self.args.get("cmd") or self.args.get("command")
        action = self.args.get("action")
        path = self.args.get("path")
        
        if cmd:
            item = cmd
        elif action and path:
            item = f"{action} {path}"
        elif action:
            item = str(action)
        else:
            item = self.name # Fallback to skill name if no command/action found
        
        # Phase 1: Dispatch
        if self.on_event:
            # Descriptive task name logic to reduce "Starting atomic step: ifconfig" noise
            desc = f"Performing {self.name}"
            if "command" in self.args or "cmd" in self.args:
                desc = f"Running: {self.args.get('command') or self.args.get('cmd')}"
            elif "path" in self.args:
                desc = f"Inspecting: {self.args['path']}"
            elif "action" in self.args:
                desc = f"Executing {self.args['action']}"
                
            await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": f"✅ {desc}"})

        for attempt in range(self.retries + 1):
            try:
                # Dispatch task via assistant (which handles gRPC signing/sending)
                self.result = await asyncio.to_thread(self.task_fn, **self.args)
                
                # If it's a long-running task (Wait-Pending), monitor it with heartbeats
                if isinstance(self.result, dict) and self.result.get("status") == "TIMEOUT_PENDING":
                    self.result = await self._monitor_atomic_task(self.result, node_id)

                if isinstance(self.result, dict) and self.result.get("error"):
                    err_msg = str(self.result.get("error")).lower()
                    if any(x in err_msg for x in ["timeout", "offline", "disconnected", "capacity", "rejected"]):
                        if attempt < self.retries:
                            backoff = (attempt + 1) * 3
                            self.status = f"RETRYING ({attempt+1}/{self.retries})"
                            await asyncio.sleep(backoff)
                            continue
                self.status = "COMPLETED"
                break
            except Exception as e:
                logger.error(f"SubAgent {self.name} error: {e}")
                self.error = str(e)
                if attempt < self.retries:
                    self.status = f"ERROR_RETRYING ({attempt+1}/{self.retries})"
                    await asyncio.sleep(2)
                else:
                    self.status = "FAILED"
        
        self.end_time = time.time()
        
        # Phase 2: Completion & Intelligence Report
        summary = None
        if self.llm and self.result:
            summary = await self._generate_summary()
            
        if self.on_event:
            # Emit to UI
            if summary:
                rep = f"🧠 {summary}"
            else:
                rep = "✅ Step Finished." if not self.error else f"❌ Step Failed: {self.error}"
            await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": rep})

        # --- Final Intelligence Handoff ---
        # We ALWAYS return a string report for the Main AI to ensure it can "read" the outcome.
        report_lines = []
        if summary:
            report_lines.append(f"REPORT: {summary}")
        else:
            # Fallback if AI summary is unavailable
            raw_snippet = str(self.result)[:1000] if self.result else "No output returned."
            report_lines.append(f"REPORT: (Auto-Summary Unavailable) Raw outcome: {raw_snippet}")
            
        report_lines.append(f"STATUS: {self.status}")
        if self.error:
            report_lines.append(f"ERROR: {self.error}")
            
        return "\n".join(report_lines)

    async def _generate_summary(self) -> Optional[str]:
        """Uses LLM to summarize logs, file system changes, or errors into a single report sentence."""
        try:
            # Prepare a very compact prompt for the sub-agent reporter
            # We provide the command and the raw result
            raw_res = str(self.result)[:3000] # Slightly more context
            prompt = (
                f"You are a Sub-Agent reporter on node '{self.args.get('node_id')}'.\n"
                f"Task Performed: {self.name} {self.args}\n"
                f"Raw Outcome: {raw_res}\n"
                f"Status: {self.status}\n"
                f"Errors: {self.error if self.error else 'None'}\n\n"
                "Task: Summarize the result for your Master-Architect in ONE CONCISE SENTENCE. "
                "Include key findings (e.g., 'Found 3 files', 'Command failed with exit 1', 'Subnet is 10.0.0.0/24'). "
                "Do not use conversational filler. Be the 'eyes and ears' on the ground."
            )
            
            response = await self.llm.acompletion(prompt=prompt, max_tokens=100, temperature=0)
            summary = response.choices[0].message.content.strip()
            summary = summary.strip('"').strip("'")
            if ":" in summary[:15]:
                summary = summary.split(":", 1)[1].strip()
            return summary
        except Exception as e:
            logger.warning(f"Failed to generate sub-agent summary: {e}")
            return None

    async def _monitor_atomic_task(self, partial_res: dict, node_id: str):
        """Monitors a single atomic task until finished without making decisions."""
        task_id = partial_res.get("task_id")
        if not task_id: return partial_res
        
        task_map = {node_id: task_id}
        max_checks = 120 # 10 minutes max for an atomic task
        for _ in range(max_checks):
            # Non-blocking peek
            res = await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=0, no_abort=True)
            node_res = res.get(node_id, {})
            
            if node_res.get("status") not in ["RUNNING", "TIMEOUT_PENDING"]:
                # Finished! Collect final and return
                return await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=2)
            
            # Simple heartbeat for UI
            if self.on_event:
                 await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": "*(Still executing...)*"})
            
            await asyncio.sleep(5)
            
        return {"error": "SubAgent timed out waiting for atomic task."}

    def get_elapsed(self) -> int:
        if not self.start_time:
            return 0
        end = self.end_time or time.time()
        return int(end - self.start_time)