import os
import platform
import abc
import threading
import time
class BaseTerminal(abc.ABC):
"""Abstract Base Class for Terminal Backends."""
@abc.abstractmethod
def spawn(self, cwd=None, env=None):
"""Initializes the shell process in a PTY/ConPTY."""
pass
@abc.abstractmethod
def read(self, size=4096) -> bytes:
"""Non-blocking read from the terminal."""
pass
@abc.abstractmethod
def write(self, data: bytes):
"""Writes data (keystrokes) to the terminal."""
pass
@abc.abstractmethod
def resize(self, cols: int, rows: int):
"""Resizes the terminal window."""
pass
@abc.abstractmethod
def kill(self):
"""Terminates the shell process and cleans up resources."""
pass
@abc.abstractmethod
def is_alive(self) -> bool:
"""Checks if the terminal process is still running."""
pass
class PosixTerminal(BaseTerminal):
"""POSIX implementation using pty.fork for a true PTY."""
def __init__(self):
self.pid = None
self.fd = None
self.process = None
def spawn(self, cwd=None, env=None):
import os
import fcntl
import subprocess
shell_path = "/bin/bash"
if not os.path.exists(shell_path):
shell_path = "/bin/sh"
if env is None:
env = os.environ.copy()
if "TERM" not in env:
env["TERM"] = "xterm-256color"
master_fd, slave_fd = os.openpty()
self.process = subprocess.Popen(
[shell_path],
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
cwd=cwd,
env=env,
process_group=0,
close_fds=True
)
self.pid = self.process.pid
self.fd = master_fd
os.close(slave_fd)
fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def read(self, size=4096) -> bytes:
if self.fd is None:
return b""
try:
import select
import os
r, _, _ = select.select([self.fd], [], [], 0.05)
if self.fd in r:
return os.read(self.fd, size)
except (OSError, EOFError):
pass
return b""
def write(self, data: bytes):
if self.fd is not None:
try:
import os
os.write(self.fd, data)
except OSError:
pass
def resize(self, cols: int, rows: int):
if self.fd is not None:
try:
import fcntl
import termios
import struct
winsize = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(self.fd, termios.TIOCSWINSZ, winsize)
except Exception:
pass
def kill(self):
if self.process:
try:
self.process.kill()
self.process.wait(timeout=5)
except Exception:
pass
self.process = None
self.pid = None
self.fd = None
elif self.pid:
import os
import signal
try:
os.kill(self.pid, signal.SIGKILL)
os.waitpid(self.pid, 0)
except Exception:
pass
self.pid = None
self.fd = None
def is_alive(self) -> bool:
if self.process:
return self.process.poll() is None
if self.pid is None:
return False
import os
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
if pid == self.pid:
return False
return True
except ChildProcessError:
return False
class WindowsTerminal(BaseTerminal):
"""Windows implementation using pywinpty (ConPTY)."""
def __init__(self):
self.pty = None
def spawn(self, cwd=None, env=None):
try:
from winpty import PTY
except ImportError:
raise ImportError("pywinpty is required for Windows terminal support. Please install it with 'pip install pywinpty'.")
# Default shell for Windows: CMD is much more stable for gRPC PTY streams
shell_cmd = "cmd.exe"
# M7: Force TERM=dumb to suppress complex ANSI sequences that clobber rendering
os.environ["TERM"] = "dumb"
if env is not None:
env["TERM"] = "dumb"
# pywinpty expects env to be a string formatted as "KEY=VALUE\0...KEY=VALUE\0\0"
if isinstance(env, dict):
env_str = "".join(f"{k}={v}\0" for k, v in env.items()) + "\0"
else:
env_str = env
self.pty = PTY(140, 40)
try:
self.pty.spawn(shell_cmd, cwd=cwd, env=env_str)
except Exception as e:
# Fallback for cwd issues on Windows
if cwd:
print(f"[!] Warning: Failed to spawn shell in {cwd} ({e}). Retrying in root...")
self.pty.spawn(shell_cmd, cwd=None, env=env_str)
else:
raise e
def read(self, size=4096) -> bytes:
if self.pty is None:
return b""
# On Windows/winpty, read() can sometimes return empty immediately even if data is coming.
# We use a small amount of blocking or a retry loop if needed.
try:
data = self.pty.read(blocking=False)
if not data and os.name == 'nt':
# Tiny sleep to allow winpty buffer to fill if it's lagging
time.sleep(0.01)
data = self.pty.read(blocking=False)
return data.encode('utf-8') if isinstance(data, str) else data
except Exception as e:
print(f"[WindowsTerminal] Read error: {e}")
return b""
def write(self, data: bytes):
if self.pty is not None:
import time
# pywinpty expects strings for input
text = data.decode('utf-8', errors='replace')
# Fix backspace key: Xterm sends DEL (\x7f), but Windows expects Backspace (\x08)
text = text.replace('\x7f', '\x08')
# Chunk writes to prevent PyWinPTY/ConHost input buffer saturation drops on Windows
# Conhost is highly sensitive to rapid buffer writes over 120 bytes.
chunk_size = 32
for i in range(0, len(text), chunk_size):
self.pty.write(text[i:i+chunk_size])
time.sleep(0.02)
def resize(self, cols: int, rows: int):
if self.pty is not None:
self.pty.set_size(cols, rows)
def kill(self):
if self.pty is not None:
# PTY object usually cleans up the process on deletion,
# but we can be explicit if the library supports it
self.pty = None
def is_alive(self) -> bool:
if self.pty is None:
return False
# pywinpty's isalive method
return self.pty.isalive()
def get_terminal_backend() -> BaseTerminal:
"""Factory function to return the correct terminal backend based on the platform."""
if platform.system() == "Windows":
return WindowsTerminal()
else:
return PosixTerminal()