Newer
Older
cortex-hub / poc-grpc-agent / agent_node / skills / shell.py
import subprocess
import threading
from .base import BaseSkill

class ShellSkill(BaseSkill):
    """Default Skill: Executing shell commands with sandbox safety."""
    def __init__(self):
        self.processes = {} # task_id -> Popen
        self.lock = threading.Lock()

    def execute(self, task, sandbox, on_complete, on_event=None):
        """Processes shell-based commands for the Node."""
        try:
            cmd = task.payload_json
            
            # 1. Verification Logic
            allowed, status_msg = sandbox.verify(cmd)
            if not allowed:
                err_msg = f"SANDBOX_VIOLATION: {status_msg}"
                return on_complete(task.task_id, {"stderr": err_msg, "status": 2}, task.trace_id)

            # 2. Sequential Execution
            print(f"    [🐚] Executing Shell: {cmd}", flush=True)
            p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
            
            with self.lock: 
                self.processes[task.task_id] = p
            
            # 3. Timeout Handling
            timeout = task.timeout_ms / 1000.0 if task.timeout_ms > 0 else None
            stdout, stderr = p.communicate(timeout=timeout)
            
            print(f"    [🐚] Shell Done: {cmd} | Stdout Size: {len(stdout)}", flush=True)
            on_complete(task.task_id, {
                "stdout": stdout, "stderr": stderr, 
                "status": 1 if p.returncode == 0 else 2
            }, task.trace_id)

        except subprocess.TimeoutExpired:
            self.cancel(task.task_id)
            on_complete(task.task_id, {"stderr": "TASK_TIMEOUT", "status": 2}, task.trace_id)
        except Exception as e:
            on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id)
        finally:
            with self.lock: 
                self.processes.pop(task.task_id, None)

    def cancel(self, task_id: str):
        """Standard process termination for shell tasks."""
        with self.lock:
            p = self.processes.get(task_id)
            if p:
                print(f"[🛑] Killing Shell Task: {task_id}")
                p.kill()
                return True
        return False
    def shutdown(self):
        """Standard cleanup: Terminates all active shell processes."""
        with self.lock:
            for tid, p in list(self.processes.items()):
                print(f"[🛑] Killing Orphan Shell Task: {tid}")
                try: p.kill()
                except: pass
            self.processes.clear()