import os
import pty
import select
import threading
import termios
import struct
import fcntl
from .base import BaseSkill
from protos import agent_pb2
class ShellSkill(BaseSkill):
"""Admin Console Skill: Persistent stateful Bash via PTY."""
def __init__(self, sync_mgr=None):
self.sync_mgr = sync_mgr
self.sessions = {} # session_id -> {fd, pid, thread}
self.lock = threading.Lock()
def _ensure_session(self, session_id, cwd, on_event):
with self.lock:
if session_id in self.sessions:
return self.sessions[session_id]
print(f" [🐚] Initializing Persistent Shell Session: {session_id}")
# Spawn bash in a pty
pid, fd = pty.fork()
if pid == 0: # Child
# Environment prep
os.environ["TERM"] = "xterm-256color"
os.environ["PS1"] = "\\s-\\v\\$ " # Simple prompt for easier parsing maybe? No, let user have default.
# Change to CWD
if cwd and os.path.exists(cwd):
os.chdir(cwd)
# Launch shell
os.execv("/bin/bash", ["/bin/bash", "--login"])
# Parent
# Set non-blocking
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def reader():
while True:
try:
r, _, _ = select.select([fd], [], [], 0.1)
if fd in r:
data = os.read(fd, 4096)
if not data: break
# Stream raw terminal output back
if on_event:
event = agent_pb2.SkillEvent(
session_id=session_id,
terminal_out=data.decode("utf-8", errors="replace")
)
on_event(agent_pb2.ClientTaskMessage(skill_event=event))
except (EOFError, OSError):
break
print(f" [🐚] Shell Session Terminated: {session_id}")
with self.lock:
self.sessions.pop(session_id, None)
t = threading.Thread(target=reader, daemon=True)
t.start()
self.sessions[session_id] = {"fd": fd, "pid": pid, "thread": t}
return self.sessions[session_id]
def handle_transparent_tty(self, task, on_complete, on_event=None):
"""Processes raw TTY/Resize events synchronously (bypasses threadpool/sandbox)."""
cmd = task.payload_json
session_id = task.session_id or "default-session"
try:
import json
if cmd.startswith('{') and cmd.endswith('}'):
raw_payload = json.loads(cmd)
# 1. Raw Keystroke forward
if isinstance(raw_payload, dict) and "tty" in raw_payload:
raw_bytes = raw_payload["tty"]
sess = self._ensure_session(session_id, None, on_event)
os.write(sess["fd"], raw_bytes.encode("utf-8"))
on_complete(task.task_id, {"stdout": "", "status": 1}, task.trace_id)
return True
# 2. Window Resize
if isinstance(raw_payload, dict) and raw_payload.get("action") == "resize":
cols = raw_payload.get("cols", 80)
rows = raw_payload.get("rows", 24)
sess = self._ensure_session(session_id, None, on_event)
import termios, struct, fcntl
s = struct.pack('HHHH', rows, cols, 0, 0)
fcntl.ioctl(sess["fd"], termios.TIOCSWINSZ, s)
print(f" [🐚] Terminal Resized to {cols}x{rows}")
on_complete(task.task_id, {"stdout": f"resized to {cols}x{rows}", "status": 1}, task.trace_id)
return True
except Exception as pe:
print(f" [🐚] Transparent TTY Fail: {pe}")
return False
def execute(self, task, sandbox, on_complete, on_event=None):
"""Dispatches command string to the persistent PTY shell."""
try:
cmd = task.payload_json
session_id = task.session_id or "default-session"
# --- Legacy Full-Command Execution (Sandboxed) ---
allowed, status_msg = sandbox.verify(cmd)
if not allowed:
err_msg = f"SANDBOX_VIOLATION: {status_msg}"
return on_complete(task.task_id, {"stderr": err_msg, "status": 2}, task.trace_id)
# Resolve CWD jail
cwd = None
if self.sync_mgr and task.session_id:
cwd = self.sync_mgr.get_session_dir(task.session_id)
elif sandbox.policy.get("WORKING_DIR_JAIL"):
cwd = sandbox.policy["WORKING_DIR_JAIL"]
if not os.path.exists(cwd):
try: os.makedirs(cwd, exist_ok=True)
except: pass
# Handle Session Persistent Process
sess = self._ensure_session(session_id, cwd, on_event)
# Input injection
print(f" [🐚] Shell In (Legacy): {cmd}")
full_cmd = cmd if cmd.endswith("\n") else cmd + "\n"
os.write(sess["fd"], full_cmd.encode("utf-8"))
# Return success immediately - output will stream via on_event
on_complete(task.task_id, {"stdout": "", "status": 1}, task.trace_id)
except Exception as e:
print(f" [🐚] Execute Error: {e}")
on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id)
def cancel(self, task_id: str):
"""Cancels an active task — for persistent shell, this sends a SIGINT (Ctrl+C)."""
# Note: We need a mapping from task_id to session_id to do this properly.
# For now, let's assume we can broadcast a SIGINT to all shells if specific task is unknown.
# Or better: track task-to-session mapping in the manager.
# For Phase 3, we'll try to find the session.
with self.lock:
for sid, sess in self.sessions.items():
print(f"[🛑] Sending SIGINT (Ctrl+C) to shell session: {sid}")
# Write \x03 (Ctrl+C) to the master FD
os.write(sess["fd"], b"\x03")
return True
def shutdown(self):
"""Cleanup: Terminates all persistent shells."""
with self.lock:
for sid, sess in list(self.sessions.items()):
print(f"[🛑] Cleaning up persistent shell: {sid}")
try: os.close(sess["fd"])
except: pass
# kill pid
try: os.kill(sess["pid"], 9)
except: pass
self.sessions.clear()