diff --git a/agent-node/src/agent_node/skills/shell_bridge.py b/agent-node/src/agent_node/skills/shell_bridge.py index d5cb278..13820f3 100644 --- a/agent-node/src/agent_node/skills/shell_bridge.py +++ b/agent-node/src/agent_node/skills/shell_bridge.py @@ -56,7 +56,8 @@ "last_activity": time.time(), "buffer_file": None, "tail_buffer": "", - "active_task": None + "active_task": None, + "write_lock": threading.Lock() } def reader(): @@ -131,14 +132,20 @@ sess["buffer_file"].close() sess["buffer_file"] = None - sess["event"].set() + + # Signal completion via safe lookup - avoid racing with finally block + finish_event = sess.get("event") + if finish_event: + finish_event.set() # Strip the protocol fences from the live UI stream to keep it clean (ANSI and Bracket) decoded = re.sub(r'\x1b]1337;Task(Start|End);id=.*?\x07', '', decoded) decoded = re.sub(r'\[\[1337;Task(Start|End);id=.*?\]\]', '', decoded) except Exception as e: print(f" [🐚⚠️] Protocol parsing failed: {e}") - sess["event"].set() + finish_event = sess.get("event") + if finish_event: + finish_event.set() # Stream terminal output back to UI if on_event: @@ -282,119 +289,123 @@ sess = self._ensure_session(session_id, cwd, on_event) - is_raw = cmd.startswith("!RAW:") - if is_raw: - # M7 Fix: Agentic tasks (starting with 'task-') MUST use framing - # to ensure results are captured. Forced bypass is only allowed for manual UI typing. - if tid.startswith("task-"): - cmd = cmd[5:] - is_raw = False - else: - input_str = cmd[5:] + "\n" - print(f" [🐚⌨️] RAW Input Injection: {input_str.strip()}") - sess["backend"].write(input_str.encode("utf-8")) - return on_complete(tid, {"stdout": "INJECTED", "status": 0}, task.trace_id) - - marker_id = int(time.time()) - marker = f"__CORTEX_FIN_SH_{marker_id}__" - event = threading.Event() - result_container = {"stdout": "", "status": 0} - - with self.lock: - sess["active_task"] = tid - sess["event"] = event - sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False) - sess["tail_buffer"] = "" - sess["result"] = result_container - sess["cancel_event"] = threading.Event() - - try: - # M7: Protocol-Aware Command Framing (OSC 1337) - # We wrap the command in non-printable control sequences. - # Format: ESC ] 1337 ; ST (\x07) - start_marker = f"1337;TaskStart;id={tid}" - end_marker = f"1337;TaskEnd;id={tid}" - - import platform - if platform.system() == "Windows": - # M7: EncodedCommand for Windows (Bypasses Quote Hell) - # This ensures byte-accurate delivery of ESC ([char]27) and BEL ([char]7) - import base64 - - # M8: Ultimate Windows Shell Boundary Method (File Spooling) - # Bypasses Conhost VTP Redraw byte swallowing caused by line wrapping in PTY - # Bypasses powershell encoded limits. - import os - import tempfile as tf - spool_dir = os.path.join(tf.gettempdir(), "cortex_pty_tasks") - os.makedirs(spool_dir, exist_ok=True) - task_path = os.path.join(spool_dir, f"{tid}.bat") - - # We write the logic to a native shell file so the PTY simply executes a short path - with open(task_path, "w", encoding="utf-8") as f: - f.write(f"@echo off\r\n") - f.write(f"echo [[1337;TaskStart;id={tid}]]\r\n") - f.write(f"{cmd}\r\n") - f.write(f"echo [[1337;TaskEnd;id={tid};exit=%errorlevel%]]\r\n") - # optionally clean up itself - f.write(f"del \"%~f0\"\r\n") - - full_input = f"\"{task_path}\"\r\n" - else: - # On Linux, we use echo -e with octal escapes - s_m = f"\\033]{start_marker}\\007" - e_m = f"\\033]{end_marker};exit=$__ctx_exit\\007" - full_input = f"echo -e -n \"{s_m}\"; {cmd}; __ctx_exit=$?; echo -e -n \"{e_m}\"\n" - - sess["backend"].write(full_input.encode("utf-8")) - - timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0 - start_time = time.time() - while time.time() - start_time < timeout: - if event.is_set(): - return on_complete(tid, result_container, task.trace_id) - if sess["cancel_event"].is_set(): - print(f" [🐚🛑] Task {tid} cancelled on node.") - return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id) - time.sleep(0.1) - - print(f" [🐚⚠️] Task {tid} timed out on node.") - with self.lock: - if sess.get("buffer_file"): - try: - sess["buffer_file"].seek(0, 2) - file_len = sess["buffer_file"].tell() - HEAD, TAIL = 10_000, 30_000 - if file_len > HEAD + TAIL: - sess["buffer_file"].seek(0) - head_str = sess["buffer_file"].read(HEAD) - sess["buffer_file"].seek(file_len - TAIL) - tail_str = sess["buffer_file"].read() - omitted = file_len - HEAD - TAIL - partial_out = head_str + f"\n\n[... {omitted:,} bytes omitted (full timeout output saved to {sess['buffer_file'].name}) ...]\n\n" + tail_str - else: - sess["buffer_file"].seek(0) - partial_out = sess["buffer_file"].read() - except: - partial_out = "" + with sess["write_lock"]: + is_raw = cmd.startswith("!RAW:") + if is_raw: + # M7 Fix: Agentic tasks (starting with 'task-') MUST use framing + # to ensure results are captured. Forced bypass is only allowed for manual UI typing. + if tid.startswith("task-"): + cmd = cmd[5:] + is_raw = False else: - partial_out = "" + input_str = cmd[5:] + "\n" + print(f" [🐚⌨️] RAW Input Injection: {input_str.strip()}") + sess["backend"].write(input_str.encode("utf-8")) + return on_complete(tid, {"stdout": "INJECTED", "status": 0}, task.trace_id) + + marker_id = int(time.time()) + marker = f"__CORTEX_FIN_SH_{marker_id}__" + event = threading.Event() + cancel_event = threading.Event() # Local snapshot for thread safety + result_container = {"stdout": "", "status": 0} - on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id) - - finally: with self.lock: - if sess.get("active_task") == tid: + sess["active_task"] = tid + sess["event"] = event + sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False) + sess["tail_buffer"] = "" + sess["result"] = result_container + sess["cancel_event"] = cancel_event + + try: + # M7: Protocol-Aware Command Framing (OSC 1337) + # We wrap the command in non-printable control sequences. + # Format: ESC ] 1337 ; ST (\x07) + start_marker = f"1337;TaskStart;id={tid}" + end_marker = f"1337;TaskEnd;id={tid}" + + import platform + if platform.system() == "Windows": + # M7: EncodedCommand for Windows (Bypasses Quote Hell) + # This ensures byte-accurate delivery of ESC ([char]27) and BEL ([char]7) + import base64 + + # M8: Ultimate Windows Shell Boundary Method (File Spooling) + # Bypasses Conhost VTP Redraw byte swallowing caused by line wrapping in PTY + # Bypasses powershell encoded limits. + import os + import tempfile as tf + spool_dir = os.path.join(tf.gettempdir(), "cortex_pty_tasks") + os.makedirs(spool_dir, exist_ok=True) + task_path = os.path.join(spool_dir, f"{tid}.bat") + + # We write the logic to a native shell file so the PTY simply executes a short path + with open(task_path, "w", encoding="utf-8") as f: + f.write(f"@echo off\r\n") + f.write(f"echo [[1337;TaskStart;id={tid}]]\r\n") + f.write(f"{cmd}\r\n") + f.write(f"echo [[1337;TaskEnd;id={tid};exit=%errorlevel%]]\r\n") + # optionally clean up itself + f.write(f"del \"%~f0\"\r\n") + + full_input = f"\"{task_path}\"\r\n" + else: + # On Linux, we use echo -e with octal escapes + s_m = f"\\033]{start_marker}\\007" + e_m = f"\\033]{end_marker};exit=$__ctx_exit\\007" + full_input = f"echo -e -n \"{s_m}\"; {cmd}; __ctx_exit=$?; echo -e -n \"{e_m}\"\n" + + sess["backend"].write(full_input.encode("utf-8")) + + timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0 + start_time = time.time() + while time.time() - start_time < timeout: + if event.is_set(): + return on_complete(tid, result_container, task.trace_id) + if cancel_event.is_set(): + print(f" [🐚🛑] Task {tid} cancelled on node.") + return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id) + time.sleep(0.1) + + print(f" [🐚⚠️] Task {tid} timed out on node.") + with self.lock: if sess.get("buffer_file"): try: - sess["buffer_file"].close() - except: pass - sess["buffer_file"] = None - sess["active_task"] = None - sess["marker"] = None - sess["event"] = None - sess["result"] = None - sess["cancel_event"] = None + sess["buffer_file"].seek(0, 2) + file_len = sess["buffer_file"].tell() + HEAD, TAIL = 10_000, 30_000 + if file_len > HEAD + TAIL: + sess["buffer_file"].seek(0) + head_str = sess["buffer_file"].read(HEAD) + sess["buffer_file"].seek(file_len - TAIL) + tail_str = sess["buffer_file"].read() + omitted = file_len - HEAD - TAIL + partial_out = head_str + f"\n\n[... {omitted:,} bytes omitted (full timeout output saved to {sess['buffer_file'].name}) ...]\n\n" + tail_str + else: + sess["buffer_file"].seek(0) + partial_out = sess["buffer_file"].read() + except: + partial_out = "" + else: + partial_out = "" + + on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id) + + finally: + with self.lock: + if sess.get("active_task") == tid: + if sess.get("buffer_file"): + try: + sess["buffer_file"].close() + except: pass + sess["buffer_file"] = None + sess["active_task"] = None + sess["marker"] = None + if sess.get("event") == event: + sess["event"] = None + sess["result"] = None + if sess.get("cancel_event") == cancel_event: + sess["cancel_event"] = None except Exception as e: print(f" [🐚❌] Execute Error for {tid}: {e}")