diff --git a/agent-node/agent_node/skills/shell.py b/agent-node/agent_node/skills/shell.py index d267657..b1709eb 100644 --- a/agent-node/agent_node/skills/shell.py +++ b/agent-node/agent_node/skills/shell.py @@ -164,6 +164,12 @@ # Handle Session Persistent Process sess = self._ensure_session(session_id, cwd, on_event) + # --- 0. Busy Check: Serialize access to the PTY --- + with self.lock: + if sess.get("active_task"): + curr_tid = sess.get("active_task") + return on_complete(tid, {"stderr": f"[BUSY] Session {session_id} is already running task {curr_tid}", "status": 2}, task.trace_id) + # --- Blocking Wait Logic --- marker = f"__CORTEX_FIN_SH_{int(time.time())}__" event = threading.Event() @@ -176,31 +182,43 @@ sess["event"] = event sess["buffer"] = "" sess["result"] = result_container + sess["cancel_event"] = threading.Event() # Input injection: execute command then echo marker and exit code - print(f" [🐚] Executing (Blocking): {cmd}") - # We use a trick: execute command, then echo marker and return code. - # We use ';' to chain even if first fails, unless it's a structural error. - # 12-factor bash: ( cmd ) ; echo marker $? - full_input = f"({cmd}) ; echo \"{marker} $?\"\n" - os.write(sess["fd"], full_input.encode("utf-8")) + try: + # 12-factor bash: ( cmd ) ; echo marker $? + full_input = f"({cmd}) ; echo \"{marker} $?\"\n" + os.write(sess["fd"], full_input.encode("utf-8")) - # Wait for completion (triggered by reader) - # Use a slightly longer timeout than the Hub's limit to avoid race, - # though the Hub will cancel us if it gets tired first. - timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0 - if event.wait(timeout): - # Success! reader found the marker - on_complete(tid, result_container, task.trace_id) - else: - # Timeout on node side + # Wait for completion (triggered by reader) OR cancellation + timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0 + start_time = time.time() + while time.time() - start_time < timeout: + # Check for completion (reader found marker) + if event.is_set(): + return on_complete(tid, result_container, task.trace_id) + + # Check for cancellation (HUB sent cancel) + if sess["cancel_event"].is_set(): + print(f" [🐚🛑] Task {tid} cancelled on node.") + return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id) + + # Sleep slightly to avoid busy loop + time.sleep(0.1) + + # Timeout Case print(f" [🐚⚠️] Task {tid} timed out on node.") on_complete(tid, {"stdout": sess["buffer"], "stderr": "TIMEOUT", "status": 2}, task.trace_id) - - # Cleanup session task state - with self.lock: - if sess.get("active_task") == tid: - sess["active_task"] = None + + finally: + # Cleanup session task state + with self.lock: + if sess.get("active_task") == tid: + sess["active_task"] = None + sess["marker"] = None + sess["event"] = None + sess["result"] = None + sess["cancel_event"] = None except Exception as e: print(f" [🐚❌] Execute Error for {tid}: {e}") @@ -208,17 +226,18 @@ def cancel(self, task_id: str): """Cancels an active task — for persistent shell, this sends a SIGINT (Ctrl+C).""" - # Note: We need a mapping from task_id to session_id to do this properly. - # For now, let's assume we can broadcast a SIGINT to all shells if specific task is unknown. - # Or better: track task-to-session mapping in the manager. - # For Phase 3, we'll try to find the session. with self.lock: for sid, sess in self.sessions.items(): - print(f"[🛑] Sending SIGINT (Ctrl+C) to shell session: {sid}") - # Write \x03 (Ctrl+C) to the master FD - os.write(sess["fd"], b"\x03") + if sess.get("active_task") == task_id: + print(f"[🛑] Sending SIGINT (Ctrl+C) to shell session (Task {task_id}): {sid}") + # Write \x03 (Ctrl+C) to the master FD + os.write(sess["fd"], b"\x03") + # Break the wait loop in execute thread + if sess.get("cancel_event"): + sess["cancel_event"].set() return True + def shutdown(self): """Cleanup: Terminates all persistent shells.""" with self.lock: diff --git a/ai-hub/app/core/services/sub_agent.py b/ai-hub/app/core/services/sub_agent.py index 8343e2f..7445069 100644 --- a/ai-hub/app/core/services/sub_agent.py +++ b/ai-hub/app/core/services/sub_agent.py @@ -35,14 +35,16 @@ if isinstance(self.result, dict) and self.result.get("error"): err_msg = str(self.result.get("error")).lower() # Only retry on potentially transient network/node issues - if any(x in err_msg for x in ["timeout", "offline", "disconnected", "capacity", "rejected"]): + is_busy = "busy" in err_msg + if is_busy or any(x in err_msg for x in ["timeout", "offline", "disconnected", "capacity", "rejected"]): if attempt < self.retries: - backoff = (attempt + 1) * 3 # Incremental backoff: 3s, 6s - self.status = f"RETRYING ({attempt+1}/{self.retries}) - {err_msg}" + backoff = (attempt + 1) * 5 if is_busy else (attempt + 1) * 3 + self.status = f"RETRYING ({attempt+1}/{self.retries}) - {'Session Busy' if is_busy else err_msg}" logger.info(f"[SubAgent] {self.name} retrying due to: {err_msg}. Backoff={backoff}s") await asyncio.sleep(backoff) continue + self.status = "COMPLETED" break