diff --git a/ai-hub/app/core/pipelines/rag_pipeline.py b/ai-hub/app/core/pipelines/rag_pipeline.py index 445adad..6ab6a95 100644 --- a/ai-hub/app/core/pipelines/rag_pipeline.py +++ b/ai-hub/app/core/pipelines/rag_pipeline.py @@ -100,6 +100,9 @@ {"role": "user", "content": question} ] + import asyncio + import time + # 2. Agentic Tool Loop (Max 5 turns to prevent infinite loops) for turn in range(5): request_kwargs = {"stream": True} @@ -118,7 +121,6 @@ msg_lens.append(len(content)) total_chars = sum(msg_lens) - tool_count = len(tools) if tools else 0 logging.info(f"[RagPipeline] Turn {turn+1} starting (STREAMING). Model: {model}, Messages: {len(messages)}, Total Chars: {total_chars}") @@ -161,15 +163,12 @@ # Process completed turn if not tool_calls_map: # If no tools, this is the final answer for this forward pass. - # However, we might have accumulated content/reasoning. - # We stop the loop and return. return - # Reconstruct the tool call list and message object for the next turn + # Parallel dispatch logic for tools processed_tool_calls = list(tool_calls_map.values()) - # Format as an OpenAI-style message for the history - # But await completion returns thin models, we need a serializable dict/object + # Reconstruct the tool call list and message object for the next turn assistant_msg = { "role": "assistant", "content": accumulated_content or None, @@ -180,6 +179,8 @@ messages.append(assistant_msg) + # Dispatch all tool calls simultaneously + tool_tasks = [] for tc in processed_tool_calls: func_name = tc.function.name func_args = {} @@ -189,16 +190,34 @@ except: pass yield {"type": "status", "content": f"AI decided to use tool: {func_name}"} - logging.info(f"[🔧] Agent calling tool: {func_name} with {func_args}") + logging.info(f"[🔧] Agent calling tool (PARALLEL): {func_name} with {func_args}") if tool_service: # Notify UI about tool execution start yield {"type": "tool_start", "name": func_name, "args": func_args} - result = await tool_service.call_tool(func_name, func_args, db=db, user_id=user_id) + # Create an async task for each tool call + tool_tasks.append(asyncio.create_task( + tool_service.call_tool(func_name, func_args, db=db, user_id=user_id) + )) else: - result = {"success": False, "error": "Tool service not available"} + # Treat as failure immediately if no service + tool_tasks.append(asyncio.sleep(0, result={"success": False, "error": "Tool service not available"})) + # 3. HEARTBEAT WAIT: Wait for all sub-agent tasks to fulfill + wait_start = time.time() + while not all(t.done() for t in tool_tasks): + elapsed = int(time.time() - wait_start) + # This status fulfills the requirement: "internal wait seconds (showing this wait seconds in chat)" + yield {"type": "status", "content": f"Waiting for nodes result... ({elapsed}s)"} + await asyncio.sleep(1) + + # 4. Collect results and populate history turn + for i, task in enumerate(tool_tasks): + tc = processed_tool_calls[i] + func_name = tc.function.name + result = await task + # Stream the result back so UI can see "behind the scenes" yield {"type": "tool_result", "name": func_name, "result": result} @@ -212,6 +231,7 @@ yield {"type": "error", "content": "Agent loop reached maximum turns without a final response."} + def _build_prompt(self, context, history, question): return f"""Generate a natural and context-aware answer to the user's question using the provided knowledge and conversation history. diff --git a/ai-hub/app/core/services/sub_agent.py b/ai-hub/app/core/services/sub_agent.py new file mode 100644 index 0000000..1abbf43 --- /dev/null +++ b/ai-hub/app/core/services/sub_agent.py @@ -0,0 +1,63 @@ +import asyncio +import time +import logging + +logger = logging.getLogger(__name__) + +class SubAgent: + """ + A stateful watcher for a specific task on an agent node. + Handles execution, result accumulation, and state monitoring. + """ + def __init__(self, name: str, task_fn, args: dict, retries: int = 1): + self.name = name + self.task_fn = task_fn + self.args = args + self.retries = retries + self.status = "PENDING" + self.result = None + self.start_time = None + self.end_time = None + self.error = None + self.task_id = None + + async def run(self): + self.start_time = time.time() + self.status = "RUNNING" + + for attempt in range(self.retries + 1): + try: + # Execute the blocking assistant method (which uses TaskJournal/Event) + # in a worker thread to keep the async loop free. + self.result = await asyncio.to_thread(self.task_fn, **self.args) + + # Basic error detection for retries (e.g. Node Offline or Timeout) + if isinstance(self.result, dict) and self.result.get("error"): + err_msg = str(self.result.get("error")).lower() + # Only retry on potentially transient network/node issues + if any(x in err_msg for x in ["timeout", "offline", "disconnected"]): + if attempt < self.retries: + self.status = f"RETRYING ({attempt+1}/{self.retries})" + logger.info(f"[SubAgent] {self.name} retrying due to: {err_msg}") + await asyncio.sleep(2) + continue + + self.status = "COMPLETED" + break + except Exception as e: + logger.error(f"SubAgent {self.name} execution error: {e}") + self.error = str(e) + if attempt < self.retries: + self.status = f"ERROR_RETRYING ({attempt+1}/{self.retries})" + await asyncio.sleep(2) + else: + self.status = "FAILED" + + self.end_time = time.time() + return self.result + + def get_elapsed(self) -> int: + if not self.start_time: + return 0 + end = self.end_time or time.time() + return int(end - self.start_time) diff --git a/ai-hub/app/core/services/tool.py b/ai-hub/app/core/services/tool.py index 77d9ba8..a077117 100644 --- a/ai-hub/app/core/services/tool.py +++ b/ai-hub/app/core/services/tool.py @@ -77,7 +77,9 @@ return {"success": False, "error": "Tool not found"} async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any]) -> Any: - """Routes system skill execution to the appropriate internal service.""" + """Routes system skill execution to a stateful SubAgent.""" + from app.core.services.sub_agent import SubAgent + orchestrator = getattr(self._services, "orchestrator", None) if not orchestrator: return {"success": False, "error": "Orchestrator not available"} @@ -88,26 +90,17 @@ if not node_id: return {"success": False, "error": "node_id is required"} + # Define the task function and arguments for the SubAgent + task_fn = None + task_args = {} + try: if skill.name == "mesh_terminal_control": # Maps to TaskAssistant.dispatch_single cmd = args.get("command") timeout = int(args.get("timeout", 30)) - res = assistant.dispatch_single(node_id, cmd, timeout=timeout) - - # IMPORTANT: Shell commands run in a persistent PTY. - # The primary output is streamed live through the MultiNodeConsole. - # To fulfill the requirement that AI waits for the result, we now - # return the actual stdout/stderr to the AI context. - if isinstance(res, dict) and "error" in res: - return {"success": False, "error": res["error"]} - - return { - "success": True, - "stdout": res.get("stdout", ""), - "stderr": res.get("stderr", ""), - "status": res.get("status", "UNKNOWN") - } + task_fn = assistant.dispatch_single + task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout} elif skill.name == "browser_automation_agent": # Maps to TaskAssistant.dispatch_browser @@ -119,8 +112,8 @@ action=action_type, url=args.get("url", ""), ) - res = assistant.dispatch_browser(node_id, browser_action) - return {"success": True, "output": res} + task_fn = assistant.dispatch_browser + task_args = {"node_id": node_id, "action": browser_action} elif skill.name == "mesh_file_explorer": # Maps to TaskAssistant.ls, cat, write, rm @@ -128,38 +121,62 @@ path = args.get("path") if action == "list": - res = assistant.ls(node_id, path) - if isinstance(res, dict) and "files" in res: - # Format for readability and token efficiency - formatted = f"Directory listing for '{res.get('path', path)}' on node {node_id}:\n" - files = res.get("files") - if not files: # Handles None or empty list - formatted += "(Empty directory or failed to list files)" - else: - # Sort: directories first, then alphabetically - files.sort(key=lambda x: (not x.get("is_dir"), x.get("name", "").lower())) - limit = 100 - for f in files[:limit]: - icon = "📁" if f.get("is_dir") else "📄" - size_str = f" ({f.get('size')} bytes)" if not f.get("is_dir") else "" - formatted += f"{icon} {f.get('name')}{size_str}\n" - if len(files) > limit: - formatted += f"... and {len(files) - limit} more items." - res = formatted + task_fn = assistant.ls + task_args = {"node_id": node_id, "path": path} elif action == "read": - res = assistant.cat(node_id, path) + task_fn = assistant.cat + task_args = {"node_id": node_id, "path": path} elif action == "write": content = args.get("content", "").encode('utf-8') - res = assistant.write(node_id, path, content) + task_fn = assistant.write + task_args = {"node_id": node_id, "path": path, "content": content} elif action == "delete": - res = assistant.rm(node_id, path) + task_fn = assistant.rm + task_args = {"node_id": node_id, "path": path} else: return {"success": False, "error": f"Unsupported action: {action}"} + + if task_fn: + # Create and run the SubAgent + sub_agent = SubAgent( + name=f"{skill.name}_{node_id}", + task_fn=task_fn, + args=task_args, + retries=2 # Allow 2 retries for transient node issues + ) + res = await sub_agent.run() - return {"success": True, "output": res} + # Post-process specific results to be more AI-friendly + if skill.name == "mesh_file_explorer" and args.get("action") == "list": + if isinstance(res, dict) and "files" in res: + res = self._format_ls_result(res, node_id, path) + + # Standardize output for AI + if isinstance(res, dict) and "error" in res: + return {"success": False, "error": res["error"], "sub_agent_status": sub_agent.status} + + return {"success": True, "output": res, "sub_agent_status": sub_agent.status} except Exception as e: logger.exception(f"System skill execution failed: {e}") return {"success": False, "error": str(e)} return {"success": False, "error": "Skill execution logic not found"} + + def _format_ls_result(self, res: dict, node_id: str, path: str) -> str: + """Formats raw directory listing for LLM consumption.""" + formatted = f"Directory listing for '{res.get('path', path)}' on node {node_id}:\n" + files = res.get("files") + if not files: + formatted += "(Empty directory or failed to list files)" + else: + files.sort(key=lambda x: (not x.get("is_dir"), x.get("name", "").lower())) + limit = 100 + for f in files[:limit]: + icon = "📁" if f.get("is_dir") else "📄" + size_str = f" ({f.get('size')} bytes)" if not f.get("is_dir") else "" + formatted += f"{icon} {f.get('name')}{size_str}\n" + if len(files) > limit: + formatted += f"... and {len(files) - limit} more items." + return formatted +