import asyncio
import time
import logging
import json
from typing import Optional, List, Dict, Any
from app.protos import agent_pb2
logger = logging.getLogger(__name__)
class SubAgent:
"""
A stateful watcher for a specific task on an agent node.
Handles execution, result accumulation, and state monitoring.
"""
def __init__(self, name: str, task_fn, args: dict, retries: int = 1,
llm_provider=None, assistant=None, subagent_system_prompt: str = None,
on_event=None):
self.name = name
self.task_fn = task_fn
self.args = args
self.retries = retries
self.llm = llm_provider
self.assistant = assistant
self.subagent_system_prompt = subagent_system_prompt
self.on_event = on_event
self.status = "PENDING"
self.result = None
self.start_time = None
self.end_time = None
self.error = None
self.task_id = None
async def run(self):
self.start_time = time.time()
self.status = "RUNNING"
# Every Sub-Agent task is now strictly atomic.
# The AI 'Observe-Think-Act' loop happens in the Main RagPipeline.
return await self._run_standard()
async def _run_standard(self):
"""Atomic execution with monitoring and simple retry logic."""
node_id = self.args.get("node_id") or "swarm"
# Improved task description logic
cmd = self.args.get("cmd") or self.args.get("command")
action = self.args.get("action")
path = self.args.get("path")
if cmd:
item = cmd
elif action and path:
item = f"{action} {path}"
elif action:
item = str(action)
else:
item = self.name # Fallback to skill name if no command/action found
# Phase 1: Dispatch
if self.on_event:
# Descriptive task name logic to reduce "Starting atomic step: ifconfig" noise
desc = f"Performing {self.name}"
if "command" in self.args or "cmd" in self.args:
desc = f"Running: {self.args.get('command') or self.args.get('cmd')}"
elif "path" in self.args:
desc = f"Inspecting: {self.args['path']}"
elif "action" in self.args:
desc = f"Executing {self.args['action']}"
await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": f"✅ {desc}"})
for attempt in range(self.retries + 1):
try:
# Dispatch task via assistant (which handles gRPC signing/sending)
self.result = await asyncio.to_thread(self.task_fn, **self.args)
# If it's a long-running task (Wait-Pending), monitor it with heartbeats
if isinstance(self.result, dict) and self.result.get("status") == "TIMEOUT_PENDING":
self.result = await self._monitor_atomic_task(self.result, node_id)
if isinstance(self.result, dict) and self.result.get("error"):
err_msg = str(self.result.get("error")).lower()
if any(x in err_msg for x in ["timeout", "offline", "disconnected", "capacity", "rejected"]):
if attempt < self.retries:
backoff = (attempt + 1) * 3
self.status = f"RETRYING ({attempt+1}/{self.retries})"
await asyncio.sleep(backoff)
continue
self.status = "COMPLETED"
break
except Exception as e:
logger.error(f"SubAgent {self.name} error: {e}")
self.error = str(e)
if attempt < self.retries:
self.status = f"ERROR_RETRYING ({attempt+1}/{self.retries})"
await asyncio.sleep(2)
else:
self.status = "FAILED"
self.end_time = time.time()
# Phase 2: Completion & Intelligence Report
summary = None
if self.llm and self.result:
summary = await self._generate_summary()
if self.on_event:
# Emit to UI
if summary:
rep = f"🧠 {summary}"
else:
rep = "✅ Step Finished." if not self.error else f"❌ Step Failed: {self.error}"
await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": rep})
# --- Final Intelligence Handoff ---
# We ALWAYS return a string report for the Main AI to ensure it can "read" the outcome.
report_lines = []
if summary:
report_lines.append(f"REPORT: {summary}")
else:
# Fallback if AI summary is unavailable
raw_snippet = str(self.result)[:1000] if self.result else "No output returned."
report_lines.append(f"REPORT: (Auto-Summary Unavailable) Raw outcome: {raw_snippet}")
report_lines.append(f"STATUS: {self.status}")
if self.error:
report_lines.append(f"ERROR: {self.error}")
return "\n".join(report_lines)
async def _generate_summary(self) -> Optional[str]:
"""Uses LLM to summarize logs, file system changes, or errors into a single report sentence."""
try:
# Prepare a very compact prompt for the sub-agent reporter
# We provide the command and the raw result
raw_res = str(self.result)[:3000] # Slightly more context
prompt = (
f"You are a Sub-Agent reporter on node '{self.args.get('node_id')}'.\n"
f"Task Performed: {self.name} {self.args}\n"
f"Raw Outcome: {raw_res}\n"
f"Status: {self.status}\n"
f"Errors: {self.error if self.error else 'None'}\n\n"
"Task: Summarize the result for your Master-Architect in ONE CONCISE SENTENCE. "
"Include key findings (e.g., 'Found 3 files', 'Command failed with exit 1', 'Subnet is 10.0.0.0/24'). "
"Do not use conversational filler. Be the 'eyes and ears' on the ground."
)
response = await self.llm.acompletion(prompt=prompt, max_tokens=100, temperature=0)
summary = response.choices[0].message.content.strip()
summary = summary.strip('"').strip("'")
if ":" in summary[:15]:
summary = summary.split(":", 1)[1].strip()
return summary
except Exception as e:
logger.warning(f"Failed to generate sub-agent summary: {e}")
return None
async def _monitor_atomic_task(self, partial_res: dict, node_id: str):
"""Monitors a single atomic task until finished without making decisions."""
task_id = partial_res.get("task_id")
if not task_id: return partial_res
task_map = {node_id: task_id}
max_checks = 120 # 10 minutes max for an atomic task
for _ in range(max_checks):
# Non-blocking peek
res = await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=0, no_abort=True)
node_res = res.get(node_id, {})
if node_res.get("status") not in ["RUNNING", "TIMEOUT_PENDING"]:
# Finished! Collect final and return
return await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=2)
# Simple heartbeat for UI
if self.on_event:
await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": "*(Still executing...)*"})
await asyncio.sleep(5)
return {"error": "SubAgent timed out waiting for atomic task."}
def get_elapsed(self) -> int:
if not self.start_time:
return 0
end = self.end_time or time.time()
return int(end - self.start_time)