Newer
Older
cortex-hub / agent-node / src / agent_node / skills / terminal_backends.py
import os
import platform
import abc
import threading
import time

class BaseTerminal(abc.ABC):
    """Abstract Base Class for Terminal Backends."""
    
    @abc.abstractmethod
    def spawn(self, cwd=None, env=None):
        """Initializes the shell process in a PTY/ConPTY."""
        pass

    @abc.abstractmethod
    def read(self, size=4096) -> bytes:
        """Non-blocking read from the terminal."""
        pass

    @abc.abstractmethod
    def write(self, data: bytes):
        """Writes data (keystrokes) to the terminal."""
        pass

    @abc.abstractmethod
    def resize(self, cols: int, rows: int):
        """Resizes the terminal window."""
        pass

    @abc.abstractmethod
    def kill(self):
        """Terminates the shell process and cleans up resources."""
        pass

    @abc.abstractmethod
    def is_alive(self) -> bool:
        """Checks if the terminal process is still running."""
        pass


class PosixTerminal(BaseTerminal):
    """POSIX implementation using pty.fork for a true PTY."""
    
    def __init__(self):
        self.pid = None
        self.fd = None

    def spawn(self, cwd=None, env=None):
        import pty
        import os
        import fcntl
        
        shell_path = "/bin/bash"
        if not os.path.exists(shell_path):
            shell_path = "/bin/sh"
            
        if env is None:
            env = os.environ.copy()
        if "TERM" not in env:
            env["TERM"] = "xterm-256color"
            
        pid, fd = pty.fork()
        if pid == 0:
            if cwd:
                try: os.chdir(cwd)
                except: pass
            os.execvpe(shell_path, [shell_path], env)
        else:
            self.pid = pid
            self.fd = fd
            
            fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
            fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

    def read(self, size=4096) -> bytes:
        if self.fd is None:
            return b""
        try:
            import select
            import os
            r, _, _ = select.select([self.fd], [], [], 0.05)
            if self.fd in r:
                return os.read(self.fd, size)
        except (OSError, EOFError):
            pass
        return b""

    def write(self, data: bytes):
        if self.fd is not None:
            try:
                import os
                os.write(self.fd, data)
            except OSError:
                pass

    def resize(self, cols: int, rows: int):
        if self.fd is not None:
            try:
                import fcntl
                import termios
                import struct
                winsize = struct.pack("HHHH", rows, cols, 0, 0)
                fcntl.ioctl(self.fd, termios.TIOCSWINSZ, winsize)
            except Exception:
                pass

    def kill(self):
        if self.pid:
            import os
            import signal
            try:
                os.kill(self.pid, signal.SIGKILL)
                os.waitpid(self.pid, 0)
            except Exception:
                pass
            self.pid = None
            self.fd = None

    def is_alive(self) -> bool:
        if self.pid is None:
            return False
        import os
        try:
            pid, status = os.waitpid(self.pid, os.WNOHANG)
            if pid == self.pid:
                return False
            return True
        except ChildProcessError:
            return False


class WindowsTerminal(BaseTerminal):
    """Windows implementation using pywinpty (ConPTY)."""
    
    def __init__(self):
        self.pty = None

    def spawn(self, cwd=None, env=None):
        try:
            from winpty import PTY
        except ImportError:
            raise ImportError("pywinpty is required for Windows terminal support. Please install it with 'pip install pywinpty'.")
        
        # Default shell for Windows: CMD is much more stable for gRPC PTY streams
        shell_cmd = "cmd.exe"
        
        # M7: Force TERM=dumb to suppress complex ANSI sequences that clobber rendering
        os.environ["TERM"] = "dumb"
        if env is not None:
            env["TERM"] = "dumb"
        
        # pywinpty expects env to be a string formatted as "KEY=VALUE\0...KEY=VALUE\0\0"
        if isinstance(env, dict):
            env_str = "".join(f"{k}={v}\0" for k, v in env.items()) + "\0"
        else:
            env_str = env
            
        self.pty = PTY(140, 40)
        try:
            self.pty.spawn(shell_cmd, cwd=cwd, env=env_str)
        except Exception as e:
            # Fallback for cwd issues on Windows
            if cwd:
                print(f"[!] Warning: Failed to spawn shell in {cwd} ({e}). Retrying in root...")
                self.pty.spawn(shell_cmd, cwd=None, env=env_str)
            else:
                raise e

    def read(self, size=4096) -> bytes:
        if self.pty is None:
            return b""
        # On Windows/winpty, read() can sometimes return empty immediately even if data is coming.
        # We use a small amount of blocking or a retry loop if needed.
        try:
            data = self.pty.read(blocking=False)
            if not data and os.name == 'nt':
                # Tiny sleep to allow winpty buffer to fill if it's lagging
                time.sleep(0.01)
                data = self.pty.read(blocking=False)
            
            return data.encode('utf-8') if isinstance(data, str) else data
        except Exception as e:
            print(f"[WindowsTerminal] Read error: {e}")
            return b""

    def write(self, data: bytes):
        if self.pty is not None:
            import time
            # pywinpty expects strings for input
            text = data.decode('utf-8', errors='replace')
            # Fix backspace key: Xterm sends DEL (\x7f), but Windows expects Backspace (\x08)
            text = text.replace('\x7f', '\x08')
            # Chunk writes to prevent PyWinPTY/ConHost input buffer saturation drops on Windows
            # Conhost is highly sensitive to rapid buffer writes over 120 bytes.
            chunk_size = 32
            for i in range(0, len(text), chunk_size):
                self.pty.write(text[i:i+chunk_size])
                time.sleep(0.02)

    def resize(self, cols: int, rows: int):
        if self.pty is not None:
            self.pty.set_size(cols, rows)

    def kill(self):
        if self.pty is not None:
            # PTY object usually cleans up the process on deletion, 
            # but we can be explicit if the library supports it
            self.pty = None

    def is_alive(self) -> bool:
        if self.pty is None:
            return False
        # pywinpty's isalive method
        return self.pty.isalive()


def get_terminal_backend() -> BaseTerminal:
    """Factory function to return the correct terminal backend based on the platform."""
    if platform.system() == "Windows":
        return WindowsTerminal()
    else:
        return PosixTerminal()