Newer
Older
cortex-hub / agent-node / src / agent_node / skills / terminal_backends.py
import os
import platform
import abc
import threading
import time

class BaseTerminal(abc.ABC):
    """Abstract Base Class for Terminal Backends."""
    
    @abc.abstractmethod
    def spawn(self, cwd=None, env=None):
        """Initializes the shell process in a PTY/ConPTY."""
        pass

    @abc.abstractmethod
    def read(self, size=4096) -> bytes:
        """Non-blocking read from the terminal."""
        pass

    @abc.abstractmethod
    def write(self, data: bytes):
        """Writes data (keystrokes) to the terminal."""
        pass

    @abc.abstractmethod
    def resize(self, cols: int, rows: int):
        """Resizes the terminal window."""
        pass

    @abc.abstractmethod
    def kill(self):
        """Terminates the shell process and cleans up resources."""
        pass

    @abc.abstractmethod
    def is_alive(self) -> bool:
        """Checks if the terminal process is still running."""
        pass


class PosixTerminal(BaseTerminal):
    """POSIX implementation using subprocess.Popen for stability."""
    
    def __init__(self):
        self.proc = None
        self.fd = None

    def spawn(self, cwd=None, env=None):
        import subprocess
        import os
        
        shell_path = "/bin/bash"
        if not os.path.exists(shell_path):
            shell_path = "/bin/sh"
            
        self.proc = subprocess.Popen(
            [shell_path],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            cwd=cwd,
            env=env,
            bufsize=0
        )
        self.fd = self.proc.stdout.fileno()
        
        import fcntl
        fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
        fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

    def read(self, size=4096) -> bytes:
        if self.fd is None:
            return b""
        try:
            import select
            r, _, _ = select.select([self.fd], [], [], 0.05)
            if self.fd in r:
                return os.read(self.fd, size)
        except (OSError, EOFError):
            pass
        return b""

    def write(self, data: bytes):
        if self.proc and self.proc.stdin:
            try:
                self.proc.stdin.write(data)
                self.proc.stdin.flush()
            except BrokenPipeError:
                pass

    def resize(self, cols: int, rows: int):
        pass

    def kill(self):
        if self.proc:
            try:
                self.proc.kill()
                self.proc.wait(timeout=1)
            except:
                pass
            self.proc = None
            self.fd = None

    def is_alive(self) -> bool:
        if self.proc is None:
            return False
        return self.proc.poll() is None


class WindowsTerminal(BaseTerminal):
    """Windows implementation using pywinpty (ConPTY)."""
    
    def __init__(self):
        self.pty = None

    def spawn(self, cwd=None, env=None):
        try:
            from winpty import PTY
        except ImportError:
            raise ImportError("pywinpty is required for Windows terminal support. Please install it with 'pip install pywinpty'.")
        
        # Default shell for Windows: CMD is much more stable for gRPC PTY streams
        shell_cmd = "cmd.exe"
        
        # M7: Force TERM=dumb to suppress complex ANSI sequences that clobber rendering
        os.environ["TERM"] = "dumb"
        
        self.pty = PTY(140, 40)
        try:
            self.pty.spawn(shell_cmd, cwd=cwd, env=env)
        except Exception as e:
            # Fallback for cwd issues on Windows
            if cwd:
                print(f"[!] Warning: Failed to spawn shell in {cwd} ({e}). Retrying in root...")
                self.pty.spawn(shell_cmd, cwd=None, env=env)
            else:
                raise e

    def read(self, size=4096) -> bytes:
        if self.pty is None:
            return b""
        # On Windows/winpty, read() can sometimes return empty immediately even if data is coming.
        # We use a small amount of blocking or a retry loop if needed.
        try:
            data = self.pty.read(blocking=False)
            if not data and os.name == 'nt':
                # Tiny sleep to allow winpty buffer to fill if it's lagging
                time.sleep(0.01)
                data = self.pty.read(blocking=False)
            
            return data.encode('utf-8') if isinstance(data, str) else data
        except Exception as e:
            print(f"[WindowsTerminal] Read error: {e}")
            return b""

    def write(self, data: bytes):
        if self.pty is not None:
            import time
            # pywinpty expects strings for input
            text = data.decode('utf-8', errors='replace')
            # Chunk writes to prevent PyWinPTY/ConHost input buffer saturation drops on Windows
            # Conhost is highly sensitive to rapid buffer writes over 120 bytes.
            chunk_size = 32
            for i in range(0, len(text), chunk_size):
                self.pty.write(text[i:i+chunk_size])
                time.sleep(0.02)

    def resize(self, cols: int, rows: int):
        if self.pty is not None:
            self.pty.set_size(cols, rows)

    def kill(self):
        if self.pty is not None:
            # PTY object usually cleans up the process on deletion, 
            # but we can be explicit if the library supports it
            self.pty = None

    def is_alive(self) -> bool:
        if self.pty is None:
            return False
        # pywinpty's isalive method
        return self.pty.isalive()


def get_terminal_backend() -> BaseTerminal:
    """Factory function to return the correct terminal backend based on the platform."""
    if platform.system() == "Windows":
        return WindowsTerminal()
    else:
        return PosixTerminal()