import subprocess
import threading
from .base import BaseSkill
class ShellSkill(BaseSkill):
"""Default Skill: Executing shell commands with sandbox safety."""
def __init__(self):
self.processes = {} # task_id -> Popen
self.lock = threading.Lock()
def execute(self, task, sandbox, on_complete, on_event=None):
"""Processes shell-based commands for the Node."""
try:
cmd = task.payload_json
# 1. Verification Logic
allowed, status_msg = sandbox.verify(cmd)
if not allowed:
err_msg = f"SANDBOX_VIOLATION: {status_msg}"
return on_complete(task.task_id, {"stderr": err_msg, "status": 2}, task.trace_id)
# 2. Sequential Execution
print(f" [🐚] Executing Shell: {cmd}", flush=True)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
with self.lock:
self.processes[task.task_id] = p
# 3. Timeout Handling
timeout = task.timeout_ms / 1000.0 if task.timeout_ms > 0 else None
stdout, stderr = p.communicate(timeout=timeout)
print(f" [🐚] Shell Done: {cmd} | Stdout Size: {len(stdout)}", flush=True)
on_complete(task.task_id, {
"stdout": stdout, "stderr": stderr,
"status": 1 if p.returncode == 0 else 2
}, task.trace_id)
except subprocess.TimeoutExpired:
self.cancel(task.task_id)
on_complete(task.task_id, {"stderr": "TASK_TIMEOUT", "status": 2}, task.trace_id)
except Exception as e:
on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id)
finally:
with self.lock:
self.processes.pop(task.task_id, None)
def cancel(self, task_id: str):
"""Standard process termination for shell tasks."""
with self.lock:
p = self.processes.get(task_id)
if p:
print(f"[🛑] Killing Shell Task: {task_id}")
p.kill()
return True
return False
def shutdown(self):
"""Standard cleanup: Terminates all active shell processes."""
with self.lock:
for tid, p in list(self.processes.items()):
print(f"[🛑] Killing Orphan Shell Task: {tid}")
try: p.kill()
except: pass
self.processes.clear()