import asyncio
import logging
import json
import sys
from typing import List, Dict, Any, AsyncGenerator, Optional
class ToolExecutor:
"""Handles parallel tool dispatching and event drainage."""
def __init__(self, tool_service: Any, user_id: str, db: Any, sync_workspace_id: str, session_db_id: int, provider_name: Optional[str] = None):
self.tool_service = tool_service
self.user_id = user_id
self.db = db
self.sync_workspace_id = sync_workspace_id
self.session_db_id = session_db_id
self.provider_name = provider_name
self.event_queue = asyncio.Queue()
async def _subagent_event_handler(self, event):
await self.event_queue.put(event)
async def run_tools(self, tool_calls: List[Any], safety_guard: Any, mesh_bridge: Any) -> AsyncGenerator[Dict[str, Any], None]:
"""Dispatches and monitors tools until all are complete or cancelled."""
tool_tasks = []
for tc in tool_calls:
# Safe access for both objects and dicts
tc_func = tc.get("function", {}) if isinstance(tc, dict) else getattr(tc, "function", None)
name = tc_func.get("name") if isinstance(tc_func, dict) else getattr(tc_func, "name", "")
args = self._parse_args(tc)
async for ev in self._prepare_and_start_tool(tc, name, args):
yield ev
task = asyncio.create_task(self.tool_service.call_tool(
name, args, db=self.db, user_id=self.user_id, session_id=self.sync_workspace_id,
session_db_id=self.session_db_id, on_event=self._subagent_event_handler, provider_name=self.provider_name
))
tool_tasks.append((tc, task))
# --- Wait & Monitor loop ---
_cycles = 0
while not all(t[1].done() for t in tool_tasks):
async for ev in self._drain_events(mesh_bridge): yield ev
if safety_guard.check_cancellation():
yield {"type": "status", "content": "Cancellation requested. Interrupting..."}
return
await asyncio.sleep(0.1)
_cycles += 1
if _cycles % 100 == 0: yield {"type": "status", "content": "Processing task..."}
# --- Finalize Results ---
async for ev in self._finalize_tool_results(tool_tasks):
yield ev
async def _prepare_and_start_tool(self, tc, name, args):
"""Yields visual UI details and initialization markers for a tool call."""
lines = [f"🔧 **Tool Call: `{name}`**"]
if args.get("command"): lines.append(f"- Command: `{args['command']}`")
if args.get("node_id"): lines.append(f"- Node: `{args['node_id']}`")
if args.get("node_ids"): lines.append(f"- Nodes: `{', '.join(args['node_ids'])}`")
yield {"type": "reasoning", "content": "\n" + "\n".join(lines) + "\n"}
yield {"type": "tool_start", "name": name, "args": args}
async def _drain_events(self, mesh_bridge):
"""Drains subagent thought events and mesh observations for UI feedback."""
while not self.event_queue.empty():
ev = await self.event_queue.get()
if ev["type"] == "subagent_thought":
yield {"type": "reasoning", "content": f"\n\n> **🧠 Sub-Agent [{ev.get('node_id', 'Swarm').capitalize()}]:** {ev.get('content')}\n"}
if mesh_bridge:
while not mesh_bridge.empty():
try:
ev = mesh_bridge.get_nowait()
e_type = ev.get("event")
data = ev.get("data", {})
if e_type == "mesh_observation":
yield {"type": "reasoning", "content": f"\n\n> **📡 Mesh Observation:** {data.get('message', 'Unspecified drift observed.')}\n\n"}
elif e_type == "skill_event":
# Relay terminal output and prompts to the thought trace
s_type = data.get("type")
if s_type == "output":
# Use status type to update the 'terminal' without clobbering history if needed,
# but for reasoning, we use a code block style.
clean_out = str(data.get("data", ""))
if clean_out.strip():
yield {"type": "reasoning", "content": clean_out}
elif s_type == "prompt":
yield {"type": "reasoning", "content": f"\n\n> **❓ Interaction Required:** `{data.get('data')}`\n\n"}
except: break
async def _finalize_tool_results(self, tool_tasks):
"""Awaits all tool tasks and yields results formatted for AI history and UI."""
for tc, task in tool_tasks:
tc_func = tc.get("function", {}) if isinstance(tc, dict) else getattr(tc, "function", None)
name = tc_func.get("name") if isinstance(tc_func, dict) else getattr(tc_func, "name", "")
try:
result = await task
except Exception as e:
result = {"success": False, "error": f"Tool crashed: {str(e)}"}
if result and (not isinstance(result, dict) or result.get("success") is False):
err = result.get("error") if isinstance(result, dict) else "Unknown failure"
yield {"type": "reasoning", "content": f"\n> **❌ Tool Error [{name}]:** {err}\n"}
yield {"type": "tool_result", "name": name, "result": result}
tc_id = tc.get("id") if isinstance(tc, dict) else getattr(tc, "id", None)
yield {"role": "tool", "tool_call_id": tc_id, "name": name, "content": self._truncate_result(result)}
def _parse_args(self, tc) -> Dict[str, Any]:
try:
tc_func = tc.get("function", {}) if isinstance(tc, dict) else getattr(tc, "function", None)
raw_args = tc_func.get("arguments", "{}") if isinstance(tc_func, dict) else getattr(tc_func, "arguments", "{}")
args = json.loads(raw_args)
except:
args = {}
# Parallel PTY Optimization preserved
name = tc_func.get("name") if isinstance(tc_func, dict) else getattr(tc_func, "name", "")
tc_id = tc.get("id") if isinstance(tc, dict) else getattr(tc, "id", "")
if name == "mesh_terminal_control" and "session_id" not in args:
args["session_id"] = f"subagent-{tc_id[:8]}"
return args
def _truncate_result(self, result: Any) -> str:
s = json.dumps(result) if isinstance(result, dict) else str(result)
limit = 128000 # Increased for better RAG/context
if len(s) > limit:
return s[:limit] + f"\n...[SYSTEM: Output Truncated at {limit} chars for safety. Use specific filters or file explorers if more detail is needed.]"
return s