diff --git a/agent-node/agent_node/skills/shell.py b/agent-node/agent_node/skills/shell.py index b1709eb..430eb61 100644 --- a/agent-node/agent_node/skills/shell.py +++ b/agent-node/agent_node/skills/shell.py @@ -13,12 +13,38 @@ """Admin Console Skill: Persistent stateful Bash via PTY.""" def __init__(self, sync_mgr=None): self.sync_mgr = sync_mgr - self.sessions = {} # session_id -> {fd, pid, thread} + self.sessions = {} # session_id -> {fd, pid, thread, last_activity, ...} self.lock = threading.Lock() + + # --- M7: Idle Session Reaper --- + # Automatically kills dormant bash processes to free up system resources. + self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper") + self.reaper_thread.start() + + def _session_reaper(self): + """Background thread that cleans up unused PTY sessions.""" + while True: + time.sleep(60) + with self.lock: + now = time.time() + for sid, sess in list(self.sessions.items()): + # Avoid reaping currently active tasks + if sess.get("active_task"): + continue + + # 10 minute idle timeout + if now - sess.get("last_activity", 0) > 600: + print(f" [๐Ÿš๐Ÿงน] Reaping idle shell session: {sid}") + try: + os.close(sess["fd"]) + os.kill(sess["pid"], 9) + except: pass + self.sessions.pop(sid, None) def _ensure_session(self, session_id, cwd, on_event): with self.lock: if session_id in self.sessions: + self.sessions[session_id]["last_activity"] = time.time() return self.sessions[session_id] print(f" [๐Ÿš] Initializing Persistent Shell Session: {session_id}") @@ -27,7 +53,6 @@ if pid == 0: # Child # Environment prep os.environ["TERM"] = "xterm-256color" - os.environ["PS1"] = "\\s-\\v\\$ " # Simple prompt for easier parsing maybe? No, let user have default. # Change to CWD if cwd and os.path.exists(cwd): @@ -41,6 +66,14 @@ fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + sess = { + "fd": fd, + "pid": pid, + "last_activity": time.time(), + "buffer": "", + "active_task": None + } + def reader(): while True: try: @@ -59,21 +92,15 @@ sess["buffer"] += decoded if marker in decoded: # Marker found! Extract exit code - # Format: ...marker [exit_code]\n try: parts = sess["buffer"].split(marker) - # The pure stdout is everything before the marker pure_stdout = parts[0] - # The exit code is right after the marker after_marker = parts[1].strip().split() exit_code = int(after_marker[0]) if after_marker else 0 sess["result"]["stdout"] = pure_stdout - sess["result"]["status"] = 1 if exit_code == 0 else 2 # Success=1 for Skill mgr + sess["result"]["status"] = 1 if exit_code == 0 else 2 sess["event"].set() - - # We don't want the marker itself to spam the UI stream - # So we only send the part before the marker decoded = pure_stdout except Exception as e: print(f" [๐Ÿšโš ๏ธ] Marker parsing failed: {e}") @@ -89,15 +116,19 @@ on_event(agent_pb2.ClientTaskMessage(skill_event=event)) except (EOFError, OSError): break + + # Thread Cleanup print(f" [๐Ÿš] Shell Session Terminated: {session_id}") with self.lock: self.sessions.pop(session_id, None) - t = threading.Thread(target=reader, daemon=True) + t = threading.Thread(target=reader, daemon=True, name=f"ShellReader-{session_id}") t.start() + sess["thread"] = t - self.sessions[session_id] = {"fd": fd, "pid": pid, "thread": t} - return self.sessions[session_id] + self.sessions[session_id] = sess + return sess + def handle_transparent_tty(self, task, on_complete, on_event=None): """Processes raw TTY/Resize events synchronously (bypasses threadpool/sandbox).""" diff --git a/ai-hub/app/core/pipelines/rag_pipeline.py b/ai-hub/app/core/pipelines/rag_pipeline.py index 6ab6a95..35e8243 100644 --- a/ai-hub/app/core/pipelines/rag_pipeline.py +++ b/ai-hub/app/core/pipelines/rag_pipeline.py @@ -165,32 +165,39 @@ # If no tools, this is the final answer for this forward pass. return - # Parallel dispatch logic for tools - processed_tool_calls = list(tool_calls_map.values()) - - # Reconstruct the tool call list and message object for the next turn - assistant_msg = { - "role": "assistant", - "content": accumulated_content or None, - "tool_calls": processed_tool_calls - } - if accumulated_reasoning: - assistant_msg["reasoning_content"] = accumulated_reasoning - - messages.append(assistant_msg) - - # Dispatch all tool calls simultaneously - tool_tasks = [] - for tc in processed_tool_calls: - func_name = tc.function.name - func_args = {} - try: - import json - func_args = json.loads(tc.function.arguments) - except: pass + # Parallel dispatch logic for tools + processed_tool_calls = list(tool_calls_map.values()) + + # Reconstruct the tool call list and message object for the next turn + assistant_msg = { + "role": "assistant", + "content": accumulated_content or None, + "tool_calls": processed_tool_calls + } + if accumulated_reasoning: + assistant_msg["reasoning_content"] = accumulated_reasoning + + messages.append(assistant_msg) + + # Dispatch all tool calls simultaneously + tool_tasks = [] + for tc in processed_tool_calls: + func_name = tc.function.name + func_args = {} + try: + import json + func_args = json.loads(tc.function.arguments) + except: pass - yield {"type": "status", "content": f"AI decided to use tool: {func_name}"} - logging.info(f"[๐Ÿ”ง] Agent calling tool (PARALLEL): {func_name} with {func_args}") + # --- M7 Parallel PTY Optimization --- + # If the tool is terminal control and no session is provided, + # use a unique session ID per SUBAGENT task to avoid PTY SERIALIZATION. + if func_name == "mesh_terminal_control" and "session_id" not in func_args: + func_args["session_id"] = f"subagent-{tc.id[:8]}" + + yield {"type": "status", "content": f"AI decided to use tool: {func_name}"} + logging.info(f"[๐Ÿ”ง] Agent calling tool (PARALLEL): {func_name} with {func_args}") + if tool_service: # Notify UI about tool execution start diff --git a/ai-hub/app/core/services/tool.py b/ai-hub/app/core/services/tool.py index a077117..7d18e70 100644 --- a/ai-hub/app/core/services/tool.py +++ b/ai-hub/app/core/services/tool.py @@ -86,6 +86,7 @@ assistant = orchestrator.assistant node_id = args.get("node_id") + session_id = args.get("session_id") # Explicit session if provided by AI if not node_id: return {"success": False, "error": "node_id is required"} @@ -100,7 +101,7 @@ cmd = args.get("command") timeout = int(args.get("timeout", 30)) task_fn = assistant.dispatch_single - task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout} + task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": session_id} elif skill.name == "browser_automation_agent": # Maps to TaskAssistant.dispatch_browser @@ -111,31 +112,34 @@ browser_action = agent_pb2.BrowserAction( action=action_type, url=args.get("url", ""), + session_id=session_id or "" # Bridge to Browser Session ) task_fn = assistant.dispatch_browser - task_args = {"node_id": node_id, "action": browser_action} + task_args = {"node_id": node_id, "action": browser_action, "session_id": session_id} elif skill.name == "mesh_file_explorer": # Maps to TaskAssistant.ls, cat, write, rm action = args.get("action") path = args.get("path") + explorer_sid = session_id or "__fs_explorer__" if action == "list": task_fn = assistant.ls - task_args = {"node_id": node_id, "path": path} + task_args = {"node_id": node_id, "path": path, "session_id": explorer_sid} elif action == "read": task_fn = assistant.cat - task_args = {"node_id": node_id, "path": path} + task_args = {"node_id": node_id, "path": path, "session_id": explorer_sid} elif action == "write": content = args.get("content", "").encode('utf-8') task_fn = assistant.write - task_args = {"node_id": node_id, "path": path, "content": content} + task_args = {"node_id": node_id, "path": path, "content": content, "session_id": explorer_sid} elif action == "delete": task_fn = assistant.rm - task_args = {"node_id": node_id, "path": path} + task_args = {"node_id": node_id, "path": path, "session_id": explorer_sid} else: return {"success": False, "error": f"Unsupported action: {action}"} + if task_fn: # Create and run the SubAgent sub_agent = SubAgent( diff --git a/ai-hub/app/core/skills/definitions.py b/ai-hub/app/core/skills/definitions.py index d7d642d..f1ecae4 100644 --- a/ai-hub/app/core/skills/definitions.py +++ b/ai-hub/app/core/skills/definitions.py @@ -17,7 +17,8 @@ "properties": { "command": {"type": "string", "description": "The shell command to execute."}, "node_id": {"type": "string", "description": "The target node ID within the mesh."}, - "timeout": {"type": "integer", "description": "The max seconds to wait for result. Default 30. Use for long-running tasks."} + "timeout": {"type": "integer", "description": "The max seconds to wait for result. Default 30. Use for long-running tasks."}, + "session_id": {"type": "string", "description": "Optional persistent session ID. Use same ID to preserve bash state (cd, env). Leave empty for parallel execution."} }, "required": ["command", "node_id"] } @@ -40,7 +41,8 @@ "properties": { "url": {"type": "string", "description": "The URL to navigate to."}, "action": {"type": "string", "enum": ["navigate", "click", "type", "screenshot"], "description": "The browser action to perform."}, - "node_id": {"type": "string", "description": "The target node ID."} + "node_id": {"type": "string", "description": "The target node ID."}, + "session_id": {"type": "string", "description": "Optional session ID to persist browser state (cookies, login)."} }, "required": ["url", "action", "node_id"] } @@ -82,11 +84,13 @@ "action": {"type": "string", "enum": ["list", "read", "write", "delete"], "description": "File system action."}, "path": {"type": "string", "description": "Relative path to the file/directory."}, "node_id": {"type": "string", "description": "The target node ID."}, - "content": {"type": "string", "description": "Optional content for write action."} + "content": {"type": "string", "description": "Optional content for write action."}, + "session_id": {"type": "string", "description": "Target sync session workspace."} }, "required": ["action", "path", "node_id"] } }, + "is_system": True } ]