Newer
Older
cortex-hub / agent-node / src / agent_node / skills / terminal_backends.py
import os
import platform
import abc
import threading
import time

class BaseTerminal(abc.ABC):
    """Abstract Base Class for Terminal Backends."""
    
    @abc.abstractmethod
    def spawn(self, cwd=None, env=None):
        """Initializes the shell process in a PTY/ConPTY."""
        pass

    @abc.abstractmethod
    def read(self, size=4096) -> bytes:
        """Non-blocking read from the terminal."""
        pass

    @abc.abstractmethod
    def write(self, data: bytes):
        """Writes data (keystrokes) to the terminal."""
        pass

    @abc.abstractmethod
    def resize(self, cols: int, rows: int):
        """Resizes the terminal window."""
        pass

    @abc.abstractmethod
    def kill(self):
        """Terminates the shell process and cleans up resources."""
        pass

    @abc.abstractmethod
    def is_alive(self) -> bool:
        """Checks if the terminal process is still running."""
        pass


class PosixTerminal(BaseTerminal):
    """POSIX implementation using pty.fork() and standard fcntl/termios."""
    
    def __init__(self):
        self.fd = None
        self.pid = None

    def spawn(self, cwd=None, env=None):
        import pty
        self.pid, self.fd = pty.fork()
        
        if self.pid == 0:  # Child process
            # Environment prep
            if env:
                os.environ.update(env)
            os.environ["TERM"] = "xterm-256color"
            
            if cwd and os.path.exists(cwd):
                os.chdir(cwd)
            
            # Launch shell
            shell_path = "/bin/bash"
            if not os.path.exists(shell_path):
                shell_path = "/bin/sh"
            os.execv(shell_path, [shell_path, "--login"])
        
        # Parent process: Set non-blocking
        import fcntl
        fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
        fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

    def read(self, size=4096) -> bytes:
        if self.fd is None:
            return b""
        try:
            import select
            r, _, _ = select.select([self.fd], [], [], 0.05)
            if self.fd in r:
                return os.read(self.fd, size)
        except (OSError, EOFError):
            pass
        return b""

    def write(self, data: bytes):
        if self.fd is not None:
            os.write(self.fd, data)

    def resize(self, cols: int, rows: int):
        if self.fd is not None:
            import termios
            import struct
            import fcntl
            s = struct.pack('HHHH', rows, cols, 0, 0)
            fcntl.ioctl(self.fd, termios.TIOCSWINSZ, s)

    def kill(self):
        if self.fd is not None:
            try: os.close(self.fd)
            except: pass
            self.fd = None
        if self.pid is not None:
            try: os.kill(self.pid, 9)
            except: pass
            self.pid = None

    def is_alive(self) -> bool:
        if self.pid is None:
            return False
        try:
            # waitpid with WNOHANG to check if it's still running
            pid, status = os.waitpid(self.pid, os.WNOHANG)
            return pid == 0
        except OSError:
            return False


class WindowsTerminal(BaseTerminal):
    """Windows implementation using pywinpty (ConPTY)."""
    
    def __init__(self):
        self.pty = None

    def spawn(self, cwd=None, env=None):
        try:
            from winpty import PTY
        except ImportError:
            raise ImportError("pywinpty is required for Windows terminal support. Please install it with 'pip install pywinpty'.")
        
        # Default shell for Windows: CMD is much more stable for gRPC PTY streams
        shell_cmd = "cmd.exe"
        
        # M7: Force TERM=dumb to suppress complex ANSI sequences that clobber rendering
        if env is None: env = os.environ.copy()
        env["TERM"] = "dumb"
        
        self.pty = PTY(120, 30)
        self.pty.spawn(shell_cmd, cwd=cwd, env=None)

    def read(self, size=4096) -> bytes:
        if self.pty is None:
            return b""
        # pywinpty's read is usually non-blocking or can be polled
        try:
            # Newer pywinpty versions use 'blocking' instead of 'size'
            data = self.pty.read(blocking=False)
            return data.encode('utf-8') if isinstance(data, str) else data
        except EOFError:
            return b""

    def write(self, data: bytes):
        if self.pty is not None:
            import time
            # pywinpty expects strings for input
            text = data.decode('utf-8', errors='replace')
            # Chunk writes to prevent PyWinPTY/ConHost input buffer saturation drops on Windows
            # Conhost is highly sensitive to rapid buffer writes over 120 bytes.
            chunk_size = 32
            for i in range(0, len(text), chunk_size):
                self.pty.write(text[i:i+chunk_size])
                time.sleep(0.02)

    def resize(self, cols: int, rows: int):
        if self.pty is not None:
            self.pty.set_size(cols, rows)

    def kill(self):
        if self.pty is not None:
            # PTY object usually cleans up the process on deletion, 
            # but we can be explicit if the library supports it
            self.pty = None

    def is_alive(self) -> bool:
        if self.pty is None:
            return False
        # pywinpty's isalive method
        return self.pty.isalive()


def get_terminal_backend() -> BaseTerminal:
    """Factory function to return the correct terminal backend based on the platform."""
    if platform.system() == "Windows":
        return WindowsTerminal()
    else:
        return PosixTerminal()